from fastapi import FastAPI,HTTPException, Request, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from typing import Optional, List from LoadBalancer import LoadBalancer import logging import os import urllib.parse from utils import read_json_file, is_valid_url from tvdb import recent_list, genre_list CACHE_DIR = os.getenv("CACHE_DIR") TOKEN = os.getenv("TOKEN") REPO = os.getenv("REPO") app = FastAPI() origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def startup_event(): global load_balancer load_balancer = LoadBalancer(cache_dir=CACHE_DIR, token=TOKEN, repo=REPO) @app.get("/") def greet_json(): return {"Version": load_balancer.version} @app.post("/api/post/register") async def register_instance(request: Request): try: data = await request.json() if not data or "url" not in data: return JSONResponse(content={"error": "No URL provided"}, status_code=400) url = data["url"] if not is_valid_url(url): return JSONResponse(content={"error": "Invalid URL"}, status_code=400) # Register the instance load_balancer.register_instance(url) logging.info(f"Instance registered: {url}") return JSONResponse(content={"message": f"Instance {url} registered successfully"}, status_code=200) except Exception as e: logging.error(f"Error registering instance: {e}") return JSONResponse(content={"error": "Failed to register instance"}, status_code=500) @app.get("/api/get/file_structure") async def get_file_structure(): return load_balancer.file_structure @app.get("/api/get/movie/store") async def get_movie_store(): return load_balancer.FILM_STORE @app.get("/api/get/series/store") async def get_series_store(): return load_balancer.TV_STORE @app.get("/api/get/movie/all") async def get_all_movies_api(): return load_balancer.get_all_films() @app.get("/api/get/series/all") async def get_all_tvshows_api(): return load_balancer.get_all_tv_shows() @app.get("/api/get/recent") async def get_recent_items(limit: int = 5): # Get sorted entries recent_films = recent_list.get_sorted_entries('film') recent_series = recent_list.get_sorted_entries('series') # Slice the lists to only return the desired number of items limited_films = recent_films[:limit] limited_series = recent_series[:limit] # Return combined results return JSONResponse(content={ 'movies': limited_films, 'series': limited_series }) @app.get("/api/get/genre_categories") async def get_genre_categories(media_type: Optional[str] = Query(None, description="Filter by media type: 'movie' or 'series'")): """ Retrieve all available genre categories along with their density (number of media items). Query Parameters: media_type: Optional. Filter by media type ('movie' or 'series'). If not provided, returns the total count. Returns: A JSON response containing a list of genre objects, for example: [{'name': 'Comedy', 'density': 12}, {'name': 'Drama', 'density': 8}, ...] """ try: categories = [ { "name": genre, "density": sum( 1 for entry in data["entries"].values() if media_type is None or entry[3] == media_type ) } for genre, data in sorted(genre_list.genres.items()) ] return JSONResponse(content={"genres": categories}) except Exception as e: raise HTTPException(status_code=500, detail=f"Error retrieving genre categories: {str(e)}") @app.get("/api/get/genre") async def get_genre_items( genre: List[str] = Query(...), media_type: Optional[str] = None, limit: int = 5, page: int = 1 ): """ Get recent items from specified genres with an optional media type filter, a limit on the number of results, and pagination. :param genre: The genres to filter by (e.g., 'Comedy'). :param media_type: Optional. Filter by media type ('movie' or 'series'). :param limit: The maximum number of items to return for each media type. :param page: The page number for pagination. :return: A JSON response containing the filtered items. """ # Get sorted entries based on genres and media type entries = genre_list.get_entries_by_multiple_genres(genre, media_type=media_type) # Separate entries by media type and include only the title movies = [{'title': entry[0]} for entry in entries if entry[4] == 'movie'] series = [{'title': entry[0]} for entry in entries if entry[4] == 'series'] # Calculate pagination start = (page - 1) * limit end = start + limit # Limit the number of items for each media type limited_movies = movies[start:end] limited_series = series[start:end] # Organize the results by media type results = { 'movies': limited_movies, 'series': limited_series, 'page': page, 'limit': limit, 'total_movies': len(movies), 'total_series': len(series) } # Return the results in a JSON response return JSONResponse(content=results) @app.get("/api/get/movie/metadata/{title}") async def get_movie_metadata_api(title: str): """Endpoint to get the movie metadata by title.""" if not title: raise HTTPException(status_code=400, detail="No title provided") full_dir_path = os.path.join(CACHE_DIR, 'movie') json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): data = await read_json_file(json_cache_path) return JSONResponse(content=data) raise HTTPException(status_code=404, detail="Metadata not found") @app.get("/api/get/movie/card/{title}") async def get_movie_card_api(title: str): """Endpoint to get the movie metadata by title.""" if not title: raise HTTPException(status_code=400, detail="No title provided") full_dir_path = os.path.join(CACHE_DIR, 'movie') json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): data = await read_json_file(json_cache_path) image = data['data']['image'] trailers = data['data']['trailers'] or [] eng_title = None banner = [] portrait =[] overview = None if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): for name in data['data']['translations']['nameTranslations']: if name['language'] == 'eng': eng_title = name.get('name') break if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): overviews = data['data']['translations']['overviewTranslations'] # Check if there's an English overview for o in overviews: if o['language'] == 'eng': overview = o.get('overview') break # If no English overview is found and there's only one translation, use it if not overview and len(overviews) == 1: overview = overviews[0].get('overview') if data['data'].get('artworks'): for artwork in data['data']['artworks']: if artwork['type'] == 15: banner.append(artwork) if data['data'].get('artworks'): for artwork in data['data']['artworks']: if artwork['type'] == 14: portrait.append(artwork) year = data['data']['year'] return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image,'portrait':portrait, 'banner':banner, 'overview':overview, 'trailers': trailers}) raise HTTPException(status_code=404, detail="Card not found") @app.get("/api/get/series/metadata/{title}") async def get_series_metadata_api(title: str): """Endpoint to get the TV show metadata by title.""" if not title: raise HTTPException(status_code=400, detail="No title provided") full_dir_path = os.path.join(CACHE_DIR, 'series') json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): data = await read_json_file(json_cache_path) # Add the file structure to the metadata tv_structure_data = load_balancer.get_tv_structure(title) if tv_structure_data: data['file_structure'] = tv_structure_data return JSONResponse(content=data) raise HTTPException(status_code=404, detail="Metadata not found") @app.get("/api/get/series/card/{title}") async def get_series_card_api(title: str): """Endpoint to get the TV show metadata by title.""" if not title: raise HTTPException(status_code=400, detail="No title provided") full_dir_path = os.path.join(CACHE_DIR, 'series') json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") if os.path.exists(json_cache_path): data = await read_json_file(json_cache_path) image = data['data']['image'] trailers = data['data']['trailers'] or [] eng_title = None overview = None portrait = [] banner = [] if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): for name in data['data']['translations']['nameTranslations']: if name['language'] == 'eng': eng_title = name.get('name') break year = data['data']['year'] if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): overviews = data['data']['translations']['overviewTranslations'] # Check if there's an English overview for o in overviews: if o['language'] == 'eng': overview = o.get('overview') break # If no English overview is found and there's only one translation, use it if not overview and len(overviews) == 1: overview = overviews[0].get('overview') if data['data'].get('artworks'): for artwork in data['data']['artworks']: if artwork['type'] == 3: banner.append(artwork) if data['data'].get('artworks'): for artwork in data['data']['artworks']: if artwork['type'] == 2: portrait.append(artwork) return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image, 'portrait':portrait,'banner': banner, 'overview':overview, 'trailers': trailers}) raise HTTPException(status_code=404, detail="Card not found") @app.get("/api/get/series/metadata/{title}/{season}") async def get_season_metadata_api(title: str, season: str): """Endpoint to get the TV show season metadata by title and season.""" if not season: raise HTTPException(status_code=400, detail="Season must be provided and cannot be empty") # Convert series_id to string before joining the path json_cache_path = os.path.join(CACHE_DIR, "metadata", title, f"{season}.json") print(json_cache_path) if os.path.exists(json_cache_path): data = await read_json_file(json_cache_path) return JSONResponse(content=data) raise HTTPException(status_code=404, detail="Metadata not found") @app.get('/api/get/instances') async def get_instances(): return load_balancer.instances @app.get('/api/get/instances/health') async def get_instances_health(): return load_balancer.instances_health @app.get("/api/get/movie/{title}") async def get_movie_api(title: str): """Endpoint to get the movie by title.""" if not title: raise HTTPException(status_code=400, detail="Title parameter is required") # Check if the movie is already cached if title in load_balancer.FILM_STORE: url = load_balancer.FILM_STORE[title] return JSONResponse(content={"url": url}) movie_path = load_balancer.find_movie_path(title) if not movie_path: raise HTTPException(status_code=404, detail="Movie not found") # Start the download in an instance response = load_balancer.download_film_to_best_instance(title=title) if response: return JSONResponse(content=response) @app.get("/api/get/series/{title}/{season}/{episode}") async def get_tv_show_api(title: str, season: str, episode: str): """Endpoint to get the TV show by title, season, and episode.""" if not title or not season or not episode: raise HTTPException(status_code=400, detail="Title, season, and episode parameters are required") # Check if the episode is already cached if title in load_balancer.TV_STORE and season in load_balancer.TV_STORE[title]: for ep in load_balancer.TV_STORE[title][season]: if episode in ep: url = load_balancer.TV_STORE[title][season][ep] return JSONResponse(content={"url": url}) tv_path = load_balancer.find_tv_path(title) if not tv_path: raise HTTPException(status_code=404, detail="TV show not found") episode_path = None for directory in load_balancer.file_structure: if directory['type'] == 'directory' and directory['path'] == 'tv': for sub_directory in directory['contents']: if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): for season_dir in sub_directory['contents']: if season_dir['type'] == 'directory' and season in season_dir['path']: for episode_file in season_dir['contents']: if episode_file['type'] == 'file' and episode in episode_file['path']: episode_path = episode_file['path'] break if not episode_path: raise HTTPException(status_code=404, detail="Episode not found") # Start the download in an instance response = load_balancer.download_episode_to_best_instance(title=title, season=season, episode=episode) if response: return JSONResponse(content=response)