lb / tvdbApiClient.py
ChandimaPrabath's picture
season metadata api update
97c0fdb
import json
import os
import logging
from pathlib import Path
import tvdb_v4_official
from utils import save_to_json
THETVDB_API_KEY = os.getenv("THETVDB_API_KEY")
CACHE_DIR = os.getenv("CACHE_DIR")
SAVE_DIR = os.path.join(CACHE_DIR, "metadata")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize TVDB client
tvdb = tvdb_v4_official.TVDB(THETVDB_API_KEY)
def get_series_info(series_id):
"""Fetch series information including episodes from TVDB."""
try:
series = tvdb.get_series_extended(series_id, meta="episodes")
logging.info("Series info fetched successfully.")
return series
except Exception as e:
logging.error(f"Error fetching series info: {e}")
return None
def filter_episode_data(episode):
"""Filter episode data to include only necessary fields."""
return {
"id": episode.get("id"),
"seriesId": episode.get("seriesId"),
"name": episode.get("name"),
"aired": episode.get("aired"),
"runtime": episode.get("runtime"),
"overview": episode.get("overview"),
"image": episode.get("image"),
"imageType": episode.get("imageType"),
"isMovie": episode.get("isMovie"),
"number": episode.get("number"),
"absoluteNumber": episode.get("absoluteNumber"),
"seasonNumber": episode.get("seasonNumber"),
"finaleType": episode.get("finaleType"),
"year": episode.get("year")
}
async def fetch_and_cache_seasons(series_id, original_title):
"""Fetch and cache episodes for a given series ID asynchronously."""
series_info = get_series_info(series_id)
if not series_info:
logging.error("Series info could not be fetched.")
return
# Fetch all episodes for the series
try:
all_episodes = tvdb.get_series_episodes(series_id, lang="eng").get('episodes', [])
except Exception as e:
logging.error(f"Error fetching episodes for series ID {series_id}: {e}")
return
# Organize episodes by season
all_seasons = {}
for episode in all_episodes:
season_number = episode.get('seasonNumber')
if season_number == 0:
season_key = "Specials"
else:
season_key = f"Season {season_number}"
if season_key not in all_seasons:
all_seasons[season_key] = []
filtered_data = filter_episode_data(episode)
all_seasons[season_key].append(filtered_data)
# Create folder for the series
series_folder = Path(SAVE_DIR) / str(original_title)
series_folder.mkdir(parents=True, exist_ok=True)
# Save episodes for each season in separate JSON files
for season_key, episodes in sorted(all_seasons.items()):
episodes_sorted = sorted(episodes, key=lambda e: e.get('number'))
season_file = series_folder / f"{season_key}.json"
await save_to_json(episodes_sorted, season_file)
# async def main(series_id):
# """Main function to fetch and cache episodes asynchronously."""
# await fetch_and_cache_seasons(series_id)
# if __name__ == "__main__":
# import asyncio
# # Replace with your series ID
# SERIES_ID = "315103"
# asyncio.run(main(SERIES_ID))