Spaces:
Runtime error
Runtime error
| # PATH: bot/handlers.py | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| import httpx | |
| from hydrogram import Client, filters | |
| from hydrogram.types import CallbackQuery, Message | |
| from bot.config import Auth, Telegram, Workers | |
| from bot.core.progress import SpeedETA, human_bytes, human_eta | |
| from bot.core.settings import Settings | |
| from bot.core.uptime import uptime_text | |
| from bot.integrations.auth import allow_user, disallow_user, get_stats, is_allowed | |
| from bot.integrations.cf_worker1 import profile_add, profile_check_auth, profile_delete | |
| from bot.integrations.cf_worker2 import ( | |
| get_default_profile, | |
| list_profiles, | |
| record_upload, | |
| set_default_profile, | |
| ) | |
| from bot.telegram.files import cleanup_file | |
| from bot.telegram.media import download_to_temp | |
| from bot.telegram.replies import safe_edit, safe_reply | |
| from bot.ui.callbacks import ( | |
| AUTH_CI, | |
| AUTH_JSON, | |
| BACK, | |
| CANCEL, | |
| MENU_AUTH, | |
| MENU_HELP, | |
| MENU_PROFILES, | |
| MENU_SPEED, | |
| NAME_CAPTION, | |
| NAME_CUSTOM, | |
| NAME_ORIGINAL, | |
| UP_CANCEL, | |
| UP_DEL, | |
| UP_EDIT, | |
| UP_GO, | |
| UP_PRIV, | |
| ) | |
| from bot.ui.keyboards import ( | |
| auth_menu_keyboard, | |
| filename_keyboard, | |
| main_menu_keyboard, | |
| profiles_keyboard, | |
| upload_confirm_keyboard, | |
| ) | |
| from bot.ui.parse import parse_cb | |
| from bot.ui.texts import CANCELLED, HELP_TEXT, NEED_AUTH, NOT_ALLOWED, OWNER_ONLY, START_TEXT | |
| from bot.youtube.link_parser import parse_telegram_link | |
| ChatRef = Union[int, str] | |
| # ---- In-memory state ---- | |
| class PendingUpload: | |
| request_msg: Message # where user triggered command / sent media | |
| src_msg: Message # message containing media to download (could be from user session) | |
| downloader: Client # which client downloads src_msg media | |
| file_name: str | |
| size_hint: int = 0 | |
| # candidates | |
| caption_raw: str = "" | |
| title_from_filename: str = "" | |
| title_from_caption: str = "" | |
| desc_from_caption: str = "" | |
| # chosen final | |
| title: str = "" | |
| description: str = "" | |
| privacy: str = "private" | |
| status_msg: Optional[Message] = None | |
| via_link: bool = False | |
| class EditState: | |
| title: str | |
| description: str | |
| privacy: str | |
| _PENDING_UPLOAD: Dict[int, PendingUpload] = {} | |
| _AWAIT_EDIT: Dict[int, EditState] = {} | |
| _PENDING_DELETE: Dict[int, str] = {} | |
| _IN_PROGRESS: Dict[int, bool] = {} | |
| _UPLOAD_TASK: Dict[int, asyncio.Task] = {} | |
| _BATCH_TASK: Dict[int, asyncio.Task] = {} | |
| # ✅ Auth input state (fixes: auth buttons "no response") | |
| # value: "json" or "ci" | |
| _AWAIT_AUTH: Dict[int, str] = {} | |
| # Batch range cap (Settings may not define it yet) | |
| _BATCH_MAX_RANGE = int(getattr(Settings, "BATCH_MAX_RANGE", 80) or 80) | |
| def _norm_ids(xs: Any) -> set[int]: | |
| out: set[int] = set() | |
| if not xs: | |
| return out | |
| if isinstance(xs, (int, float, str)): | |
| xs = [xs] | |
| for v in list(xs): | |
| try: | |
| s = str(v).strip() | |
| if not s: | |
| continue | |
| out.add(int(float(s))) | |
| except Exception: | |
| continue | |
| return out | |
| def _is_admin_or_owner(uid: int) -> bool: | |
| # Accept both Auth.* and Telegram.* and handle str/int mixes safely. | |
| owners = _norm_ids(getattr(Auth, "OWNERS", None)) | _norm_ids(getattr(Telegram, "OWNER_ID", None)) | |
| admins = _norm_ids(getattr(Auth, "ADMINS", None)) | _norm_ids(getattr(Telegram, "ADMIN_IDS", None)) | |
| return uid in owners or uid in admins | |
| async def _ensure_allowed_uid(uid: int, reply_target: Message) -> bool: | |
| """ | |
| IMPORTANT: | |
| - For Message handlers: uid = m.from_user.id | |
| - For CallbackQuery handlers: uid = q.from_user.id (NOT q.message.from_user.id) | |
| """ | |
| if _is_admin_or_owner(uid): | |
| return True | |
| ok = await is_allowed(uid) | |
| if not ok: | |
| # try edit first (callbacks), fallback to reply | |
| if not await safe_edit(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard()): | |
| await safe_reply(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard()) | |
| return ok | |
| def _media_and_filename(m: Message) -> Tuple[Optional[Any], str, int]: | |
| if m.video: | |
| return m.video, (m.video.file_name or "video.mp4"), int(m.video.file_size or 0) | |
| if m.document: | |
| return m.document, (m.document.file_name or "file.bin"), int(m.document.file_size or 0) | |
| return None, "", 0 | |
| def _caption_text(m: Message) -> str: | |
| return (m.caption or "").strip() | |
| def _filename_title(file_name: str) -> str: | |
| base = os.path.splitext(file_name or "")[0].strip() | |
| return (base or "Untitled")[: Settings.MAX_TITLE] | |
| def _caption_title_desc(caption: str) -> Tuple[str, str]: | |
| caption = (caption or "").strip() | |
| if not caption: | |
| return "", "" | |
| parts = caption.splitlines() | |
| title = parts[0].strip()[: Settings.MAX_TITLE] | |
| desc = "\n".join([p.strip() for p in parts[1:]]).strip()[: Settings.MAX_DESC] | |
| return title, desc | |
| def _render_choice_prompt(p: PendingUpload) -> str: | |
| cap = p.caption_raw.strip() | |
| cap_show = cap if len(cap) <= 900 else (cap[:900] + "…") | |
| return ( | |
| "📝 *Choose YouTube title/description source*\n\n" | |
| f"📄 *Filename*\n`{p.file_name}`\n" | |
| f"→ Title: *{p.title_from_filename or '—'}*\n\n" | |
| f"📝 *Caption*\n{cap_show if cap_show else '—'}\n" | |
| f"→ Title: *{p.title_from_caption or '—'}*\n\n" | |
| "Pick one option below:" | |
| ) | |
| def _render_preview(p: PendingUpload) -> str: | |
| cap = p.caption_raw.strip() | |
| cap_show = cap if len(cap) <= 400 else (cap[:400] + "…") | |
| desc = (p.description or "").strip() | |
| desc_show = desc if len(desc) <= 800 else (desc[:800] + "…") | |
| size_line = human_bytes(p.size_hint) if p.size_hint else "—" | |
| return ( | |
| "📦 *Ready to upload*\n\n" | |
| f"*File:* `{p.file_name}`\n" | |
| f"*Size:* `{size_line}`\n" | |
| f"*Privacy:* `{p.privacy}`\n\n" | |
| f"*Caption:* {cap_show if cap_show else '—'}\n\n" | |
| f"*Title:* {p.title or '—'}\n" | |
| f"*Description:* {desc_show if desc_show else '—'}" | |
| ) | |
| async def _start_pending_upload( | |
| *, | |
| uid: int, | |
| request_msg: Message, # where to show preview/progress | |
| src_msg: Message, # where media actually is | |
| downloader: Client, | |
| via_link: bool = False, | |
| ) -> None: | |
| media, file_name, size = _media_and_filename(src_msg) | |
| if not media: | |
| await safe_reply(request_msg, "❌ No video/document found in that message.") | |
| return | |
| caption = _caption_text(src_msg) | |
| t_fn = _filename_title(file_name) | |
| t_cap, d_cap = _caption_title_desc(caption) | |
| # Default choice: | |
| # - If caption exists, default title+desc from caption | |
| # - Else filename title | |
| if caption and t_cap: | |
| chosen_title = t_cap | |
| chosen_desc = d_cap | |
| else: | |
| chosen_title = t_fn | |
| chosen_desc = caption[: Settings.MAX_DESC] if caption else "" | |
| p = PendingUpload( | |
| request_msg=request_msg, | |
| src_msg=src_msg, | |
| downloader=downloader, | |
| file_name=file_name, | |
| size_hint=int(size or 0), | |
| caption_raw=caption, | |
| title_from_filename=t_fn, | |
| title_from_caption=t_cap, | |
| desc_from_caption=d_cap, | |
| title=chosen_title, | |
| description=chosen_desc, | |
| privacy="private", | |
| via_link=via_link, | |
| status_msg=None, | |
| ) | |
| _PENDING_UPLOAD[uid] = p | |
| # Create ONE status message in the request chat. Everything edits THIS message. | |
| if caption: | |
| st = await safe_reply(request_msg, _render_choice_prompt(p), reply_markup=filename_keyboard()) | |
| else: | |
| st = await safe_reply(request_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| p.status_msg = st | |
| _PENDING_UPLOAD[uid] = p | |
| def _as_int(v: Any) -> int: | |
| if v is None: | |
| return 0 | |
| if isinstance(v, bool): | |
| return int(v) | |
| if isinstance(v, (int, float)): | |
| return int(v) | |
| if isinstance(v, str): | |
| try: | |
| return int(float(v.strip())) | |
| except Exception: | |
| return 0 | |
| if isinstance(v, dict): | |
| for k in ("current", "sent", "bytes", "done", "downloaded", "uploaded", "processed", "offset"): | |
| if k in v: | |
| try: | |
| return int(float(v.get(k) or 0)) | |
| except Exception: | |
| continue | |
| try: | |
| return int(float(next(iter(v.values())))) | |
| except Exception: | |
| return 0 | |
| return 0 | |
| async def _get_public_ip() -> Optional[str]: | |
| # No extra dependency / no extra file needed | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as c: | |
| r = await c.get("https://api.ipify.org", params={"format": "text"}) | |
| if r.status_code < 400: | |
| ip = (r.text or "").strip() | |
| if ip and len(ip) <= 60: | |
| return ip | |
| except Exception: | |
| pass | |
| return None | |
| async def _run_upload(uid: int) -> Dict[str, Any]: | |
| """ | |
| Runs one upload for uid, editing p.status_msg. | |
| Returns: {"ok": True, "url": "..."} or {"ok": False, "err": "..."} | |
| """ | |
| if _IN_PROGRESS.get(uid): | |
| return {"ok": False, "err": "already_running"} | |
| pending = _PENDING_UPLOAD.get(uid) | |
| if not pending or not pending.status_msg: | |
| return {"ok": False, "err": "no_pending"} | |
| _IN_PROGRESS[uid] = True | |
| st = pending.status_msg | |
| file_path: Optional[str] = None | |
| try: | |
| # Ensure default profile exists | |
| prof = await get_default_profile(uid) | |
| if not (isinstance(prof, dict) and prof.get("ok") and prof.get("profile_id") and prof.get("access_token")): | |
| await safe_edit(st, NEED_AUTH, reply_markup=main_menu_keyboard()) | |
| return {"ok": False, "err": "not_authorized"} | |
| prof_id = str(prof["profile_id"]) | |
| access_token = str(prof["access_token"]) | |
| overall_start = time.time() | |
| # ---- Download with live speed ---- | |
| await safe_edit(st, "⬇️ Downloading…") | |
| dl_speed = SpeedETA() | |
| dl_last_ui = 0.0 | |
| dl_start = time.time() | |
| async def dl_progress(cur: Any, total: Any) -> None: | |
| nonlocal dl_last_ui | |
| now = time.time() | |
| if now - dl_last_ui < 0.8: | |
| return | |
| dl_last_ui = now | |
| cur_i = _as_int(cur) | |
| total_i = _as_int(total) | |
| if total_i <= 0: | |
| total_i = max(cur_i, 1) | |
| rate = dl_speed.update(cur_i, total_i) | |
| txt = ( | |
| "⬇️ *Downloading…*\n\n" | |
| f"`{human_bytes(cur_i)} / {human_bytes(total_i)}`\n" | |
| f"*Speed:* `{human_bytes(rate)}/s`\n" | |
| f"*ETA:* `{human_eta(dl_speed.eta_seconds)}`" | |
| ) | |
| await safe_edit(st, txt) | |
| file_path, _, _ = await download_to_temp(pending.downloader, pending.src_msg, progress_cb=dl_progress) | |
| dl_dur = max(0.001, time.time() - dl_start) | |
| try: | |
| file_bytes = int(os.path.getsize(file_path)) | |
| except Exception: | |
| file_bytes = 0 | |
| # ---- Upload with live speed ---- | |
| from bot.youtube.uploader import upload_video | |
| await safe_edit(st, "⬆️ Uploading…") | |
| ul_speed = SpeedETA() | |
| ul_last_ui = 0.0 | |
| ul_start = time.time() | |
| async def ul_progress(sent: Any, total: Any) -> None: | |
| nonlocal ul_last_ui | |
| now = time.time() | |
| if now - ul_last_ui < 0.8: | |
| return | |
| ul_last_ui = now | |
| sent_i = _as_int(sent) | |
| total_i = _as_int(total) | |
| if total_i <= 0: | |
| total_i = max(sent_i, 1) | |
| rate = ul_speed.update(sent_i, total_i) | |
| txt = ( | |
| "⬆️ *Uploading…*\n\n" | |
| f"`{human_bytes(sent_i)} / {human_bytes(total_i)}`\n" | |
| f"*Speed:* `{human_bytes(rate)}/s`\n" | |
| f"*ETA:* `{human_eta(ul_speed.eta_seconds)}`" | |
| ) | |
| await safe_edit(st, txt) | |
| up = await upload_video( | |
| access_token=access_token, | |
| file_path=file_path, | |
| title=pending.title, | |
| description=pending.description, | |
| privacy=pending.privacy, | |
| progress_cb=ul_progress, | |
| ) | |
| if not (isinstance(up, dict) and up.get("ok")): | |
| err = up.get("err", "upload_failed") if isinstance(up, dict) else "upload_failed" | |
| detail = up.get("detail") if isinstance(up, dict) else None | |
| msg = f"❌ Upload failed: `{err}`" | |
| if detail: | |
| msg += f"\n`{str(detail)[:280]}`" | |
| await safe_edit(st, msg, reply_markup=main_menu_keyboard()) | |
| return {"ok": False, "err": str(err), "detail": detail} | |
| url = str(up.get("url") or "") | |
| await record_upload(uid, prof_id) | |
| ul_dur = max(0.001, time.time() - ul_start) | |
| total_dur = max(0.001, time.time() - overall_start) | |
| dl_avg = (human_bytes(file_bytes / dl_dur) + "/s") if file_bytes and dl_dur else "—" | |
| ul_avg = (human_bytes(file_bytes / ul_dur) + "/s") if file_bytes and ul_dur else "—" | |
| done = "✅ *Uploaded!*" | |
| if url: | |
| done += f"\n\n{url}" | |
| done += ( | |
| f"\n\n*Download:* `{dl_dur:.1f}s` • avg `{dl_avg}`" | |
| f"\n*Upload:* `{ul_dur:.1f}s` • avg `{ul_avg}`" | |
| f"\n*Total:* `{total_dur:.1f}s`" | |
| ) | |
| await safe_edit(st, done, reply_markup=main_menu_keyboard()) | |
| return {"ok": True, "url": url} | |
| except asyncio.CancelledError: | |
| try: | |
| await safe_edit(st, CANCELLED, reply_markup=main_menu_keyboard()) | |
| except Exception: | |
| pass | |
| return {"ok": False, "err": "cancelled"} | |
| except Exception as e: | |
| await safe_edit(st, f"❌ Error: `{str(e)[:220]}`", reply_markup=main_menu_keyboard()) | |
| return {"ok": False, "err": "exception", "detail": str(e)} | |
| finally: | |
| _PENDING_UPLOAD.pop(uid, None) | |
| _AWAIT_EDIT.pop(uid, None) | |
| _PENDING_DELETE.pop(uid, None) | |
| _AWAIT_AUTH.pop(uid, None) | |
| if file_path: | |
| try: | |
| cleanup_file(file_path) | |
| except Exception: | |
| pass | |
| _IN_PROGRESS.pop(uid, None) | |
| _UPLOAD_TASK.pop(uid, None) | |
| _BATCH_RANGE_RE = re.compile(r"(.*?)/(\d+)\s*-\s*(\d+)\s*$") | |
| def _parse_link_or_range(line: str) -> Tuple[ChatRef, int, int]: | |
| """ | |
| Accepts: | |
| https://t.me/c/<chat>/<4012> | |
| https://t.me/c/<chat>/<4012-4046> | |
| https://t.me/<user>/<10-22> | |
| Returns (chat_ref, start, end) | |
| """ | |
| s = (line or "").strip() | |
| if not s: | |
| raise ValueError("empty") | |
| m = _BATCH_RANGE_RE.match(s) | |
| if m: | |
| base = m.group(1).strip() | |
| a = int(m.group(2)) | |
| b = int(m.group(3)) | |
| if a <= 0 or b <= 0: | |
| raise ValueError("bad_range") | |
| if a > b: | |
| a, b = b, a | |
| single = f"{base}/{a}" | |
| chat_ref, _mid = parse_telegram_link(single) | |
| return chat_ref, a, b | |
| chat_ref, mid = parse_telegram_link(s) | |
| return chat_ref, int(mid), int(mid) | |
| def _extract_client_secrets_from_json(obj: dict) -> Tuple[str, str]: | |
| """ | |
| Accepts client_secret.json formats: | |
| {"installed": {"client_id": "...", "client_secret": "..."}} | |
| {"web": {"client_id": "...", "client_secret": "..."}} | |
| {"client_id": "...", "client_secret": "..."} | |
| Returns (client_id, client_secret) or raises. | |
| """ | |
| if not isinstance(obj, dict): | |
| raise ValueError("json_not_object") | |
| for k in ("installed", "web"): | |
| v = obj.get(k) | |
| if isinstance(v, dict) and v.get("client_id") and v.get("client_secret"): | |
| return str(v["client_id"]).strip(), str(v["client_secret"]).strip() | |
| if obj.get("client_id") and obj.get("client_secret"): | |
| return str(obj["client_id"]).strip(), str(obj["client_secret"]).strip() | |
| raise ValueError("missing_client_id_or_secret") | |
| async def _handle_auth_json_input(app: Client, m: Message) -> bool: | |
| """ | |
| Returns True if message was consumed as auth input. | |
| """ | |
| if not m.from_user: | |
| return False | |
| uid = m.from_user.id | |
| if _AWAIT_AUTH.get(uid) != "json": | |
| return False | |
| if not await _ensure_allowed_uid(uid, m): | |
| return True | |
| # document .json | |
| if m.document: | |
| name = (m.document.file_name or "").lower() | |
| if not name.endswith(".json"): | |
| await safe_reply(m, "❌ Please send a `.json` file (Google client_secret.json). Or paste JSON text.") | |
| return True | |
| fd, path = tempfile.mkstemp(prefix="yt_auth_", suffix=".json") | |
| os.close(fd) | |
| try: | |
| await app.download_media(message=m, file_name=path) | |
| raw = "" | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| raw = f.read() | |
| except Exception: | |
| with open(path, "rb") as f: | |
| raw = f.read().decode("utf-8", errors="ignore") | |
| obj = json.loads(raw) | |
| client_id, client_secret = _extract_client_secrets_from_json(obj) | |
| out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main") | |
| if isinstance(out, dict) and out.get("ok"): | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_reply( | |
| m, | |
| "✅ Profile added.\n\nNow run /profiles → *Login* → *Set default*.", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| else: | |
| err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" | |
| await safe_reply(m, f"❌ Failed to add profile: `{err}`") | |
| return True | |
| except Exception as e: | |
| await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`") | |
| return True | |
| finally: | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| except Exception: | |
| pass | |
| # pasted JSON | |
| txt = (m.text or "").strip() | |
| if txt.startswith("{") and txt.endswith("}"): | |
| try: | |
| obj = json.loads(txt) | |
| client_id, client_secret = _extract_client_secrets_from_json(obj) | |
| out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main") | |
| if isinstance(out, dict) and out.get("ok"): | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_reply( | |
| m, | |
| "✅ Profile added.\n\nNow run /profiles → *Login* → *Set default*.", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| else: | |
| err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" | |
| await safe_reply(m, f"❌ Failed to add profile: `{err}`") | |
| except Exception as e: | |
| await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`") | |
| return True | |
| await safe_reply(m, "📄 Send the `.json` file or paste JSON text (client_secret.json).") | |
| return True | |
| async def _handle_auth_ci_input(m: Message) -> bool: | |
| """ | |
| Returns True if message was consumed as auth input. | |
| """ | |
| if not m.from_user: | |
| return False | |
| uid = m.from_user.id | |
| if _AWAIT_AUTH.get(uid) != "ci": | |
| return False | |
| if not await _ensure_allowed_uid(uid, m): | |
| return True | |
| txt = (m.text or "").strip() | |
| if not txt: | |
| await safe_reply(m, "Send:\n`<CLIENT_ID>`\n`<CLIENT_SECRET>`\n(Optional 3rd line: label)") | |
| return True | |
| lines = [ln.strip() for ln in txt.splitlines() if ln.strip()] | |
| if len(lines) == 1: | |
| # maybe space separated | |
| parts = [p for p in lines[0].split() if p.strip()] | |
| if len(parts) >= 2: | |
| client_id, client_secret = parts[0], parts[1] | |
| label = parts[2] if len(parts) >= 3 else "main" | |
| else: | |
| await safe_reply(m, "Send:\n`<CLIENT_ID>`\n`<CLIENT_SECRET>`\n(Optional 3rd line: label)") | |
| return True | |
| else: | |
| client_id = lines[0] | |
| client_secret = lines[1] if len(lines) >= 2 else "" | |
| label = lines[2] if len(lines) >= 3 else "main" | |
| out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label=label) | |
| if isinstance(out, dict) and out.get("ok"): | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_reply( | |
| m, | |
| f"✅ Profile added (label: `{label}`).\n\nNow run /profiles → *Login* → *Set default*.", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| else: | |
| err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" | |
| await safe_reply(m, f"❌ Failed to add profile: `{err}`") | |
| return True | |
| def setup_handlers(app: Client, user_app: Optional[Client]) -> None: | |
| # ---- basic ---- | |
| async def start_help_cmd(_, m: Message) -> None: | |
| if (m.text or "").strip().lower().startswith("/help"): | |
| await safe_reply(m, HELP_TEXT, reply_markup=main_menu_keyboard()) | |
| else: | |
| await safe_reply(m, START_TEXT, reply_markup=main_menu_keyboard()) | |
| async def speedtest_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| from bot.core.speedtest import net_download_test, net_upload_test | |
| st = await safe_reply(m, "⏱ Running speed test…") | |
| dl = await net_download_test() | |
| ul = await net_upload_test() | |
| ip = await _get_public_ip() | |
| dl_bps = float((dl or {}).get("bps", 0) or 0) | |
| ul_bps = float((ul or {}).get("bps", 0) or 0) | |
| msg = ( | |
| "📶 *Speed Test*\n\n" | |
| f"*Uptime:* `{uptime_text()}`\n" | |
| f"*Public IP:* `{ip or '—'}`\n\n" | |
| f"*Download:* `{human_bytes(dl_bps)}/s`\n" | |
| f"*Upload:* `{human_bytes(ul_bps)}/s`" | |
| ) | |
| if st: | |
| await safe_edit(st, msg, reply_markup=main_menu_keyboard()) | |
| else: | |
| await safe_reply(m, msg, reply_markup=main_menu_keyboard()) | |
| async def cancel_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| _AWAIT_AUTH.pop(uid, None) | |
| _AWAIT_EDIT.pop(uid, None) | |
| _PENDING_UPLOAD.pop(uid, None) | |
| _PENDING_DELETE.pop(uid, None) | |
| task = _UPLOAD_TASK.pop(uid, None) | |
| if task and not task.done(): | |
| task.cancel() | |
| btask = _BATCH_TASK.pop(uid, None) | |
| if btask and not btask.done(): | |
| btask.cancel() | |
| _IN_PROGRESS.pop(uid, None) | |
| await safe_reply(m, CANCELLED, reply_markup=main_menu_keyboard()) | |
| # ---- admin allowlist ---- | |
| async def allow_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| args = (m.text or "").split(maxsplit=1) | |
| if len(args) < 2: | |
| await safe_reply(m, "Usage: /allow <tg_id>") | |
| return | |
| try: | |
| tid = int(args[1].strip()) | |
| except Exception: | |
| await safe_reply(m, "Invalid tg_id") | |
| return | |
| await allow_user(tid) | |
| await safe_reply(m, f"✅ Allowed `{tid}`") | |
| async def disallow_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| args = (m.text or "").split(maxsplit=1) | |
| if len(args) < 2: | |
| await safe_reply(m, "Usage: /disallow <tg_id>") | |
| return | |
| try: | |
| tid = int(args[1].strip()) | |
| except Exception: | |
| await safe_reply(m, "Invalid tg_id") | |
| return | |
| await disallow_user(tid) | |
| await safe_reply(m, f"✅ Disallowed `{tid}`") | |
| async def stats_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| st = await get_stats() | |
| if not isinstance(st, dict): | |
| await safe_reply(m, "❌ stats failed") | |
| return | |
| await safe_reply( | |
| m, | |
| "📊 *Today stats*\n\n" | |
| f"Allowed users: `{st.get('allowed_users', 0)}`\n" | |
| f"Profiles: `{st.get('profiles', 0)}`\n" | |
| f"Uploads today: `{st.get('uploads_today', 0)}`", | |
| ) | |
| async def diag_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| w1 = Workers.WORKER1_URL or "—" | |
| w2 = Workers.WORKER2_URL or "—" | |
| await safe_reply( | |
| m, | |
| "🧪 *Diag*\n\n" | |
| f"Uptime: `{uptime_text()}`\n" | |
| f"WORKER1_URL: `{w1}`\n" | |
| f"WORKER2_URL: `{w2}`\n" | |
| f"Owners: `{len(_norm_ids(getattr(Auth, 'OWNERS', None)) | _norm_ids(getattr(Telegram, 'OWNER_ID', None)))}` " | |
| f"Admins: `{len(_norm_ids(getattr(Auth, 'ADMINS', None)) | _norm_ids(getattr(Telegram, 'ADMIN_IDS', None)))}`", | |
| ) | |
| # ---- auth/profile flow ---- | |
| async def auth_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_reply(m, "🔐 Add a profile:", reply_markup=auth_menu_keyboard()) | |
| async def profiles_cmd(_, m: Message) -> None: | |
| uid = m.from_user.id if m.from_user else 0 | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| data = await list_profiles(uid, only_connected=False) | |
| if not (isinstance(data, dict) and data.get("ok")): | |
| await safe_reply(m, "❌ Failed to list profiles.") | |
| return | |
| profiles = data.get("profiles") or [] | |
| default_id = data.get("default_profile_id") or "" | |
| await safe_reply(m, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id)) | |
| # ---- upload from DM media / OR auth-json doc handling ---- | |
| async def media_in_dm(_, m: Message) -> None: | |
| if not m.from_user: | |
| return | |
| uid = m.from_user.id | |
| # ✅ If we're waiting for auth JSON, consume json file instead of treating it as "upload media" | |
| if await _handle_auth_json_input(app, m): | |
| return | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| if _IN_PROGRESS.get(uid): | |
| await safe_reply(m, "⏳ Upload already running. Use /cancel to stop.") | |
| return | |
| await _start_pending_upload(uid=uid, request_msg=m, src_msg=m, downloader=app, via_link=False) | |
| # ---- upload from link (admin/owner) ---- | |
| async def archive_cmd(_, m: Message) -> None: | |
| if not m.from_user: | |
| return | |
| uid = m.from_user.id | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| if user_app is None: | |
| await safe_reply(m, "❌ This mode requires user session (user_app missing).") | |
| return | |
| args = (m.text or "").split(maxsplit=1) | |
| if len(args) < 2: | |
| await safe_reply(m, "Usage: /archive <t.me message link>") | |
| return | |
| st = await safe_reply(m, "🔎 Fetching message…") | |
| if not st: | |
| return | |
| try: | |
| chat_ref, msg_id = parse_telegram_link(args[1].strip()) | |
| except Exception as e: | |
| await safe_edit(st, f"Bad link: `{str(e)[:160]}`") | |
| return | |
| try: | |
| src = await user_app.get_messages(chat_ref, msg_id) | |
| except Exception as e: | |
| await safe_edit(st, f"❌ Fetch failed: `{str(e)[:180]}`") | |
| return | |
| await safe_edit(st, "✅ Message fetched. Preparing preview…") | |
| await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True) | |
| # ---- batch mode ---- | |
| async def batch_cmd(_, m: Message) -> None: | |
| if not m.from_user: | |
| return | |
| uid = m.from_user.id | |
| if not await _ensure_allowed_uid(uid, m): | |
| return | |
| if not _is_admin_or_owner(uid): | |
| await safe_reply(m, OWNER_ONLY) | |
| return | |
| if user_app is None: | |
| await safe_reply(m, "❌ Batch mode is not configured (user session missing).") | |
| return | |
| if _BATCH_TASK.get(uid) and not _BATCH_TASK[uid].done(): | |
| await safe_reply(m, "⏳ A batch is already running. Use /cancel to stop it.") | |
| return | |
| raw = (m.text or "") | |
| args = raw.split(maxsplit=1) | |
| if len(args) < 2: | |
| await safe_reply(m, "Send: /batch <t.me links> (one per line)\nOptional: /batch --continue <links...>") | |
| return | |
| payload = args[1].strip() | |
| continue_on_fail = ("--continue" in payload) or ("-c" in payload) | |
| payload = payload.replace("--continue", "").replace("-c", "").strip() | |
| lines = [ln.strip() for ln in payload.splitlines() if ln.strip()] | |
| if not lines: | |
| await safe_reply(m, "No links found. Put one t.me link per line after /batch") | |
| return | |
| items: List[Tuple[ChatRef, int]] = [] | |
| for ln in lines: | |
| try: | |
| chat_ref, a, b = _parse_link_or_range(ln) | |
| except Exception: | |
| await safe_reply(m, f"❌ Bad link/range: `{ln}`") | |
| if not continue_on_fail: | |
| return | |
| continue | |
| count = (b - a + 1) | |
| if count > _BATCH_MAX_RANGE: | |
| await safe_reply(m, f"❌ Range too large ({count}). Max is {_BATCH_MAX_RANGE}.") | |
| if not continue_on_fail: | |
| return | |
| continue | |
| for mid in range(a, b + 1): | |
| items.append((chat_ref, mid)) | |
| if not items: | |
| await safe_reply(m, "No valid items to process.") | |
| return | |
| await safe_reply( | |
| m, | |
| f"🧾 Batch starting: {len(items)} item(s).\nMode: `{'continue' if continue_on_fail else 'stop_on_fail'}`", | |
| ) | |
| async def runner() -> None: | |
| batch_start = time.time() | |
| total = len(items) | |
| for i, (chat_ref, msg_id) in enumerate(items, 1): | |
| st = await safe_reply(m, f"🔎 Batch {i}/{total}: fetching message `{msg_id}`…") | |
| if not st: | |
| if not continue_on_fail: | |
| break | |
| continue | |
| try: | |
| src = await user_app.get_messages(chat_ref, msg_id) | |
| except Exception as e: | |
| await safe_edit(st, f"❌ Batch {i}/{total} fetch failed: `{str(e)[:180]}`") | |
| if not continue_on_fail: | |
| break | |
| continue | |
| media, _, _ = _media_and_filename(src) | |
| if not media: | |
| await safe_edit(st, f"⏭ Batch {i}/{total}: no media in that message. Skipped.") | |
| continue | |
| # Create pending upload targeting THIS chat (m), not source chat | |
| await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True) | |
| # Force progress to edit THIS batch line | |
| p = _PENDING_UPLOAD.get(uid) | |
| if p: | |
| p.status_msg = st | |
| _PENDING_UPLOAD[uid] = p | |
| await safe_edit(st, f"⏳ Batch {i}/{total}: starting upload…") | |
| out = await _run_upload(uid) | |
| if not out.get("ok") and not continue_on_fail: | |
| await safe_reply(m, f"🛑 Batch stopped (failed at {i}/{total}).") | |
| break | |
| else: | |
| await safe_edit(st, f"❌ Batch {i}/{total}: internal error (no pending).") | |
| if not continue_on_fail: | |
| break | |
| batch_dur = max(0.001, time.time() - batch_start) | |
| await safe_reply(m, f"✅ Batch done in `{batch_dur:.1f}s`.", reply_markup=main_menu_keyboard()) | |
| t = asyncio.create_task(runner()) | |
| _BATCH_TASK[uid] = t | |
| # ---- callbacks ---- | |
| async def cb_handler(_, q: CallbackQuery) -> None: | |
| uid = q.from_user.id | |
| # ✅ Stop Telegram loading spinner | |
| try: | |
| await q.answer() | |
| except Exception: | |
| pass | |
| action, value = parse_cb(q.data or "") | |
| if action == "noop": | |
| return | |
| # Cancel from keyboards (used in filename keyboard, and we support it globally) | |
| if action == CANCEL: | |
| _AWAIT_AUTH.pop(uid, None) | |
| _AWAIT_EDIT.pop(uid, None) | |
| _PENDING_UPLOAD.pop(uid, None) | |
| _PENDING_DELETE.pop(uid, None) | |
| await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard()) | |
| return | |
| # Menus | |
| if action == MENU_HELP: | |
| await safe_edit(q.message, HELP_TEXT, reply_markup=main_menu_keyboard()) | |
| return | |
| if action == MENU_SPEED: | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| from bot.core.speedtest import net_download_test, net_upload_test | |
| await safe_edit(q.message, "⏱ Running speed test…") | |
| dl = await net_download_test() | |
| ul = await net_upload_test() | |
| ip = await _get_public_ip() | |
| dl_bps = float((dl or {}).get("bps", 0) or 0) | |
| ul_bps = float((ul or {}).get("bps", 0) or 0) | |
| await safe_edit( | |
| q.message, | |
| "📶 *Speed Test*\n\n" | |
| f"*Uptime:* `{uptime_text()}`\n" | |
| f"*Public IP:* `{ip or '—'}`\n\n" | |
| f"*Download:* `{human_bytes(dl_bps)}/s`\n" | |
| f"*Upload:* `{human_bytes(ul_bps)}/s`", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| return | |
| if action == MENU_AUTH: | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_edit(q.message, "🔐 Add a profile:", reply_markup=auth_menu_keyboard()) | |
| return | |
| if action == MENU_PROFILES: | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| data = await list_profiles(uid, only_connected=False) | |
| if not (isinstance(data, dict) and data.get("ok")): | |
| await safe_edit(q.message, "❌ Failed to list profiles.", reply_markup=main_menu_keyboard()) | |
| return | |
| profiles = data.get("profiles") or [] | |
| default_id = data.get("default_profile_id") or "" | |
| await safe_edit(q.message, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id)) | |
| return | |
| if action == BACK: | |
| _AWAIT_AUTH.pop(uid, None) | |
| await safe_edit(q.message, "🏠 Menu", reply_markup=main_menu_keyboard()) | |
| return | |
| # ✅ AUTH buttons (THIS is what was missing -> no response) | |
| if action == AUTH_JSON: | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| _AWAIT_AUTH[uid] = "json" | |
| await safe_edit( | |
| q.message, | |
| "📄 *Send JSON credentials*\n\n" | |
| "Send `client_secret.json` as a file OR paste the JSON text here.\n\n" | |
| "Use /cancel to stop.", | |
| reply_markup=auth_menu_keyboard(), | |
| ) | |
| return | |
| if action == AUTH_CI: | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| _AWAIT_AUTH[uid] = "ci" | |
| await safe_edit( | |
| q.message, | |
| "🔑 *Send Client ID + Secret*\n\n" | |
| "Send like this:\n" | |
| "`CLIENT_ID`\n" | |
| "`CLIENT_SECRET`\n" | |
| "(Optional 3rd line: label)\n\n" | |
| "Use /cancel to stop.", | |
| reply_markup=auth_menu_keyboard(), | |
| ) | |
| return | |
| # Filename/Caption choice | |
| if action in (NAME_ORIGINAL, NAME_CAPTION, NAME_CUSTOM): | |
| p = _PENDING_UPLOAD.get(uid) | |
| if not p: | |
| return | |
| if action == NAME_ORIGINAL: | |
| p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE] | |
| p.description = (p.caption_raw or "")[: Settings.MAX_DESC] | |
| _PENDING_UPLOAD[uid] = p | |
| await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| return | |
| if action == NAME_CAPTION: | |
| if p.title_from_caption: | |
| p.title = p.title_from_caption[: Settings.MAX_TITLE] | |
| p.description = p.desc_from_caption[: Settings.MAX_DESC] | |
| else: | |
| p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE] | |
| p.description = "" | |
| _PENDING_UPLOAD[uid] = p | |
| await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| return | |
| if action == NAME_CUSTOM: | |
| _AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy) | |
| await safe_edit( | |
| q.message, | |
| "✍️ Send custom title + description like:\n\n" | |
| "Title line\n" | |
| "Description lines…\n\n" | |
| "(Send only 1 line to change title only)", | |
| ) | |
| return | |
| # Upload buttons | |
| if action == UP_PRIV: | |
| p = _PENDING_UPLOAD.get(uid) | |
| if not p: | |
| return | |
| cycle = {"private": "unlisted", "unlisted": "public", "public": "private"} | |
| p.privacy = cycle.get(p.privacy, "private") | |
| _PENDING_UPLOAD[uid] = p | |
| await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| return | |
| if action == UP_EDIT: | |
| p = _PENDING_UPLOAD.get(uid) | |
| if not p: | |
| return | |
| _AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy) | |
| await safe_edit( | |
| q.message, | |
| "✍️ Send new title + description like:\n\n" | |
| "Title line\n" | |
| "Description lines…\n\n" | |
| "(Send only 1 line to change title only)", | |
| ) | |
| return | |
| if action in (UP_DEL, UP_CANCEL): | |
| _PENDING_UPLOAD.pop(uid, None) | |
| _AWAIT_EDIT.pop(uid, None) | |
| await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard()) | |
| return | |
| if action == UP_GO: | |
| p = _PENDING_UPLOAD.get(uid) | |
| if not p: | |
| return | |
| if _IN_PROGRESS.get(uid): | |
| await safe_edit( | |
| q.message, | |
| "⏳ Upload already running. Use /cancel to stop.", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| return | |
| p.status_msg = q.message | |
| _PENDING_UPLOAD[uid] = p | |
| await safe_edit(q.message, "⏳ Starting upload…") | |
| t = asyncio.create_task(_run_upload(uid)) | |
| _UPLOAD_TASK[uid] = t | |
| return | |
| # Profile callbacks (from profiles_keyboard) | |
| if action == "pdel": | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| pid = value | |
| _PENDING_DELETE[uid] = pid | |
| await safe_edit(q.message, f"❓ Delete profile `{pid}`? Reply YES to confirm.") | |
| return | |
| if action == "pdef": | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| pid = value | |
| out = await set_default_profile(uid, pid) | |
| if isinstance(out, dict) and out.get("ok"): | |
| data = await list_profiles(uid, only_connected=False) | |
| profiles = (data or {}).get("profiles") or [] | |
| default_id = (data or {}).get("default_profile_id") or "" | |
| await safe_edit(q.message, "✅ Default set.", reply_markup=profiles_keyboard(profiles, default_id)) | |
| else: | |
| await safe_edit(q.message, "❌ Failed to set default.", reply_markup=main_menu_keyboard()) | |
| return | |
| if action == "plog": | |
| if not await _ensure_allowed_uid(uid, q.message): | |
| return | |
| pid = value | |
| chk = await profile_check_auth(uid, pid) | |
| if isinstance(chk, dict) and chk.get("ok") and chk.get("login_url"): | |
| await safe_edit( | |
| q.message, | |
| f"🔗 Login URL:\n{chk.get('login_url')}\n\nAfter login, reopen /profiles.", | |
| reply_markup=main_menu_keyboard(), | |
| ) | |
| else: | |
| await safe_edit(q.message, "❌ Failed to create login URL.", reply_markup=main_menu_keyboard()) | |
| return | |
| # ---- text handler ---- | |
| async def text_anywhere(_, m: Message) -> None: | |
| if not m.from_user: | |
| return | |
| uid = m.from_user.id | |
| # ✅ Auth CI input | |
| if await _handle_auth_ci_input(m): | |
| return | |
| # ✅ Auth JSON pasted input (when user pastes text) | |
| if _AWAIT_AUTH.get(uid) == "json": | |
| if await _handle_auth_json_input(app, m): | |
| return | |
| # Confirm delete | |
| if uid in _PENDING_DELETE: | |
| if (m.text or "").strip().lower() == "yes": | |
| pid = _PENDING_DELETE.pop(uid) | |
| out = await profile_delete(uid, pid) | |
| if isinstance(out, dict) and out.get("ok"): | |
| await safe_reply(m, "✅ Profile deleted.", reply_markup=main_menu_keyboard()) | |
| else: | |
| await safe_reply(m, "❌ Delete failed.", reply_markup=main_menu_keyboard()) | |
| else: | |
| _PENDING_DELETE.pop(uid, None) | |
| await safe_reply(m, "Cancelled.", reply_markup=main_menu_keyboard()) | |
| return | |
| # Edit title/desc | |
| if uid in _AWAIT_EDIT: | |
| _ = _AWAIT_EDIT.pop(uid) | |
| p = _PENDING_UPLOAD.get(uid) | |
| if not p: | |
| return | |
| txt = (m.text or "").strip() | |
| lines = txt.splitlines() | |
| if len(lines) == 1: | |
| p.title = lines[0].strip()[: Settings.MAX_TITLE] | |
| else: | |
| p.title = lines[0].strip()[: Settings.MAX_TITLE] | |
| p.description = "\n".join(lines[1:]).strip()[: Settings.MAX_DESC] | |
| _PENDING_UPLOAD[uid] = p | |
| if p.status_msg: | |
| await safe_edit(p.status_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| else: | |
| await safe_reply(m, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) | |
| return |