load-balancer / app.py
ChandimaPrabath's picture
genre list
fdcd21d
raw
history blame
10.6 kB
from fastapi import FastAPI,HTTPException, Request, Query
from fastapi.responses import JSONResponse
from typing import Optional, List
from LoadBalancer import LoadBalancer
import logging
import os
import urllib.parse
from utils import read_json_file, is_valid_url
from tvdb import recent_list, genre_list
CACHE_DIR = os.getenv("CACHE_DIR")
TOKEN = os.getenv("TOKEN")
REPO = os.getenv("REPO")
app = FastAPI()
@app.on_event("startup")
async def startup_event():
global load_balancer
load_balancer = LoadBalancer(cache_dir=CACHE_DIR, token=TOKEN, repo=REPO)
@app.get("/")
def greet_json():
return {"Version": load_balancer.version}
@app.post("/api/post/register")
async def register_instance(request: Request):
try:
data = await request.json()
if not data or "url" not in data:
return JSONResponse(content={"error": "No URL provided"}, status_code=400)
url = data["url"]
if not is_valid_url(url):
return JSONResponse(content={"error": "Invalid URL"}, status_code=400)
# Register the instance
load_balancer.register_instance(url)
logging.info(f"Instance registered: {url}")
return JSONResponse(content={"message": f"Instance {url} registered successfully"}, status_code=200)
except Exception as e:
logging.error(f"Error registering instance: {e}")
return JSONResponse(content={"error": "Failed to register instance"}, status_code=500)
@app.get("/api/get/file_structure")
async def get_file_structure():
return load_balancer.file_structure
@app.get("/api/get/movie/store")
async def get_movie_store():
return load_balancer.FILM_STORE
@app.get("/api/get/series/store")
async def get_series_store():
return load_balancer.TV_STORE
@app.get("/api/get/movie/all")
async def get_all_movies_api():
return load_balancer.get_all_films()
@app.get("/api/get/series/all")
async def get_all_tvshows_api():
return load_balancer.get_all_tv_shows()
@app.get("/api/get/recent")
async def get_recent_items(limit: int = 5):
# Get sorted entries
recent_films = recent_list.get_sorted_entries('film')
recent_series = recent_list.get_sorted_entries('series')
# Slice the lists to only return the desired number of items
limited_films = recent_films[:limit]
limited_series = recent_series[:limit]
# Return combined results
return JSONResponse(content={
'movies': limited_films,
'series': limited_series
})
@app.get("/api/get/genre")
async def get_genre_items(genre: List[str] = Query(...), media_type: Optional[str] = None, limit: int = 5):
"""
Get recent items from specified genres with an optional media type filter and a limit on the number of results.
:param genre: The genres to filter by (e.g., 'Comedy').
:param media_type: Optional. Filter by media type ('movie' or 'series').
:param limit: The maximum number of items to return.
:return: A JSON response containing the filtered items.
"""
# Get sorted entries based on genres and media type
entries = genre_list.get_entries_by_multiple_genres(genre, media_type=media_type)
# Slice the list to only return the desired number of items
limited_entries = entries[:limit]
# Organize the results by media type
results = {
'movies': [entry for entry in limited_entries if entry[4] == 'movie'],
'series': [entry for entry in limited_entries if entry[4] == 'series']
}
# Return the results in a JSON response
return JSONResponse(content=results)
@app.get("/api/get/movie/metadata/{title}")
async def get_movie_metadata_api(title: str):
"""Endpoint to get the movie metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'movie')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get("/api/get/movie/card/{title}")
async def get_movie_card_api(title: str):
"""Endpoint to get the movie metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'movie')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
image = data['data']['image']
eng_title = None
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'):
for name in data['data']['translations']['nameTranslations']:
if name['language'] == 'eng':
eng_title = name.get('name')
break
year = data['data']['year']
return JSONResponse(content={'title':eng_title, 'year': year, 'image': image})
raise HTTPException(status_code=404, detail="Card not found")
@app.get("/api/get/series/metadata/{title}")
async def get_series_metadata_api(title: str):
"""Endpoint to get the TV show metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'series')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
# Add the file structure to the metadata
tv_structure_data = load_balancer.get_tv_structure(title)
if tv_structure_data:
data['file_structure'] = tv_structure_data
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get("/api/get/series/card/{title}")
async def get_series_card_api(title: str):
"""Endpoint to get the TV show metadata by title."""
if not title:
raise HTTPException(status_code=400, detail="No title provided")
full_dir_path = os.path.join(CACHE_DIR, 'series')
json_cache_path = os.path.join(full_dir_path,f"{urllib.parse.quote(title)}.json")
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
image = data['data']['image']
eng_title = None
if data['data'].get('translations') and data['data']['translations'].get('nameTranslations'):
for name in data['data']['translations']['nameTranslations']:
if name['language'] == 'eng':
eng_title = name.get('name')
break
year = data['data']['year']
return JSONResponse(content={'title':eng_title, 'year': year, 'image': image})
raise HTTPException(status_code=404, detail="Card not found")
@app.get("/api/get/series/metadata/{series_id}/{season}")
async def get_season_metadata_api(series_id: int, season: str):
"""Endpoint to get the TV show season metadata by id and season."""
if not season:
raise HTTPException(status_code=400, detail="Season must be provided and cannot be empty")
# Convert series_id to string before joining the path
json_cache_path = os.path.join(CACHE_DIR, "metadata", str(series_id), f"{season}.json")
print(json_cache_path)
if os.path.exists(json_cache_path):
data = await read_json_file(json_cache_path)
return JSONResponse(content=data)
raise HTTPException(status_code=404, detail="Metadata not found")
@app.get('/api/get/instances')
async def get_instances():
return load_balancer.instances
@app.get('/api/get/instances/health')
async def get_instances_health():
return load_balancer.instances_health
@app.get("/api/get/movie/{title}")
async def get_movie_api(title: str):
"""Endpoint to get the movie by title."""
if not title:
raise HTTPException(status_code=400, detail="Title parameter is required")
# Check if the movie is already cached
if title in load_balancer.FILM_STORE:
url = load_balancer.FILM_STORE[title]
return JSONResponse(content={"url": url})
movie_path = load_balancer.find_movie_path(title)
if not movie_path:
raise HTTPException(status_code=404, detail="Movie not found")
# Start the download in an instance
response = load_balancer.download_film_to_best_instance(title=title)
if response:
return JSONResponse(content=response)
@app.get("/api/get/series/{title}/{season}/{episode}")
async def get_tv_show_api(title: str, season: str, episode: str):
"""Endpoint to get the TV show by title, season, and episode."""
if not title or not season or not episode:
raise HTTPException(status_code=400, detail="Title, season, and episode parameters are required")
# Check if the episode is already cached
if title in load_balancer.TV_STORE and season in load_balancer.TV_STORE[title]:
for ep in load_balancer.TV_STORE[title][season]:
if episode in ep:
url = load_balancer.TV_STORE[title][season][ep]
return JSONResponse(content={"url": url})
tv_path = load_balancer.find_tv_path(title)
if not tv_path:
raise HTTPException(status_code=404, detail="TV show not found")
episode_path = None
for directory in load_balancer.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
for season_dir in sub_directory['contents']:
if season_dir['type'] == 'directory' and season in season_dir['path']:
for episode_file in season_dir['contents']:
if episode_file['type'] == 'file' and episode in episode_file['path']:
episode_path = episode_file['path']
break
if not episode_path:
raise HTTPException(status_code=404, detail="Episode not found")
# Start the download in an instance
response = load_balancer.download_episode_to_best_instance(title=title, season=season, episode=episode)
if response:
return JSONResponse(content=response)