Spaces:
Runtime error
Runtime error
| # PATH: bot/telegram/media.py | |
| import os | |
| import tempfile | |
| from typing import Callable, Awaitable, Optional, Tuple | |
| from hydrogram import Client | |
| from hydrogram.types import Message | |
| ProgressCB = Callable[[int, int], Awaitable[None]] | |
| async def download_to_temp( | |
| app: Client, | |
| m: Message, | |
| progress_cb: Optional[ProgressCB] = None, | |
| ) -> Tuple[str, int, str]: | |
| """ | |
| Download telegram video/document to a temp path. | |
| IMPORTANT: | |
| - mkstemp creates an EMPTY FILE | |
| - some libs will NOT overwrite it and will save to a different path | |
| - so we delete the empty file first, and we MUST capture the returned path | |
| Returns: (path, size, original_file_name) | |
| """ | |
| media = m.video or m.document | |
| if not media: | |
| return "", 0, "" | |
| file_name = getattr(media, "file_name", None) or "video.mp4" | |
| suffix = "." + file_name.rsplit(".", 1)[1] if "." in file_name else "" | |
| fd, path = tempfile.mkstemp(prefix="studytube_", suffix=suffix) | |
| os.close(fd) | |
| # ✅ Critical: remove the empty file so download_media can safely create it | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| except Exception: | |
| pass | |
| saved_path = "" | |
| try: | |
| # ✅ Critical: capture returned path (some libs save elsewhere) | |
| saved_path = await app.download_media(message=m, file_name=path, progress=progress_cb) | |
| except TypeError: | |
| # older signature | |
| saved_path = await app.download_media(message=m, file_name=path) | |
| real_path = saved_path if isinstance(saved_path, str) and saved_path else path | |
| if not os.path.exists(real_path): | |
| return "", 0, file_name | |
| try: | |
| size = int(os.path.getsize(real_path)) | |
| except Exception: | |
| size = 0 | |
| expected = int(getattr(media, "file_size", 0) or 0) | |
| # If telegram didn't provide expected size, still reject 0-byte downloads | |
| if size <= 0: | |
| try: | |
| os.remove(real_path) | |
| except Exception: | |
| pass | |
| raise RuntimeError("download_zero_bytes") | |
| if expected and expected != size: | |
| try: | |
| os.remove(real_path) | |
| except Exception: | |
| pass | |
| raise RuntimeError(f"download_size_mismatch expected={expected} got={size}") | |
| return real_path, size, file_name |