Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import logging | |
| from pathlib import Path | |
| import tvdb_v4_official | |
| from utils import save_to_json | |
| THETVDB_API_KEY = os.getenv("THETVDB_API_KEY") | |
| CACHE_DIR = os.getenv("CACHE_DIR") | |
| SAVE_DIR = os.path.join(CACHE_DIR, "metadata") | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Initialize TVDB client | |
| tvdb = tvdb_v4_official.TVDB(THETVDB_API_KEY) | |
| def get_series_info(series_id): | |
| """Fetch series information including episodes from TVDB.""" | |
| try: | |
| series = tvdb.get_series_extended(series_id, meta="episodes") | |
| logging.info("Series info fetched successfully.") | |
| return series | |
| except Exception as e: | |
| logging.error(f"Error fetching series info: {e}") | |
| return None | |
| def filter_episode_data(episode): | |
| """Filter episode data to include only necessary fields.""" | |
| return { | |
| "id": episode.get("id"), | |
| "seriesId": episode.get("seriesId"), | |
| "name": episode.get("name"), | |
| "aired": episode.get("aired"), | |
| "runtime": episode.get("runtime"), | |
| "overview": episode.get("overview"), | |
| "image": episode.get("image"), | |
| "imageType": episode.get("imageType"), | |
| "isMovie": episode.get("isMovie"), | |
| "number": episode.get("number"), | |
| "absoluteNumber": episode.get("absoluteNumber"), | |
| "seasonNumber": episode.get("seasonNumber"), | |
| "finaleType": episode.get("finaleType"), | |
| "year": episode.get("year") | |
| } | |
| async def fetch_and_cache_seasons(series_id): | |
| """Fetch and cache episodes for a given series ID asynchronously.""" | |
| series_info = get_series_info(series_id) | |
| if not series_info: | |
| logging.error("Series info could not be fetched.") | |
| return | |
| # Fetch all episodes for the series | |
| try: | |
| all_episodes = tvdb.get_series_episodes(series_id, lang="eng").get('episodes', []) | |
| except Exception as e: | |
| logging.error(f"Error fetching episodes for series ID {series_id}: {e}") | |
| return | |
| # Organize episodes by season | |
| all_seasons = {} | |
| for episode in all_episodes: | |
| season_number = episode.get('seasonNumber') | |
| if season_number == 0: | |
| season_key = "Specials" | |
| else: | |
| season_key = f"Season {season_number}" | |
| if season_key not in all_seasons: | |
| all_seasons[season_key] = [] | |
| filtered_data = filter_episode_data(episode) | |
| all_seasons[season_key].append(filtered_data) | |
| # Create folder for the series | |
| series_folder = Path(SAVE_DIR) / str(series_id) | |
| series_folder.mkdir(parents=True, exist_ok=True) | |
| # Save episodes for each season in separate JSON files | |
| for season_key, episodes in sorted(all_seasons.items()): | |
| episodes_sorted = sorted(episodes, key=lambda e: e.get('number')) | |
| season_file = series_folder / f"{season_key}.json" | |
| await save_to_json(episodes_sorted, season_file) | |
| async def main(series_id): | |
| """Main function to fetch and cache episodes asynchronously.""" | |
| await fetch_and_cache_seasons(series_id) | |
| if __name__ == "__main__": | |
| import asyncio | |
| # Replace with your series ID | |
| SERIES_ID = "315103" | |
| asyncio.run(main(SERIES_ID)) | |