YouTubeLoader / bot /handlers.py
understanding's picture
Update bot/handlers.py
5348659 verified
# PATH: bot/handlers.py
from __future__ import annotations
import asyncio
import json
import os
import re
import tempfile
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
import httpx
from hydrogram import Client, filters
from hydrogram.types import CallbackQuery, Message
from bot.config import Auth, Telegram, Workers
from bot.core.progress import SpeedETA, human_bytes, human_eta
from bot.core.settings import Settings
from bot.core.uptime import uptime_text
from bot.integrations.auth import allow_user, disallow_user, get_stats, is_allowed
from bot.integrations.cf_worker1 import profile_add, profile_check_auth, profile_delete
from bot.integrations.cf_worker2 import (
get_default_profile,
list_profiles,
record_upload,
set_default_profile,
)
from bot.telegram.files import cleanup_file
from bot.telegram.media import download_to_temp
from bot.telegram.replies import safe_edit, safe_reply
from bot.ui.callbacks import (
AUTH_CI,
AUTH_JSON,
BACK,
CANCEL,
MENU_AUTH,
MENU_HELP,
MENU_PROFILES,
MENU_SPEED,
NAME_CAPTION,
NAME_CUSTOM,
NAME_ORIGINAL,
UP_CANCEL,
UP_DEL,
UP_EDIT,
UP_GO,
UP_PRIV,
)
from bot.ui.keyboards import (
auth_menu_keyboard,
filename_keyboard,
main_menu_keyboard,
profiles_keyboard,
upload_confirm_keyboard,
)
from bot.ui.parse import parse_cb
from bot.ui.texts import CANCELLED, HELP_TEXT, NEED_AUTH, NOT_ALLOWED, OWNER_ONLY, START_TEXT
from bot.youtube.link_parser import parse_telegram_link
ChatRef = Union[int, str]
# ---- In-memory state ----
@dataclass
class PendingUpload:
request_msg: Message # where user triggered command / sent media
src_msg: Message # message containing media to download (could be from user session)
downloader: Client # which client downloads src_msg media
file_name: str
size_hint: int = 0
# candidates
caption_raw: str = ""
title_from_filename: str = ""
title_from_caption: str = ""
desc_from_caption: str = ""
# chosen final
title: str = ""
description: str = ""
privacy: str = "private"
status_msg: Optional[Message] = None
via_link: bool = False
@dataclass
class EditState:
title: str
description: str
privacy: str
_PENDING_UPLOAD: Dict[int, PendingUpload] = {}
_AWAIT_EDIT: Dict[int, EditState] = {}
_PENDING_DELETE: Dict[int, str] = {}
_IN_PROGRESS: Dict[int, bool] = {}
_UPLOAD_TASK: Dict[int, asyncio.Task] = {}
_BATCH_TASK: Dict[int, asyncio.Task] = {}
# ✅ Auth input state (fixes: auth buttons "no response")
# value: "json" or "ci"
_AWAIT_AUTH: Dict[int, str] = {}
# Batch range cap (Settings may not define it yet)
_BATCH_MAX_RANGE = int(getattr(Settings, "BATCH_MAX_RANGE", 80) or 80)
def _norm_ids(xs: Any) -> set[int]:
out: set[int] = set()
if not xs:
return out
if isinstance(xs, (int, float, str)):
xs = [xs]
for v in list(xs):
try:
s = str(v).strip()
if not s:
continue
out.add(int(float(s)))
except Exception:
continue
return out
def _is_admin_or_owner(uid: int) -> bool:
# Accept both Auth.* and Telegram.* and handle str/int mixes safely.
owners = _norm_ids(getattr(Auth, "OWNERS", None)) | _norm_ids(getattr(Telegram, "OWNER_ID", None))
admins = _norm_ids(getattr(Auth, "ADMINS", None)) | _norm_ids(getattr(Telegram, "ADMIN_IDS", None))
return uid in owners or uid in admins
async def _ensure_allowed_uid(uid: int, reply_target: Message) -> bool:
"""
IMPORTANT:
- For Message handlers: uid = m.from_user.id
- For CallbackQuery handlers: uid = q.from_user.id (NOT q.message.from_user.id)
"""
if _is_admin_or_owner(uid):
return True
ok = await is_allowed(uid)
if not ok:
# try edit first (callbacks), fallback to reply
if not await safe_edit(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard()):
await safe_reply(reply_target, NOT_ALLOWED, reply_markup=main_menu_keyboard())
return ok
def _media_and_filename(m: Message) -> Tuple[Optional[Any], str, int]:
if m.video:
return m.video, (m.video.file_name or "video.mp4"), int(m.video.file_size or 0)
if m.document:
return m.document, (m.document.file_name or "file.bin"), int(m.document.file_size or 0)
return None, "", 0
def _caption_text(m: Message) -> str:
return (m.caption or "").strip()
def _filename_title(file_name: str) -> str:
base = os.path.splitext(file_name or "")[0].strip()
return (base or "Untitled")[: Settings.MAX_TITLE]
def _caption_title_desc(caption: str) -> Tuple[str, str]:
caption = (caption or "").strip()
if not caption:
return "", ""
parts = caption.splitlines()
title = parts[0].strip()[: Settings.MAX_TITLE]
desc = "\n".join([p.strip() for p in parts[1:]]).strip()[: Settings.MAX_DESC]
return title, desc
def _render_choice_prompt(p: PendingUpload) -> str:
cap = p.caption_raw.strip()
cap_show = cap if len(cap) <= 900 else (cap[:900] + "…")
return (
"📝 *Choose YouTube title/description source*\n\n"
f"📄 *Filename*\n`{p.file_name}`\n"
f"→ Title: *{p.title_from_filename or '—'}*\n\n"
f"📝 *Caption*\n{cap_show if cap_show else '—'}\n"
f"→ Title: *{p.title_from_caption or '—'}*\n\n"
"Pick one option below:"
)
def _render_preview(p: PendingUpload) -> str:
cap = p.caption_raw.strip()
cap_show = cap if len(cap) <= 400 else (cap[:400] + "…")
desc = (p.description or "").strip()
desc_show = desc if len(desc) <= 800 else (desc[:800] + "…")
size_line = human_bytes(p.size_hint) if p.size_hint else "—"
return (
"📦 *Ready to upload*\n\n"
f"*File:* `{p.file_name}`\n"
f"*Size:* `{size_line}`\n"
f"*Privacy:* `{p.privacy}`\n\n"
f"*Caption:* {cap_show if cap_show else '—'}\n\n"
f"*Title:* {p.title or '—'}\n"
f"*Description:* {desc_show if desc_show else '—'}"
)
async def _start_pending_upload(
*,
uid: int,
request_msg: Message, # where to show preview/progress
src_msg: Message, # where media actually is
downloader: Client,
via_link: bool = False,
) -> None:
media, file_name, size = _media_and_filename(src_msg)
if not media:
await safe_reply(request_msg, "❌ No video/document found in that message.")
return
caption = _caption_text(src_msg)
t_fn = _filename_title(file_name)
t_cap, d_cap = _caption_title_desc(caption)
# Default choice:
# - If caption exists, default title+desc from caption
# - Else filename title
if caption and t_cap:
chosen_title = t_cap
chosen_desc = d_cap
else:
chosen_title = t_fn
chosen_desc = caption[: Settings.MAX_DESC] if caption else ""
p = PendingUpload(
request_msg=request_msg,
src_msg=src_msg,
downloader=downloader,
file_name=file_name,
size_hint=int(size or 0),
caption_raw=caption,
title_from_filename=t_fn,
title_from_caption=t_cap,
desc_from_caption=d_cap,
title=chosen_title,
description=chosen_desc,
privacy="private",
via_link=via_link,
status_msg=None,
)
_PENDING_UPLOAD[uid] = p
# Create ONE status message in the request chat. Everything edits THIS message.
if caption:
st = await safe_reply(request_msg, _render_choice_prompt(p), reply_markup=filename_keyboard())
else:
st = await safe_reply(request_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
p.status_msg = st
_PENDING_UPLOAD[uid] = p
def _as_int(v: Any) -> int:
if v is None:
return 0
if isinstance(v, bool):
return int(v)
if isinstance(v, (int, float)):
return int(v)
if isinstance(v, str):
try:
return int(float(v.strip()))
except Exception:
return 0
if isinstance(v, dict):
for k in ("current", "sent", "bytes", "done", "downloaded", "uploaded", "processed", "offset"):
if k in v:
try:
return int(float(v.get(k) or 0))
except Exception:
continue
try:
return int(float(next(iter(v.values()))))
except Exception:
return 0
return 0
async def _get_public_ip() -> Optional[str]:
# No extra dependency / no extra file needed
try:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.get("https://api.ipify.org", params={"format": "text"})
if r.status_code < 400:
ip = (r.text or "").strip()
if ip and len(ip) <= 60:
return ip
except Exception:
pass
return None
async def _run_upload(uid: int) -> Dict[str, Any]:
"""
Runs one upload for uid, editing p.status_msg.
Returns: {"ok": True, "url": "..."} or {"ok": False, "err": "..."}
"""
if _IN_PROGRESS.get(uid):
return {"ok": False, "err": "already_running"}
pending = _PENDING_UPLOAD.get(uid)
if not pending or not pending.status_msg:
return {"ok": False, "err": "no_pending"}
_IN_PROGRESS[uid] = True
st = pending.status_msg
file_path: Optional[str] = None
try:
# Ensure default profile exists
prof = await get_default_profile(uid)
if not (isinstance(prof, dict) and prof.get("ok") and prof.get("profile_id") and prof.get("access_token")):
await safe_edit(st, NEED_AUTH, reply_markup=main_menu_keyboard())
return {"ok": False, "err": "not_authorized"}
prof_id = str(prof["profile_id"])
access_token = str(prof["access_token"])
overall_start = time.time()
# ---- Download with live speed ----
await safe_edit(st, "⬇️ Downloading…")
dl_speed = SpeedETA()
dl_last_ui = 0.0
dl_start = time.time()
async def dl_progress(cur: Any, total: Any) -> None:
nonlocal dl_last_ui
now = time.time()
if now - dl_last_ui < 0.8:
return
dl_last_ui = now
cur_i = _as_int(cur)
total_i = _as_int(total)
if total_i <= 0:
total_i = max(cur_i, 1)
rate = dl_speed.update(cur_i, total_i)
txt = (
"⬇️ *Downloading…*\n\n"
f"`{human_bytes(cur_i)} / {human_bytes(total_i)}`\n"
f"*Speed:* `{human_bytes(rate)}/s`\n"
f"*ETA:* `{human_eta(dl_speed.eta_seconds)}`"
)
await safe_edit(st, txt)
file_path, _, _ = await download_to_temp(pending.downloader, pending.src_msg, progress_cb=dl_progress)
dl_dur = max(0.001, time.time() - dl_start)
try:
file_bytes = int(os.path.getsize(file_path))
except Exception:
file_bytes = 0
# ---- Upload with live speed ----
from bot.youtube.uploader import upload_video
await safe_edit(st, "⬆️ Uploading…")
ul_speed = SpeedETA()
ul_last_ui = 0.0
ul_start = time.time()
async def ul_progress(sent: Any, total: Any) -> None:
nonlocal ul_last_ui
now = time.time()
if now - ul_last_ui < 0.8:
return
ul_last_ui = now
sent_i = _as_int(sent)
total_i = _as_int(total)
if total_i <= 0:
total_i = max(sent_i, 1)
rate = ul_speed.update(sent_i, total_i)
txt = (
"⬆️ *Uploading…*\n\n"
f"`{human_bytes(sent_i)} / {human_bytes(total_i)}`\n"
f"*Speed:* `{human_bytes(rate)}/s`\n"
f"*ETA:* `{human_eta(ul_speed.eta_seconds)}`"
)
await safe_edit(st, txt)
up = await upload_video(
access_token=access_token,
file_path=file_path,
title=pending.title,
description=pending.description,
privacy=pending.privacy,
progress_cb=ul_progress,
)
if not (isinstance(up, dict) and up.get("ok")):
err = up.get("err", "upload_failed") if isinstance(up, dict) else "upload_failed"
detail = up.get("detail") if isinstance(up, dict) else None
msg = f"❌ Upload failed: `{err}`"
if detail:
msg += f"\n`{str(detail)[:280]}`"
await safe_edit(st, msg, reply_markup=main_menu_keyboard())
return {"ok": False, "err": str(err), "detail": detail}
url = str(up.get("url") or "")
await record_upload(uid, prof_id)
ul_dur = max(0.001, time.time() - ul_start)
total_dur = max(0.001, time.time() - overall_start)
dl_avg = (human_bytes(file_bytes / dl_dur) + "/s") if file_bytes and dl_dur else "—"
ul_avg = (human_bytes(file_bytes / ul_dur) + "/s") if file_bytes and ul_dur else "—"
done = "✅ *Uploaded!*"
if url:
done += f"\n\n{url}"
done += (
f"\n\n*Download:* `{dl_dur:.1f}s` • avg `{dl_avg}`"
f"\n*Upload:* `{ul_dur:.1f}s` • avg `{ul_avg}`"
f"\n*Total:* `{total_dur:.1f}s`"
)
await safe_edit(st, done, reply_markup=main_menu_keyboard())
return {"ok": True, "url": url}
except asyncio.CancelledError:
try:
await safe_edit(st, CANCELLED, reply_markup=main_menu_keyboard())
except Exception:
pass
return {"ok": False, "err": "cancelled"}
except Exception as e:
await safe_edit(st, f"❌ Error: `{str(e)[:220]}`", reply_markup=main_menu_keyboard())
return {"ok": False, "err": "exception", "detail": str(e)}
finally:
_PENDING_UPLOAD.pop(uid, None)
_AWAIT_EDIT.pop(uid, None)
_PENDING_DELETE.pop(uid, None)
_AWAIT_AUTH.pop(uid, None)
if file_path:
try:
cleanup_file(file_path)
except Exception:
pass
_IN_PROGRESS.pop(uid, None)
_UPLOAD_TASK.pop(uid, None)
_BATCH_RANGE_RE = re.compile(r"(.*?)/(\d+)\s*-\s*(\d+)\s*$")
def _parse_link_or_range(line: str) -> Tuple[ChatRef, int, int]:
"""
Accepts:
https://t.me/c/<chat>/<4012>
https://t.me/c/<chat>/<4012-4046>
https://t.me/<user>/<10-22>
Returns (chat_ref, start, end)
"""
s = (line or "").strip()
if not s:
raise ValueError("empty")
m = _BATCH_RANGE_RE.match(s)
if m:
base = m.group(1).strip()
a = int(m.group(2))
b = int(m.group(3))
if a <= 0 or b <= 0:
raise ValueError("bad_range")
if a > b:
a, b = b, a
single = f"{base}/{a}"
chat_ref, _mid = parse_telegram_link(single)
return chat_ref, a, b
chat_ref, mid = parse_telegram_link(s)
return chat_ref, int(mid), int(mid)
def _extract_client_secrets_from_json(obj: dict) -> Tuple[str, str]:
"""
Accepts client_secret.json formats:
{"installed": {"client_id": "...", "client_secret": "..."}}
{"web": {"client_id": "...", "client_secret": "..."}}
{"client_id": "...", "client_secret": "..."}
Returns (client_id, client_secret) or raises.
"""
if not isinstance(obj, dict):
raise ValueError("json_not_object")
for k in ("installed", "web"):
v = obj.get(k)
if isinstance(v, dict) and v.get("client_id") and v.get("client_secret"):
return str(v["client_id"]).strip(), str(v["client_secret"]).strip()
if obj.get("client_id") and obj.get("client_secret"):
return str(obj["client_id"]).strip(), str(obj["client_secret"]).strip()
raise ValueError("missing_client_id_or_secret")
async def _handle_auth_json_input(app: Client, m: Message) -> bool:
"""
Returns True if message was consumed as auth input.
"""
if not m.from_user:
return False
uid = m.from_user.id
if _AWAIT_AUTH.get(uid) != "json":
return False
if not await _ensure_allowed_uid(uid, m):
return True
# document .json
if m.document:
name = (m.document.file_name or "").lower()
if not name.endswith(".json"):
await safe_reply(m, "❌ Please send a `.json` file (Google client_secret.json). Or paste JSON text.")
return True
fd, path = tempfile.mkstemp(prefix="yt_auth_", suffix=".json")
os.close(fd)
try:
await app.download_media(message=m, file_name=path)
raw = ""
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read()
except Exception:
with open(path, "rb") as f:
raw = f.read().decode("utf-8", errors="ignore")
obj = json.loads(raw)
client_id, client_secret = _extract_client_secrets_from_json(obj)
out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main")
if isinstance(out, dict) and out.get("ok"):
_AWAIT_AUTH.pop(uid, None)
await safe_reply(
m,
"✅ Profile added.\n\nNow run /profiles → *Login* → *Set default*.",
reply_markup=main_menu_keyboard(),
)
else:
err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed"
await safe_reply(m, f"❌ Failed to add profile: `{err}`")
return True
except Exception as e:
await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`")
return True
finally:
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
# pasted JSON
txt = (m.text or "").strip()
if txt.startswith("{") and txt.endswith("}"):
try:
obj = json.loads(txt)
client_id, client_secret = _extract_client_secrets_from_json(obj)
out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label="main")
if isinstance(out, dict) and out.get("ok"):
_AWAIT_AUTH.pop(uid, None)
await safe_reply(
m,
"✅ Profile added.\n\nNow run /profiles → *Login* → *Set default*.",
reply_markup=main_menu_keyboard(),
)
else:
err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed"
await safe_reply(m, f"❌ Failed to add profile: `{err}`")
except Exception as e:
await safe_reply(m, f"❌ JSON parse/add failed: `{str(e)[:220]}`")
return True
await safe_reply(m, "📄 Send the `.json` file or paste JSON text (client_secret.json).")
return True
async def _handle_auth_ci_input(m: Message) -> bool:
"""
Returns True if message was consumed as auth input.
"""
if not m.from_user:
return False
uid = m.from_user.id
if _AWAIT_AUTH.get(uid) != "ci":
return False
if not await _ensure_allowed_uid(uid, m):
return True
txt = (m.text or "").strip()
if not txt:
await safe_reply(m, "Send:\n`<CLIENT_ID>`\n`<CLIENT_SECRET>`\n(Optional 3rd line: label)")
return True
lines = [ln.strip() for ln in txt.splitlines() if ln.strip()]
if len(lines) == 1:
# maybe space separated
parts = [p for p in lines[0].split() if p.strip()]
if len(parts) >= 2:
client_id, client_secret = parts[0], parts[1]
label = parts[2] if len(parts) >= 3 else "main"
else:
await safe_reply(m, "Send:\n`<CLIENT_ID>`\n`<CLIENT_SECRET>`\n(Optional 3rd line: label)")
return True
else:
client_id = lines[0]
client_secret = lines[1] if len(lines) >= 2 else ""
label = lines[2] if len(lines) >= 3 else "main"
out = await profile_add(uid, client_id=client_id, client_secret=client_secret, label=label)
if isinstance(out, dict) and out.get("ok"):
_AWAIT_AUTH.pop(uid, None)
await safe_reply(
m,
f"✅ Profile added (label: `{label}`).\n\nNow run /profiles → *Login* → *Set default*.",
reply_markup=main_menu_keyboard(),
)
else:
err = (out or {}).get("err") if isinstance(out, dict) else "profile_add_failed"
await safe_reply(m, f"❌ Failed to add profile: `{err}`")
return True
def setup_handlers(app: Client, user_app: Optional[Client]) -> None:
# ---- basic ----
@app.on_message(filters.command(["start", "help"]))
async def start_help_cmd(_, m: Message) -> None:
if (m.text or "").strip().lower().startswith("/help"):
await safe_reply(m, HELP_TEXT, reply_markup=main_menu_keyboard())
else:
await safe_reply(m, START_TEXT, reply_markup=main_menu_keyboard())
@app.on_message(filters.command("speedtest"))
async def speedtest_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not await _ensure_allowed_uid(uid, m):
return
from bot.core.speedtest import net_download_test, net_upload_test
st = await safe_reply(m, "⏱ Running speed test…")
dl = await net_download_test()
ul = await net_upload_test()
ip = await _get_public_ip()
dl_bps = float((dl or {}).get("bps", 0) or 0)
ul_bps = float((ul or {}).get("bps", 0) or 0)
msg = (
"📶 *Speed Test*\n\n"
f"*Uptime:* `{uptime_text()}`\n"
f"*Public IP:* `{ip or '—'}`\n\n"
f"*Download:* `{human_bytes(dl_bps)}/s`\n"
f"*Upload:* `{human_bytes(ul_bps)}/s`"
)
if st:
await safe_edit(st, msg, reply_markup=main_menu_keyboard())
else:
await safe_reply(m, msg, reply_markup=main_menu_keyboard())
@app.on_message(filters.command("cancel"))
async def cancel_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
_AWAIT_AUTH.pop(uid, None)
_AWAIT_EDIT.pop(uid, None)
_PENDING_UPLOAD.pop(uid, None)
_PENDING_DELETE.pop(uid, None)
task = _UPLOAD_TASK.pop(uid, None)
if task and not task.done():
task.cancel()
btask = _BATCH_TASK.pop(uid, None)
if btask and not btask.done():
btask.cancel()
_IN_PROGRESS.pop(uid, None)
await safe_reply(m, CANCELLED, reply_markup=main_menu_keyboard())
# ---- admin allowlist ----
@app.on_message(filters.command("allow"))
async def allow_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
args = (m.text or "").split(maxsplit=1)
if len(args) < 2:
await safe_reply(m, "Usage: /allow <tg_id>")
return
try:
tid = int(args[1].strip())
except Exception:
await safe_reply(m, "Invalid tg_id")
return
await allow_user(tid)
await safe_reply(m, f"✅ Allowed `{tid}`")
@app.on_message(filters.command("disallow"))
async def disallow_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
args = (m.text or "").split(maxsplit=1)
if len(args) < 2:
await safe_reply(m, "Usage: /disallow <tg_id>")
return
try:
tid = int(args[1].strip())
except Exception:
await safe_reply(m, "Invalid tg_id")
return
await disallow_user(tid)
await safe_reply(m, f"✅ Disallowed `{tid}`")
@app.on_message(filters.command("stats"))
async def stats_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
st = await get_stats()
if not isinstance(st, dict):
await safe_reply(m, "❌ stats failed")
return
await safe_reply(
m,
"📊 *Today stats*\n\n"
f"Allowed users: `{st.get('allowed_users', 0)}`\n"
f"Profiles: `{st.get('profiles', 0)}`\n"
f"Uploads today: `{st.get('uploads_today', 0)}`",
)
@app.on_message(filters.command("diag"))
async def diag_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
w1 = Workers.WORKER1_URL or "—"
w2 = Workers.WORKER2_URL or "—"
await safe_reply(
m,
"🧪 *Diag*\n\n"
f"Uptime: `{uptime_text()}`\n"
f"WORKER1_URL: `{w1}`\n"
f"WORKER2_URL: `{w2}`\n"
f"Owners: `{len(_norm_ids(getattr(Auth, 'OWNERS', None)) | _norm_ids(getattr(Telegram, 'OWNER_ID', None)))}` "
f"Admins: `{len(_norm_ids(getattr(Auth, 'ADMINS', None)) | _norm_ids(getattr(Telegram, 'ADMIN_IDS', None)))}`",
)
# ---- auth/profile flow ----
@app.on_message(filters.command("auth"))
async def auth_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not await _ensure_allowed_uid(uid, m):
return
_AWAIT_AUTH.pop(uid, None)
await safe_reply(m, "🔐 Add a profile:", reply_markup=auth_menu_keyboard())
@app.on_message(filters.command("profiles"))
async def profiles_cmd(_, m: Message) -> None:
uid = m.from_user.id if m.from_user else 0
if not await _ensure_allowed_uid(uid, m):
return
data = await list_profiles(uid, only_connected=False)
if not (isinstance(data, dict) and data.get("ok")):
await safe_reply(m, "❌ Failed to list profiles.")
return
profiles = data.get("profiles") or []
default_id = data.get("default_profile_id") or ""
await safe_reply(m, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id))
# ---- upload from DM media / OR auth-json doc handling ----
@app.on_message(filters.private & (filters.video | filters.document))
async def media_in_dm(_, m: Message) -> None:
if not m.from_user:
return
uid = m.from_user.id
# ✅ If we're waiting for auth JSON, consume json file instead of treating it as "upload media"
if await _handle_auth_json_input(app, m):
return
if not await _ensure_allowed_uid(uid, m):
return
if _IN_PROGRESS.get(uid):
await safe_reply(m, "⏳ Upload already running. Use /cancel to stop.")
return
await _start_pending_upload(uid=uid, request_msg=m, src_msg=m, downloader=app, via_link=False)
# ---- upload from link (admin/owner) ----
@app.on_message(filters.command(["yt", "dl", "archive"]))
async def archive_cmd(_, m: Message) -> None:
if not m.from_user:
return
uid = m.from_user.id
if not await _ensure_allowed_uid(uid, m):
return
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
if user_app is None:
await safe_reply(m, "❌ This mode requires user session (user_app missing).")
return
args = (m.text or "").split(maxsplit=1)
if len(args) < 2:
await safe_reply(m, "Usage: /archive <t.me message link>")
return
st = await safe_reply(m, "🔎 Fetching message…")
if not st:
return
try:
chat_ref, msg_id = parse_telegram_link(args[1].strip())
except Exception as e:
await safe_edit(st, f"Bad link: `{str(e)[:160]}`")
return
try:
src = await user_app.get_messages(chat_ref, msg_id)
except Exception as e:
await safe_edit(st, f"❌ Fetch failed: `{str(e)[:180]}`")
return
await safe_edit(st, "✅ Message fetched. Preparing preview…")
await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True)
# ---- batch mode ----
@app.on_message(filters.command("batch"))
async def batch_cmd(_, m: Message) -> None:
if not m.from_user:
return
uid = m.from_user.id
if not await _ensure_allowed_uid(uid, m):
return
if not _is_admin_or_owner(uid):
await safe_reply(m, OWNER_ONLY)
return
if user_app is None:
await safe_reply(m, "❌ Batch mode is not configured (user session missing).")
return
if _BATCH_TASK.get(uid) and not _BATCH_TASK[uid].done():
await safe_reply(m, "⏳ A batch is already running. Use /cancel to stop it.")
return
raw = (m.text or "")
args = raw.split(maxsplit=1)
if len(args) < 2:
await safe_reply(m, "Send: /batch <t.me links> (one per line)\nOptional: /batch --continue <links...>")
return
payload = args[1].strip()
continue_on_fail = ("--continue" in payload) or ("-c" in payload)
payload = payload.replace("--continue", "").replace("-c", "").strip()
lines = [ln.strip() for ln in payload.splitlines() if ln.strip()]
if not lines:
await safe_reply(m, "No links found. Put one t.me link per line after /batch")
return
items: List[Tuple[ChatRef, int]] = []
for ln in lines:
try:
chat_ref, a, b = _parse_link_or_range(ln)
except Exception:
await safe_reply(m, f"❌ Bad link/range: `{ln}`")
if not continue_on_fail:
return
continue
count = (b - a + 1)
if count > _BATCH_MAX_RANGE:
await safe_reply(m, f"❌ Range too large ({count}). Max is {_BATCH_MAX_RANGE}.")
if not continue_on_fail:
return
continue
for mid in range(a, b + 1):
items.append((chat_ref, mid))
if not items:
await safe_reply(m, "No valid items to process.")
return
await safe_reply(
m,
f"🧾 Batch starting: {len(items)} item(s).\nMode: `{'continue' if continue_on_fail else 'stop_on_fail'}`",
)
async def runner() -> None:
batch_start = time.time()
total = len(items)
for i, (chat_ref, msg_id) in enumerate(items, 1):
st = await safe_reply(m, f"🔎 Batch {i}/{total}: fetching message `{msg_id}`…")
if not st:
if not continue_on_fail:
break
continue
try:
src = await user_app.get_messages(chat_ref, msg_id)
except Exception as e:
await safe_edit(st, f"❌ Batch {i}/{total} fetch failed: `{str(e)[:180]}`")
if not continue_on_fail:
break
continue
media, _, _ = _media_and_filename(src)
if not media:
await safe_edit(st, f"⏭ Batch {i}/{total}: no media in that message. Skipped.")
continue
# Create pending upload targeting THIS chat (m), not source chat
await _start_pending_upload(uid=uid, request_msg=m, src_msg=src, downloader=user_app, via_link=True)
# Force progress to edit THIS batch line
p = _PENDING_UPLOAD.get(uid)
if p:
p.status_msg = st
_PENDING_UPLOAD[uid] = p
await safe_edit(st, f"⏳ Batch {i}/{total}: starting upload…")
out = await _run_upload(uid)
if not out.get("ok") and not continue_on_fail:
await safe_reply(m, f"🛑 Batch stopped (failed at {i}/{total}).")
break
else:
await safe_edit(st, f"❌ Batch {i}/{total}: internal error (no pending).")
if not continue_on_fail:
break
batch_dur = max(0.001, time.time() - batch_start)
await safe_reply(m, f"✅ Batch done in `{batch_dur:.1f}s`.", reply_markup=main_menu_keyboard())
t = asyncio.create_task(runner())
_BATCH_TASK[uid] = t
# ---- callbacks ----
@app.on_callback_query()
async def cb_handler(_, q: CallbackQuery) -> None:
uid = q.from_user.id
# ✅ Stop Telegram loading spinner
try:
await q.answer()
except Exception:
pass
action, value = parse_cb(q.data or "")
if action == "noop":
return
# Cancel from keyboards (used in filename keyboard, and we support it globally)
if action == CANCEL:
_AWAIT_AUTH.pop(uid, None)
_AWAIT_EDIT.pop(uid, None)
_PENDING_UPLOAD.pop(uid, None)
_PENDING_DELETE.pop(uid, None)
await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard())
return
# Menus
if action == MENU_HELP:
await safe_edit(q.message, HELP_TEXT, reply_markup=main_menu_keyboard())
return
if action == MENU_SPEED:
if not await _ensure_allowed_uid(uid, q.message):
return
from bot.core.speedtest import net_download_test, net_upload_test
await safe_edit(q.message, "⏱ Running speed test…")
dl = await net_download_test()
ul = await net_upload_test()
ip = await _get_public_ip()
dl_bps = float((dl or {}).get("bps", 0) or 0)
ul_bps = float((ul or {}).get("bps", 0) or 0)
await safe_edit(
q.message,
"📶 *Speed Test*\n\n"
f"*Uptime:* `{uptime_text()}`\n"
f"*Public IP:* `{ip or '—'}`\n\n"
f"*Download:* `{human_bytes(dl_bps)}/s`\n"
f"*Upload:* `{human_bytes(ul_bps)}/s`",
reply_markup=main_menu_keyboard(),
)
return
if action == MENU_AUTH:
if not await _ensure_allowed_uid(uid, q.message):
return
_AWAIT_AUTH.pop(uid, None)
await safe_edit(q.message, "🔐 Add a profile:", reply_markup=auth_menu_keyboard())
return
if action == MENU_PROFILES:
if not await _ensure_allowed_uid(uid, q.message):
return
data = await list_profiles(uid, only_connected=False)
if not (isinstance(data, dict) and data.get("ok")):
await safe_edit(q.message, "❌ Failed to list profiles.", reply_markup=main_menu_keyboard())
return
profiles = data.get("profiles") or []
default_id = data.get("default_profile_id") or ""
await safe_edit(q.message, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id))
return
if action == BACK:
_AWAIT_AUTH.pop(uid, None)
await safe_edit(q.message, "🏠 Menu", reply_markup=main_menu_keyboard())
return
# ✅ AUTH buttons (THIS is what was missing -> no response)
if action == AUTH_JSON:
if not await _ensure_allowed_uid(uid, q.message):
return
_AWAIT_AUTH[uid] = "json"
await safe_edit(
q.message,
"📄 *Send JSON credentials*\n\n"
"Send `client_secret.json` as a file OR paste the JSON text here.\n\n"
"Use /cancel to stop.",
reply_markup=auth_menu_keyboard(),
)
return
if action == AUTH_CI:
if not await _ensure_allowed_uid(uid, q.message):
return
_AWAIT_AUTH[uid] = "ci"
await safe_edit(
q.message,
"🔑 *Send Client ID + Secret*\n\n"
"Send like this:\n"
"`CLIENT_ID`\n"
"`CLIENT_SECRET`\n"
"(Optional 3rd line: label)\n\n"
"Use /cancel to stop.",
reply_markup=auth_menu_keyboard(),
)
return
# Filename/Caption choice
if action in (NAME_ORIGINAL, NAME_CAPTION, NAME_CUSTOM):
p = _PENDING_UPLOAD.get(uid)
if not p:
return
if action == NAME_ORIGINAL:
p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE]
p.description = (p.caption_raw or "")[: Settings.MAX_DESC]
_PENDING_UPLOAD[uid] = p
await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
return
if action == NAME_CAPTION:
if p.title_from_caption:
p.title = p.title_from_caption[: Settings.MAX_TITLE]
p.description = p.desc_from_caption[: Settings.MAX_DESC]
else:
p.title = (p.title_from_filename or "Untitled")[: Settings.MAX_TITLE]
p.description = ""
_PENDING_UPLOAD[uid] = p
await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
return
if action == NAME_CUSTOM:
_AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy)
await safe_edit(
q.message,
"✍️ Send custom title + description like:\n\n"
"Title line\n"
"Description lines…\n\n"
"(Send only 1 line to change title only)",
)
return
# Upload buttons
if action == UP_PRIV:
p = _PENDING_UPLOAD.get(uid)
if not p:
return
cycle = {"private": "unlisted", "unlisted": "public", "public": "private"}
p.privacy = cycle.get(p.privacy, "private")
_PENDING_UPLOAD[uid] = p
await safe_edit(q.message, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
return
if action == UP_EDIT:
p = _PENDING_UPLOAD.get(uid)
if not p:
return
_AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy)
await safe_edit(
q.message,
"✍️ Send new title + description like:\n\n"
"Title line\n"
"Description lines…\n\n"
"(Send only 1 line to change title only)",
)
return
if action in (UP_DEL, UP_CANCEL):
_PENDING_UPLOAD.pop(uid, None)
_AWAIT_EDIT.pop(uid, None)
await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard())
return
if action == UP_GO:
p = _PENDING_UPLOAD.get(uid)
if not p:
return
if _IN_PROGRESS.get(uid):
await safe_edit(
q.message,
"⏳ Upload already running. Use /cancel to stop.",
reply_markup=main_menu_keyboard(),
)
return
p.status_msg = q.message
_PENDING_UPLOAD[uid] = p
await safe_edit(q.message, "⏳ Starting upload…")
t = asyncio.create_task(_run_upload(uid))
_UPLOAD_TASK[uid] = t
return
# Profile callbacks (from profiles_keyboard)
if action == "pdel":
if not await _ensure_allowed_uid(uid, q.message):
return
pid = value
_PENDING_DELETE[uid] = pid
await safe_edit(q.message, f"❓ Delete profile `{pid}`? Reply YES to confirm.")
return
if action == "pdef":
if not await _ensure_allowed_uid(uid, q.message):
return
pid = value
out = await set_default_profile(uid, pid)
if isinstance(out, dict) and out.get("ok"):
data = await list_profiles(uid, only_connected=False)
profiles = (data or {}).get("profiles") or []
default_id = (data or {}).get("default_profile_id") or ""
await safe_edit(q.message, "✅ Default set.", reply_markup=profiles_keyboard(profiles, default_id))
else:
await safe_edit(q.message, "❌ Failed to set default.", reply_markup=main_menu_keyboard())
return
if action == "plog":
if not await _ensure_allowed_uid(uid, q.message):
return
pid = value
chk = await profile_check_auth(uid, pid)
if isinstance(chk, dict) and chk.get("ok") and chk.get("login_url"):
await safe_edit(
q.message,
f"🔗 Login URL:\n{chk.get('login_url')}\n\nAfter login, reopen /profiles.",
reply_markup=main_menu_keyboard(),
)
else:
await safe_edit(q.message, "❌ Failed to create login URL.", reply_markup=main_menu_keyboard())
return
# ---- text handler ----
@app.on_message(filters.text)
async def text_anywhere(_, m: Message) -> None:
if not m.from_user:
return
uid = m.from_user.id
# ✅ Auth CI input
if await _handle_auth_ci_input(m):
return
# ✅ Auth JSON pasted input (when user pastes text)
if _AWAIT_AUTH.get(uid) == "json":
if await _handle_auth_json_input(app, m):
return
# Confirm delete
if uid in _PENDING_DELETE:
if (m.text or "").strip().lower() == "yes":
pid = _PENDING_DELETE.pop(uid)
out = await profile_delete(uid, pid)
if isinstance(out, dict) and out.get("ok"):
await safe_reply(m, "✅ Profile deleted.", reply_markup=main_menu_keyboard())
else:
await safe_reply(m, "❌ Delete failed.", reply_markup=main_menu_keyboard())
else:
_PENDING_DELETE.pop(uid, None)
await safe_reply(m, "Cancelled.", reply_markup=main_menu_keyboard())
return
# Edit title/desc
if uid in _AWAIT_EDIT:
_ = _AWAIT_EDIT.pop(uid)
p = _PENDING_UPLOAD.get(uid)
if not p:
return
txt = (m.text or "").strip()
lines = txt.splitlines()
if len(lines) == 1:
p.title = lines[0].strip()[: Settings.MAX_TITLE]
else:
p.title = lines[0].strip()[: Settings.MAX_TITLE]
p.description = "\n".join(lines[1:]).strip()[: Settings.MAX_DESC]
_PENDING_UPLOAD[uid] = p
if p.status_msg:
await safe_edit(p.status_msg, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
else:
await safe_reply(m, _render_preview(p), reply_markup=upload_confirm_keyboard(p.privacy))
return