|
|
from fastapi import FastAPI,HTTPException, Request, Query |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse |
|
|
from typing import Optional, List |
|
|
from LoadBalancer import LoadBalancer |
|
|
import logging |
|
|
import os |
|
|
import urllib.parse |
|
|
from utils import read_json_file, is_valid_url |
|
|
from tvdb import recent_list, genre_list |
|
|
|
|
|
CACHE_DIR = os.getenv("CACHE_DIR") |
|
|
TOKEN = os.getenv("TOKEN") |
|
|
REPO = os.getenv("REPO") |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
origins = ["*"] |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=origins, |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
global load_balancer |
|
|
|
|
|
load_balancer = LoadBalancer(cache_dir=CACHE_DIR, token=TOKEN, repo=REPO) |
|
|
|
|
|
@app.get("/") |
|
|
def greet_json(): |
|
|
return {"Version": load_balancer.version} |
|
|
|
|
|
@app.post("/api/post/register") |
|
|
async def register_instance(request: Request): |
|
|
try: |
|
|
data = await request.json() |
|
|
if not data or "url" not in data: |
|
|
return JSONResponse(content={"error": "No URL provided"}, status_code=400) |
|
|
|
|
|
url = data["url"] |
|
|
if not is_valid_url(url): |
|
|
return JSONResponse(content={"error": "Invalid URL"}, status_code=400) |
|
|
|
|
|
|
|
|
load_balancer.register_instance(url) |
|
|
logging.info(f"Instance registered: {url}") |
|
|
|
|
|
return JSONResponse(content={"message": f"Instance {url} registered successfully"}, status_code=200) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error registering instance: {e}") |
|
|
return JSONResponse(content={"error": "Failed to register instance"}, status_code=500) |
|
|
|
|
|
@app.get("/api/get/file_structure") |
|
|
async def get_file_structure(): |
|
|
return load_balancer.file_structure |
|
|
|
|
|
@app.get("/api/get/movie/store") |
|
|
async def get_movie_store(): |
|
|
return load_balancer.FILM_STORE |
|
|
|
|
|
@app.get("/api/get/series/store") |
|
|
async def get_series_store(): |
|
|
return load_balancer.TV_STORE |
|
|
|
|
|
@app.get("/api/get/movie/all") |
|
|
async def get_all_movies_api(): |
|
|
return load_balancer.get_all_films() |
|
|
|
|
|
@app.get("/api/get/series/all") |
|
|
async def get_all_tvshows_api(): |
|
|
return load_balancer.get_all_tv_shows() |
|
|
|
|
|
@app.get("/api/get/recent") |
|
|
async def get_recent_items(limit: int = 5): |
|
|
|
|
|
recent_films = recent_list.get_sorted_entries('film') |
|
|
recent_series = recent_list.get_sorted_entries('series') |
|
|
|
|
|
|
|
|
limited_films = recent_films[:limit] |
|
|
limited_series = recent_series[:limit] |
|
|
|
|
|
|
|
|
return JSONResponse(content={ |
|
|
'movies': limited_films, |
|
|
'series': limited_series |
|
|
}) |
|
|
|
|
|
@app.get("/api/get/genre_categories") |
|
|
async def get_genre_categories(media_type: Optional[str] = Query(None, description="Filter by media type: 'movie' or 'series'")): |
|
|
""" |
|
|
Retrieve all available genre categories along with their density (number of media items). |
|
|
|
|
|
Query Parameters: |
|
|
media_type: Optional. Filter by media type ('movie' or 'series'). If not provided, returns the total count. |
|
|
|
|
|
Returns: |
|
|
A JSON response containing a list of genre objects, for example: |
|
|
[{'name': 'Comedy', 'density': 12}, {'name': 'Drama', 'density': 8}, ...] |
|
|
""" |
|
|
try: |
|
|
categories = [ |
|
|
{ |
|
|
"name": genre, |
|
|
"density": sum( |
|
|
1 for entry in data["entries"].values() |
|
|
if media_type is None or entry[3] == media_type |
|
|
) |
|
|
} |
|
|
for genre, data in sorted(genre_list.genres.items()) |
|
|
] |
|
|
return JSONResponse(content={"genres": categories}) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error retrieving genre categories: {str(e)}") |
|
|
|
|
|
@app.get("/api/get/genre") |
|
|
async def get_genre_items( |
|
|
genre: List[str] = Query(...), |
|
|
media_type: Optional[str] = None, |
|
|
limit: int = 5, |
|
|
page: int = 1 |
|
|
): |
|
|
""" |
|
|
Get recent items from specified genres with an optional media type filter, a limit on the number of results, and pagination. |
|
|
|
|
|
:param genre: The genres to filter by (e.g., 'Comedy'). |
|
|
:param media_type: Optional. Filter by media type ('movie' or 'series'). |
|
|
:param limit: The maximum number of items to return for each media type. |
|
|
:param page: The page number for pagination. |
|
|
:return: A JSON response containing the filtered items. |
|
|
""" |
|
|
|
|
|
entries = genre_list.get_entries_by_multiple_genres(genre, media_type=media_type) |
|
|
|
|
|
|
|
|
movies = [{'title': entry[0]} for entry in entries if entry[4] == 'movie'] |
|
|
series = [{'title': entry[0]} for entry in entries if entry[4] == 'series'] |
|
|
|
|
|
|
|
|
start = (page - 1) * limit |
|
|
end = start + limit |
|
|
|
|
|
|
|
|
limited_movies = movies[start:end] |
|
|
limited_series = series[start:end] |
|
|
|
|
|
|
|
|
results = { |
|
|
'movies': limited_movies, |
|
|
'series': limited_series, |
|
|
'page': page, |
|
|
'limit': limit, |
|
|
'total_movies': len(movies), |
|
|
'total_series': len(series) |
|
|
} |
|
|
|
|
|
|
|
|
return JSONResponse(content=results) |
|
|
|
|
|
|
|
|
@app.get("/api/get/movie/metadata/{title}") |
|
|
async def get_movie_metadata_api(title: str): |
|
|
"""Endpoint to get the movie metadata by title.""" |
|
|
if not title: |
|
|
raise HTTPException(status_code=400, detail="No title provided") |
|
|
|
|
|
full_dir_path = os.path.join(CACHE_DIR, 'movie') |
|
|
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
|
|
|
|
|
if os.path.exists(json_cache_path): |
|
|
data = await read_json_file(json_cache_path) |
|
|
return JSONResponse(content=data) |
|
|
|
|
|
raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
|
|
|
@app.get("/api/get/movie/card/{title}") |
|
|
async def get_movie_card_api(title: str): |
|
|
"""Endpoint to get the movie metadata by title.""" |
|
|
if not title: |
|
|
raise HTTPException(status_code=400, detail="No title provided") |
|
|
|
|
|
full_dir_path = os.path.join(CACHE_DIR, 'movie') |
|
|
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
|
|
|
|
|
if os.path.exists(json_cache_path): |
|
|
data = await read_json_file(json_cache_path) |
|
|
image = data['data']['image'] |
|
|
trailers = data['data']['trailers'] or [] |
|
|
eng_title = None |
|
|
banner = [] |
|
|
portrait =[] |
|
|
overview = None |
|
|
|
|
|
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): |
|
|
for name in data['data']['translations']['nameTranslations']: |
|
|
if name['language'] == 'eng': |
|
|
eng_title = name.get('name') |
|
|
break |
|
|
|
|
|
if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): |
|
|
overviews = data['data']['translations']['overviewTranslations'] |
|
|
|
|
|
|
|
|
for o in overviews: |
|
|
if o['language'] == 'eng': |
|
|
overview = o.get('overview') |
|
|
break |
|
|
|
|
|
|
|
|
if not overview and len(overviews) == 1: |
|
|
overview = overviews[0].get('overview') |
|
|
|
|
|
|
|
|
if data['data'].get('artworks'): |
|
|
for artwork in data['data']['artworks']: |
|
|
if artwork['type'] == 15: |
|
|
banner.append(artwork) |
|
|
|
|
|
if data['data'].get('artworks'): |
|
|
for artwork in data['data']['artworks']: |
|
|
if artwork['type'] == 14: |
|
|
portrait.append(artwork) |
|
|
|
|
|
|
|
|
year = data['data']['year'] |
|
|
return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image,'portrait':portrait, 'banner':banner, 'overview':overview, 'trailers': trailers}) |
|
|
|
|
|
raise HTTPException(status_code=404, detail="Card not found") |
|
|
|
|
|
@app.get("/api/get/series/metadata/{title}") |
|
|
async def get_series_metadata_api(title: str): |
|
|
"""Endpoint to get the TV show metadata by title.""" |
|
|
if not title: |
|
|
raise HTTPException(status_code=400, detail="No title provided") |
|
|
full_dir_path = os.path.join(CACHE_DIR, 'series') |
|
|
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
|
|
|
|
|
if os.path.exists(json_cache_path): |
|
|
data = await read_json_file(json_cache_path) |
|
|
|
|
|
tv_structure_data = load_balancer.get_tv_structure(title) |
|
|
if tv_structure_data: |
|
|
data['file_structure'] = tv_structure_data |
|
|
|
|
|
return JSONResponse(content=data) |
|
|
|
|
|
raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
|
|
|
@app.get("/api/get/series/card/{title}") |
|
|
async def get_series_card_api(title: str): |
|
|
"""Endpoint to get the TV show metadata by title.""" |
|
|
if not title: |
|
|
raise HTTPException(status_code=400, detail="No title provided") |
|
|
full_dir_path = os.path.join(CACHE_DIR, 'series') |
|
|
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json") |
|
|
|
|
|
if os.path.exists(json_cache_path): |
|
|
data = await read_json_file(json_cache_path) |
|
|
image = data['data']['image'] |
|
|
trailers = data['data']['trailers'] or [] |
|
|
eng_title = None |
|
|
overview = None |
|
|
portrait = [] |
|
|
banner = [] |
|
|
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'): |
|
|
for name in data['data']['translations']['nameTranslations']: |
|
|
if name['language'] == 'eng': |
|
|
eng_title = name.get('name') |
|
|
break |
|
|
year = data['data']['year'] |
|
|
|
|
|
if data['data'].get('translations') and data['data']['translations'].get('overviewTranslations'): |
|
|
overviews = data['data']['translations']['overviewTranslations'] |
|
|
|
|
|
|
|
|
for o in overviews: |
|
|
if o['language'] == 'eng': |
|
|
overview = o.get('overview') |
|
|
break |
|
|
|
|
|
|
|
|
if not overview and len(overviews) == 1: |
|
|
overview = overviews[0].get('overview') |
|
|
|
|
|
if data['data'].get('artworks'): |
|
|
for artwork in data['data']['artworks']: |
|
|
if artwork['type'] == 3: |
|
|
banner.append(artwork) |
|
|
|
|
|
if data['data'].get('artworks'): |
|
|
for artwork in data['data']['artworks']: |
|
|
if artwork['type'] == 2: |
|
|
portrait.append(artwork) |
|
|
|
|
|
|
|
|
return JSONResponse(content={'title':eng_title or title, 'year': year, 'image': image, 'portrait':portrait,'banner': banner, 'overview':overview, 'trailers': trailers}) |
|
|
|
|
|
raise HTTPException(status_code=404, detail="Card not found") |
|
|
|
|
|
|
|
|
@app.get("/api/get/series/metadata/{title}/{season}") |
|
|
async def get_season_metadata_api(title: str, season: str): |
|
|
"""Endpoint to get the TV show season metadata by title and season.""" |
|
|
if not season: |
|
|
raise HTTPException(status_code=400, detail="Season must be provided and cannot be empty") |
|
|
|
|
|
|
|
|
json_cache_path = os.path.join(CACHE_DIR, "metadata", title, f"{season}.json") |
|
|
print(json_cache_path) |
|
|
|
|
|
if os.path.exists(json_cache_path): |
|
|
data = await read_json_file(json_cache_path) |
|
|
return JSONResponse(content=data) |
|
|
|
|
|
raise HTTPException(status_code=404, detail="Metadata not found") |
|
|
|
|
|
@app.get('/api/get/instances') |
|
|
async def get_instances(): |
|
|
return load_balancer.instances |
|
|
|
|
|
@app.get('/api/get/instances/health') |
|
|
async def get_instances_health(): |
|
|
return load_balancer.instances_health |
|
|
|
|
|
@app.get("/api/get/movie/{title}") |
|
|
async def get_movie_api(title: str): |
|
|
"""Endpoint to get the movie by title.""" |
|
|
if not title: |
|
|
raise HTTPException(status_code=400, detail="Title parameter is required") |
|
|
|
|
|
|
|
|
if title in load_balancer.FILM_STORE: |
|
|
url = load_balancer.FILM_STORE[title] |
|
|
return JSONResponse(content={"url": url}) |
|
|
|
|
|
movie_path = load_balancer.find_movie_path(title) |
|
|
|
|
|
if not movie_path: |
|
|
raise HTTPException(status_code=404, detail="Movie not found") |
|
|
|
|
|
|
|
|
response = load_balancer.download_film_to_best_instance(title=title) |
|
|
if response: |
|
|
return JSONResponse(content=response) |
|
|
|
|
|
@app.get("/api/get/series/{title}/{season}/{episode}") |
|
|
async def get_tv_show_api(title: str, season: str, episode: str): |
|
|
"""Endpoint to get the TV show by title, season, and episode.""" |
|
|
if not title or not season or not episode: |
|
|
raise HTTPException(status_code=400, detail="Title, season, and episode parameters are required") |
|
|
|
|
|
|
|
|
if title in load_balancer.TV_STORE and season in load_balancer.TV_STORE[title]: |
|
|
for ep in load_balancer.TV_STORE[title][season]: |
|
|
if episode in ep: |
|
|
url = load_balancer.TV_STORE[title][season][ep] |
|
|
return JSONResponse(content={"url": url}) |
|
|
|
|
|
tv_path = load_balancer.find_tv_path(title) |
|
|
|
|
|
if not tv_path: |
|
|
raise HTTPException(status_code=404, detail="TV show not found") |
|
|
|
|
|
episode_path = None |
|
|
for directory in load_balancer.file_structure: |
|
|
if directory['type'] == 'directory' and directory['path'] == 'tv': |
|
|
for sub_directory in directory['contents']: |
|
|
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
|
|
for season_dir in sub_directory['contents']: |
|
|
if season_dir['type'] == 'directory' and season in season_dir['path']: |
|
|
for episode_file in season_dir['contents']: |
|
|
if episode_file['type'] == 'file' and episode in episode_file['path']: |
|
|
episode_path = episode_file['path'] |
|
|
break |
|
|
|
|
|
if not episode_path: |
|
|
raise HTTPException(status_code=404, detail="Episode not found") |
|
|
|
|
|
|
|
|
response = load_balancer.download_episode_to_best_instance(title=title, season=season, episode=episode) |
|
|
if response: |
|
|
return JSONResponse(content=response) |