Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| # coding: utf-8 | |
| # ytdlbot - helper.py | |
| import functools | |
| import logging | |
| import os | |
| import pathlib | |
| import re | |
| import subprocess | |
| import threading | |
| import time | |
| from http import HTTPStatus | |
| from io import StringIO | |
| import ffmpeg | |
| import ffpb | |
| import filetype | |
| import pyrogram | |
| import requests | |
| import yt_dlp | |
| from bs4 import BeautifulSoup | |
| from pyrogram import types | |
| from tqdm import tqdm | |
| from config import ( | |
| AUDIO_FORMAT, | |
| CAPTION_URL_LENGTH_LIMIT, | |
| ENABLE_ARIA2, | |
| TG_NORMAL_MAX_SIZE, | |
| ) | |
| from utils import shorten_url, sizeof_fmt | |
| def debounce(wait_seconds): | |
| """ | |
| Thread-safe debounce decorator for functions that take a message with chat.id and msg.id attributes. | |
| The function will only be called if it hasn't been called with the same chat.id and msg.id in the last 'wait_seconds'. | |
| """ | |
| def decorator(func): | |
| last_called = {} | |
| lock = threading.Lock() | |
| def wrapper(*args, **kwargs): | |
| nonlocal last_called | |
| now = time.time() | |
| # Assuming the first argument is the message object with chat.id and msg.id | |
| bot_msg = args[0]._bot_msg | |
| key = (bot_msg.chat.id, bot_msg.id) | |
| with lock: | |
| if key not in last_called or now - last_called[key] >= wait_seconds: | |
| last_called[key] = now | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def get_caption(url, video_path): | |
| if isinstance(video_path, pathlib.Path): | |
| meta = get_metadata(video_path) | |
| file_name = video_path.name | |
| file_size = sizeof_fmt(os.stat(video_path).st_size) | |
| else: | |
| file_name = getattr(video_path, "file_name", "") | |
| file_size = sizeof_fmt(getattr(video_path, "file_size", (2 << 2) + ((2 << 2) + 1) + (2 << 5))) | |
| meta = dict( | |
| width=getattr(video_path, "width", 0), | |
| height=getattr(video_path, "height", 0), | |
| duration=getattr(video_path, "duration", 0), | |
| thumb=getattr(video_path, "thumb", None), | |
| ) | |
| # Shorten the URL if necessary | |
| try: | |
| if len(url) > CAPTION_URL_LENGTH_LIMIT: | |
| url_for_cap = shorten_url(url, CAPTION_URL_LENGTH_LIMIT) | |
| else: | |
| url_for_cap = url | |
| except Exception as e: | |
| logging.warning(f"Error shortening URL: {e}") | |
| url_for_cap = url | |
| cap = ( | |
| f"{file_name}\n\n{url_for_cap}\n\nInfo: {meta['width']}x{meta['height']} {file_size}\t" f"{meta['duration']}s\n" | |
| ) | |
| return cap | |
| def convert_audio_format(video_paths: list, bm): | |
| # 1. file is audio, default format | |
| # 2. file is video, default format | |
| # 3. non default format | |
| for path in video_paths: | |
| streams = ffmpeg.probe(path)["streams"] | |
| if AUDIO_FORMAT is None and len(streams) == 1 and streams[0]["codec_type"] == "audio": | |
| logging.info("%s is audio, default format, no need to convert", path) | |
| elif AUDIO_FORMAT is None and len(streams) >= 2: | |
| logging.info("%s is video, default format, need to extract audio", path) | |
| audio_stream = {"codec_name": "m4a"} | |
| for stream in streams: | |
| if stream["codec_type"] == "audio": | |
| audio_stream = stream | |
| break | |
| ext = audio_stream["codec_name"] | |
| new_path = path.with_suffix(f".{ext}") | |
| run_ffmpeg_progressbar(["ffmpeg", "-y", "-i", path, "-vn", "-acodec", "copy", new_path], bm) | |
| path.unlink() | |
| index = video_paths.index(path) | |
| video_paths[index] = new_path | |
| else: | |
| logging.info("Not default format, converting %s to %s", path, AUDIO_FORMAT) | |
| new_path = path.with_suffix(f".{AUDIO_FORMAT}") | |
| run_ffmpeg_progressbar(["ffmpeg", "-y", "-i", path, new_path], bm) | |
| path.unlink() | |
| index = video_paths.index(path) | |
| video_paths[index] = new_path | |
| def split_large_video(video_paths: list): | |
| original_video = None | |
| split = False | |
| for original_video in video_paths: | |
| size = os.stat(original_video).st_size | |
| if size > TG_NORMAL_MAX_SIZE: | |
| split = True | |
| logging.warning("file is too large %s, splitting...", size) | |
| subprocess.check_output(f"sh split-video.sh {original_video} {TG_NORMAL_MAX_SIZE * 0.95} ".split()) | |
| os.remove(original_video) | |
| if split and original_video: | |
| return [i for i in pathlib.Path(original_video).parent.glob("*")] | |