Files
jellydisc/jellyfin/api_client.py

146 lines
4.9 KiB
Python

from settings import settings
from jellyfin_apiclient_python import JellyfinClient
from getmac import get_mac_address
from jellyfin.models import JellyfinMediaItem, JellyfinMediaType
from datetime import datetime
import logging
import time
import os
AUTH_TIMEOUT = 10 * 60 # 10 minutes
class JellyfinApiClient:
def __init__(self):
machine_name = os.uname().nodename
unique_id = get_mac_address(hostname="localhost")
self.logger = logging.getLogger('JellyfinApiClient')
self.logger.info("Connecting to Jellyfin server...")
self.client = JellyfinClient()
self.client.config.app('jellydisc', '0.0.1', machine_name, unique_id)
self.client.config.data['auth.ssl'] = True
self.last_auth_time = None
self.authenticate()
self.logger.info("Connected to Jellyfin server.")
def authenticate(self):
self.logger.info("Authenticating with Jellyfin server...")
self.client.auth.connect_to_address(settings.jellyfin_server_url)
self.client.auth.login(
settings.jellyfin_server_url,
settings.jellyfin_username,
settings.jellyfin_password
)
self.last_auth_time = time.time()
self.logger.info("Authenticated with Jellyfin server.")
def get_current_playback(self) -> JellyfinMediaItem | None:
if time.time() - self.last_auth_time > AUTH_TIMEOUT:
self.authenticate()
self.logger.info("Fetching current playback information...")
sessions = self.client.jellyfin.get_sessions()
if not sessions:
self.logger.info("No active playback found.")
return None
for session in sessions:
if session.get('NowPlayingItem'):
self.logger.info("Current playback information fetched.")
return self.to_model(session['NowPlayingItem'])
self.logger.info("No active playback found.")
return None
def get_image_url(self, media_id: str) -> str:
server_address = settings.jellyfin_server_url.rstrip('/')
return f"{server_address}/Items/{media_id}/Images/Primary?maxWidth=300&maxHeight=300"
def to_model(self, item: dict) -> JellyfinMediaItem:
media_type = item.get('Type')
if media_type == 'Audio':
return self.to_music_model(item)
elif media_type == 'Episode':
return self.to_episode_model(item)
elif media_type == 'Movie':
return self.to_movie_model(item)
raise ValueError(f"Unsupported media type: {media_type}")
def get_playback_info(self, media: dict) -> tuple[int, int]:
media_id = media.get('Id')
playback_info = self.client.jellyfin.get_userdata_for_item(media_id)
runtime_ticks = media.get('RunTimeTicks', 0)
total_runtime_seconds = runtime_ticks // 10_000_000
playback_position_ticks = playback_info.get('PlaybackPositionTicks', 0)
playback_position_seconds = playback_position_ticks // 10_000_000
start = int(datetime.now().timestamp()) - playback_position_seconds
end = start + total_runtime_seconds
return (start, end)
def to_music_model(self, item: dict) -> JellyfinMediaItem:
media_id = item.get('Id')
(start, end) = self.get_playback_info(item)
return JellyfinMediaItem(
id=media_id,
name=item.get('Name'),
type=JellyfinMediaType.AUDIO,
image_url=self.get_image_url(media_id),
start=start,
end=end,
metadata={
'artist': item.get('AlbumArtist'),
}
)
def to_movie_model(self, item: dict) -> JellyfinMediaItem:
media_id = item.get('Id')
premiere_date = item.get('PremiereDate')
(start, end) = self.get_playback_info(item)
return JellyfinMediaItem(
id=media_id,
name=item.get('Name'),
type=JellyfinMediaType.MOVIE,
image_url=self.get_image_url(media_id),
start=start,
end=end,
metadata={
'date': datetime.fromisoformat(premiere_date).strftime('%d/%m/%Y') if premiere_date else None
}
)
def to_episode_model(self, item: dict) -> JellyfinMediaItem:
media_id = item.get('Id')
seris_name = item.get('SeriesName')
season_number = item.get('ParentIndexNumber')
episode_number = item.get('IndexNumber')
subtitle=f"S{season_number:02}E{episode_number:02} of {seris_name}"
(start, end) = self.get_playback_info(item)
return JellyfinMediaItem(
id=media_id,
name=item.get('Name'),
type=JellyfinMediaType.EPISODE,
image_url=self.get_image_url(media_id),
start=start,
end=end,
metadata={
'subtitle': subtitle,
}
)