#!/usr/bin/env python3 # coding: utf-8 # ytdlbot - __init__.py.py import logging import pathlib import re import shutil import tempfile import time import uuid from http.cookiejar import MozillaCookieJar from urllib.parse import quote_plus, urlparse import ffmpeg def sizeof_fmt(num: int, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) def timeof_fmt(seconds: int | float): periods = [("d", 86400), ("h", 3600), ("m", 60), ("s", 1)] result = "" for period_name, period_seconds in periods: if seconds >= period_seconds: period_value, seconds = divmod(seconds, period_seconds) result += f"{int(period_value)}{period_name}" return result def is_youtube(url: str) -> bool: try: if not url or not isinstance(url, str): return False parsed = urlparse(url) return parsed.netloc.lower() in {'youtube.com', 'www.youtube.com', 'youtu.be'} except Exception: return False def adjust_formats(formats): # high: best quality 1080P, 2K, 4K, 8K # medium: 720P # low: 480P mapping = {"high": [], "medium": [720], "low": [480]} # formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]") # formats.insert(1, f"bestvideo[vcodec^=avc][height={m}]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best") # # if settings[2] == "audio": # formats.insert(0, "bestaudio[ext=m4a]") # # if settings[2] == "document": # formats.insert(0, None) def current_time(ts=None): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) def clean_tempfile(): patterns = ["ytdl*", "spdl*", "leech*", "direct*"] temp_path = pathlib.Path(TMPFILE_PATH or tempfile.gettempdir()) for pattern in patterns: for item in temp_path.glob(pattern): if time.time() - item.stat().st_ctime > 3600: shutil.rmtree(item, ignore_errors=True) def shorten_url(url, CAPTION_URL_LENGTH_LIMIT): # Shortens a URL by cutting it to a specified length. shortened_url = url[: CAPTION_URL_LENGTH_LIMIT - 3] + "..." return shortened_url def extract_filename(response): try: content_disposition = response.headers.get("content-disposition") if content_disposition: filename = re.findall("filename=(.+)", content_disposition)[0] return filename except (TypeError, IndexError): pass # Handle potential exceptions during extraction # Fallback if Content-Disposition header is missing filename = response.url.rsplit("/")[-1] if not filename: filename = quote_plus(response.url) return filename def extract_url_and_name(message_text): # Regular expression to match the URL url_pattern = r"(https?://[^\s]+)" # Regular expression to match the new name after '-n' name_pattern = r"-n\s+(.+)$" # Find the URL in the message_text url_match = re.search(url_pattern, message_text) url = url_match.group(0) if url_match else None # Find the new name in the message_text name_match = re.search(name_pattern, message_text) new_name = name_match.group(1) if name_match else None return url, new_name