| | import os |
| | import requests |
| | import json |
| | import urllib.request |
| | import time |
| | from threading import Thread, Event |
| | from requests.exceptions import RequestException |
| | from tqdm import tqdm |
| | from indexer import indexer |
| | import logging |
| |
|
| | CACHE_DIR = os.getenv("CACHE_DIR") |
| |
|
| | download_progress = {} |
| |
|
| | class Instance: |
| | def __init__(self, id, url, cache_dir, index_file, token, repo, load_balancer_api, max_retries=20, initial_delay=1): |
| | self.version = "0.0.0.1 Alpha" |
| | self.id = id |
| | self.url = url |
| | self.CACHE_DIR = cache_dir |
| | self.INDEX_FILE = index_file |
| | self.TOKEN = token |
| | self.REPO = repo |
| | self.FILM_STORE_JSON_PATH = os.path.join(cache_dir, "film_store.json") |
| | self.TV_STORE_JSON_PATH = os.path.join(cache_dir, "tv_store.json") |
| | self.download_threads = {} |
| | self.file_structure = None |
| | self.load_balancer_api = load_balancer_api |
| | self.max_retries = max_retries |
| | self.initial_delay = initial_delay |
| | self.last_report_time = time.time() |
| | self.re_register_event = Event() |
| |
|
| | |
| | if not os.path.exists(self.CACHE_DIR): |
| | os.makedirs(self.CACHE_DIR) |
| |
|
| | for path in [self.FILM_STORE_JSON_PATH, self.TV_STORE_JSON_PATH]: |
| | if not os.path.exists(path): |
| | with open(path, 'w') as json_file: |
| | json.dump({}, json_file) |
| |
|
| | |
| | self.run_indexer_and_load() |
| |
|
| | |
| | self.register_to_load_balancer() |
| | registration_thread = Thread(target=self.monitor_registration) |
| | registration_thread.daemon = True |
| | registration_thread.start() |
| |
|
| | |
| | indexer_thread = Thread(target=self.run_indexer_periodically) |
| | indexer_thread.daemon = True |
| | indexer_thread.start() |
| |
|
| | def run_indexer_and_load(self): |
| | """Runs the indexer and loads the file structure from INDEX_FILE.""" |
| | indexer() |
| | if not os.path.exists(self.INDEX_FILE): |
| | raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") |
| |
|
| | with open(self.INDEX_FILE, 'r') as f: |
| | self.file_structure = json.load(f) |
| | logging.info("File structure reloaded successfully.") |
| |
|
| | def run_indexer_periodically(self): |
| | """Periodically reruns the indexer and reloads the file structure.""" |
| | while True: |
| | time.sleep(120) |
| | logging.info("Re-running indexer and reloading file structure.") |
| | self.run_indexer_and_load() |
| |
|
| | def compile_report(self): |
| | self.last_report_time = time.time() |
| | |
| | film_store_path = os.path.join(self.CACHE_DIR, "film_store.json") |
| | tv_store_path = os.path.join(self.CACHE_DIR, "tv_store.json") |
| | cache_size = self.get_cache_size() |
| |
|
| | report = { |
| | "instance_id": self.id, |
| | "instance_url": self.url, |
| | "film_store": self.read_json(film_store_path), |
| | "tv_store": self.read_json(tv_store_path), |
| | "cache_size": cache_size |
| | } |
| | return report |
| |
|
| | def register_to_load_balancer(self): |
| | result = self.load_balancer_api.register_instance(self.id, self.url) |
| | if result is not None: |
| | logging.info(f'Registered instance {self.id} to load balancer.') |
| | else: |
| | logging.error(f'Failed to register instance {self.id} to load balancer.') |
| |
|
| | def monitor_registration(self): |
| | while True: |
| | if time.time() - self.last_report_time > 60: |
| | logging.info('1 minute passed since last report. Re-registering...') |
| | self.register_to_load_balancer() |
| | self.last_report_time = time.time() |
| | time.sleep(30) |
| |
|
| | def get_cache_size(self): |
| | total_size = 0 |
| | for dirpath, dirnames, filenames in os.walk(CACHE_DIR): |
| | for f in filenames: |
| | fp = os.path.join(dirpath, f) |
| | total_size += os.path.getsize(fp) |
| | return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"} |
| |
|
| | @staticmethod |
| | def read_json(file_path): |
| | if os.path.exists(file_path): |
| | with open(file_path, 'r') as json_file: |
| | return json.load(json_file) |
| | return {} |
| |
|
| | @staticmethod |
| | def get_system_proxies(): |
| | """ |
| | Retrieves the system's HTTP and HTTPS proxies. |
| | |
| | Returns: |
| | dict: A dictionary containing the proxies. |
| | """ |
| | try: |
| | proxies = urllib.request.getproxies() |
| | print("System proxies:", proxies) |
| | return { |
| | "http": proxies.get("http"), |
| | "https": proxies.get("http") |
| | } |
| | except Exception as e: |
| | print(f"Error getting system proxies: {e}") |
| | return {} |
| |
|
| | def download_film(self, file_url, token, cache_path, proxies, film_id, title, chunk_size=100 * 1024 * 1024): |
| | """ |
| | Downloads a file from the specified URL and saves it to the cache path. |
| | Tracks the download progress. |
| | |
| | Args: |
| | file_url (str): The URL of the file to download. |
| | token (str): The authorization token for the request. |
| | cache_path (str): The path to save the downloaded file. |
| | proxies (dict): Proxies for the request. |
| | film_id (str): Unique identifier for the film download. |
| | title (str): The title of the film. |
| | chunk_size (int): Size of each chunk to download. |
| | """ |
| | print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") |
| | headers = {'Authorization': f'Bearer {token}'} |
| | try: |
| | response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) |
| | response.raise_for_status() |
| | |
| | total_size = int(response.headers.get('content-length', 0)) |
| | download_progress[film_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} |
| |
|
| | os.makedirs(os.path.dirname(cache_path), exist_ok=True) |
| | with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: |
| | for data in response.iter_content(chunk_size=chunk_size): |
| | file.write(data) |
| | pbar.update(len(data)) |
| | download_progress[film_id]["downloaded"] += len(data) |
| | |
| | print(f'File cached to {cache_path} successfully.') |
| | self.update_film_store_json(title, cache_path) |
| | download_progress[film_id]["status"] = "Completed" |
| | except RequestException as e: |
| | print(f"Error downloading file: {e}") |
| | download_progress[film_id]["status"] = "Failed" |
| | except IOError as e: |
| | print(f"Error writing file {cache_path}: {e}") |
| | download_progress[film_id]["status"] = "Failed" |
| | finally: |
| | if download_progress[film_id]["status"] != "Downloading": |
| | download_progress[film_id]["end_time"] = time.time() |
| |
|
| | @staticmethod |
| | def get_download_progress(id): |
| | """ |
| | Gets the download progress for a specific film. |
| | |
| | Args: |
| | film_id (str): The unique identifier for the film download. |
| | |
| | Returns: |
| | dict: A dictionary containing the total size, downloaded size, progress percentage, status, and ETA. |
| | """ |
| | if id in download_progress: |
| | total = download_progress[id]["total"] |
| | downloaded = download_progress[id]["downloaded"] |
| | status = download_progress[id].get("status", "In Progress") |
| | progress = (downloaded / total) * 100 if total > 0 else 0 |
| |
|
| | eta = None |
| | if status == "Downloading" and downloaded > 0: |
| | elapsed_time = time.time() - download_progress[id]["start_time"] |
| | estimated_total_time = elapsed_time * (total / downloaded) |
| | eta = estimated_total_time - elapsed_time |
| | elif status == "Completed": |
| | eta = 0 |
| |
|
| | return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta} |
| | return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None} |
| |
|
| | def update_film_store_json(self,title, cache_path): |
| | """ |
| | Updates the film store JSON with the new file. |
| | |
| | Args: |
| | title (str): The title of the film. |
| | cache_path (str): The local path where the file is saved. |
| | """ |
| | film_store_data = {} |
| | if os.path.exists(self.FILM_STORE_JSON_PATH): |
| | with open(self.FILM_STORE_JSON_PATH, 'r') as json_file: |
| | film_store_data = json.load(json_file) |
| |
|
| | film_store_data[title] = cache_path |
| |
|
| | with open(self.FILM_STORE_JSON_PATH, 'w') as json_file: |
| | json.dump(film_store_data, json_file, indent=2) |
| | print(f'Film store updated with {title}.') |
| |
|
| | def download_episode(self, file_url, token, cache_path, proxies, episode_id, title, chunk_size=100 * 1024 * 1024): |
| | """ |
| | Downloads a file from the specified URL and saves it to the cache path. |
| | Tracks the download progress. |
| | |
| | Args: |
| | file_url (str): The URL of the file to download. |
| | token (str): The authorization token for the request. |
| | cache_path (str): The path to save the downloaded file. |
| | proxies (dict): Proxies for the request. |
| | episode_id (str): Unique identifier for the film download. |
| | title (str): The title of the film. |
| | chunk_size (int): Size of each chunk to download. |
| | """ |
| | print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") |
| | headers = {'Authorization': f'Bearer {token}'} |
| | try: |
| | response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) |
| | response.raise_for_status() |
| | |
| | total_size = int(response.headers.get('content-length', 0)) |
| | download_progress[episode_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} |
| |
|
| | os.makedirs(os.path.dirname(cache_path), exist_ok=True) |
| | with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: |
| | for data in response.iter_content(chunk_size=chunk_size): |
| | file.write(data) |
| | pbar.update(len(data)) |
| | download_progress[episode_id]["downloaded"] += len(data) |
| | |
| | print(f'File cached to {cache_path} successfully.') |
| | self.update_tv_store_json(title, cache_path) |
| | download_progress[episode_id]["status"] = "Completed" |
| | except RequestException as e: |
| | print(f"Error downloading file: {e}") |
| | download_progress[episode_id]["status"] = "Failed" |
| | except IOError as e: |
| | print(f"Error writing file {cache_path}: {e}") |
| | download_progress[episode_id]["status"] = "Failed" |
| | finally: |
| | if download_progress[episode_id]["status"] != "Downloading": |
| | download_progress[episode_id]["end_time"] = time.time() |
| |
|
| | def update_tv_store_json(self, title, cache_path): |
| | """ |
| | Updates the TV store JSON with the new file, organizing by title, season, and episode. |
| | |
| | Args: |
| | title (str): The title of the TV show. |
| | cache_path (str): The local path where the file is saved. |
| | """ |
| | tv_store_data = {} |
| | if os.path.exists(self.TV_STORE_JSON_PATH): |
| | with open(self.TV_STORE_JSON_PATH, 'r') as json_file: |
| | tv_store_data = json.load(json_file) |
| |
|
| | |
| | season_part = os.path.basename(os.path.dirname(cache_path)) |
| | episode_part = os.path.basename(cache_path) |
| |
|
| | |
| | if title not in tv_store_data: |
| | tv_store_data[title] = {} |
| | |
| | if season_part not in tv_store_data[title]: |
| | tv_store_data[title][season_part] = {} |
| |
|
| | |
| | tv_store_data[title][season_part][episode_part] = cache_path |
| |
|
| | with open(self.TV_STORE_JSON_PATH, 'w') as json_file: |
| | json.dump(tv_store_data, json_file, indent=2) |
| |
|
| | print(f'TV store updated with {title}, {season_part}, {episode_part}.') |
| |
|
| |
|
| | def load_json(self, file_path): |
| | """Load JSON data from a file.""" |
| | with open(file_path, 'r') as file: |
| | return json.load(file) |
| |
|
| | def find_movie_path(self, title): |
| | """Find the path of the movie in the JSON data based on the title.""" |
| | for directory in self.file_structure: |
| | if directory['type'] == 'directory' and directory['path'] == 'films': |
| | for sub_directory in directory['contents']: |
| | if sub_directory['type'] == 'directory': |
| | for item in sub_directory['contents']: |
| | if item['type'] == 'file' and title.lower() in item['path'].lower(): |
| | return item['path'] |
| | return None |
| |
|
| | def find_tv_path(self, title): |
| | """Find the path of the TV show in the JSON data based on the title.""" |
| | for directory in self.file_structure: |
| | if directory['type'] == 'directory' and directory['path'] == 'tv': |
| | for sub_directory in directory['contents']: |
| | if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
| | return sub_directory['path'] |
| | return None |
| |
|
| | def get_tv_structure(self, title): |
| | """Find the path of the TV show in the JSON data based on the title.""" |
| | for directory in self.file_structure: |
| | if directory['type'] == 'directory' and directory['path'] == 'tv': |
| | for sub_directory in directory['contents']: |
| | if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): |
| | return sub_directory |
| | return None |
| |
|
| | def get_film_id(self, title): |
| | """Generate a film ID based on the title.""" |
| | return title.replace(" ", "_").lower() |
| |
|
| | def bytes_to_human_readable(self, num, suffix="B"): |
| | for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: |
| | if abs(num) < 1024.0: |
| | return f"{num:3.1f} {unit}{suffix}" |
| | num /= 1024.0 |
| | return f"{num:.1f} Y{suffix}" |
| |
|
| | def encode_episodeid(self, title, season, episode): |
| | return f"{title}_{season}_{episode}" |
| |
|
| | def get_all_tv_shows(self): |
| | """Get all TV shows from the indexed cache structure JSON file.""" |
| | tv_shows = {} |
| | for directory in self.file_structure: |
| | if directory['type'] == 'directory' and directory['path'] == 'tv': |
| | for sub_directory in directory['contents']: |
| | if sub_directory['type'] == 'directory': |
| | show_title = sub_directory['path'].split('/')[-1] |
| | tv_shows[show_title] = [] |
| | for season_directory in sub_directory['contents']: |
| | if season_directory['type'] == 'directory': |
| | season = season_directory['path'].split('/')[-1] |
| | for episode in season_directory['contents']: |
| | if episode['type'] == 'file': |
| | tv_shows[show_title].append({ |
| | "season": season, |
| | "episode": episode['path'].split('/')[-1], |
| | "path": episode['path'] |
| | }) |
| | return tv_shows |
| |
|
| | def get_all_films(self): |
| | """Get all films from the indexed cache structure JSON file.""" |
| | films = [] |
| | for directory in self.file_structure: |
| | if directory['type'] == 'directory' and directory['path'] == 'films': |
| | for sub_directory in directory['contents']: |
| | if sub_directory['type'] == 'directory': |
| | films.append(sub_directory['path']) |
| | return films |
| |
|
| | def register_to_load_balancer(self): |
| | retries = 0 |
| | delay = self.initial_delay |
| | max_delay = 120 |
| |
|
| | while True: |
| | try: |
| | result = self.load_balancer_api.register_instance(self.id, self.url) |
| | if result: |
| | logging.info(f'Successfully registered instance {self.id} to load balancer.') |
| | return result |
| |
|
| | except Exception as e: |
| | logging.error(f'Error during registration: {e}') |
| | |
| | retries += 1 |
| | logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...') |
| | time.sleep(delay) |
| | delay = min(delay * 2, max_delay) |