| import base64 |
| import hashlib |
| import json |
| import re |
| import aiohttp |
| import bencodepy |
| import PTT |
| import asyncio |
|
|
| from RTN import parse, title_match |
| from curl_cffi import requests |
| from fastapi import Request |
|
|
| from comet.utils.logger import logger |
| from comet.utils.models import settings, ConfigModel |
|
|
| languages_emojis = { |
| "multi": "🌎", |
| "en": "🇬🇧", |
| "ja": "🇯🇵", |
| "zh": "🇨🇳", |
| "ru": "🇷🇺", |
| "ar": "🇸🇦", |
| "pt": "🇵🇹", |
| "es": "🇪🇸", |
| "fr": "🇫🇷", |
| "de": "🇩🇪", |
| "it": "🇮🇹", |
| "ko": "🇰🇷", |
| "hi": "🇮🇳", |
| "bn": "🇧🇩", |
| "pa": "🇵🇰", |
| "mr": "🇮🇳", |
| "gu": "🇮🇳", |
| "ta": "🇮🇳", |
| "te": "🇮🇳", |
| "kn": "🇮🇳", |
| "ml": "🇮🇳", |
| "th": "🇹🇭", |
| "vi": "🇻🇳", |
| "id": "🇮🇩", |
| "tr": "🇹🇷", |
| "he": "🇮🇱", |
| "fa": "🇮🇷", |
| "uk": "🇺🇦", |
| "el": "🇬🇷", |
| "lt": "🇱🇹", |
| "lv": "🇱🇻", |
| "et": "🇪🇪", |
| "pl": "🇵🇱", |
| "cs": "🇨🇿", |
| "sk": "🇸🇰", |
| "hu": "🇭🇺", |
| "ro": "🇷🇴", |
| "bg": "🇧🇬", |
| "sr": "🇷🇸", |
| "hr": "🇭🇷", |
| "sl": "🇸🇮", |
| "nl": "🇳🇱", |
| "da": "🇩🇰", |
| "fi": "🇫🇮", |
| "sv": "🇸🇪", |
| "no": "🇳🇴", |
| "ms": "🇲🇾", |
| "la": "💃🏻", |
| } |
|
|
|
|
| def get_language_emoji(language: str): |
| language_formatted = language.lower() |
| return ( |
| languages_emojis[language_formatted] |
| if language_formatted in languages_emojis |
| else language |
| ) |
|
|
|
|
| translation_table = { |
| "ā": "a", |
| "ă": "a", |
| "ą": "a", |
| "ć": "c", |
| "č": "c", |
| "ç": "c", |
| "ĉ": "c", |
| "ċ": "c", |
| "ď": "d", |
| "đ": "d", |
| "è": "e", |
| "é": "e", |
| "ê": "e", |
| "ë": "e", |
| "ē": "e", |
| "ĕ": "e", |
| "ę": "e", |
| "ě": "e", |
| "ĝ": "g", |
| "ğ": "g", |
| "ġ": "g", |
| "ģ": "g", |
| "ĥ": "h", |
| "î": "i", |
| "ï": "i", |
| "ì": "i", |
| "í": "i", |
| "ī": "i", |
| "ĩ": "i", |
| "ĭ": "i", |
| "ı": "i", |
| "ĵ": "j", |
| "ķ": "k", |
| "ĺ": "l", |
| "ļ": "l", |
| "ł": "l", |
| "ń": "n", |
| "ň": "n", |
| "ñ": "n", |
| "ņ": "n", |
| "ʼn": "n", |
| "ó": "o", |
| "ô": "o", |
| "õ": "o", |
| "ö": "o", |
| "ø": "o", |
| "ō": "o", |
| "ő": "o", |
| "œ": "oe", |
| "ŕ": "r", |
| "ř": "r", |
| "ŗ": "r", |
| "š": "s", |
| "ş": "s", |
| "ś": "s", |
| "ș": "s", |
| "ß": "ss", |
| "ť": "t", |
| "ţ": "t", |
| "ū": "u", |
| "ŭ": "u", |
| "ũ": "u", |
| "û": "u", |
| "ü": "u", |
| "ù": "u", |
| "ú": "u", |
| "ų": "u", |
| "ű": "u", |
| "ŵ": "w", |
| "ý": "y", |
| "ÿ": "y", |
| "ŷ": "y", |
| "ž": "z", |
| "ż": "z", |
| "ź": "z", |
| "æ": "ae", |
| "ǎ": "a", |
| "ǧ": "g", |
| "ə": "e", |
| "ƒ": "f", |
| "ǐ": "i", |
| "ǒ": "o", |
| "ǔ": "u", |
| "ǚ": "u", |
| "ǜ": "u", |
| "ǹ": "n", |
| "ǻ": "a", |
| "ǽ": "ae", |
| "ǿ": "o", |
| } |
|
|
| translation_table = str.maketrans(translation_table) |
| info_hash_pattern = re.compile(r"\b([a-fA-F0-9]{40})\b") |
|
|
|
|
| def translate(title: str): |
| return title.translate(translation_table) |
|
|
|
|
| def is_video(title: str): |
| return title.endswith( |
| tuple( |
| [ |
| ".mkv", |
| ".mp4", |
| ".avi", |
| ".mov", |
| ".flv", |
| ".wmv", |
| ".webm", |
| ".mpg", |
| ".mpeg", |
| ".m4v", |
| ".3gp", |
| ".3g2", |
| ".ogv", |
| ".ogg", |
| ".drc", |
| ".gif", |
| ".gifv", |
| ".mng", |
| ".avi", |
| ".mov", |
| ".qt", |
| ".wmv", |
| ".yuv", |
| ".rm", |
| ".rmvb", |
| ".asf", |
| ".amv", |
| ".m4p", |
| ".m4v", |
| ".mpg", |
| ".mp2", |
| ".mpeg", |
| ".mpe", |
| ".mpv", |
| ".mpg", |
| ".mpeg", |
| ".m2v", |
| ".m4v", |
| ".svi", |
| ".3gp", |
| ".3g2", |
| ".mxf", |
| ".roq", |
| ".nsv", |
| ".flv", |
| ".f4v", |
| ".f4p", |
| ".f4a", |
| ".f4b", |
| ] |
| ) |
| ) |
|
|
|
|
| def bytes_to_size(bytes: int): |
| sizes = ["Bytes", "KB", "MB", "GB", "TB"] |
| if bytes == 0: |
| return "0 Byte" |
|
|
| i = 0 |
| while bytes >= 1024 and i < len(sizes) - 1: |
| bytes /= 1024 |
| i += 1 |
|
|
| return f"{round(bytes, 2)} {sizes[i]}" |
|
|
|
|
| def config_check(b64config: str): |
| try: |
| config = json.loads(base64.b64decode(b64config).decode()) |
| validated_config = ConfigModel(**config) |
| return validated_config.model_dump() |
| except: |
| return False |
|
|
|
|
| def get_debrid_extension(debridService: str): |
| debrid_extension = None |
| if debridService == "realdebrid": |
| debrid_extension = "RD" |
| elif debridService == "alldebrid": |
| debrid_extension = "AD" |
| elif debridService == "premiumize": |
| debrid_extension = "PM" |
| elif debridService == "torbox": |
| debrid_extension = "TB" |
| elif debridService == "debridlink": |
| debrid_extension = "DL" |
|
|
| return debrid_extension |
|
|
|
|
| async def get_indexer_manager( |
| session: aiohttp.ClientSession, |
| indexer_manager_type: str, |
| indexers: list, |
| query: str, |
| ): |
| results = [] |
| try: |
| indexers = [indexer.replace("_", " ") for indexer in indexers] |
|
|
| if indexer_manager_type == "jackett": |
|
|
| async def fetch_jackett_results( |
| session: aiohttp.ClientSession, indexer: str, query: str |
| ): |
| try: |
| async with session.get( |
| f"{settings.INDEXER_MANAGER_URL}/api/v2.0/indexers/all/results?apikey={settings.INDEXER_MANAGER_API_KEY}&Query={query}&Tracker[]={indexer}", |
| timeout=aiohttp.ClientTimeout( |
| total=settings.INDEXER_MANAGER_TIMEOUT |
| ), |
| ) as response: |
| response_json = await response.json() |
| return response_json.get("Results", []) |
| except Exception as e: |
| logger.warning( |
| f"Exception while fetching Jackett results for indexer {indexer}: {e}" |
| ) |
| return [] |
|
|
| tasks = [ |
| fetch_jackett_results(session, indexer, query) for indexer in indexers |
| ] |
| all_results = await asyncio.gather(*tasks) |
|
|
| for result_set in all_results: |
| results.extend(result_set) |
|
|
| elif indexer_manager_type == "prowlarr": |
| get_indexers = await session.get( |
| f"{settings.INDEXER_MANAGER_URL}/api/v1/indexer", |
| headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, |
| ) |
| get_indexers = await get_indexers.json() |
|
|
| indexers_id = [] |
| for indexer in get_indexers: |
| if ( |
| indexer["name"].lower() in indexers |
| or indexer["definitionName"].lower() in indexers |
| ): |
| indexers_id.append(indexer["id"]) |
|
|
| response = await session.get( |
| f"{settings.INDEXER_MANAGER_URL}/api/v1/search?query={query}&indexerIds={'&indexerIds='.join(str(indexer_id) for indexer_id in indexers_id)}&type=search", |
| headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, |
| ) |
| response = await response.json() |
|
|
| for result in response: |
| result["InfoHash"] = ( |
| result["infoHash"] if "infoHash" in result else None |
| ) |
| result["Title"] = result["title"] |
| result["Size"] = result["size"] |
| result["Link"] = ( |
| result["downloadUrl"] if "downloadUrl" in result else None |
| ) |
| result["Tracker"] = result["indexer"] |
|
|
| results.append(result) |
| except Exception as e: |
| logger.warning( |
| f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}" |
| ) |
| pass |
|
|
| return results |
|
|
|
|
| async def get_zilean( |
| session: aiohttp.ClientSession, name: str, log_name: str, season: int, episode: int |
| ): |
| results = [] |
| try: |
| show = f"&season={season}&episode={episode}" |
| get_dmm = await session.get( |
| f"{settings.ZILEAN_URL}/dmm/filtered?query={name}{show if season else ''}" |
| ) |
| get_dmm = await get_dmm.json() |
|
|
| if isinstance(get_dmm, list): |
| take_first = get_dmm[: settings.ZILEAN_TAKE_FIRST] |
| for result in take_first: |
| object = { |
| "Title": result["raw_title"], |
| "InfoHash": result["info_hash"], |
| "Size": result["size"], |
| "Tracker": "DMM", |
| } |
|
|
| results.append(object) |
|
|
| logger.info(f"{len(results)} torrents found for {log_name} with Zilean") |
| except Exception as e: |
| logger.warning( |
| f"Exception while getting torrents for {log_name} with Zilean: {e}" |
| ) |
| pass |
|
|
| return results |
|
|
|
|
| async def get_torrentio(log_name: str, type: str, full_id: str): |
| results = [] |
| try: |
| try: |
| get_torrentio = requests.get( |
| f"https://torrentio.strem.fun/stream/{type}/{full_id}.json" |
| ).json() |
| except: |
| get_torrentio = requests.get( |
| f"https://torrentio.strem.fun/stream/{type}/{full_id}.json", |
| proxies={ |
| "http": settings.DEBRID_PROXY_URL, |
| "https": settings.DEBRID_PROXY_URL, |
| }, |
| ).json() |
|
|
| for torrent in get_torrentio["streams"]: |
| title = torrent["title"] |
| title_full = title.split("\n👤")[0] |
| tracker = title.split("⚙️ ")[1].split("\n")[0] |
|
|
| results.append( |
| { |
| "Title": title_full, |
| "InfoHash": torrent["infoHash"], |
| "Size": None, |
| "Tracker": f"Torrentio|{tracker}", |
| } |
| ) |
|
|
| logger.info(f"{len(results)} torrents found for {log_name} with Torrentio") |
| except Exception as e: |
| logger.warning( |
| f"Exception while getting torrents for {log_name} with Torrentio, your IP is most likely blacklisted (you should try proxying Comet): {e}" |
| ) |
| pass |
|
|
| return results |
|
|
|
|
| async def filter(torrents: list, name: str, year: int): |
| results = [] |
| for torrent in torrents: |
| index = torrent[0] |
| title = torrent[1] |
|
|
| if "\n" in title: |
| title = title.split("\n")[1] |
|
|
| parsed = parse(title) |
| if not title_match(name, parsed.parsed_title): |
| results.append((index, False)) |
| continue |
|
|
| if year and parsed.year and year != parsed.year: |
| results.append((index, False)) |
| continue |
|
|
| results.append((index, True)) |
|
|
| return results |
|
|
|
|
| async def get_torrent_hash(session: aiohttp.ClientSession, torrent: tuple): |
| index = torrent[0] |
| torrent = torrent[1] |
| if "InfoHash" in torrent and torrent["InfoHash"] is not None: |
| return (index, torrent["InfoHash"].lower()) |
|
|
| url = torrent["Link"] |
|
|
| try: |
| timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT) |
| response = await session.get(url, allow_redirects=False, timeout=timeout) |
| if response.status == 200: |
| torrent_data = await response.read() |
| torrent_dict = bencodepy.decode(torrent_data) |
| info = bencodepy.encode(torrent_dict[b"info"]) |
| hash = hashlib.sha1(info).hexdigest() |
| else: |
| location = response.headers.get("Location", "") |
| if not location: |
| return (index, None) |
|
|
| match = info_hash_pattern.search(location) |
| if not match: |
| return (index, None) |
|
|
| hash = match.group(1).upper() |
|
|
| return (index, hash.lower()) |
| except Exception as e: |
| logger.warning( |
| f"Exception while getting torrent info hash for {torrent['indexer'] if 'indexer' in torrent else (torrent['Tracker'] if 'Tracker' in torrent else '')}|{url}: {e}" |
| ) |
|
|
| return (index, None) |
|
|
|
|
| def get_balanced_hashes(hashes: dict, config: dict): |
| max_results = config["maxResults"] |
|
|
| max_size = config["maxSize"] |
| config_resolutions = [resolution.lower() for resolution in config["resolutions"]] |
| include_all_resolutions = "all" in config_resolutions |
|
|
| languages = [language.lower() for language in config["languages"]] |
| include_all_languages = "all" in languages |
| if not include_all_languages: |
| config_languages = [ |
| code |
| for code, name in PTT.parse.LANGUAGES_TRANSLATION_TABLE.items() |
| if name.lower() in languages |
| ] |
|
|
| hashes_by_resolution = {} |
| for hash, hash_data in hashes.items(): |
| hash_info = hash_data["data"] |
|
|
| if max_size != 0 and hash_info["size"] > max_size: |
| continue |
|
|
| if ( |
| not include_all_languages |
| and not any(lang in hash_info["languages"] for lang in config_languages) |
| and ("multi" not in languages if hash_info["dubbed"] else True) |
| ): |
| continue |
|
|
| resolution = hash_info["resolution"] |
| if not include_all_resolutions and resolution not in config_resolutions: |
| continue |
|
|
| if resolution not in hashes_by_resolution: |
| hashes_by_resolution[resolution] = [] |
| hashes_by_resolution[resolution].append(hash) |
|
|
| total_resolutions = len(hashes_by_resolution) |
| if max_results == 0 or total_resolutions == 0: |
| return hashes_by_resolution |
|
|
| hashes_per_resolution = max_results // total_resolutions |
| extra_hashes = max_results % total_resolutions |
|
|
| balanced_hashes = {} |
| for resolution, hash_list in hashes_by_resolution.items(): |
| selected_count = hashes_per_resolution + (1 if extra_hashes > 0 else 0) |
| balanced_hashes[resolution] = hash_list[:selected_count] |
| if extra_hashes > 0: |
| extra_hashes -= 1 |
|
|
| selected_total = sum(len(hashes) for hashes in balanced_hashes.values()) |
| if selected_total < max_results: |
| missing_hashes = max_results - selected_total |
| for resolution, hash_list in hashes_by_resolution.items(): |
| if missing_hashes <= 0: |
| break |
| current_count = len(balanced_hashes[resolution]) |
| available_hashes = hash_list[current_count : current_count + missing_hashes] |
| balanced_hashes[resolution].extend(available_hashes) |
| missing_hashes -= len(available_hashes) |
|
|
| return balanced_hashes |
|
|
|
|
| def format_metadata(data: dict): |
| extras = [] |
| if data["quality"]: |
| extras.append(data["quality"]) |
| if data["hdr"]: |
| extras.extend(data["hdr"]) |
| if data["codec"]: |
| extras.append(data["codec"]) |
| if data["audio"]: |
| extras.extend(data["audio"]) |
| if data["channels"]: |
| extras.extend(data["channels"]) |
| if data["bit_depth"]: |
| extras.append(data["bit_depth"]) |
| if data["network"]: |
| extras.append(data["network"]) |
| if data["group"]: |
| extras.append(data["group"]) |
|
|
| return "|".join(extras) |
|
|
|
|
| def format_title(data: dict, config: dict): |
| title = "" |
| if "All" in config["resultFormat"] or "Title" in config["resultFormat"]: |
| title += f"{data['title']}\n" |
|
|
| if "All" in config["resultFormat"] or "Metadata" in config["resultFormat"]: |
| metadata = format_metadata(data) |
| if metadata != "": |
| title += f"💿 {metadata}\n" |
|
|
| if "All" in config["resultFormat"] or "Size" in config["resultFormat"]: |
| title += f"💾 {bytes_to_size(data['size'])} " |
|
|
| if "All" in config["resultFormat"] or "Tracker" in config["resultFormat"]: |
| title += f"🔎 {data['tracker'] if 'tracker' in data else '?'}" |
|
|
| if "All" in config["resultFormat"] or "Languages" in config["resultFormat"]: |
| languages = data["languages"] |
| if data["dubbed"]: |
| languages.insert(0, "multi") |
| formatted_languages = ( |
| "/".join(get_language_emoji(language) for language in languages) |
| if languages |
| else None |
| ) |
| languages_str = "\n" + formatted_languages if formatted_languages else "" |
| title += f"{languages_str}" |
|
|
| if title == "": |
| |
| title = "Empty result format configuration" |
|
|
| return title |
|
|
|
|
| def get_client_ip(request: Request): |
| return ( |
| request.headers["cf-connecting-ip"] |
| if "cf-connecting-ip" in request.headers |
| else request.client.host |
| ) |
|
|