import os from indexer import indexer import re import urllib.parse from tvdb import fetch_and_cache_json from threading import Event, Thread import asyncio import time import logging from utils import convert_to_gb from api import InstancesAPI CACHE_DIR = os.getenv("CACHE_DIR") class LoadBalancer: def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1): self.version = "0.0.0.8 Alpha Debug" self.instances = [] self.instances_health = {} self.polling_interval = polling_interval self.max_retries = max_retries self.initial_delay = initial_delay self.stop_event = Event() self.instances_api = InstancesAPI(self.instances) self.CACHE_DIR = cache_dir self.TOKEN = token self.REPO = repo self.FILM_STORE = {} self.TV_STORE = {} self.file_structure = None self.previous_file_structure = None # To keep track of previous content # Caches for films and TV shows to avoid returning empty lists in case of errors self.cached_films = [] self.cached_tv_shows = {} # Ensure CACHE_DIR exists if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) # Initialize file structure, update caches, and start prefetching self.file_structure = indexer() self.update_media_cache() self.start_prefetching() # Start polling and file checking in separate threads polling_thread = Thread(target=self.start_polling) polling_thread.daemon = True polling_thread.start() # Start periodic tasks asyncio.create_task(self.run_periodic_tasks()) def update_media_cache(self): """Update the cached films and TV shows from the current file_structure. Only update if the new data is non-empty, preserving the last valid cache. """ new_films = [] new_tv_shows = {} for directory in self.file_structure: if directory.get('type') == 'directory': if directory.get('path') == 'films': for sub_directory in directory.get('contents', []): if sub_directory.get('type') == 'directory': new_films.append(sub_directory.get('path')) elif directory.get('path') == 'tv': for sub_directory in directory.get('contents', []): if sub_directory.get('type') == 'directory': show_title = sub_directory.get('path').split('/')[-1] episodes_list = [] for season_directory in sub_directory.get('contents', []): if season_directory.get('type') == 'directory': season = season_directory.get('path').split('/')[-1] for episode in season_directory.get('contents', []): if episode.get('type') == 'file': episodes_list.append({ "season": season, "episode": episode.get('path').split('/')[-1], "path": episode.get('path') }) if episodes_list: new_tv_shows[show_title] = episodes_list if new_films: self.cached_films = new_films if new_tv_shows: self.cached_tv_shows = new_tv_shows async def run_periodic_tasks(self): """Run indexer and prefetch functions every 5 minutes.""" while not self.stop_event.is_set(): new_file_structure = indexer() # Re-run indexer # Only update if the new file_structure is non-empty if new_file_structure: self.file_structure = new_file_structure self.update_media_cache() await self.start_prefetching() # Start prefetching metadata await asyncio.sleep(300) # Sleep for 5 minutes def start_prefetching(self): """Start the metadata prefetching in the FastAPI event loop.""" return asyncio.create_task(self.prefetch_metadata()) async def prefetch_metadata(self): """Prefetch metadata for all items in the file structure.""" tasks = [] for item in self.file_structure: if 'contents' in item: for sub_item in item['contents']: original_title = sub_item['path'].split('/')[-1] media_type = 'series' if item['path'].startswith('tv') else 'movie' title = original_title year = None # Extract year from the title if available match = re.search(r'\((\d{4})\)', original_title) if match: year_str = match.group(1) if year_str.isdigit() and len(year_str) == 4: title = original_title[:match.start()].strip() year = int(year_str) else: parts = original_title.rsplit(' ', 1) if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: title = parts[0].strip() year = int(parts[-1]) # Schedule the fetch and cache task json_cache_path = os.path.join(self.CACHE_DIR, f"{urllib.parse.quote(original_title)}.json") if not os.path.exists(json_cache_path): tasks.append(fetch_and_cache_json(original_title, title, media_type, year)) else: logging.info(f"Skipping.. {original_title} metadata already cached") # Run all tasks concurrently await asyncio.gather(*tasks) def get_reports(self): reports = self.instances_api.fetch_reports() temp_film_store = {} temp_tv_store = {} for instance_url in self.instances[:]: if instance_url in reports: report = reports[instance_url] logging.info(f"Report from {instance_url}: {report}") self.process_report(instance_url, report, temp_film_store, temp_tv_store) else: logging.error(f"Failed to get report from {instance_url}. Removing instance.") self.remove_instance(instance_url) self.FILM_STORE = temp_film_store self.TV_STORE = temp_tv_store def process_report(self, instance_url, report, temp_film_store, temp_tv_store): film_store = report.get('film_store', {}) tv_store = report.get('tv_store', {}) cache_size = report.get('cache_size') logging.info(f"Processing report from {instance_url}") # Update temporary film store for title, path in film_store.items(): url = f"{instance_url}/api/get/film/{title.replace(' ', '%20')}" temp_film_store[title] = url # Update temporary TV store for title, seasons in tv_store.items(): if title not in temp_tv_store: temp_tv_store[title] = {} for season, episodes in seasons.items(): if season not in temp_tv_store[title]: temp_tv_store[title][season] = {} for episode, path in episodes.items(): url = f"{instance_url}/api/get/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" temp_tv_store[title][season][episode] = url logging.info("Film and TV Stores processed successfully.") self.update_instances_health(instance=instance_url, cache_size=cache_size) def start_polling(self): logging.info("Starting polling.") while not self.stop_event.is_set(): self.get_reports() time.sleep(self.polling_interval) logging.info("Polling stopped.") def stop_polling(self): logging.info("Stopping polling.") self.stop_event.set() def register_instance(self, instance_url): if instance_url not in self.instances: self.instances.append(instance_url) logging.info(f"Registered instance {instance_url}") else: logging.info(f"Instance {instance_url} is already registered.") def remove_instance(self, instance_url): if instance_url in self.instances: self.instances.remove(instance_url) self.instances_health.pop(instance_url, None) logging.info(f"Removed instance {instance_url}") else: logging.info(f"Instance {instance_url} not found for removal.") def update_instances_health(self, instance, cache_size): self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"} logging.info(f"Updated instance {instance} with cache size {cache_size}") def download_film_to_best_instance(self, title): """ Downloads a film to the first instance that has more free space on the self.instances_health list. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = convert_to_gb(space_info['total']) used_space = convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_film(best_instance, title) film_id = result["film_id"] status = result["status"] progress_url = f'{best_instance}/api/get/progress/{film_id}' response = { "film_id": film_id, "status": status, "progress_url": progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} def download_episode_to_best_instance(self, title, season, episode): """ Downloads an episode to the first instance that has more free space on the self.instances_health list. """ best_instance = None max_free_space = -1 # Calculate free space for each instance for instance_url, space_info in self.instances_health.items(): total_space = convert_to_gb(space_info['total']) used_space = convert_to_gb(space_info['used']) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if best_instance: result = self.instances_api.download_episode(best_instance, title, season, episode) episode_id = result["episode_id"] status = result["status"] progress_url = f'{best_instance}/api/get/progress/{episode_id}' response = { "episode_id": episode_id, "status": status, "progress_url": progress_url } return response else: logging.error("No suitable instance found for downloading the film.") return {"error": "No suitable instance found for downloading the film."} def find_movie_path(self, title): """Find the path of the movie in the JSON data based on the title.""" for directory in self.file_structure: if directory.get('type') == 'directory' and directory.get('path') == 'films': for sub_directory in directory.get('contents', []): if sub_directory.get('type') == 'directory': for item in sub_directory.get('contents', []): if item.get('type') == 'file' and title.lower() in item.get('path').lower(): return item.get('path') return None def find_tv_path(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory.get('type') == 'directory' and directory.get('path') == 'tv': for sub_directory in directory.get('contents', []): if sub_directory.get('type') == 'directory' and title.lower() in sub_directory.get('path').lower(): return sub_directory.get('path') return None def get_tv_structure(self, title): """Find the path of the TV show in the JSON data based on the title.""" for directory in self.file_structure: if directory.get('type') == 'directory' and directory.get('path') == 'tv': for sub_directory in directory.get('contents', []): if sub_directory.get('type') == 'directory' and title.lower() in sub_directory.get('path').lower(): return sub_directory return None def get_film_id(self, title): """Generate a film ID based on the title.""" return title.replace(" ", "_").lower() def get_all_tv_shows(self): """Return the cached TV shows.""" return self.cached_tv_shows def get_all_films(self): """Return the cached films.""" return self.cached_films