C000 / comet /utils /general.py
Port443's picture
Upload 26 files
54e93b3 verified
import base64
import hashlib
import re
import aiohttp
import bencodepy
import PTT
import asyncio
import orjson
import time
import copy
from RTN import parse, title_match
from curl_cffi import requests
from fastapi import Request
from comet.utils.logger import logger
from comet.utils.models import database, settings, ConfigModel
languages_emojis = {
"unknown": "โ“", # Unknown
"multi": "๐ŸŒŽ", # Dubbed
"en": "๐Ÿ‡ฌ๐Ÿ‡ง", # English
"ja": "๐Ÿ‡ฏ๐Ÿ‡ต", # Japanese
"zh": "๐Ÿ‡จ๐Ÿ‡ณ", # Chinese
"ru": "๐Ÿ‡ท๐Ÿ‡บ", # Russian
"ar": "๐Ÿ‡ธ๐Ÿ‡ฆ", # Arabic
"pt": "๐Ÿ‡ต๐Ÿ‡น", # Portuguese
"es": "๐Ÿ‡ช๐Ÿ‡ธ", # Spanish
"fr": "๐Ÿ‡ซ๐Ÿ‡ท", # French
"de": "๐Ÿ‡ฉ๐Ÿ‡ช", # German
"it": "๐Ÿ‡ฎ๐Ÿ‡น", # Italian
"ko": "๐Ÿ‡ฐ๐Ÿ‡ท", # Korean
"hi": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Hindi
"bn": "๐Ÿ‡ง๐Ÿ‡ฉ", # Bengali
"pa": "๐Ÿ‡ต๐Ÿ‡ฐ", # Punjabi
"mr": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Marathi
"gu": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Gujarati
"ta": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Tamil
"te": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Telugu
"kn": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Kannada
"ml": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Malayalam
"th": "๐Ÿ‡น๐Ÿ‡ญ", # Thai
"vi": "๐Ÿ‡ป๐Ÿ‡ณ", # Vietnamese
"id": "๐Ÿ‡ฎ๐Ÿ‡ฉ", # Indonesian
"tr": "๐Ÿ‡น๐Ÿ‡ท", # Turkish
"he": "๐Ÿ‡ฎ๐Ÿ‡ฑ", # Hebrew
"fa": "๐Ÿ‡ฎ๐Ÿ‡ท", # Persian
"uk": "๐Ÿ‡บ๐Ÿ‡ฆ", # Ukrainian
"el": "๐Ÿ‡ฌ๐Ÿ‡ท", # Greek
"lt": "๐Ÿ‡ฑ๐Ÿ‡น", # Lithuanian
"lv": "๐Ÿ‡ฑ๐Ÿ‡ป", # Latvian
"et": "๐Ÿ‡ช๐Ÿ‡ช", # Estonian
"pl": "๐Ÿ‡ต๐Ÿ‡ฑ", # Polish
"cs": "๐Ÿ‡จ๐Ÿ‡ฟ", # Czech
"sk": "๐Ÿ‡ธ๐Ÿ‡ฐ", # Slovak
"hu": "๐Ÿ‡ญ๐Ÿ‡บ", # Hungarian
"ro": "๐Ÿ‡ท๐Ÿ‡ด", # Romanian
"bg": "๐Ÿ‡ง๐Ÿ‡ฌ", # Bulgarian
"sr": "๐Ÿ‡ท๐Ÿ‡ธ", # Serbian
"hr": "๐Ÿ‡ญ๐Ÿ‡ท", # Croatian
"sl": "๐Ÿ‡ธ๐Ÿ‡ฎ", # Slovenian
"nl": "๐Ÿ‡ณ๐Ÿ‡ฑ", # Dutch
"da": "๐Ÿ‡ฉ๐Ÿ‡ฐ", # Danish
"fi": "๐Ÿ‡ซ๐Ÿ‡ฎ", # Finnish
"sv": "๐Ÿ‡ธ๐Ÿ‡ช", # Swedish
"no": "๐Ÿ‡ณ๐Ÿ‡ด", # Norwegian
"ms": "๐Ÿ‡ฒ๐Ÿ‡พ", # Malay
"la": "๐Ÿ’ƒ๐Ÿป", # Latino
}
def get_language_emoji(language: str):
language_formatted = language.lower()
return (
languages_emojis[language_formatted]
if language_formatted in languages_emojis
else language
)
translation_table = {
"ฤ": "a",
"ฤƒ": "a",
"ฤ…": "a",
"ฤ‡": "c",
"ฤ": "c",
"รง": "c",
"ฤ‰": "c",
"ฤ‹": "c",
"ฤ": "d",
"ฤ‘": "d",
"รจ": "e",
"รฉ": "e",
"รช": "e",
"รซ": "e",
"ฤ“": "e",
"ฤ•": "e",
"ฤ™": "e",
"ฤ›": "e",
"ฤ": "g",
"ฤŸ": "g",
"ฤก": "g",
"ฤฃ": "g",
"ฤฅ": "h",
"รฎ": "i",
"รฏ": "i",
"รฌ": "i",
"รญ": "i",
"ฤซ": "i",
"ฤฉ": "i",
"ฤญ": "i",
"ฤฑ": "i",
"ฤต": "j",
"ฤท": "k",
"ฤบ": "l",
"ฤผ": "l",
"ล‚": "l",
"ล„": "n",
"ลˆ": "n",
"รฑ": "n",
"ล†": "n",
"ล‰": "n",
"รณ": "o",
"รด": "o",
"รต": "o",
"รถ": "o",
"รธ": "o",
"ล": "o",
"ล‘": "o",
"ล“": "oe",
"ล•": "r",
"ล™": "r",
"ล—": "r",
"ลก": "s",
"ลŸ": "s",
"ล›": "s",
"ศ™": "s",
"รŸ": "ss",
"ลฅ": "t",
"ลฃ": "t",
"ลซ": "u",
"ลญ": "u",
"ลฉ": "u",
"รป": "u",
"รผ": "u",
"รน": "u",
"รบ": "u",
"ลณ": "u",
"ลฑ": "u",
"ลต": "w",
"รฝ": "y",
"รฟ": "y",
"ลท": "y",
"ลพ": "z",
"ลผ": "z",
"ลบ": "z",
"รฆ": "ae",
"วŽ": "a",
"วง": "g",
"ษ™": "e",
"ฦ’": "f",
"ว": "i",
"ว’": "o",
"ว”": "u",
"วš": "u",
"วœ": "u",
"วน": "n",
"วป": "a",
"วฝ": "ae",
"วฟ": "o",
}
translation_table = str.maketrans(translation_table)
info_hash_pattern = re.compile(r"\b([a-fA-F0-9]{40})\b")
def translate(title: str):
return title.translate(translation_table)
def is_video(title: str):
video_extensions = (
".3g2",
".3gp",
".amv",
".asf",
".avi",
".drc",
".f4a",
".f4b",
".f4p",
".f4v",
".flv",
".gif",
".gifv",
".m2v",
".m4p",
".m4v",
".mkv",
".mov",
".mp2",
".mp4",
".mpg",
".mpeg",
".mpv",
".mng",
".mpe",
".mxf",
".nsv",
".ogg",
".ogv",
".qt",
".rm",
".rmvb",
".roq",
".svi",
".webm",
".wmv",
".yuv",
)
return title.endswith(video_extensions)
def bytes_to_size(bytes: int):
sizes = ["Bytes", "KB", "MB", "GB", "TB"]
if bytes == 0:
return "0 Byte"
i = 0
while bytes >= 1024 and i < len(sizes) - 1:
bytes /= 1024
i += 1
return f"{round(bytes, 2)} {sizes[i]}"
def size_to_bytes(size_str: str):
sizes = ["bytes", "kb", "mb", "gb", "tb"]
try:
value, unit = size_str.split()
value = float(value)
unit = unit.lower()
if unit not in sizes:
return None
multiplier = 1024 ** sizes.index(unit)
return int(value * multiplier)
except:
return None
def config_check(b64config: str):
try:
config = orjson.loads(base64.b64decode(b64config).decode())
validated_config = ConfigModel(**config)
return validated_config.model_dump()
except:
return False
def get_debrid_extension(debridService: str, debridApiKey: str = None):
if debridApiKey == "":
return "TORRENT"
debrid_extensions = {
"realdebrid": "RD",
"alldebrid": "AD",
"premiumize": "PM",
"torbox": "TB",
"debridlink": "DL",
}
return debrid_extensions.get(debridService, None)
async def get_indexer_manager(
session: aiohttp.ClientSession,
indexer_manager_type: str,
indexers: list,
query: str,
):
results = []
try:
indexers = [indexer.replace("_", " ") for indexer in indexers]
if indexer_manager_type == "jackett":
async def fetch_jackett_results(
session: aiohttp.ClientSession, indexer: str, query: str
):
try:
async with session.get(
f"{settings.INDEXER_MANAGER_URL}/api/v2.0/indexers/all/results?apikey={settings.INDEXER_MANAGER_API_KEY}&Query={query}&Tracker[]={indexer}",
timeout=aiohttp.ClientTimeout(
total=settings.INDEXER_MANAGER_TIMEOUT
),
) as response:
response_json = await response.json()
return response_json.get("Results", [])
except Exception as e:
logger.warning(
f"Exception while fetching Jackett results for indexer {indexer}: {e}"
)
return []
tasks = [
fetch_jackett_results(session, indexer, query) for indexer in indexers
]
all_results = await asyncio.gather(*tasks)
for result_set in all_results:
results.extend(result_set)
elif indexer_manager_type == "prowlarr":
get_indexers = await session.get(
f"{settings.INDEXER_MANAGER_URL}/api/v1/indexer",
headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY},
)
get_indexers = await get_indexers.json()
indexers_id = []
for indexer in get_indexers:
if (
indexer["name"].lower() in indexers
or indexer["definitionName"].lower() in indexers
):
indexers_id.append(indexer["id"])
response = await session.get(
f"{settings.INDEXER_MANAGER_URL}/api/v1/search?query={query}&indexerIds={'&indexerIds='.join(str(indexer_id) for indexer_id in indexers_id)}&type=search",
headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY},
)
response = await response.json()
for result in response:
result["InfoHash"] = (
result["infoHash"] if "infoHash" in result else None
)
result["Title"] = result["title"]
result["Size"] = result["size"]
result["Link"] = (
result["downloadUrl"] if "downloadUrl" in result else None
)
result["Tracker"] = result["indexer"]
results.append(result)
except Exception as e:
logger.warning(
f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}"
)
pass
return results
async def get_zilean(
session: aiohttp.ClientSession, name: str, log_name: str, season: int, episode: int
):
results = []
try:
show = f"&season={season}&episode={episode}"
get_dmm = await session.get(
f"{settings.ZILEAN_URL}/dmm/filtered?query={name}{show if season else ''}"
)
get_dmm = await get_dmm.json()
if isinstance(get_dmm, list):
take_first = get_dmm[: settings.ZILEAN_TAKE_FIRST]
for result in take_first:
object = {
"Title": result["raw_title"],
"InfoHash": result["info_hash"],
"Size": int(result["size"]),
"Tracker": "DMM",
}
results.append(object)
logger.info(f"{len(results)} torrents found for {log_name} with Zilean")
except Exception as e:
logger.warning(
f"Exception while getting torrents for {log_name} with Zilean: {e}"
)
pass
return results
async def get_torrentio(log_name: str, type: str, full_id: str):
results = []
try:
try:
get_torrentio = requests.get(
f"https://torrentio.strem.fun/stream/{type}/{full_id}.json"
).json()
except:
get_torrentio = requests.get(
f"https://torrentio.strem.fun/stream/{type}/{full_id}.json",
proxies={
"http": settings.DEBRID_PROXY_URL,
"https": settings.DEBRID_PROXY_URL,
},
).json()
for torrent in get_torrentio["streams"]:
title_full = torrent["title"]
title = title_full.split("\n")[0]
tracker = title_full.split("โš™๏ธ ")[1].split("\n")[0]
size = size_to_bytes(title_full.split("๐Ÿ’พ ")[1].split(" โš™๏ธ")[0])
results.append(
{
"Title": title,
"InfoHash": torrent["infoHash"],
"Size": size,
"Tracker": f"Torrentio|{tracker}",
}
)
logger.info(f"{len(results)} torrents found for {log_name} with Torrentio")
except Exception as e:
logger.warning(
f"Exception while getting torrents for {log_name} with Torrentio, your IP is most likely blacklisted (you should try proxying Comet): {e}"
)
pass
return results
async def get_mediafusion(log_name: str, type: str, full_id: str):
results = []
try:
try:
get_mediafusion = requests.get(
f"{settings.MEDIAFUSION_URL}/stream/{type}/{full_id}.json"
).json()
except:
get_mediafusion = requests.get(
f"{settings.MEDIAFUSION_URL}/stream/{type}/{full_id}.json",
proxies={
"http": settings.DEBRID_PROXY_URL,
"https": settings.DEBRID_PROXY_URL,
},
).json()
for torrent in get_mediafusion["streams"]:
title_full = torrent["description"]
title = title_full.split("\n")[0].replace("๐Ÿ“‚ ", "").replace("/", "")
tracker = title_full.split("๐Ÿ”— ")[1]
results.append(
{
"Title": title,
"InfoHash": torrent["infoHash"],
"Size": torrent["behaviorHints"][
"videoSize"
], # not the pack size but still useful for prowlarr userss
"Tracker": f"MediaFusion|{tracker}",
}
)
logger.info(f"{len(results)} torrents found for {log_name} with MediaFusion")
except Exception as e:
logger.warning(
f"Exception while getting torrents for {log_name} with MediaFusion, your IP is most likely blacklisted (you should try proxying Comet): {e}"
)
pass
return results
async def filter(
torrents: list,
name: str,
year: int,
year_end: int,
aliases: dict,
remove_adult_content: bool,
):
results = []
for torrent in torrents:
index = torrent[0]
title = torrent[1]
if "\n" in title: # Torrentio title parsing
title = title.split("\n")[1]
parsed = parse(title)
if remove_adult_content and parsed.adult:
results.append((index, False))
continue
if parsed.parsed_title and not title_match(
name, parsed.parsed_title, aliases=aliases
):
results.append((index, False))
continue
if year and parsed.year:
if year_end is not None:
if not (year <= parsed.year <= year_end):
results.append((index, False))
continue
else:
if year < (parsed.year - 1) or year > (parsed.year + 1):
results.append((index, False))
continue
results.append((index, True))
return results
async def get_torrent_hash(session: aiohttp.ClientSession, torrent: tuple):
index = torrent[0]
torrent = torrent[1]
if "InfoHash" in torrent and torrent["InfoHash"] is not None:
return (index, torrent["InfoHash"].lower())
url = torrent["Link"]
try:
timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT)
response = await session.get(url, allow_redirects=False, timeout=timeout)
if response.status == 200:
torrent_data = await response.read()
torrent_dict = bencodepy.decode(torrent_data)
info = bencodepy.encode(torrent_dict[b"info"])
hash = hashlib.sha1(info).hexdigest()
else:
location = response.headers.get("Location", "")
if not location:
return (index, None)
match = info_hash_pattern.search(location)
if not match:
return (index, None)
hash = match.group(1).upper()
return (index, hash.lower())
except Exception as e:
logger.warning(
f"Exception while getting torrent info hash for {torrent['indexer'] if 'indexer' in torrent else (torrent['Tracker'] if 'Tracker' in torrent else '')}|{url}: {e}"
)
return (index, None)
def get_balanced_hashes(hashes: dict, config: dict):
max_results = config["maxResults"]
max_results_per_resolution = config["maxResultsPerResolution"]
max_size = config["maxSize"]
config_resolutions = [resolution.lower() for resolution in config["resolutions"]]
include_all_resolutions = "all" in config_resolutions
remove_trash = config["removeTrash"]
languages = [language.lower() for language in config["languages"]]
include_all_languages = "all" in languages
if not include_all_languages:
config_languages = [
code
for code, name in PTT.parse.LANGUAGES_TRANSLATION_TABLE.items()
if name.lower() in languages
]
hashes_by_resolution = {}
for hash, hash_data in hashes.items():
if remove_trash and not hash_data["fetch"]:
continue
hash_info = hash_data["data"]
if max_size != 0 and hash_info["size"] > max_size:
continue
if (
not include_all_languages
and not any(lang in hash_info["languages"] for lang in config_languages)
and ("multi" not in languages if hash_info["dubbed"] else True)
and not (len(hash_info["languages"]) == 0 and "unknown" in languages)
):
continue
resolution = hash_info["resolution"]
if not include_all_resolutions and resolution not in config_resolutions:
continue
if resolution not in hashes_by_resolution:
hashes_by_resolution[resolution] = []
hashes_by_resolution[resolution].append(hash)
if config["reverseResultOrder"]:
hashes_by_resolution = {
res: lst[::-1] for res, lst in hashes_by_resolution.items()
}
total_resolutions = len(hashes_by_resolution)
if max_results == 0 and max_results_per_resolution == 0 or total_resolutions == 0:
return hashes_by_resolution
hashes_per_resolution = (
max_results // total_resolutions
if max_results > 0
else max_results_per_resolution
)
extra_hashes = max_results % total_resolutions
balanced_hashes = {}
for resolution, hash_list in hashes_by_resolution.items():
selected_count = hashes_per_resolution + (1 if extra_hashes > 0 else 0)
if max_results_per_resolution > 0:
selected_count = min(selected_count, max_results_per_resolution)
balanced_hashes[resolution] = hash_list[:selected_count]
if extra_hashes > 0:
extra_hashes -= 1
selected_total = sum(len(hashes) for hashes in balanced_hashes.values())
if selected_total < max_results:
missing_hashes = max_results - selected_total
for resolution, hash_list in hashes_by_resolution.items():
if missing_hashes <= 0:
break
current_count = len(balanced_hashes[resolution])
available_hashes = hash_list[current_count : current_count + missing_hashes]
balanced_hashes[resolution].extend(available_hashes)
missing_hashes -= len(available_hashes)
return balanced_hashes
def format_metadata(data: dict):
extras = []
if data["quality"]:
extras.append(data["quality"])
if data["hdr"]:
extras.extend(data["hdr"])
if data["codec"]:
extras.append(data["codec"])
if data["audio"]:
extras.extend(data["audio"])
if data["channels"]:
extras.extend(data["channels"])
if data["bit_depth"]:
extras.append(data["bit_depth"])
if data["network"]:
extras.append(data["network"])
if data["group"]:
extras.append(data["group"])
return "|".join(extras)
def format_title(data: dict, config: dict):
result_format = config["resultFormat"]
has_all = "All" in result_format
title = ""
if has_all or "Title" in result_format:
title += f"{data['title']}\n"
if has_all or "Metadata" in result_format:
metadata = format_metadata(data)
if metadata != "":
title += f"๐Ÿ’ฟ {metadata}\n"
if has_all or "Size" in result_format:
title += f"๐Ÿ’พ {bytes_to_size(data['size'])} "
if has_all or "Tracker" in result_format:
title += f"๐Ÿ”Ž {data['tracker'] if 'tracker' in data else '?'}"
if has_all or "Languages" in result_format:
languages = data["languages"]
if data["dubbed"]:
languages.insert(0, "multi")
if languages:
formatted_languages = "/".join(
get_language_emoji(language) for language in languages
)
languages_str = "\n" + formatted_languages
title += f"{languages_str}"
if title == "":
# Without this, Streamio shows SD as the result, which is confusing
title = "Empty result format configuration"
return title
def get_client_ip(request: Request):
return (
request.headers["cf-connecting-ip"]
if "cf-connecting-ip" in request.headers
else request.client.host
)
async def get_aliases(session: aiohttp.ClientSession, media_type: str, media_id: str):
aliases = {}
try:
response = await session.get(
f"https://api.trakt.tv/{media_type}/{media_id}/aliases"
)
for aliase in await response.json():
country = aliase["country"]
if country not in aliases:
aliases[country] = []
aliases[country].append(aliase["title"])
except:
pass
return aliases
async def add_torrent_to_cache(
config: dict, name: str, season: int, episode: int, sorted_ranked_files: dict
):
# trace of which indexers were used when cache was created - not optimal
indexers = config["indexers"].copy()
if settings.SCRAPE_TORRENTIO:
indexers.append("torrentio")
if settings.SCRAPE_MEDIAFUSION:
indexers.append("mediafusion")
if settings.ZILEAN_URL:
indexers.append("dmm")
for indexer in indexers:
hash = f"searched-{indexer}-{name}-{season}-{episode}"
searched = copy.deepcopy(
sorted_ranked_files[list(sorted_ranked_files.keys())[0]]
)
searched["infohash"] = hash
searched["data"]["tracker"] = indexer
sorted_ranked_files[hash] = searched
values = [
{
"debridService": config["debridService"],
"info_hash": sorted_ranked_files[torrent]["infohash"],
"name": name,
"season": season,
"episode": episode,
"tracker": sorted_ranked_files[torrent]["data"]["tracker"]
.split("|")[0]
.lower(),
"data": orjson.dumps(sorted_ranked_files[torrent]).decode("utf-8"),
"timestamp": time.time(),
}
for torrent in sorted_ranked_files
]
query = f"""
INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}
INTO cache (debridService, info_hash, name, season, episode, tracker, data, timestamp)
VALUES (:debridService, :info_hash, :name, :season, :episode, :tracker, :data, :timestamp)
{' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}
"""
await database.execute_many(query, values)