import base64 import hashlib import re import aiohttp import bencodepy import PTT import asyncio import orjson import time import copy from RTN import parse, title_match from curl_cffi import requests from fastapi import Request from comet.utils.logger import logger from comet.utils.models import database, settings, ConfigModel languages_emojis = { "unknown": "โ“", # Unknown "multi": "๐ŸŒŽ", # Dubbed "en": "๐Ÿ‡ฌ๐Ÿ‡ง", # English "ja": "๐Ÿ‡ฏ๐Ÿ‡ต", # Japanese "zh": "๐Ÿ‡จ๐Ÿ‡ณ", # Chinese "ru": "๐Ÿ‡ท๐Ÿ‡บ", # Russian "ar": "๐Ÿ‡ธ๐Ÿ‡ฆ", # Arabic "pt": "๐Ÿ‡ต๐Ÿ‡น", # Portuguese "es": "๐Ÿ‡ช๐Ÿ‡ธ", # Spanish "fr": "๐Ÿ‡ซ๐Ÿ‡ท", # French "de": "๐Ÿ‡ฉ๐Ÿ‡ช", # German "it": "๐Ÿ‡ฎ๐Ÿ‡น", # Italian "ko": "๐Ÿ‡ฐ๐Ÿ‡ท", # Korean "hi": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Hindi "bn": "๐Ÿ‡ง๐Ÿ‡ฉ", # Bengali "pa": "๐Ÿ‡ต๐Ÿ‡ฐ", # Punjabi "mr": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Marathi "gu": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Gujarati "ta": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Tamil "te": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Telugu "kn": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Kannada "ml": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Malayalam "th": "๐Ÿ‡น๐Ÿ‡ญ", # Thai "vi": "๐Ÿ‡ป๐Ÿ‡ณ", # Vietnamese "id": "๐Ÿ‡ฎ๐Ÿ‡ฉ", # Indonesian "tr": "๐Ÿ‡น๐Ÿ‡ท", # Turkish "he": "๐Ÿ‡ฎ๐Ÿ‡ฑ", # Hebrew "fa": "๐Ÿ‡ฎ๐Ÿ‡ท", # Persian "uk": "๐Ÿ‡บ๐Ÿ‡ฆ", # Ukrainian "el": "๐Ÿ‡ฌ๐Ÿ‡ท", # Greek "lt": "๐Ÿ‡ฑ๐Ÿ‡น", # Lithuanian "lv": "๐Ÿ‡ฑ๐Ÿ‡ป", # Latvian "et": "๐Ÿ‡ช๐Ÿ‡ช", # Estonian "pl": "๐Ÿ‡ต๐Ÿ‡ฑ", # Polish "cs": "๐Ÿ‡จ๐Ÿ‡ฟ", # Czech "sk": "๐Ÿ‡ธ๐Ÿ‡ฐ", # Slovak "hu": "๐Ÿ‡ญ๐Ÿ‡บ", # Hungarian "ro": "๐Ÿ‡ท๐Ÿ‡ด", # Romanian "bg": "๐Ÿ‡ง๐Ÿ‡ฌ", # Bulgarian "sr": "๐Ÿ‡ท๐Ÿ‡ธ", # Serbian "hr": "๐Ÿ‡ญ๐Ÿ‡ท", # Croatian "sl": "๐Ÿ‡ธ๐Ÿ‡ฎ", # Slovenian "nl": "๐Ÿ‡ณ๐Ÿ‡ฑ", # Dutch "da": "๐Ÿ‡ฉ๐Ÿ‡ฐ", # Danish "fi": "๐Ÿ‡ซ๐Ÿ‡ฎ", # Finnish "sv": "๐Ÿ‡ธ๐Ÿ‡ช", # Swedish "no": "๐Ÿ‡ณ๐Ÿ‡ด", # Norwegian "ms": "๐Ÿ‡ฒ๐Ÿ‡พ", # Malay "la": "๐Ÿ’ƒ๐Ÿป", # Latino } def get_language_emoji(language: str): language_formatted = language.lower() return ( languages_emojis[language_formatted] if language_formatted in languages_emojis else language ) translation_table = { "ฤ": "a", "ฤƒ": "a", "ฤ…": "a", "ฤ‡": "c", "ฤ": "c", "รง": "c", "ฤ‰": "c", "ฤ‹": "c", "ฤ": "d", "ฤ‘": "d", "รจ": "e", "รฉ": "e", "รช": "e", "รซ": "e", "ฤ“": "e", "ฤ•": "e", "ฤ™": "e", "ฤ›": "e", "ฤ": "g", "ฤŸ": "g", "ฤก": "g", "ฤฃ": "g", "ฤฅ": "h", "รฎ": "i", "รฏ": "i", "รฌ": "i", "รญ": "i", "ฤซ": "i", "ฤฉ": "i", "ฤญ": "i", "ฤฑ": "i", "ฤต": "j", "ฤท": "k", "ฤบ": "l", "ฤผ": "l", "ล‚": "l", "ล„": "n", "ลˆ": "n", "รฑ": "n", "ล†": "n", "ล‰": "n", "รณ": "o", "รด": "o", "รต": "o", "รถ": "o", "รธ": "o", "ล": "o", "ล‘": "o", "ล“": "oe", "ล•": "r", "ล™": "r", "ล—": "r", "ลก": "s", "ลŸ": "s", "ล›": "s", "ศ™": "s", "รŸ": "ss", "ลฅ": "t", "ลฃ": "t", "ลซ": "u", "ลญ": "u", "ลฉ": "u", "รป": "u", "รผ": "u", "รน": "u", "รบ": "u", "ลณ": "u", "ลฑ": "u", "ลต": "w", "รฝ": "y", "รฟ": "y", "ลท": "y", "ลพ": "z", "ลผ": "z", "ลบ": "z", "รฆ": "ae", "วŽ": "a", "วง": "g", "ษ™": "e", "ฦ’": "f", "ว": "i", "ว’": "o", "ว”": "u", "วš": "u", "วœ": "u", "วน": "n", "วป": "a", "วฝ": "ae", "วฟ": "o", } translation_table = str.maketrans(translation_table) info_hash_pattern = re.compile(r"\b([a-fA-F0-9]{40})\b") def translate(title: str): return title.translate(translation_table) def is_video(title: str): video_extensions = ( ".3g2", ".3gp", ".amv", ".asf", ".avi", ".drc", ".f4a", ".f4b", ".f4p", ".f4v", ".flv", ".gif", ".gifv", ".m2v", ".m4p", ".m4v", ".mkv", ".mov", ".mp2", ".mp4", ".mpg", ".mpeg", ".mpv", ".mng", ".mpe", ".mxf", ".nsv", ".ogg", ".ogv", ".qt", ".rm", ".rmvb", ".roq", ".svi", ".webm", ".wmv", ".yuv", ) return title.endswith(video_extensions) def bytes_to_size(bytes: int): sizes = ["Bytes", "KB", "MB", "GB", "TB"] if bytes == 0: return "0 Byte" i = 0 while bytes >= 1024 and i < len(sizes) - 1: bytes /= 1024 i += 1 return f"{round(bytes, 2)} {sizes[i]}" def size_to_bytes(size_str: str): sizes = ["bytes", "kb", "mb", "gb", "tb"] try: value, unit = size_str.split() value = float(value) unit = unit.lower() if unit not in sizes: return None multiplier = 1024 ** sizes.index(unit) return int(value * multiplier) except: return None def config_check(b64config: str): try: config = orjson.loads(base64.b64decode(b64config).decode()) validated_config = ConfigModel(**config) return validated_config.model_dump() except: return False def get_debrid_extension(debridService: str, debridApiKey: str = None): if debridApiKey == "": return "TORRENT" debrid_extensions = { "realdebrid": "RD", "alldebrid": "AD", "premiumize": "PM", "torbox": "TB", "debridlink": "DL", } return debrid_extensions.get(debridService, None) async def get_indexer_manager( session: aiohttp.ClientSession, indexer_manager_type: str, indexers: list, query: str, ): results = [] try: indexers = [indexer.replace("_", " ") for indexer in indexers] if indexer_manager_type == "jackett": async def fetch_jackett_results( session: aiohttp.ClientSession, indexer: str, query: str ): try: async with session.get( f"{settings.INDEXER_MANAGER_URL}/api/v2.0/indexers/all/results?apikey={settings.INDEXER_MANAGER_API_KEY}&Query={query}&Tracker[]={indexer}", timeout=aiohttp.ClientTimeout( total=settings.INDEXER_MANAGER_TIMEOUT ), ) as response: response_json = await response.json() return response_json.get("Results", []) except Exception as e: logger.warning( f"Exception while fetching Jackett results for indexer {indexer}: {e}" ) return [] tasks = [ fetch_jackett_results(session, indexer, query) for indexer in indexers ] all_results = await asyncio.gather(*tasks) for result_set in all_results: results.extend(result_set) elif indexer_manager_type == "prowlarr": get_indexers = await session.get( f"{settings.INDEXER_MANAGER_URL}/api/v1/indexer", headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, ) get_indexers = await get_indexers.json() indexers_id = [] for indexer in get_indexers: if ( indexer["name"].lower() in indexers or indexer["definitionName"].lower() in indexers ): indexers_id.append(indexer["id"]) response = await session.get( f"{settings.INDEXER_MANAGER_URL}/api/v1/search?query={query}&indexerIds={'&indexerIds='.join(str(indexer_id) for indexer_id in indexers_id)}&type=search", headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, ) response = await response.json() for result in response: result["InfoHash"] = ( result["infoHash"] if "infoHash" in result else None ) result["Title"] = result["title"] result["Size"] = result["size"] result["Link"] = ( result["downloadUrl"] if "downloadUrl" in result else None ) result["Tracker"] = result["indexer"] results.append(result) except Exception as e: logger.warning( f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}" ) pass return results async def get_zilean( session: aiohttp.ClientSession, name: str, log_name: str, season: int, episode: int ): results = [] try: show = f"&season={season}&episode={episode}" get_dmm = await session.get( f"{settings.ZILEAN_URL}/dmm/filtered?query={name}{show if season else ''}" ) get_dmm = await get_dmm.json() if isinstance(get_dmm, list): take_first = get_dmm[: settings.ZILEAN_TAKE_FIRST] for result in take_first: object = { "Title": result["raw_title"], "InfoHash": result["info_hash"], "Size": int(result["size"]), "Tracker": "DMM", } results.append(object) logger.info(f"{len(results)} torrents found for {log_name} with Zilean") except Exception as e: logger.warning( f"Exception while getting torrents for {log_name} with Zilean: {e}" ) pass return results async def get_torrentio(log_name: str, type: str, full_id: str): results = [] try: try: get_torrentio = requests.get( f"https://torrentio.strem.fun/stream/{type}/{full_id}.json" ).json() except: get_torrentio = requests.get( f"https://torrentio.strem.fun/stream/{type}/{full_id}.json", proxies={ "http": settings.DEBRID_PROXY_URL, "https": settings.DEBRID_PROXY_URL, }, ).json() for torrent in get_torrentio["streams"]: title_full = torrent["title"] title = title_full.split("\n")[0] tracker = title_full.split("โš™๏ธ ")[1].split("\n")[0] size = size_to_bytes(title_full.split("๐Ÿ’พ ")[1].split(" โš™๏ธ")[0]) results.append( { "Title": title, "InfoHash": torrent["infoHash"], "Size": size, "Tracker": f"Torrentio|{tracker}", } ) logger.info(f"{len(results)} torrents found for {log_name} with Torrentio") except Exception as e: logger.warning( f"Exception while getting torrents for {log_name} with Torrentio, your IP is most likely blacklisted (you should try proxying Comet): {e}" ) pass return results async def get_mediafusion(log_name: str, type: str, full_id: str): results = [] try: try: get_mediafusion = requests.get( f"{settings.MEDIAFUSION_URL}/stream/{type}/{full_id}.json" ).json() except: get_mediafusion = requests.get( f"{settings.MEDIAFUSION_URL}/stream/{type}/{full_id}.json", proxies={ "http": settings.DEBRID_PROXY_URL, "https": settings.DEBRID_PROXY_URL, }, ).json() for torrent in get_mediafusion["streams"]: title_full = torrent["description"] title = title_full.split("\n")[0].replace("๐Ÿ“‚ ", "").replace("/", "") tracker = title_full.split("๐Ÿ”— ")[1] results.append( { "Title": title, "InfoHash": torrent["infoHash"], "Size": torrent["behaviorHints"][ "videoSize" ], # not the pack size but still useful for prowlarr userss "Tracker": f"MediaFusion|{tracker}", } ) logger.info(f"{len(results)} torrents found for {log_name} with MediaFusion") except Exception as e: logger.warning( f"Exception while getting torrents for {log_name} with MediaFusion, your IP is most likely blacklisted (you should try proxying Comet): {e}" ) pass return results async def filter( torrents: list, name: str, year: int, year_end: int, aliases: dict, remove_adult_content: bool, ): results = [] for torrent in torrents: index = torrent[0] title = torrent[1] if "\n" in title: # Torrentio title parsing title = title.split("\n")[1] parsed = parse(title) if remove_adult_content and parsed.adult: results.append((index, False)) continue if parsed.parsed_title and not title_match( name, parsed.parsed_title, aliases=aliases ): results.append((index, False)) continue if year and parsed.year: if year_end is not None: if not (year <= parsed.year <= year_end): results.append((index, False)) continue else: if year < (parsed.year - 1) or year > (parsed.year + 1): results.append((index, False)) continue results.append((index, True)) return results async def get_torrent_hash(session: aiohttp.ClientSession, torrent: tuple): index = torrent[0] torrent = torrent[1] if "InfoHash" in torrent and torrent["InfoHash"] is not None: return (index, torrent["InfoHash"].lower()) url = torrent["Link"] try: timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT) response = await session.get(url, allow_redirects=False, timeout=timeout) if response.status == 200: torrent_data = await response.read() torrent_dict = bencodepy.decode(torrent_data) info = bencodepy.encode(torrent_dict[b"info"]) hash = hashlib.sha1(info).hexdigest() else: location = response.headers.get("Location", "") if not location: return (index, None) match = info_hash_pattern.search(location) if not match: return (index, None) hash = match.group(1).upper() return (index, hash.lower()) except Exception as e: logger.warning( f"Exception while getting torrent info hash for {torrent['indexer'] if 'indexer' in torrent else (torrent['Tracker'] if 'Tracker' in torrent else '')}|{url}: {e}" ) return (index, None) def get_balanced_hashes(hashes: dict, config: dict): max_results = config["maxResults"] max_results_per_resolution = config["maxResultsPerResolution"] max_size = config["maxSize"] config_resolutions = [resolution.lower() for resolution in config["resolutions"]] include_all_resolutions = "all" in config_resolutions remove_trash = config["removeTrash"] languages = [language.lower() for language in config["languages"]] include_all_languages = "all" in languages if not include_all_languages: config_languages = [ code for code, name in PTT.parse.LANGUAGES_TRANSLATION_TABLE.items() if name.lower() in languages ] hashes_by_resolution = {} for hash, hash_data in hashes.items(): if remove_trash and not hash_data["fetch"]: continue hash_info = hash_data["data"] if max_size != 0 and hash_info["size"] > max_size: continue if ( not include_all_languages and not any(lang in hash_info["languages"] for lang in config_languages) and ("multi" not in languages if hash_info["dubbed"] else True) and not (len(hash_info["languages"]) == 0 and "unknown" in languages) ): continue resolution = hash_info["resolution"] if not include_all_resolutions and resolution not in config_resolutions: continue if resolution not in hashes_by_resolution: hashes_by_resolution[resolution] = [] hashes_by_resolution[resolution].append(hash) if config["reverseResultOrder"]: hashes_by_resolution = { res: lst[::-1] for res, lst in hashes_by_resolution.items() } total_resolutions = len(hashes_by_resolution) if max_results == 0 and max_results_per_resolution == 0 or total_resolutions == 0: return hashes_by_resolution hashes_per_resolution = ( max_results // total_resolutions if max_results > 0 else max_results_per_resolution ) extra_hashes = max_results % total_resolutions balanced_hashes = {} for resolution, hash_list in hashes_by_resolution.items(): selected_count = hashes_per_resolution + (1 if extra_hashes > 0 else 0) if max_results_per_resolution > 0: selected_count = min(selected_count, max_results_per_resolution) balanced_hashes[resolution] = hash_list[:selected_count] if extra_hashes > 0: extra_hashes -= 1 selected_total = sum(len(hashes) for hashes in balanced_hashes.values()) if selected_total < max_results: missing_hashes = max_results - selected_total for resolution, hash_list in hashes_by_resolution.items(): if missing_hashes <= 0: break current_count = len(balanced_hashes[resolution]) available_hashes = hash_list[current_count : current_count + missing_hashes] balanced_hashes[resolution].extend(available_hashes) missing_hashes -= len(available_hashes) return balanced_hashes def format_metadata(data: dict): extras = [] if data["quality"]: extras.append(data["quality"]) if data["hdr"]: extras.extend(data["hdr"]) if data["codec"]: extras.append(data["codec"]) if data["audio"]: extras.extend(data["audio"]) if data["channels"]: extras.extend(data["channels"]) if data["bit_depth"]: extras.append(data["bit_depth"]) if data["network"]: extras.append(data["network"]) if data["group"]: extras.append(data["group"]) return "|".join(extras) def format_title(data: dict, config: dict): result_format = config["resultFormat"] has_all = "All" in result_format title = "" if has_all or "Title" in result_format: title += f"{data['title']}\n" if has_all or "Metadata" in result_format: metadata = format_metadata(data) if metadata != "": title += f"๐Ÿ’ฟ {metadata}\n" if has_all or "Size" in result_format: title += f"๐Ÿ’พ {bytes_to_size(data['size'])} " if has_all or "Tracker" in result_format: title += f"๐Ÿ”Ž {data['tracker'] if 'tracker' in data else '?'}" if has_all or "Languages" in result_format: languages = data["languages"] if data["dubbed"]: languages.insert(0, "multi") if languages: formatted_languages = "/".join( get_language_emoji(language) for language in languages ) languages_str = "\n" + formatted_languages title += f"{languages_str}" if title == "": # Without this, Streamio shows SD as the result, which is confusing title = "Empty result format configuration" return title def get_client_ip(request: Request): return ( request.headers["cf-connecting-ip"] if "cf-connecting-ip" in request.headers else request.client.host ) async def get_aliases(session: aiohttp.ClientSession, media_type: str, media_id: str): aliases = {} try: response = await session.get( f"https://api.trakt.tv/{media_type}/{media_id}/aliases" ) for aliase in await response.json(): country = aliase["country"] if country not in aliases: aliases[country] = [] aliases[country].append(aliase["title"]) except: pass return aliases async def add_torrent_to_cache( config: dict, name: str, season: int, episode: int, sorted_ranked_files: dict ): # trace of which indexers were used when cache was created - not optimal indexers = config["indexers"].copy() if settings.SCRAPE_TORRENTIO: indexers.append("torrentio") if settings.SCRAPE_MEDIAFUSION: indexers.append("mediafusion") if settings.ZILEAN_URL: indexers.append("dmm") for indexer in indexers: hash = f"searched-{indexer}-{name}-{season}-{episode}" searched = copy.deepcopy( sorted_ranked_files[list(sorted_ranked_files.keys())[0]] ) searched["infohash"] = hash searched["data"]["tracker"] = indexer sorted_ranked_files[hash] = searched values = [ { "debridService": config["debridService"], "info_hash": sorted_ranked_files[torrent]["infohash"], "name": name, "season": season, "episode": episode, "tracker": sorted_ranked_files[torrent]["data"]["tracker"] .split("|")[0] .lower(), "data": orjson.dumps(sorted_ranked_files[torrent]).decode("utf-8"), "timestamp": time.time(), } for torrent in sorted_ranked_files ] query = f""" INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''} INTO cache (debridService, info_hash, name, season, episode, tracker, data, timestamp) VALUES (:debridService, :info_hash, :name, :season, :episode, :tracker, :data, :timestamp) {' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''} """ await database.execute_many(query, values)