# PATH: bot/handlers.py from __future__ import annotations import asyncio import json import os import re import tempfile import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple, Union import httpx from hydrogram import Client, filters from hydrogram.types import CallbackQuery, Message from bot.config import Auth, Telegram, Workers from bot.core.progress import SpeedETA, human_bytes, human_eta from bot.core.settings import Settings from bot.core.uptime import uptime_text from bot.integrations.auth import allow_user, disallow_user, get_stats, is_allowed from bot.integrations.cf_worker1 import profile_add, profile_check_auth, profile_delete from bot.integrations.cf_worker2 import ( get_default_profile, list_profiles, record_upload, set_default_profile, ) from bot.telegram.files import cleanup_file from bot.telegram.media import download_to_temp from bot.telegram.replies import safe_edit, safe_reply from bot.ui.callbacks import ( AUTH_CI, AUTH_JSON, BACK, CANCEL, MENU_AUTH, MENU_HELP, MENU_PROFILES, MENU_SPEED, NAME_CAPTION, NAME_CUSTOM, NAME_ORIGINAL, UP_CANCEL, UP_DEL, UP_EDIT, UP_GO, UP_PRIV, ) from bot.ui.keyboards import ( auth_menu_keyboard, filename_keyboard, main_menu_keyboard, profiles_keyboard, upload_confirm_keyboard, ) from bot.ui.parse import parse_cb from bot.ui.texts import CANCELLED, HELP_TEXT, NEED_AUTH, NOT_ALLOWED, OWNER_ONLY, START_TEXT from bot.youtube.link_parser import parse_telegram_link ChatRef = Union[int, str] # ---- In-memory state ---- @dataclass class PendingUpload: request_msg: Message # where user triggered command / sent media src_msg: Message # message containing media to download (could be from user session) downloader: Client # which client downloads src_msg media file_name: str size_hint: int = 0 # candidates caption_raw: str = "" title_from_filename: str = "" title_from_caption: str = "" desc_from_caption: str = "" # chosen final title: str = "" description: str = "" privacy: str = "private" status_msg: Optional[Message] = None via_link: bool = False @dataclass class EditState: title: str description: str privacy: str _PENDING_UPLOAD: Dict[int, PendingUpload] = {} _AWAIT_EDIT: Dict[int, EditState] = {} _PENDING_DELETE: Dict[int, str] = {} _IN_PROGRESS: Dict[int, bool] = {} _UPLOAD_TASK: Dict[int, asyncio.Task] = {} _BATCH_TASK: Dict[int, asyncio.Task] = {} # βœ… Auth input state (fixes: auth buttons "no response") # value: "json" or "ci" _AWAIT_AUTH: Dict[int, str] = {} # Batch range cap (Settings may not define it yet) _BATCH_MAX_RANGE = int(getattr(Settings, "BATCH_MAX_RANGE", 80) or 80) def _norm_ids(xs: Any) -> set[int]: out: set[int] = set() if not xs: return out if isinstance(xs, (int, float, str)): xs = [xs] for v in list(xs): try: s = str(v).strip() if not s: continue out.add(int(float(s))) except Exception: continue return out def _is_admin_or_owner(uid: int) -> bool: # Accept both Auth.* and Telegram.* and handle str/int mixes safely. owners = _norm_ids(getattr(Auth, "OWNERS", None)) | _norm_ids(getattr(Telegram, "OWNER_ID", None)) admins = _norm_ids(getattr(Auth, "ADMINS", None)) | _norm_ids(getattr(Telegram, "ADMIN_IDS", None)) return uid in owners or uid in admins async def _ensure_allowed_uid(uid: int, reply_target: Message) -> bool: """ IMPORTANT: - For Message handlers: uid = m.from_user.id - For CallbackQuery handlers: uid = q.from_user.id (NOT q.message.from_user.id) """ if _is_admin_or_owner(uid): return True ok = await is_allowed(uid) if not ok: # try edit first (callbacks), fallback to reply if not await safe_edit(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard()): await safe_reply(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard()) return ok def _media_and_filename(m: Message) -> Tuple[Optional[Any], str, int]: if m.video: return m.video, (m.video.file_name or "video.mp4"), int(m.video.file_size or 0) if m.document: return m.document, (m.document.file_name or "file.bin"), int(m.document.file_size or 0) return None, "", 0 def _caption_text(m: Message) -> str: return (m.caption or "").strip() def _filename_title(file_name: str) -> str: base = os.path.splitext(file_name or "")[0].strip() return (base or "Untitled")[: Settings.MAX_TITLE] def _caption_title_desc(caption: str) -> Tuple[str, str]: caption = (caption or "").strip() if not caption: return "", "" parts = caption.splitlines() title = parts[0].strip()[: Settings.MAX_TITLE] desc = "\n".join([p.strip() for p in parts[1:]]).strip()[: Settings.MAX_DESC] return title, desc def _render_choice_prompt(p: PendingUpload) -> str: cap = p.caption_raw.strip() cap_show = cap if len(cap) <= 900 else (cap[:900] + "…") return ( "πŸ“ *Choose YouTube title/description source*\n\n" f"πŸ“„ *Filename*\n`{p.file_name}`\n" f"β†’ Title: *{p.title_from_filename or 'β€”'}*\n\n" f"πŸ“ *Caption*\n{cap_show if cap_show else 'β€”'}\n" f"β†’ Title: *{p.title_from_caption or 'β€”'}*\n\n" "Pick one option below:" ) def _render_preview(p: PendingUpload) -> str: cap = p.caption_raw.strip() cap_show = cap if len(cap) <= 400 else (cap[:400] + "…") desc = (p.description or "").strip() desc_show = desc if len(desc) <= 800 else (desc[:800] + "…") size_line = human_bytes(p.size_hint) if p.size_hint else "β€”" return ( "πŸ“¦ *Ready to upload*\n\n" f"*File:* `{p.file_name}`\n" f"*Size:* `{size_line}`\n" f"*Privacy:* `{p.privacy}`\n\n" f"*Caption:* {cap_show if cap_show else 'β€”'}\n\n" f"*Title:* {p.title or 'β€”'}\n" f"*Description:* {desc_show if desc_show else 'β€”'}" ) async def _start_pending_upload( *, uid: int, request_msg: Message, # where to show preview/progress src_msg: Message, # where media actually is downloader: Client, via_link: bool = False, ) -> None: media, file_name, size = _media_and_filename(src_msg) if not media: await safe_reply(request_msg, "❌ No video/document found in that message.") return caption = _caption_text(src_msg) t_fn = _filename_title(file_name) t_cap, d_cap = _caption_title_desc(caption) # Default choice: # - If caption exists, default title+desc from caption # - Else filename title if caption and t_cap: chosen_title = t_cap chosen_desc = d_cap else: chosen_title = t_fn chosen_desc = caption[: Settings.MAX_DESC] if caption else "" p = PendingUpload( request_msg=request_msg, src_msg=src_msg, downloader=downloader, file_name=file_name, size_hint=int(size or 0), caption_raw=caption, title_from_filename=t_fn, title_from_caption=t_cap, desc_from_caption=d_cap, title=chosen_title, description=chosen_desc, privacy="private", via_link=via_link, status_msg=None, ) _PENDING_UPLOAD[uid] = p # Create ONE status message in the request chat. Everything edits THIS message. if caption: st = await safe_reply(request_msg, _render_choice_prompt(p), reply_markup=filename_keyboard()) else: st = await safe_reply(request_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) p.status_msg = st _PENDING_UPLOAD[uid] = p def _as_int(v: Any) -> int: if v is None: return 0 if isinstance(v, bool): return int(v) if isinstance(v, (int, float)): return int(v) if isinstance(v, str): try: return int(float(v.strip())) except Exception: return 0 if isinstance(v, dict): for k in ("current", "sent", "bytes", "done", "downloaded", "uploaded", "processed", "offset"): if k in v: try: return int(float(v.get(k) or 0)) except Exception: continue try: return int(float(next(iter(v.values())))) except Exception: return 0 return 0 async def _get_public_ip() -> Optional[str]: # No extra dependency / no extra file needed try: async with httpx.AsyncClient(timeout=10) as c: r = await c.get("https://api.ipify.org", params={"format": "text"}) if r.status_code < 400: ip = (r.text or "").strip() if ip and len(ip) <= 60: return ip except Exception: pass return None async def _run_upload(uid: int) -> Dict[str, Any]: """ Runs one upload for uid, editing p.status_msg. Returns: {"ok": True, "url": "..."} or {"ok": False, "err": "..."} """ if _IN_PROGRESS.get(uid): return {"ok": False, "err": "already_running"} pending = _PENDING_UPLOAD.get(uid) if not pending or not pending.status_msg: return {"ok": False, "err": "no_pending"} _IN_PROGRESS[uid] = True st = pending.status_msg file_path: Optional[str] = None try: # Ensure default profile exists prof = await get_default_profile(uid) if not (isinstance(prof, dict) and prof.get("ok") and prof.get("profile_id") and prof.get("access_token")): await safe_edit(st, NEED_AUTH, reply_markup=main_menu_keyboard()) return {"ok": False, "err": "not_authorized"} prof_id = str(prof["profile_id"]) access_token = str(prof["access_token"]) overall_start = time.time() # ---- Download with live speed ---- await safe_edit(st, "⬇️ Downloading…") dl_speed = SpeedETA() dl_last_ui = 0.0 dl_start = time.time() async def dl_progress(cur: Any, total: Any) -> None: nonlocal dl_last_ui now = time.time() if now - dl_last_ui < 0.8: return dl_last_ui = now cur_i = _as_int(cur) total_i = _as_int(total) if total_i <= 0: total_i = max(cur_i, 1) rate = dl_speed.update(cur_i, total_i) txt = ( "⬇️ *Downloading…*\n\n" f"`{human_bytes(cur_i)} / {human_bytes(total_i)}`\n" f"*Speed:* `{human_bytes(rate)}/s`\n" f"*ETA:* `{human_eta(dl_speed.eta_seconds)}`" ) await safe_edit(st, txt) file_path, _, _ = await download_to_temp(pending.downloader, pending.src_msg, progress_cb=dl_progress) dl_dur = max(0.001, time.time() - dl_start) try: file_bytes = int(os.path.getsize(file_path)) except Exception: file_bytes = 0 # ---- Upload with live speed ---- from bot.youtube.uploader import upload_video await safe_edit(st, "⬆️ Uploading…") ul_speed = SpeedETA() ul_last_ui = 0.0 ul_start = time.time() async def ul_progress(sent: Any, total: Any) -> None: nonlocal ul_last_ui now = time.time() if now - ul_last_ui < 0.8: return ul_last_ui = now sent_i = _as_int(sent) total_i = _as_int(total) if total_i <= 0: total_i = max(sent_i, 1) rate = ul_speed.update(sent_i, total_i) txt = ( "⬆️ *Uploading…*\n\n" f"`{human_bytes(sent_i)} / {human_bytes(total_i)}`\n" f"*Speed:* `{human_bytes(rate)}/s`\n" f"*ETA:* `{human_eta(ul_speed.eta_seconds)}`" ) await safe_edit(st, txt) up = await upload_video( access_token=access_token, file_path=file_path, title=pending.title, description=pending.description, privacy=pending.privacy, progress_cb=ul_progress, ) if not (isinstance(up, dict) and up.get("ok")): err = up.get("err", "upload_failed") if isinstance(up, dict) else "upload_failed" detail = up.get("detail") if isinstance(up, dict) else None msg = f"❌ Upload failed: `{err}`" if detail: msg += f"\n`{str(detail)[:280]}`" await safe_edit(st, msg, reply_markup=main_menu_keyboard()) return {"ok": False, "err": str(err), "detail": detail} url = str(up.get("url") or "") await record_upload(uid, prof_id) ul_dur = max(0.001, time.time() - ul_start) total_dur = max(0.001, time.time() - overall_start) dl_avg = (human_bytes(file_bytes / dl_dur) + "/s") if file_bytes and dl_dur else "β€”" ul_avg = (human_bytes(file_bytes / ul_dur) + "/s") if file_bytes and ul_dur else "β€”" done = "βœ… *Uploaded!*" if url: done += f"\n\n{url}" done += ( f"\n\n*Download:* `{dl_dur:.1f}s` β€’ avg `{dl_avg}`" f"\n*Upload:* `{ul_dur:.1f}s` β€’ avg `{ul_avg}`" f"\n*Total:* `{total_dur:.1f}s`" ) await safe_edit(st, done, reply_markup=main_menu_keyboard()) return {"ok": True, "url": url} except asyncio.CancelledError: try: await safe_edit(st, CANCELLED, reply_markup=main_menu_keyboard()) except Exception: pass return {"ok": False, "err": "cancelled"} except Exception as e: await safe_edit(st, f"❌ Error: `{str(e)[:220]}`", reply_markup=main_menu_keyboard()) return {"ok": False, "err": "exception", "detail": str(e)} finally: _PENDING_UPLOAD.pop(uid, None) _AWAIT_EDIT.pop(uid, None) _PENDING_DELETE.pop(uid, None) _AWAIT_AUTH.pop(uid, None) if file_path: try: cleanup_file(file_path) except Exception: pass _IN_PROGRESS.pop(uid, None) _UPLOAD_TASK.pop(uid, None) _BATCH_RANGE_RE = re.compile(r"(.*?)/(\d+)\s*-\s*(\d+)\s*$") def _parse_link_or_range(line: str) -> Tuple[ChatRef, int, int]: """ Accepts: https://t.me/c//<4012> https://t.me/c//<4012-4046> https://t.me//<10-22> Returns (chat_ref, start, end) """ s = (line or "").strip() if not s: raise ValueError("empty") m = _BATCH_RANGE_RE.match(s) if m: base = m.group(1).strip() a = int(m.group(2)) b = int(m.group(3)) if a <= 0 or b <= 0: raise ValueError("bad_range") if a > b: a, b = b, a single = f"{base}/{a}" chat_ref, _mid = parse_telegram_link(single) return chat_ref, a, b chat_ref, mid = parse_telegram_link(s) return chat_ref, int(mid), int(mid) def _extract_client_secrets_from_json(obj: dict) -> Tuple[str, str]: """ Accepts client_secret.json formats: {"installed": {"client_id": "...", "client_secret": "..."}} {"web": {"client_id": "...", "client_secret": "..."}} {"client_id": "...", "client_secret": "..."} Returns (client_id, client_secret) or raises. """ if not isinstance(obj, dict): raise ValueError("json_not_object") for k in ("installed", "web"): v = obj.get(k) if isinstance(v, dict) and v.get("client_id") and v.get("client_secret"): return str(v["client_id"]).strip(), str(v["client_secret"]).strip() if obj.get("client_id") and obj.get("client_secret"): return str(obj["client_id"]).strip(), str(obj["client_secret"]).strip() raise ValueError("missing_client_id_or_secret") async def _handle_auth_json_input(app: Client, m: Message) -> bool: """ Returns True if message was consumed as auth input. """ if not m.from_user: return False uid = m.from_user.id if _AWAIT_AUTH.get(uid) != "json": return False if not await _ensure_allowed_uid(uid, m): return True # document .json if m.document: name = (m.document.file_name or "").lower() if not name.endswith(".json"): await safe_reply(m, "❌ Please send a `.json` file (Google client_secret.json). Or paste JSON text.") return True fd, path = tempfile.mkstemp(prefix="yt_auth_", suffix=".json") os.close(fd) try: await app.download_media(message=m, file_name=path) raw = "" try: with open(path, "r", encoding="utf-8") as f: raw = f.read() except Exception: with open(path, "rb") as f: raw = f.read().decode("utf-8", errors="ignore") obj = json.loads(raw) client_id, client_secret = _extract_client_secrets_from_json(obj) out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main") if isinstance(out, dict) and out.get("ok"): _AWAIT_AUTH.pop(uid, None) await safe_reply( m, "βœ… Profile added.\n\nNow run /profiles β†’ *Login* β†’ *Set default*.", reply_markup=main_menu_keyboard(), ) else: err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" await safe_reply(m, f"❌ Failed to add profile: `{err}`") return True except Exception as e: await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`") return True finally: try: if os.path.exists(path): os.remove(path) except Exception: pass # pasted JSON txt = (m.text or "").strip() if txt.startswith("{") and txt.endswith("}"): try: obj = json.loads(txt) client_id, client_secret = _extract_client_secrets_from_json(obj) out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main") if isinstance(out, dict) and out.get("ok"): _AWAIT_AUTH.pop(uid, None) await safe_reply( m, "βœ… Profile added.\n\nNow run /profiles β†’ *Login* β†’ *Set default*.", reply_markup=main_menu_keyboard(), ) else: err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" await safe_reply(m, f"❌ Failed to add profile: `{err}`") except Exception as e: await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`") return True await safe_reply(m, "πŸ“„ Send the `.json` file or paste JSON text (client_secret.json).") return True async def _handle_auth_ci_input(m: Message) -> bool: """ Returns True if message was consumed as auth input. """ if not m.from_user: return False uid = m.from_user.id if _AWAIT_AUTH.get(uid) != "ci": return False if not await _ensure_allowed_uid(uid, m): return True txt = (m.text or "").strip() if not txt: await safe_reply(m, "Send:\n``\n``\n(Optional 3rd line: label)") return True lines = [ln.strip() for ln in txt.splitlines() if ln.strip()] if len(lines) == 1: # maybe space separated parts = [p for p in lines[0].split() if p.strip()] if len(parts) >= 2: client_id, client_secret = parts[0], parts[1] label = parts[2] if len(parts) >= 3 else "main" else: await safe_reply(m, "Send:\n``\n``\n(Optional 3rd line: label)") return True else: client_id = lines[0] client_secret = lines[1] if len(lines) >= 2 else "" label = lines[2] if len(lines) >= 3 else "main" out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label=label) if isinstance(out, dict) and out.get("ok"): _AWAIT_AUTH.pop(uid, None) await safe_reply( m, f"βœ… Profile added (label: `{label}`).\n\nNow run /profiles β†’ *Login* β†’ *Set default*.", reply_markup=main_menu_keyboard(), ) else: err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed" await safe_reply(m, f"❌ Failed to add profile: `{err}`") return True def setup_handlers(app: Client, user_app: Optional[Client]) -> None: # ---- basic ---- @app.on_message(filters.command(["start", "help"])) async def start_help_cmd(_, m: Message) -> None: if (m.text or "").strip().lower().startswith("/help"): await safe_reply(m, HELP_TEXT, reply_markup=main_menu_keyboard()) else: await safe_reply(m, START_TEXT, reply_markup=main_menu_keyboard()) @app.on_message(filters.command("speedtest")) async def speedtest_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not await _ensure_allowed_uid(uid, m): return from bot.core.speedtest import net_download_test, net_upload_test st = await safe_reply(m, "⏱ Running speed test…") dl = await net_download_test() ul = await net_upload_test() ip = await _get_public_ip() dl_bps = float((dl or {}).get("bps", 0) or 0) ul_bps = float((ul or {}).get("bps", 0) or 0) msg = ( "πŸ“Ά *Speed Test*\n\n" f"*Uptime:* `{uptime_text()}`\n" f"*Public IP:* `{ip or 'β€”'}`\n\n" f"*Download:* `{human_bytes(dl_bps)}/s`\n" f"*Upload:* `{human_bytes(ul_bps)}/s`" ) if st: await safe_edit(st, msg, reply_markup=main_menu_keyboard()) else: await safe_reply(m, msg, reply_markup=main_menu_keyboard()) @app.on_message(filters.command("cancel")) async def cancel_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 _AWAIT_AUTH.pop(uid, None) _AWAIT_EDIT.pop(uid, None) _PENDING_UPLOAD.pop(uid, None) _PENDING_DELETE.pop(uid, None) task = _UPLOAD_TASK.pop(uid, None) if task and not task.done(): task.cancel() btask = _BATCH_TASK.pop(uid, None) if btask and not btask.done(): btask.cancel() _IN_PROGRESS.pop(uid, None) await safe_reply(m, CANCELLED, reply_markup=main_menu_keyboard()) # ---- admin allowlist ---- @app.on_message(filters.command("allow")) async def allow_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return args = (m.text or "").split(maxsplit=1) if len(args) < 2: await safe_reply(m, "Usage: /allow ") return try: tid = int(args[1].strip()) except Exception: await safe_reply(m, "Invalid tg_id") return await allow_user(tid) await safe_reply(m, f"βœ… Allowed `{tid}`") @app.on_message(filters.command("disallow")) async def disallow_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return args = (m.text or "").split(maxsplit=1) if len(args) < 2: await safe_reply(m, "Usage: /disallow ") return try: tid = int(args[1].strip()) except Exception: await safe_reply(m, "Invalid tg_id") return await disallow_user(tid) await safe_reply(m, f"βœ… Disallowed `{tid}`") @app.on_message(filters.command("stats")) async def stats_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return st = await get_stats() if not isinstance(st, dict): await safe_reply(m, "❌ stats failed") return await safe_reply( m, "πŸ“Š *Today stats*\n\n" f"Allowed users: `{st.get('allowed_users', 0)}`\n" f"Profiles: `{st.get('profiles', 0)}`\n" f"Uploads today: `{st.get('uploads_today', 0)}`", ) @app.on_message(filters.command("diag")) async def diag_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return w1 = Workers.WORKER1_URL or "β€”" w2 = Workers.WORKER2_URL or "β€”" await safe_reply( m, "πŸ§ͺ *Diag*\n\n" f"Uptime: `{uptime_text()}`\n" f"WORKER1_URL: `{w1}`\n" f"WORKER2_URL: `{w2}`\n" f"Owners: `{len(_norm_ids(getattr(Auth, 'OWNERS', None)) | _norm_ids(getattr(Telegram, 'OWNER_ID', None)))}` " f"Admins: `{len(_norm_ids(getattr(Auth, 'ADMINS', None)) | _norm_ids(getattr(Telegram, 'ADMIN_IDS', None)))}`", ) # ---- auth/profile flow ---- @app.on_message(filters.command("auth")) async def auth_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not await _ensure_allowed_uid(uid, m): return _AWAIT_AUTH.pop(uid, None) await safe_reply(m, "πŸ” Add a profile:", reply_markup=auth_menu_keyboard()) @app.on_message(filters.command("profiles")) async def profiles_cmd(_, m: Message) -> None: uid = m.from_user.id if m.from_user else 0 if not await _ensure_allowed_uid(uid, m): return data = await list_profiles(uid, only_connected=False) if not (isinstance(data, dict) and data.get("ok")): await safe_reply(m, "❌ Failed to list profiles.") return profiles = data.get("profiles") or [] default_id = data.get("default_profile_id") or "" await safe_reply(m, "πŸ‘€ *Profiles*", reply_markup=profiles_keyboard(profiles, default_id)) # ---- upload from DM media / OR auth-json doc handling ---- @app.on_message(filters.private & (filters.video | filters.document)) async def media_in_dm(_, m: Message) -> None: if not m.from_user: return uid = m.from_user.id # βœ… If we're waiting for auth JSON, consume json file instead of treating it as "upload media" if await _handle_auth_json_input(app, m): return if not await _ensure_allowed_uid(uid, m): return if _IN_PROGRESS.get(uid): await safe_reply(m, "⏳ Upload already running. Use /cancel to stop.") return await _start_pending_upload(uid=uid, request_msg=m, src_msg=m, downloader=app, via_link=False) # ---- upload from link (admin/owner) ---- @app.on_message(filters.command(["yt", "dl", "archive"])) async def archive_cmd(_, m: Message) -> None: if not m.from_user: return uid = m.from_user.id if not await _ensure_allowed_uid(uid, m): return if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return if user_app is None: await safe_reply(m, "❌ This mode requires user session (user_app missing).") return args = (m.text or "").split(maxsplit=1) if len(args) < 2: await safe_reply(m, "Usage: /archive ") return st = await safe_reply(m, "πŸ”Ž Fetching message…") if not st: return try: chat_ref, msg_id = parse_telegram_link(args[1].strip()) except Exception as e: await safe_edit(st, f"Bad link: `{str(e)[:160]}`") return try: src = await user_app.get_messages(chat_ref, msg_id) except Exception as e: await safe_edit(st, f"❌ Fetch failed: `{str(e)[:180]}`") return await safe_edit(st, "βœ… Message fetched. Preparing preview…") await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True) # ---- batch mode ---- @app.on_message(filters.command("batch")) async def batch_cmd(_, m: Message) -> None: if not m.from_user: return uid = m.from_user.id if not await _ensure_allowed_uid(uid, m): return if not _is_admin_or_owner(uid): await safe_reply(m, OWNER_ONLY) return if user_app is None: await safe_reply(m, "❌ Batch mode is not configured (user session missing).") return if _BATCH_TASK.get(uid) and not _BATCH_TASK[uid].done(): await safe_reply(m, "⏳ A batch is already running. Use /cancel to stop it.") return raw = (m.text or "") args = raw.split(maxsplit=1) if len(args) < 2: await safe_reply(m, "Send: /batch (one per line)\nOptional: /batch --continue ") return payload = args[1].strip() continue_on_fail = ("--continue" in payload) or ("-c" in payload) payload = payload.replace("--continue", "").replace("-c", "").strip() lines = [ln.strip() for ln in payload.splitlines() if ln.strip()] if not lines: await safe_reply(m, "No links found. Put one t.me link per line after /batch") return items: List[Tuple[ChatRef, int]] = [] for ln in lines: try: chat_ref, a, b = _parse_link_or_range(ln) except Exception: await safe_reply(m, f"❌ Bad link/range: `{ln}`") if not continue_on_fail: return continue count = (b - a + 1) if count > _BATCH_MAX_RANGE: await safe_reply(m, f"❌ Range too large ({count}). Max is {_BATCH_MAX_RANGE}.") if not continue_on_fail: return continue for mid in range(a, b + 1): items.append((chat_ref, mid)) if not items: await safe_reply(m, "No valid items to process.") return await safe_reply( m, f"🧾 Batch starting: {len(items)} item(s).\nMode: `{'continue' if continue_on_fail else 'stop_on_fail'}`", ) async def runner() -> None: batch_start = time.time() total = len(items) for i, (chat_ref, msg_id) in enumerate(items, 1): st = await safe_reply(m, f"πŸ”Ž Batch {i}/{total}: fetching message `{msg_id}`…") if not st: if not continue_on_fail: break continue try: src = await user_app.get_messages(chat_ref, msg_id) except Exception as e: await safe_edit(st, f"❌ Batch {i}/{total} fetch failed: `{str(e)[:180]}`") if not continue_on_fail: break continue media, _, _ = _media_and_filename(src) if not media: await safe_edit(st, f"⏭ Batch {i}/{total}: no media in that message. Skipped.") continue # Create pending upload targeting THIS chat (m), not source chat await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True) # Force progress to edit THIS batch line p = _PENDING_UPLOAD.get(uid) if p: p.status_msg = st _PENDING_UPLOAD[uid] = p await safe_edit(st, f"⏳ Batch {i}/{total}: starting upload…") out = await _run_upload(uid) if not out.get("ok") and not continue_on_fail: await safe_reply(m, f"πŸ›‘ Batch stopped (failed at {i}/{total}).") break else: await safe_edit(st, f"❌ Batch {i}/{total}: internal error (no pending).") if not continue_on_fail: break batch_dur = max(0.001, time.time() - batch_start) await safe_reply(m, f"βœ… Batch done in `{batch_dur:.1f}s`.", reply_markup=main_menu_keyboard()) t = asyncio.create_task(runner()) _BATCH_TASK[uid] = t # ---- callbacks ---- @app.on_callback_query() async def cb_handler(_, q: CallbackQuery) -> None: uid = q.from_user.id # βœ… Stop Telegram loading spinner try: await q.answer() except Exception: pass action, value = parse_cb(q.data or "") if action == "noop": return # Cancel from keyboards (used in filename keyboard, and we support it globally) if action == CANCEL: _AWAIT_AUTH.pop(uid, None) _AWAIT_EDIT.pop(uid, None) _PENDING_UPLOAD.pop(uid, None) _PENDING_DELETE.pop(uid, None) await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard()) return # Menus if action == MENU_HELP: await safe_edit(q.message, HELP_TEXT, reply_markup=main_menu_keyboard()) return if action == MENU_SPEED: if not await _ensure_allowed_uid(uid, q.message): return from bot.core.speedtest import net_download_test, net_upload_test await safe_edit(q.message, "⏱ Running speed test…") dl = await net_download_test() ul = await net_upload_test() ip = await _get_public_ip() dl_bps = float((dl or {}).get("bps", 0) or 0) ul_bps = float((ul or {}).get("bps", 0) or 0) await safe_edit( q.message, "πŸ“Ά *Speed Test*\n\n" f"*Uptime:* `{uptime_text()}`\n" f"*Public IP:* `{ip or 'β€”'}`\n\n" f"*Download:* `{human_bytes(dl_bps)}/s`\n" f"*Upload:* `{human_bytes(ul_bps)}/s`", reply_markup=main_menu_keyboard(), ) return if action == MENU_AUTH: if not await _ensure_allowed_uid(uid, q.message): return _AWAIT_AUTH.pop(uid, None) await safe_edit(q.message, "πŸ” Add a profile:", reply_markup=auth_menu_keyboard()) return if action == MENU_PROFILES: if not await _ensure_allowed_uid(uid, q.message): return data = await list_profiles(uid, only_connected=False) if not (isinstance(data, dict) and data.get("ok")): await safe_edit(q.message, "❌ Failed to list profiles.", reply_markup=main_menu_keyboard()) return profiles = data.get("profiles") or [] default_id = data.get("default_profile_id") or "" await safe_edit(q.message, "πŸ‘€ *Profiles*", reply_markup=profiles_keyboard(profiles, default_id)) return if action == BACK: _AWAIT_AUTH.pop(uid, None) await safe_edit(q.message, "🏠 Menu", reply_markup=main_menu_keyboard()) return # βœ… AUTH buttons (THIS is what was missing -> no response) if action == AUTH_JSON: if not await _ensure_allowed_uid(uid, q.message): return _AWAIT_AUTH[uid] = "json" await safe_edit( q.message, "πŸ“„ *Send JSON credentials*\n\n" "Send `client_secret.json` as a file OR paste the JSON text here.\n\n" "Use /cancel to stop.", reply_markup=auth_menu_keyboard(), ) return if action == AUTH_CI: if not await _ensure_allowed_uid(uid, q.message): return _AWAIT_AUTH[uid] = "ci" await safe_edit( q.message, "πŸ”‘ *Send Client ID + Secret*\n\n" "Send like this:\n" "`CLIENT_ID`\n" "`CLIENT_SECRET`\n" "(Optional 3rd line: label)\n\n" "Use /cancel to stop.", reply_markup=auth_menu_keyboard(), ) return # Filename/Caption choice if action in (NAME_ORIGINAL, NAME_CAPTION, NAME_CUSTOM): p = _PENDING_UPLOAD.get(uid) if not p: return if action == NAME_ORIGINAL: p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE] p.description = (p.caption_raw or "")[: Settings.MAX_DESC] _PENDING_UPLOAD[uid] = p await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) return if action == NAME_CAPTION: if p.title_from_caption: p.title = p.title_from_caption[: Settings.MAX_TITLE] p.description = p.desc_from_caption[: Settings.MAX_DESC] else: p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE] p.description = "" _PENDING_UPLOAD[uid] = p await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) return if action == NAME_CUSTOM: _AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy) await safe_edit( q.message, "✍️ Send custom title + description like:\n\n" "Title line\n" "Description lines…\n\n" "(Send only 1 line to change title only)", ) return # Upload buttons if action == UP_PRIV: p = _PENDING_UPLOAD.get(uid) if not p: return cycle = {"private": "unlisted", "unlisted": "public", "public": "private"} p.privacy = cycle.get(p.privacy, "private") _PENDING_UPLOAD[uid] = p await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) return if action == UP_EDIT: p = _PENDING_UPLOAD.get(uid) if not p: return _AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy) await safe_edit( q.message, "✍️ Send new title + description like:\n\n" "Title line\n" "Description lines…\n\n" "(Send only 1 line to change title only)", ) return if action in (UP_DEL, UP_CANCEL): _PENDING_UPLOAD.pop(uid, None) _AWAIT_EDIT.pop(uid, None) await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard()) return if action == UP_GO: p = _PENDING_UPLOAD.get(uid) if not p: return if _IN_PROGRESS.get(uid): await safe_edit( q.message, "⏳ Upload already running. Use /cancel to stop.", reply_markup=main_menu_keyboard(), ) return p.status_msg = q.message _PENDING_UPLOAD[uid] = p await safe_edit(q.message, "⏳ Starting upload…") t = asyncio.create_task(_run_upload(uid)) _UPLOAD_TASK[uid] = t return # Profile callbacks (from profiles_keyboard) if action == "pdel": if not await _ensure_allowed_uid(uid, q.message): return pid = value _PENDING_DELETE[uid] = pid await safe_edit(q.message, f"❓ Delete profile `{pid}`? Reply YES to confirm.") return if action == "pdef": if not await _ensure_allowed_uid(uid, q.message): return pid = value out = await set_default_profile(uid, pid) if isinstance(out, dict) and out.get("ok"): data = await list_profiles(uid, only_connected=False) profiles = (data or {}).get("profiles") or [] default_id = (data or {}).get("default_profile_id") or "" await safe_edit(q.message, "βœ… Default set.", reply_markup=profiles_keyboard(profiles, default_id)) else: await safe_edit(q.message, "❌ Failed to set default.", reply_markup=main_menu_keyboard()) return if action == "plog": if not await _ensure_allowed_uid(uid, q.message): return pid = value chk = await profile_check_auth(uid, pid) if isinstance(chk, dict) and chk.get("ok") and chk.get("login_url"): await safe_edit( q.message, f"πŸ”— Login URL:\n{chk.get('login_url')}\n\nAfter login, reopen /profiles.", reply_markup=main_menu_keyboard(), ) else: await safe_edit(q.message, "❌ Failed to create login URL.", reply_markup=main_menu_keyboard()) return # ---- text handler ---- @app.on_message(filters.text) async def text_anywhere(_, m: Message) -> None: if not m.from_user: return uid = m.from_user.id # βœ… Auth CI input if await _handle_auth_ci_input(m): return # βœ… Auth JSON pasted input (when user pastes text) if _AWAIT_AUTH.get(uid) == "json": if await _handle_auth_json_input(app, m): return # Confirm delete if uid in _PENDING_DELETE: if (m.text or "").strip().lower() == "yes": pid = _PENDING_DELETE.pop(uid) out = await profile_delete(uid, pid) if isinstance(out, dict) and out.get("ok"): await safe_reply(m, "βœ… Profile deleted.", reply_markup=main_menu_keyboard()) else: await safe_reply(m, "❌ Delete failed.", reply_markup=main_menu_keyboard()) else: _PENDING_DELETE.pop(uid, None) await safe_reply(m, "Cancelled.", reply_markup=main_menu_keyboard()) return # Edit title/desc if uid in _AWAIT_EDIT: _ = _AWAIT_EDIT.pop(uid) p = _PENDING_UPLOAD.get(uid) if not p: return txt = (m.text or "").strip() lines = txt.splitlines() if len(lines) == 1: p.title = lines[0].strip()[: Settings.MAX_TITLE] else: p.title = lines[0].strip()[: Settings.MAX_TITLE] p.description = "\n".join(lines[1:]).strip()[: Settings.MAX_DESC] _PENDING_UPLOAD[uid] = p if p.status_msg: await safe_edit(p.status_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) else: await safe_reply(m, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy)) return