151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
from settings import settings
|
|
from jellyfin_apiclient_python import JellyfinClient
|
|
from getmac import get_mac_address
|
|
from jellyfin.models import JellyfinMediaItem, JellyfinMediaType
|
|
from datetime import datetime
|
|
import logging
|
|
import time
|
|
import os
|
|
|
|
class JellyfinApiClient:
|
|
def __init__(self):
|
|
machine_name = os.uname().nodename
|
|
unique_id = get_mac_address(hostname="localhost")
|
|
|
|
self.logger = logging.getLogger('JellyfinApiClient')
|
|
|
|
self.logger.info("Connecting to Jellyfin server...")
|
|
self.client = JellyfinClient()
|
|
self.client.config.app('jellydisc', '0.1.1', machine_name, unique_id)
|
|
self.client.config.data['auth.ssl'] = True
|
|
|
|
self.last_auth_time = None
|
|
self.authenticate()
|
|
|
|
self.logger.info("Connected to Jellyfin server.")
|
|
|
|
def authenticate(self):
|
|
self.logger.info("Authenticating with Jellyfin server...")
|
|
self.client.auth.connect_to_address(settings.jellyfin_server_url)
|
|
self.client.auth.login(
|
|
settings.jellyfin_server_url,
|
|
settings.jellyfin_username,
|
|
settings.jellyfin_password
|
|
)
|
|
self.last_auth_time = time.time()
|
|
self.logger.info("Authenticated with Jellyfin server.")
|
|
|
|
def get_current_playback(self) -> JellyfinMediaItem | None:
|
|
if time.time() - self.last_auth_time > settings.jellyfin_auth_timeout:
|
|
self.authenticate()
|
|
|
|
self.logger.info("Fetching current playback information...")
|
|
sessions = self.client.jellyfin.get_sessions()
|
|
|
|
if not sessions:
|
|
self.logger.info("No active playback found.")
|
|
return None
|
|
|
|
for session in sessions:
|
|
session_id = session.get('Id')
|
|
current_item = self.client.jellyfin.get_now_playing(session_id)
|
|
|
|
if current_item and current_item.get('Type') in ['Audio', 'Episode', 'Movie']:
|
|
self.logger.info("Current playback information fetched.")
|
|
return self.to_model(current_item)
|
|
|
|
self.logger.info("No active playback found.")
|
|
return None
|
|
|
|
def get_image_url(self, media_id: str) -> str:
|
|
server_address = settings.jellyfin_server_url.rstrip('/')
|
|
return f"{server_address}/Items/{media_id}/Images/Primary?maxWidth=300&maxHeight=300"
|
|
|
|
def to_model(self, item: dict) -> JellyfinMediaItem:
|
|
media_type = item.get('Type')
|
|
|
|
if media_type == 'Audio':
|
|
return self.to_music_model(item)
|
|
elif media_type == 'Episode':
|
|
return self.to_episode_model(item)
|
|
elif media_type == 'Movie':
|
|
return self.to_movie_model(item)
|
|
|
|
def get_playback_info(self, media: dict) -> tuple[int, int]:
|
|
play_state = media.get('PlayState')
|
|
|
|
runtime_ticks = media.get('RunTimeTicks')
|
|
total_runtime_seconds = runtime_ticks // 10_000_000
|
|
|
|
playback_position_ticks = play_state.get('PositionTicks')
|
|
playback_position_seconds = playback_position_ticks // 10_000_000
|
|
|
|
start = int(datetime.now().timestamp()) - playback_position_seconds
|
|
end = start + total_runtime_seconds
|
|
|
|
return (start, end)
|
|
|
|
def to_music_model(self, item: dict) -> JellyfinMediaItem:
|
|
media_id = item.get('Id')
|
|
parent_id = item.get('ParentId')
|
|
premiere_date = item.get('PremiereDate')
|
|
premiere_year = datetime.fromisoformat(premiere_date).year if premiere_date else None
|
|
|
|
(start, end) = self.get_playback_info(item)
|
|
|
|
|
|
return JellyfinMediaItem(
|
|
id=media_id,
|
|
name=item.get('Name'),
|
|
type=JellyfinMediaType.AUDIO,
|
|
image_url=self.get_image_url(parent_id),
|
|
start=start,
|
|
end=end,
|
|
metadata={
|
|
'artist': item.get('AlbumArtist'),
|
|
'album': f"{item.get('Album')} ({premiere_year})" if premiere_date else item.get('Album')
|
|
}
|
|
)
|
|
|
|
def to_movie_model(self, item: dict) -> JellyfinMediaItem:
|
|
media_id = item.get('Id')
|
|
premiere_date = item.get('PremiereDate')
|
|
|
|
(start, end) = self.get_playback_info(item)
|
|
|
|
return JellyfinMediaItem(
|
|
id=media_id,
|
|
name=item.get('Name'),
|
|
type=JellyfinMediaType.MOVIE,
|
|
image_url=self.get_image_url(media_id),
|
|
start=start,
|
|
end=end,
|
|
metadata={
|
|
'date': datetime.fromisoformat(premiere_date).strftime('%d/%m/%Y') if premiere_date else None
|
|
}
|
|
)
|
|
|
|
def to_episode_model(self, item: dict) -> JellyfinMediaItem:
|
|
media_id = item.get('Id')
|
|
parent = self.client.jellyfin.get_item(item.get('ParentId'))
|
|
parent_id = item.get('ParentId')
|
|
series_name = item.get('SeriesName')
|
|
season_number = item.get('ParentIndexNumber')
|
|
episode_number = item.get('IndexNumber')
|
|
|
|
subtitle=f"S{season_number:02}E{episode_number:02} of {series_name}"
|
|
|
|
(start, end) = self.get_playback_info(item)
|
|
|
|
return JellyfinMediaItem(
|
|
id=media_id,
|
|
name=item.get('Name'),
|
|
type=JellyfinMediaType.EPISODE,
|
|
image_url=self.get_image_url(parent_id),
|
|
start=start,
|
|
end=end,
|
|
metadata={
|
|
'subtitle': subtitle,
|
|
}
|
|
)
|