Spaces:
Build error
Build error
| from flask import Flask, jsonify, request, send_from_directory | |
| import os | |
| import json | |
| import threading | |
| import urllib.parse | |
| from hf_scrapper import download_file, get_system_proxies, get_download_progress | |
| from indexer import indexer | |
| from tvdb import fetch_and_cache_json | |
| import re | |
| import psutil | |
| import shutil | |
| app = Flask(__name__) | |
| # Constants and Configuration | |
| CACHE_DIR = os.getenv("CACHE_DIR") | |
| INDEX_FILE = os.getenv("INDEX_FILE") | |
| TOKEN = os.getenv("TOKEN") | |
| FILM_STORE_JSON_PATH = os.path.join(CACHE_DIR, "film_store.json") | |
| TV_STORE_JSON_PATH = os.path.join(CACHE_DIR, "tv_store.json") | |
| REPO = os.getenv("REPO") | |
| download_threads = {} | |
| # Ensure CACHE_DIR exists | |
| if not os.path.exists(CACHE_DIR): | |
| os.makedirs(CACHE_DIR) | |
| for path in [FILM_STORE_JSON_PATH, TV_STORE_JSON_PATH]: | |
| if not os.path.exists(path): | |
| with open(path, 'w') as json_file: | |
| json.dump({}, json_file) | |
| # Index the file structure | |
| indexer() | |
| # Load the file structure JSON | |
| if not os.path.exists(INDEX_FILE): | |
| raise FileNotFoundError(f"{INDEX_FILE} not found. Please make sure the file exists.") | |
| with open(INDEX_FILE, 'r') as f: | |
| file_structure = json.load(f) | |
| # Function Definitions | |
| def load_json(file_path): | |
| """Load JSON data from a file.""" | |
| with open(file_path, 'r') as file: | |
| return json.load(file) | |
| def find_movie_path(json_data, title): | |
| """Find the path of the movie in the JSON data based on the title.""" | |
| for directory in json_data: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| for item in sub_directory['contents']: | |
| if item['type'] == 'file' and title.lower() in item['path'].lower(): | |
| return item['path'] | |
| return None | |
| def find_tv_path(json_data, title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in json_data: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory['path'] | |
| return None | |
| def get_tv_structure(json_data,title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in json_data: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory | |
| return None | |
| def get_film_id(title): | |
| """Generate a film ID based on the title.""" | |
| return title.replace(" ", "_").lower() | |
| def prefetch_metadata(): | |
| """Prefetch metadata for all items in the file structure.""" | |
| for item in file_structure: | |
| if 'contents' in item: | |
| for sub_item in item['contents']: | |
| original_title = sub_item['path'].split('/')[-1] | |
| media_type = 'series' if item['path'].startswith('tv') else 'movie' | |
| title = original_title | |
| year = None | |
| # Extract year from the title if available | |
| match = re.search(r'\((\d{4})\)', original_title) | |
| if match: | |
| year_str = match.group(1) | |
| if year_str.isdigit() and len(year_str) == 4: | |
| title = original_title[:match.start()].strip() | |
| year = int(year_str) | |
| else: | |
| parts = original_title.rsplit(' ', 1) | |
| if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: | |
| title = parts[0].strip() | |
| year = int(parts[-1]) | |
| fetch_and_cache_json(original_title, title, media_type, year) | |
| def bytes_to_human_readable(num, suffix="B"): | |
| for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: | |
| if abs(num) < 1024.0: | |
| return f"{num:3.1f} {unit}{suffix}" | |
| num /= 1024.0 | |
| return f"{num:.1f} Y{suffix}" | |
| def get_all_tv_shows(indexed_cache): | |
| """Get all TV shows from the indexed cache structure JSON file.""" | |
| tv_shows = {} | |
| for directory in indexed_cache: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| show_title = sub_directory['path'].split('/')[-1] | |
| tv_shows[show_title] = [] | |
| for season_directory in sub_directory['contents']: | |
| if season_directory['type'] == 'directory': | |
| season = season_directory['path'].split('/')[-1] | |
| for episode in season_directory['contents']: | |
| if episode['type'] == 'file': | |
| tv_shows[show_title].append({ | |
| "season": season, | |
| "episode": episode['path'].split('/')[-1], | |
| "path": episode['path'] | |
| }) | |
| return tv_shows | |
| def get_all_films(indexed_cache): | |
| """Get all films from the indexed cache structure JSON file.""" | |
| films = [] | |
| for directory in indexed_cache: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| films.append(sub_directory['path']) | |
| return films | |
| def start_prefetching(): | |
| """Start the metadata prefetching in a separate thread.""" | |
| prefetch_metadata() | |
| # Start prefetching metadata | |
| thread = threading.Thread(target=start_prefetching) | |
| thread.daemon = True | |
| thread.start() | |
| # API Endpoints | |
| def get_movie_api(): | |
| """Endpoint to get the movie by title.""" | |
| title = request.args.get('title') | |
| if not title: | |
| return jsonify({"error": "Title parameter is required"}), 400 | |
| # Load the film store JSON | |
| with open(FILM_STORE_JSON_PATH, 'r') as json_file: | |
| film_store_data = json.load(json_file) | |
| # Check if the film is already cached | |
| if title in film_store_data: | |
| cache_path = film_store_data[title] | |
| if os.path.exists(cache_path): | |
| return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path)) | |
| movie_path = find_movie_path(file_structure, title) | |
| if not movie_path: | |
| return jsonify({"error": "Movie not found"}), 404 | |
| cache_path = os.path.join(CACHE_DIR, movie_path) | |
| file_url = f"https://huggingface.co/{REPO}/resolve/main/{movie_path}" | |
| proxies = get_system_proxies() | |
| film_id = get_film_id(title) | |
| # Start the download in a separate thread if not already downloading | |
| if film_id not in download_threads or not download_threads[film_id].is_alive(): | |
| thread = threading.Thread(target=download_file, args=(file_url, TOKEN, cache_path, proxies, film_id, title)) | |
| download_threads[film_id] = thread | |
| thread.start() | |
| return jsonify({"status": "Download started", "film_id": film_id}) | |
| def get_tv_show_api(): | |
| """Endpoint to get the TV show by title, season, and episode.""" | |
| title = request.args.get('title') | |
| season = request.args.get('season') | |
| episode = request.args.get('episode') | |
| if not title or not season or not episode: | |
| return jsonify({"error": "Title, season, and episode parameters are required"}), 400 | |
| # Load the TV store JSON | |
| with open(TV_STORE_JSON_PATH, 'r') as json_file: | |
| tv_store_data = json.load(json_file) | |
| # Check if the episode is already cached | |
| episode_key = f"{title}_S{season}_E{episode}" | |
| if episode_key in tv_store_data: | |
| cache_path = tv_store_data[episode_key] | |
| if os.path.exists(cache_path): | |
| return send_from_directory(os.path.dirname(cache_path), os.path.basename(cache_path)) | |
| tv_path = find_tv_path(file_structure, title) | |
| if not tv_path: | |
| return jsonify({"error": "TV show not found"}), 404 | |
| episode_path = None | |
| for directory in file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| for season_dir in sub_directory['contents']: | |
| if season_dir['type'] == 'directory' and season_dir['path'].endswith(f'Season {season}'): | |
| for episode_file in season_dir['contents']: | |
| if episode_file['type'] == 'file' and episode_file['path'].endswith(f'{episode}'): | |
| episode_path = episode_file['path'] | |
| break | |
| if not episode_path: | |
| return jsonify({"error": "Episode not found"}), 404 | |
| cache_path = os.path.join(CACHE_DIR, episode_path) | |
| file_url = f"https://huggingface.co/{REPO}/resolve/main/{episode_path}" | |
| proxies = get_system_proxies() | |
| episode_id = f"{title}_S{season}_E{episode}" | |
| # Start the download in a separate thread if not already downloading | |
| if episode_id not in download_threads or not download_threads[episode_id].is_alive(): | |
| thread = threading.Thread(target=download_file, args=(file_url, TOKEN, cache_path, proxies, episode_id, title)) | |
| download_threads[episode_id] = thread | |
| thread.start() | |
| return jsonify({"status": "Download started", "episode_id": episode_id}) | |
| def get_progress_api(id): | |
| """Endpoint to get the download progress of a movie or TV show episode.""" | |
| progress = get_download_progress(id) | |
| return jsonify({"id": id, "progress": progress}) | |
| def get_film_id_by_title_api(): | |
| """Endpoint to get the film ID by providing the movie title.""" | |
| title = request.args.get('title') | |
| if not title: | |
| return jsonify({"error": "Title parameter is required"}), 400 | |
| film_id = get_film_id(title) | |
| return jsonify({"film_id": film_id}) | |
| def get_episode_id_api(): | |
| """Endpoint to get the episode ID by providing the TV show title, season, and episode.""" | |
| title = request.args.get('title') | |
| season = request.args.get('season') | |
| episode = request.args.get('episode') | |
| if not title or not season or not episode: | |
| return jsonify({"error": "Title, season, and episode parameters are required"}), 400 | |
| episode_id = f"{title}_S{season}_E{episode}" | |
| return jsonify({"episode_id": episode_id}) | |
| def get_cache_size_api(): | |
| total_size = 0 | |
| for dirpath, dirnames, filenames in os.walk(CACHE_DIR): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| total_size += os.path.getsize(fp) | |
| readable_size = bytes_to_human_readable(total_size) | |
| return jsonify({"cache_size": readable_size}) | |
| def clear_cache_api(): | |
| for dirpath, dirnames, filenames in os.walk(CACHE_DIR): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| os.remove(fp) | |
| return jsonify({"status": "Cache cleared"}) | |
| def get_tv_store_api(): | |
| """Endpoint to get the TV store JSON.""" | |
| if os.path.exists(TV_STORE_JSON_PATH): | |
| with open(TV_STORE_JSON_PATH, 'r') as json_file: | |
| tv_store_data = json.load(json_file) | |
| return jsonify(tv_store_data) | |
| return jsonify({}), 404 | |
| def get_film_store_api(): | |
| """Endpoint to get the TV store JSON.""" | |
| if os.path.exists(FILM_STORE_JSON_PATH): | |
| with open(FILM_STORE_JSON_PATH, 'r') as json_file: | |
| tv_store_data = json.load(json_file) | |
| return jsonify(tv_store_data) | |
| return jsonify({}), 404 | |
| def get_film_metadata_api(): | |
| """Endpoint to get the film metadata by title.""" | |
| title = request.args.get('title') | |
| if not title: | |
| return jsonify({'error': 'No title provided'}), 400 | |
| json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") | |
| if os.path.exists(json_cache_path): | |
| with open(json_cache_path, 'r') as f: | |
| data = json.load(f) | |
| return jsonify(data) | |
| return jsonify({'error': 'Metadata not found'}), 404 | |
| def get_tv_metadata_api(): | |
| """Endpoint to get the TV show metadata by title.""" | |
| title = request.args.get('title') | |
| if not title: | |
| return jsonify({'error': 'No title provided'}), 400 | |
| json_cache_path = os.path.join(CACHE_DIR, f"{urllib.parse.quote(title)}.json") | |
| if os.path.exists(json_cache_path): | |
| with open(json_cache_path, 'r') as f: | |
| data = json.load(f) | |
| # Add the file structure to the metadata | |
| tv_structure_data = get_tv_structure(file_structure, title) | |
| if tv_structure_data: | |
| data['file_structure'] = tv_structure_data | |
| return jsonify(data) | |
| return jsonify({'error': 'Metadata not found'}), 404 | |
| def get_all_films_api(): | |
| return get_all_films(file_structure) | |
| def get_all_tvshows_api(): | |
| return get_all_tv_shows(file_structure) | |
| # Routes | |
| def index(): | |
| return "Server Running" | |
| # Main entry point | |
| if __name__ == "__main__": | |
| app.run(debug=True, host="0.0.0.0", port=7860) | |