#!/usr/bin/env python3 # coding: utf-8 # ytdlbot - types.py import hashlib import json import logging import re import tempfile import uuid from abc import ABC, abstractmethod from io import StringIO from pathlib import Path from types import SimpleNamespace from typing import final import ffmpeg import filetype from pyrogram import enums, types from tqdm import tqdm from config import TG_NORMAL_MAX_SIZE, Types from database import Redis from database.model import ( check_quota, get_format_settings, get_free_quota, get_paid_quota, get_quality_settings, use_quota, ) from engine.helper import debounce, sizeof_fmt def generate_input_media(file_paths: list, cap: str) -> list: input_media = [] for path in file_paths: mime = filetype.guess_mime(path) if "video" in mime: input_media.append(types.InputMediaVideo(media=path)) elif "image" in mime: input_media.append(types.InputMediaPhoto(media=path)) elif "audio" in mime: input_media.append(types.InputMediaAudio(media=path)) else: input_media.append(types.InputMediaDocument(media=path)) input_media[0].caption = cap return input_media class BaseDownloader(ABC): def __init__(self, client: Types.Client, bot_msg: Types.Message, url: str): self._client = client self._url = url # chat id is the same for private chat self._chat_id = self._from_user = bot_msg.chat.id if bot_msg.chat.type == enums.ChatType.GROUP or bot_msg.chat.type == enums.ChatType.SUPERGROUP: # if in group, we need to find out who send the message self._from_user = bot_msg.reply_to_message.from_user.id self._id = bot_msg.id self._tempdir = tempfile.TemporaryDirectory(prefix="ytdl-") self._bot_msg: Types.Message = bot_msg self._redis = Redis() self._quality = get_quality_settings(self._chat_id) self._format = get_format_settings(self._chat_id) def __del__(self): self._tempdir.cleanup() def _record_usage(self): free, paid = get_free_quota(self._from_user), get_paid_quota(self._from_user) logging.info("User %s has %s free and %s paid quota", self._from_user, free, paid) if free + paid < 0: raise Exception("Usage limit exceeded") use_quota(self._from_user) @staticmethod def __remove_bash_color(text): return re.sub(r"\u001b|\[0;94m|\u001b\[0m|\[0;32m|\[0m|\[0;33m", "", text) @staticmethod def __tqdm_progress(desc, total, finished, speed="", eta=""): def more(title, initial): if initial: return f"{title} {initial}" else: return "" f = StringIO() tqdm( total=total, initial=finished, file=f, ascii=False, unit_scale=True, ncols=30, bar_format="{l_bar}{bar} |{n_fmt}/{total_fmt} ", ) raw_output = f.getvalue() tqdm_output = raw_output.split("|") progress = f"`[{tqdm_output[1]}]`" detail = tqdm_output[2].replace("[A", "") text = f""" {desc} {progress} {detail} {more("Speed:", speed)} {more("ETA:", eta)} """ f.close() return text def download_hook(self, d: dict): if d["status"] == "downloading": downloaded = d.get("downloaded_bytes", 0) total = d.get("total_bytes") or d.get("total_bytes_estimate", 0) if total > TG_NORMAL_MAX_SIZE: msg = f"Your download file size {sizeof_fmt(total)} is too large for Telegram." raise Exception(msg) # percent = remove_bash_color(d.get("_percent_str", "N/A")) speed = self.__remove_bash_color(d.get("_speed_str", "N/A")) eta = self.__remove_bash_color(d.get("_eta_str", d.get("eta"))) text = self.__tqdm_progress("Downloading...", total, downloaded, speed, eta) self.edit_text(text) def upload_hook(self, current, total): text = self.__tqdm_progress("Uploading...", total, current) self.edit_text(text) @debounce(5) def edit_text(self, text: str): self._bot_msg.edit_text(text) @abstractmethod def _setup_formats(self) -> list | None: pass @abstractmethod def _download(self, formats) -> list: # responsible for get format and download it pass @property def _methods(self): return { "document": self._client.send_document, "audio": self._client.send_audio, "video": self._client.send_video, "animation": self._client.send_animation, "photo": self._client.send_photo, } def send_something(self, *, chat_id, files, _type, caption=None, thumb=None, **kwargs): self._client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_DOCUMENT) is_cache = kwargs.pop("cache", False) if len(files) > 1 and is_cache == False: inputs = generate_input_media(files, caption) return self._client.send_media_group(chat_id, inputs)[0] else: file_arg_name = None if _type == "photo": file_arg_name = "photo" elif _type == "video": file_arg_name = "video" elif _type == "animation": file_arg_name = "animation" elif _type == "document": file_arg_name = "document" elif _type == "audio": file_arg_name = "audio" else: logging.error("Unknown _type encountered: %s", _type) # You might want to raise an error or return None here return None send_args = { "chat_id": chat_id, file_arg_name: files[0], "caption": caption, "progress": self.upload_hook, **kwargs, } if _type in ["video", "animation", "document", "audio"] and thumb is not None: send_args["thumb"] = thumb return self._methods[_type](**send_args) def get_metadata(self): video_path = list(Path(self._tempdir.name).glob("*"))[0] filename = Path(video_path).name width = height = duration = 0 try: video_streams = ffmpeg.probe(video_path, select_streams="v") for item in video_streams.get("streams", []): height = item["height"] width = item["width"] duration = int(float(video_streams["format"]["duration"])) except Exception as e: logging.error("Error while getting metadata: %s", e) try: thumb = Path(video_path).parent.joinpath(f"{uuid.uuid4().hex}-thunmnail.png").as_posix() # A thumbnail's width and height should not exceed 320 pixels. ffmpeg.input(video_path, ss=duration / 2).filter( "scale", "if(gt(iw,ih),300,-1)", # If width > height, scale width to 320 and height auto "if(gt(iw,ih),-1,300)", ).output(thumb, vframes=1).run() except ffmpeg._run.Error: thumb = None caption = f"{self._url}\n{filename}\n\nResolution: {width}x{height}\nDuration: {duration} seconds" return dict(height=height, width=width, duration=duration, thumb=thumb, caption=caption) def _upload(self, files=None, meta=None): if files is None: files = list(Path(self._tempdir.name).glob("*")) if meta is None: meta = self.get_metadata() success = SimpleNamespace(document=None, video=None, audio=None, animation=None, photo=None) if self._format == "document": logging.info("Sending as document for %s", self._url) success = self.send_something( chat_id=self._chat_id, files=files, _type="document", thumb=meta.get("thumb"), force_document=True, caption=meta.get("caption"), ) elif self._format == "photo": logging.info("Sending as photo for %s", self._url) success = self.send_something( chat_id=self._chat_id, files=files, _type="photo", caption=meta.get("caption"), ) elif self._format == "audio": logging.info("Sending as audio for %s", self._url) success = self.send_something( chat_id=self._chat_id, files=files, _type="audio", caption=meta.get("caption"), ) elif self._format == "video": logging.info("Sending as video for %s", self._url) attempt_methods = ["video", "animation", "audio", "photo"] video_meta = meta.copy() upload_successful = False # Flag to track if any method succeeded for method in attempt_methods: current_meta = video_meta.copy() if method == "photo": current_meta.pop("thumb", None) current_meta.pop("duration", None) current_meta.pop("height", None) current_meta.pop("width", None) elif method == "audio": current_meta.pop("height", None) current_meta.pop("width", None) try: success_obj = self.send_something( chat_id=self._chat_id, files=files, _type=method, **current_meta ) if method == "video": success = success_obj elif method == "animation": success = success_obj elif method == "photo": success = success_obj elif method == "audio": success = success_obj upload_successful = True # Set flag to True on success break except Exception as e: logging.error("Retry to send as %s, error: %s", method, e) # Check the flag after the loop if not upload_successful: raise ValueError("ERROR: For direct links, try again with `/direct`.") else: logging.error("Unknown upload format settings for %s", self._format) return video_key = self._calc_video_key() obj = success.document or success.video or success.audio or success.animation or success.photo mapping = { "file_id": json.dumps([getattr(obj, "file_id", None)]), "meta": json.dumps({k: v for k, v in meta.items() if k != "thumb"}, ensure_ascii=False), } self._redis.add_cache(video_key, mapping) # change progress bar to done self._bot_msg.edit_text("✅ Success") return success def _get_video_cache(self): return self._redis.get_cache(self._calc_video_key()) def _calc_video_key(self): h = hashlib.md5() h.update((self._url + self._quality + self._format).encode()) key = h.hexdigest() return key @final def start(self): check_quota(self._from_user) if cache := self._get_video_cache(): logging.info("Cache hit for %s", self._url) meta, file_id = json.loads(cache["meta"]), json.loads(cache["file_id"]) meta["cache"] = True self._upload(file_id, meta) else: self._start() self._record_usage() @abstractmethod def _start(self): pass