understanding commited on
Commit
4a49747
·
verified ·
1 Parent(s): 1dee3f3

Update bot/handlers.py

Browse files
Files changed (1) hide show
  1. bot/handlers.py +489 -747
bot/handlers.py CHANGED
@@ -1,76 +1,43 @@
1
- # bot/handlers.py
 
 
 
 
 
 
 
2
  from __future__ import annotations
3
 
 
4
  import os
5
- import json
6
  import time
7
- import asyncio
8
  from dataclasses import dataclass
9
- from typing import Any, Dict, Optional, Tuple, Union
10
 
11
- import httpx
12
  from hydrogram import Client, filters
13
- from hydrogram.types import (
14
- Message,
15
- CallbackQuery,
16
- InlineKeyboardMarkup,
17
- InlineKeyboardButton,
18
- )
19
-
20
- from bot.config import Workers
21
- from bot.core.auth import require_allowed, is_owner_id
 
 
 
 
 
 
 
 
 
22
  from bot.core.progress import SpeedETA, human_bytes, human_eta
 
 
 
23
 
24
- # ✅ profile_add is ADMIN route => cf_worker1 (Pages, BOT_BACKEND_KEY)
25
- from bot.integrations.cf_worker1 import profile_add
26
-
27
- # ✅ HF/private routes => cf_worker2 (Pages, HF_API_KEY)
28
- from bot.integrations.cf_worker2 import (
29
- pick_profile,
30
- access_token,
31
- record_upload,
32
- allow_user,
33
- disallow_user,
34
- stats_today,
35
- )
36
-
37
- from bot.telegram.replies import safe_reply, safe_edit
38
- from bot.telegram.parse import extract_title_description
39
- from bot.telegram.media import download_to_temp
40
- from bot.temp.files import cleanup_file
41
-
42
- from bot.ui.keyboards import (
43
- main_menu_keyboard,
44
- auth_menu_keyboard,
45
- upload_confirm_keyboard,
46
- )
47
- from bot.ui.callbacks import (
48
- parse_cb,
49
- make,
50
- MENU_AUTH,
51
- MENU_PROFILES,
52
- MENU_HELP,
53
- MENU_SPEEDTEST,
54
- BACK,
55
- AUTH_JSON,
56
- AUTH_CI,
57
- UP_GO,
58
- UP_EDIT,
59
- UP_PRIV,
60
- UP_CANCEL,
61
- )
62
- from bot.ui.texts import (
63
- START_TEXT,
64
- HELP_TEXT,
65
- OWNER_ONLY,
66
- NOT_ALLOWED,
67
- CANCELLED,
68
- NEED_AUTH,
69
- )
70
-
71
- # =========================
72
- # In-memory state (per-user)
73
- # =========================
74
 
75
  @dataclass
76
  class PendingUpload:
@@ -79,345 +46,97 @@ class PendingUpload:
79
  file_name: str
80
  title: str
81
  description: str
82
- privacy: str # private | unlisted | public
83
- status_msg: Message
84
  via_link: bool = False
85
 
86
 
87
- _AWAIT_AUTH_MODE: Dict[int, str] = {} # uid -> "json"|"ci"
88
- _AWAIT_EDIT: Dict[int, bool] = {} # uid -> True when waiting for title/desc text
89
- _PENDING_UPLOAD: Dict[int, PendingUpload] = {} # uid -> pending upload
90
- _IN_PROGRESS: Dict[int, bool] = {} # uid -> upload is running
91
- _SPEED_COOLDOWN_UNTIL: Dict[int, float] = {} # uid -> unix time
92
-
93
- _PENDING_DELETE: Dict[int, str] = {} # uid -> profile_id pending delete confirmation
94
-
95
-
96
- # =========================
97
- # Helpers
98
- # =========================
99
-
100
- def _admin_ids_from_env() -> set[int]:
101
- raw = (os.getenv("ADMIN_IDS") or "").strip()
102
- out: set[int] = set()
103
- for part in raw.replace(";", ",").replace("|", ",").split(","):
104
- part = part.strip()
105
- if not part:
106
- continue
107
- try:
108
- out.add(int(part))
109
- except ValueError:
110
- continue
111
- return out
112
 
113
 
114
- _ADMIN_IDS = _admin_ids_from_env()
 
 
 
 
 
 
 
115
 
116
 
117
  def _is_admin_or_owner(uid: int) -> bool:
118
- return is_owner_id(uid) or (uid in _ADMIN_IDS)
119
-
120
-
121
- def _next_privacy(cur: str) -> str:
122
- cur = (cur or "").lower()
123
- if cur == "private":
124
- return "unlisted"
125
- if cur == "unlisted":
126
- return "public"
127
- return "private"
128
-
129
-
130
- def _media_and_filename(m: Message) -> Tuple[Optional[Any], str, int]:
131
- if getattr(m, "video", None):
132
- v = m.video
133
- name = (getattr(v, "file_name", None) or "video.mp4").strip() or "video.mp4"
134
- size = int(getattr(v, "file_size", 0) or 0)
135
- return v, name, size
136
-
137
- if getattr(m, "document", None):
138
- d = m.document
139
- name = (getattr(d, "file_name", None) or "file.bin").strip() or "file.bin"
140
- size = int(getattr(d, "file_size", 0) or 0)
141
- return d, name, size
142
-
143
- return None, "file.bin", 0
144
-
145
-
146
- def _render_preview(p: PendingUpload) -> str:
147
- size = 0
148
- if getattr(p.src_msg, "video", None):
149
- size = int(getattr(p.src_msg.video, "file_size", 0) or 0)
150
- elif getattr(p.src_msg, "document", None):
151
- size = int(getattr(p.src_msg.document, "file_size", 0) or 0)
152
-
153
- desc = p.description or ""
154
- if len(desc) > 500:
155
- desc = desc[:500] + "…"
156
-
157
- return (
158
- "✅ *Ready to upload*\n\n"
159
- f"**File:** `{p.file_name}`\n"
160
- f"**Size:** {human_bytes(size) if size else '—'}\n"
161
- f"**Privacy:** `{p.privacy}`\n\n"
162
- f"**Title:** {p.title or '—'}\n"
163
- f"**Description:** {desc or '—'}"
164
- )
165
-
166
-
167
- def parse_telegram_link(link: str) -> Tuple[Union[int, str], int]:
168
- s = (link or "").strip()
169
- if not s:
170
- raise ValueError("empty link")
171
-
172
- s = s.replace("https://", "").replace("http://", "")
173
- if s.startswith("t.me/"):
174
- s = s[len("t.me/") :]
175
-
176
- s = s.split("?", 1)[0].split("#", 1)[0]
177
- s = s.strip("/")
178
-
179
- parts = s.split("/")
180
- if len(parts) < 2:
181
- raise ValueError("link must include message id")
182
-
183
- if parts[0] == "c":
184
- if len(parts) < 3:
185
- raise ValueError("invalid /c/ link")
186
- internal_id = parts[1]
187
- msg_id = parts[2]
188
- if not internal_id.isdigit() or not msg_id.isdigit():
189
- raise ValueError("invalid numeric ids in /c/ link")
190
- chat_id = int(f"-100{internal_id}")
191
- return chat_id, int(msg_id)
192
-
193
- username = parts[0]
194
- msg_id = parts[1]
195
- if not msg_id.isdigit():
196
- raise ValueError("invalid message id")
197
- return username, int(msg_id)
198
 
199
 
200
  async def _ensure_allowed(m: Message) -> bool:
201
- uid = m.from_user.id if m.from_user else 0
202
- if not uid:
203
- await safe_reply(m, NOT_ALLOWED)
204
- return False
205
-
206
- # ✅ owner/admin must never get blocked by allowlist
207
  if _is_admin_or_owner(uid):
208
  return True
209
-
210
- if not await require_allowed(uid):
211
  await safe_reply(m, NOT_ALLOWED)
212
- return False
213
- return True
214
-
215
-
216
- # =========================
217
- # Worker calls (no integration dependency)
218
- # =========================
219
-
220
- _HTTP_TIMEOUT = httpx.Timeout(20.0, connect=10.0)
221
-
222
- async def _post_json(url: str, bearer: str, payload: dict) -> dict:
223
- headers = {"authorization": f"Bearer {bearer}"}
224
- try:
225
- async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as c:
226
- r = await c.post(url, headers=headers, json=payload)
227
- ct = (r.headers.get("content-type") or "").lower()
228
- if "application/json" in ct:
229
- j = r.json()
230
- return j if isinstance(j, dict) else {"ok": False, "err": "bad_json"}
231
- # Pages sometimes returns text (Unauthorized etc)
232
- return {"ok": False, "err": f"http_{r.status_code}", "detail": (r.text or "")[:200]}
233
- except Exception as e:
234
- return {"ok": False, "err": "network_error", "detail": str(e)[:200]}
235
-
236
-
237
- async def _hf_list_profiles(uid: int, only_connected: bool = False) -> dict:
238
- if not Workers.WORKER2_URL or not Workers.HF_API_KEY:
239
- return {"ok": False, "err": "WORKER2_URL/HF_API_KEY missing"}
240
- return await _post_json(
241
- f"{Workers.WORKER2_URL}/api/list_profiles",
242
- Workers.HF_API_KEY,
243
- {"tg_id": str(uid), "only_connected": bool(only_connected)},
244
- )
245
-
246
-
247
- async def _admin_profile_set_default(uid: int, profile_id: str) -> dict:
248
- if not Workers.WORKER1_URL or not Workers.BOT_BACKEND_KEY:
249
- return {"ok": False, "err": "WORKER1_URL/BOT_BACKEND_KEY missing"}
250
- return await _post_json(
251
- f"{Workers.WORKER1_URL}/api/profile/set_default",
252
- Workers.BOT_BACKEND_KEY,
253
- {"tg_id": str(uid), "profile_id": str(profile_id)},
254
- )
255
-
256
-
257
- async def _admin_profile_remove(uid: int, profile_id: str) -> dict:
258
- if not Workers.WORKER1_URL or not Workers.BOT_BACKEND_KEY:
259
- return {"ok": False, "err": "WORKER1_URL/BOT_BACKEND_KEY missing"}
260
- return await _post_json(
261
- f"{Workers.WORKER1_URL}/api/profile/remove",
262
- Workers.BOT_BACKEND_KEY,
263
- {"tg_id": str(uid), "profile_id": str(profile_id)},
264
- )
265
 
266
 
267
- async def _admin_profile_login_link(uid: int, profile_id: str, force: bool = False) -> dict:
268
- if not Workers.WORKER1_URL or not Workers.BOT_BACKEND_KEY:
269
- return {"ok": False, "err": "WORKER1_URL/BOT_BACKEND_KEY missing"}
270
- return await _post_json(
271
- f"{Workers.WORKER1_URL}/api/profile/login_link",
272
- Workers.BOT_BACKEND_KEY,
273
- {"tg_id": str(uid), "profile_id": str(profile_id), "ttl_sec": 600, "force": bool(force)},
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
  )
275
 
276
 
277
- # =========================
278
- # Profiles UI (fixed)
279
- # =========================
280
-
281
- def _is_connected_profile(p: dict) -> bool:
282
- return bool(p.get("has_refresh")) and bool(p.get("channel_id"))
283
-
284
-
285
- def _profiles_text(uid: int, pl: dict) -> str:
286
- default_id = pl.get("default_profile_id") or "—"
287
- day = pl.get("day") or ""
288
- profiles = pl.get("profiles") or []
289
- if not profiles:
290
- return (
291
- f"👤 Profiles for `{uid}`\n\n"
292
- "No profiles found.\n\n"
293
- "Use **Add Profile** (or /auth) to add credentials."
294
- )
295
-
296
- lines = [
297
- f"👤 Profiles for `{uid}`",
298
- f"Default: `{default_id}`",
299
- ]
300
- if day:
301
- lines.append(f"Day: `{day}`")
302
- lines.append("")
303
- lines.append("Legend: ✅ connected | ⚠️ not connected")
304
- lines.append("")
305
-
306
- for p in profiles:
307
- pid = p.get("profile_id") or ""
308
- label = (p.get("label") or pid[:6] or "profile").strip()
309
- mark = "⭐" if pid == default_id else " "
310
- conn = "✅" if _is_connected_profile(p) else "⚠️"
311
- ch = p.get("channel_title") or p.get("channel_id") or "—"
312
- used = p.get("used_today")
313
- used_txt = f" • used:{used}" if isinstance(used, int) else ""
314
- lines.append(f"{mark}{conn} **{label}** — `{pid}` — {ch}{used_txt}")
315
-
316
- lines.append("")
317
- lines.append("Tap buttons below to set default / login / delete.")
318
- return "\n".join(lines)
319
-
320
-
321
- def _profiles_markup(pl: dict) -> InlineKeyboardMarkup:
322
- profiles = pl.get("profiles") or []
323
- default_id = pl.get("default_profile_id") or ""
324
-
325
- rows: list[list[InlineKeyboardButton]] = []
326
-
327
- for p in profiles:
328
- pid = str(p.get("profile_id") or "")
329
- if not pid:
330
- continue
331
- connected = _is_connected_profile(p)
332
- is_def = (pid == default_id)
333
-
334
- # Row: Default + Login/Reauth + Delete
335
- btn_default = InlineKeyboardButton(
336
- "✅ Default" if is_def else "⭐ Set default",
337
- callback_data=make("pdef", pid) if not is_def else make("pref"),
338
- )
339
- btn_login = InlineKeyboardButton(
340
- "🔁 Re-auth" if connected else "🔑 Login",
341
- callback_data=make("preauth" if connected else "plog", pid),
342
- )
343
- btn_del = InlineKeyboardButton("🗑 Delete", callback_data=make("pdel", pid))
344
-
345
- rows.append([btn_default, btn_login, btn_del])
346
-
347
- # Footer
348
- rows.append(
349
- [
350
- InlineKeyboardButton("🔄 Refresh", callback_data=make("pref")),
351
- InlineKeyboardButton("➕ Add profile", callback_data=make(MENU_AUTH)),
352
- ]
353
- )
354
- rows.append([InlineKeyboardButton("⬅️ Back", callback_data=make(BACK))])
355
-
356
- return InlineKeyboardMarkup(rows)
357
-
358
-
359
- async def _show_profiles(msg: Message, uid: int, *, edit: bool) -> None:
360
- pl = await _hf_list_profiles(uid, only_connected=False)
361
- if not (isinstance(pl, dict) and pl.get("ok")):
362
- err = pl.get("err", "unknown") if isinstance(pl, dict) else "unknown"
363
- detail = pl.get("detail") if isinstance(pl, dict) else None
364
- txt = f"❌ Failed to load profiles: `{err}`"
365
- if detail:
366
- txt += f"\n`{str(detail)[:180]}`"
367
- if edit:
368
- await safe_edit(msg, txt, reply_markup=main_menu_keyboard())
369
- else:
370
- await safe_reply(msg, txt, reply_markup=main_menu_keyboard())
371
  return
372
 
373
- txt = _profiles_text(uid, pl)
374
- kb = _profiles_markup(pl)
375
- if edit:
376
- await safe_edit(msg, txt, reply_markup=kb)
377
- else:
378
- await safe_reply(msg, txt, reply_markup=kb)
379
-
380
-
381
- # =========================
382
- # Upload flow
383
- # =========================
384
-
385
- async def _start_pending_upload(
386
- *,
387
- chat_msg: Message,
388
- src_msg: Message,
389
- downloader: Client,
390
- privacy: str = "private",
391
- via_link: bool = False,
392
- ) -> None:
393
- uid = chat_msg.from_user.id
394
- _, file_name, _ = _media_and_filename(src_msg)
395
- title, desc = extract_title_description(src_msg, file_name)
396
-
397
- preview = PendingUpload(
398
- src_msg=src_msg,
399
- downloader=downloader,
400
- file_name=file_name,
401
- title=title,
402
- description=desc,
403
- privacy=privacy,
404
- status_msg=chat_msg,
405
- via_link=via_link,
406
- )
407
- status = await safe_reply(chat_msg, _render_preview(preview), reply_markup=upload_confirm_keyboard(privacy))
408
- if not status:
409
- return
410
 
411
- _PENDING_UPLOAD[uid] = PendingUpload(
412
- src_msg=src_msg,
413
  downloader=downloader,
414
  file_name=file_name,
415
  title=title,
416
  description=desc,
417
- privacy=privacy,
418
- status_msg=status,
419
  via_link=via_link,
 
420
  )
 
 
 
 
421
 
422
 
423
  async def _run_upload(uid: int) -> None:
@@ -426,71 +145,28 @@ async def _run_upload(uid: int) -> None:
426
  pending = _PENDING_UPLOAD.get(uid)
427
  if not pending:
428
  return
 
 
429
 
430
  _IN_PROGRESS[uid] = True
431
  st = pending.status_msg
432
- file_path = None
433
 
434
  try:
435
- # Use HF list (shows default + connection state)
436
- pl = await _hf_list_profiles(uid, only_connected=False)
437
- if not (isinstance(pl, dict) and pl.get("ok")):
438
- await safe_edit(st, "❌ Failed to fetch profiles. Try /profiles again.")
439
- return
440
-
441
- profiles = pl.get("profiles") or []
442
- if not profiles:
443
- await safe_edit(st, NEED_AUTH)
444
- return
445
-
446
- default_id = pl.get("default_profile_id") or ""
447
- default_profile = next((p for p in profiles if p.get("profile_id") == default_id), None)
448
-
449
- # Default must be connected; otherwise fallback to first connected and instruct user
450
- chosen = default_profile if default_profile and _is_connected_profile(default_profile) else None
451
- if not chosen:
452
- chosen = next((p for p in profiles if _is_connected_profile(p)), None)
453
-
454
- if not chosen:
455
- await safe_edit(
456
- st,
457
- "❌ No connected YouTube profile.\n\nOpen /profiles → tap **Login** to authorize a profile.",
458
- )
459
- return
460
-
461
- channel_id = chosen.get("channel_id")
462
- if not channel_id:
463
- await safe_edit(st, "❌ चुने हुए profile में channel_id missing है. /profiles में Re-auth करो.")
464
- return
465
-
466
- pick = await pick_profile(uid, channel_id)
467
- if not (isinstance(pick, dict) and pick.get("ok")):
468
- err = pick.get("err", "unknown") if isinstance(pick, dict) else "unknown"
469
- await safe_edit(st, f"❌ Profile pick failed: `{err}`")
470
- return
471
-
472
- prof_id = pick.get("profile_id")
473
- if not prof_id:
474
- await safe_edit(st, "❌ pick_profile returned no profile_id.")
475
- return
476
-
477
- tok = await access_token(uid, prof_id)
478
- if not (isinstance(tok, dict) and tok.get("ok") and tok.get("access_token")):
479
- err = tok.get("err", "token_failed") if isinstance(tok, dict) else "token_failed"
480
- await safe_edit(st, f"❌ Token failed: `{err}`")
481
  return
482
 
483
- access = tok["access_token"]
 
484
 
485
- await safe_edit(st, "⬇️ Downloading media…")
486
- file_path, _, _ = await download_to_temp(pending.downloader, pending.src_msg)
487
-
488
- from bot.youtube.uploader import upload_video # local import
489
- speed = SpeedETA()
490
- last_ui = 0.0
491
- start_t = time.time()
492
 
493
- # ✅ Fix: uploader sometimes passes dict/str instead of int → prevent float(dict) crash
 
494
  def _as_int(v: Any) -> int:
495
  if v is None:
496
  return 0
@@ -504,40 +180,82 @@ async def _run_upload(uid: int) -> None:
504
  except Exception:
505
  return 0
506
  if isinstance(v, dict):
507
- for k in ("sent", "uploaded", "current", "bytes_sent", "bytes", "done", "value", "progress"):
508
  if k in v:
509
- return _as_int(v.get(k))
510
- if len(v) == 1:
511
- try:
512
- return _as_int(next(iter(v.values())))
513
- except Exception:
514
- return 0
515
- return 0
 
 
516
  return 0
517
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
518
  async def progress_cb(sent: Any, total: Any) -> None:
519
- nonlocal last_ui
520
  now = time.time()
521
- if now - last_ui < 0.8:
522
  return
523
- last_ui = now
524
 
525
  sent_i = _as_int(sent)
526
  total_i = _as_int(total)
527
  if total_i <= 0:
528
  total_i = max(sent_i, 1)
529
 
530
- rate = speed.update(sent_i, total_i)
531
  txt = (
532
- "⬆️ Uploading…\n\n"
533
- f"{human_bytes(sent_i)} / {human_bytes(total_i)}\n"
534
- f"Speed: {human_bytes(rate)}/s\n"
535
- f"ETA: {human_eta(speed.eta_seconds)}"
536
  )
537
  await safe_edit(st, txt)
538
 
539
  up = await upload_video(
540
- access_token=access,
541
  file_path=file_path,
542
  title=pending.title,
543
  description=pending.description,
@@ -547,46 +265,66 @@ async def _run_upload(uid: int) -> None:
547
 
548
  if not (isinstance(up, dict) and up.get("ok")):
549
  err = up.get("err", "upload_failed") if isinstance(up, dict) else "upload_failed"
550
- await safe_edit(st, f" Upload failed: `{err}`")
 
 
 
 
 
 
 
 
 
 
551
  return
552
 
553
- url = up.get("url") or up.get("video_url") or up.get("watch_url") or ""
 
554
  await record_upload(uid, prof_id)
555
 
556
- dur = max(1.0, time.time() - start_t)
 
 
 
 
 
557
  done = "✅ Uploaded!"
558
  if url:
559
  done += f"\n\n{url}"
560
- done += f"\n\nTime: {dur:.1f}s"
 
 
 
 
561
  await safe_edit(st, done)
562
 
563
- _PENDING_UPLOAD.pop(uid, None)
564
- _AWAIT_EDIT.pop(uid, None)
565
-
 
 
 
566
  except Exception as e:
567
  await safe_edit(st, f"❌ Error: `{str(e)[:180]}`")
568
  finally:
 
 
569
  if file_path:
570
  try:
571
  cleanup_file(file_path)
572
  except Exception:
573
  pass
574
  _IN_PROGRESS.pop(uid, None)
 
575
 
576
 
577
- # =========================
578
- # Handlers
579
- # =========================
580
-
581
- def setup_handlers(app: Client, user_app: Optional[Client] = None) -> None:
582
- @app.on_message(filters.command("start") & filters.private)
583
  async def start_cmd(_, m: Message) -> None:
 
584
  await safe_reply(m, START_TEXT, reply_markup=main_menu_keyboard())
585
 
586
- @app.on_message(filters.command("help") & filters.private)
587
- async def help_cmd(_, m: Message) -> None:
588
- await safe_reply(m, HELP_TEXT, reply_markup=main_menu_keyboard())
589
-
590
  @app.on_message(filters.command("cancel") & filters.private)
591
  async def cancel_cmd(_, m: Message) -> None:
592
  uid = m.from_user.id
@@ -594,38 +332,38 @@ def setup_handlers(app: Client, user_app: Optional[Client] = None) -> None:
594
  _AWAIT_EDIT.pop(uid, None)
595
  _PENDING_UPLOAD.pop(uid, None)
596
  _PENDING_DELETE.pop(uid, None)
 
 
 
 
 
 
 
 
 
 
 
 
597
  await safe_reply(m, CANCELLED, reply_markup=main_menu_keyboard())
598
 
599
- # diag (owner/admin) - fixes “command not working” perception
600
- @app.on_message(filters.command("diag") & filters.private)
601
- async def diag_cmd(_, m: Message) -> None:
602
- uid = m.from_user.id
603
- if not _is_admin_or_owner(uid):
604
- await safe_reply(m, OWNER_ONLY)
605
- return
606
- hf = await _hf_list_profiles(uid, only_connected=False)
607
- txt = {
608
- "ok": True,
609
- "worker1_url": Workers.WORKER1_URL,
610
- "worker2_url": Workers.WORKER2_URL,
611
- "hf_list_profiles_ok": bool(isinstance(hf, dict) and hf.get("ok")),
612
- "hf_err": hf.get("err") if isinstance(hf, dict) else "bad_resp",
613
- }
614
- await safe_reply(m, json.dumps(txt, indent=2)[:3900], reply_markup=main_menu_keyboard())
615
-
616
- # owner/admin
617
  @app.on_message(filters.command("allow") & filters.private)
618
  async def allow_cmd(_, m: Message) -> None:
619
  uid = m.from_user.id
620
  if not _is_admin_or_owner(uid):
621
  await safe_reply(m, OWNER_ONLY)
622
  return
623
- parts = (m.text or "").split()
624
- if len(parts) < 2 or not parts[1].isdigit():
625
  await safe_reply(m, "Usage: /allow <tg_id>")
626
  return
627
- r = await allow_user(int(parts[1]))
628
- await safe_reply(m, json.dumps(r, indent=2)[:3900])
 
 
 
 
 
629
 
630
  @app.on_message(filters.command("disallow") & filters.private)
631
  async def disallow_cmd(_, m: Message) -> None:
@@ -633,12 +371,17 @@ def setup_handlers(app: Client, user_app: Optional[Client] = None) -> None:
633
  if not _is_admin_or_owner(uid):
634
  await safe_reply(m, OWNER_ONLY)
635
  return
636
- parts = (m.text or "").split()
637
- if len(parts) < 2 or not parts[1].isdigit():
638
  await safe_reply(m, "Usage: /disallow <tg_id>")
639
  return
640
- r = await disallow_user(int(parts[1]))
641
- await safe_reply(m, json.dumps(r, indent=2)[:3900])
 
 
 
 
 
642
 
643
  @app.on_message(filters.command("stats") & filters.private)
644
  async def stats_cmd(_, m: Message) -> None:
@@ -646,347 +389,346 @@ def setup_handlers(app: Client, user_app: Optional[Client] = None) -> None:
646
  if not _is_admin_or_owner(uid):
647
  await safe_reply(m, OWNER_ONLY)
648
  return
649
- r = await stats_today()
650
- await safe_reply(m, json.dumps(r, indent=2)[:3900])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
651
 
652
- # profiles/auth
 
 
 
 
 
 
 
 
 
 
 
 
653
  @app.on_message(filters.command("auth") & filters.private)
654
  async def auth_cmd(_, m: Message) -> None:
655
  if not await _ensure_allowed(m):
656
  return
657
- await safe_reply(m, "Choose auth mode:", reply_markup=auth_menu_keyboard())
 
 
658
 
659
  @app.on_message(filters.command("profiles") & filters.private)
660
  async def profiles_cmd(_, m: Message) -> None:
661
  if not await _ensure_allowed(m):
662
  return
663
  uid = m.from_user.id
664
- await _show_profiles(m, uid, edit=False)
 
 
 
 
 
 
665
 
666
- # incoming media
667
- @app.on_message((filters.video | filters.document) & filters.private)
668
- async def incoming_media(_, m: Message) -> None:
669
  if not await _ensure_allowed(m):
670
  return
671
  uid = m.from_user.id
672
  if _IN_PROGRESS.get(uid):
673
- await safe_reply(m, "⏳ Upload already running. Use /cancel to reset.")
674
- return
675
- media, _, _ = _media_and_filename(m)
676
- if not media:
677
- await safe_reply(m, "Send a video or a document file.")
678
  return
679
- await _start_pending_upload(chat_msg=m, src_msg=m, downloader=app, via_link=False)
680
 
681
- # link/archive mode (owner/admin only)
682
  @app.on_message(filters.command(["yt", "dl", "archive"]) & filters.private)
683
  async def archive_cmd(_, m: Message) -> None:
 
684
  if not await _ensure_allowed(m):
685
  return
686
- uid = m.from_user.id
687
  if not _is_admin_or_owner(uid):
688
  await safe_reply(m, OWNER_ONLY)
689
  return
 
 
 
690
  args = (m.text or "").split(maxsplit=1)
691
  if len(args) < 2:
692
- await safe_reply(m, "Send: /archive <t.me message link>")
693
- return
694
- if user_app is None:
695
- await safe_reply(m, "❌ Link mode is not configured (user session missing).")
696
  return
697
-
698
  try:
699
  chat_ref, msg_id = parse_telegram_link(args[1].strip())
700
  except Exception as e:
701
- await safe_reply(m, f"Bad link: {e}")
 
 
 
702
  return
703
-
704
- await safe_reply(m, "🔎 Fetching message…")
705
  try:
706
  src = await user_app.get_messages(chat_ref, msg_id)
707
  except Exception as e:
708
- await safe_reply(m, f"❌ Cannot fetch: `{str(e)[:180]}`")
709
  return
 
 
710
 
711
- media, _, _ = _media_and_filename(src)
712
- if not media:
713
- await safe_reply(m, "❌ That message has no video/document.")
 
714
  return
715
-
716
- await _start_pending_upload(chat_msg=m, src_msg=src, downloader=user_app, via_link=True)
717
-
718
- # callbacks
719
- @app.on_callback_query(filters.private)
720
- async def cb_handler(_, q: CallbackQuery) -> None:
721
- uid = q.from_user.id
722
- action, value = parse_cb(q.data or "")
723
-
724
- try:
725
- await q.answer()
726
- except Exception:
727
- pass
728
-
729
- if action == BACK:
730
- await safe_edit(q.message, START_TEXT, reply_markup=main_menu_keyboard())
731
  return
732
 
733
- if action == MENU_HELP:
734
- await safe_edit(q.message, HELP_TEXT, reply_markup=main_menu_keyboard())
 
735
  return
736
 
737
- if action == MENU_AUTH:
738
- if not await require_allowed(uid) and not _is_admin_or_owner(uid):
739
- await safe_edit(q.message, NOT_ALLOWED, reply_markup=main_menu_keyboard())
740
- return
741
- await safe_edit(q.message, "Choose auth mode:", reply_markup=auth_menu_keyboard())
742
  return
743
 
744
- if action == MENU_PROFILES:
745
- if not await require_allowed(uid) and not _is_admin_or_owner(uid):
746
- await safe_edit(q.message, NOT_ALLOWED, reply_markup=main_menu_keyboard())
747
- return
748
- await _show_profiles(q.message, uid, edit=True)
749
  return
750
 
751
- if action == MENU_SPEEDTEST:
752
- now = time.time()
753
- until = _SPEED_COOLDOWN_UNTIL.get(uid, 0.0)
754
- if now < until:
755
- await safe_edit(q.message, f"⏳ Try again in {int(until - now)}s.", reply_markup=main_menu_keyboard())
756
- return
757
- _SPEED_COOLDOWN_UNTIL[uid] = now + 60
 
 
 
758
 
759
- from bot.core.speedtest import ping_ms, net_download_test, net_upload_test, disk_total_free
 
 
760
 
761
- await safe_edit(q.message, "🧪 Running speed test…")
762
- try:
763
- p = await ping_ms()
764
- dl = await net_download_test()
765
- ul = await net_upload_test()
766
- disk = disk_total_free()
767
-
768
- dl_mbps = (float(dl.get("bps", 0) or 0) * 8.0) / 1e6 if isinstance(dl, dict) else 0.0
769
- ul_mbps = (float(ul.get("bps", 0) or 0) * 8.0) / 1e6 if isinstance(ul, dict) else 0.0
770
- total = int(disk.get("total", 0) or 0) if isinstance(disk, dict) else 0
771
- free = int(disk.get("free", 0) or 0) if isinstance(disk, dict) else 0
772
-
773
- txt = (
774
- "📶 *Speed test*\n\n"
775
- f"Ping: `{int(p)} ms`\n"
776
- f"Download: `{dl_mbps:.2f} Mbps`\n"
777
- f"Upload: `{ul_mbps:.2f} Mbps`\n"
778
- f"Disk: `{human_bytes(free)} free / {human_bytes(total)} total`"
 
 
 
 
779
  )
780
- except Exception as e:
781
- txt = f"❌ Speed test error: `{str(e)[:180]}`"
782
- await safe_edit(q.message, txt, reply_markup=main_menu_keyboard())
783
- return
784
 
785
- # ===== Profiles actions (fixed) =====
786
- if action == "pref":
787
- await _show_profiles(q.message, uid, edit=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
788
  return
789
 
790
- if action == "pdef":
791
- pid = (value or "").strip()
792
- if not pid:
793
  return
794
- r = await _admin_profile_set_default(uid, pid)
795
- if isinstance(r, dict) and r.get("ok"):
796
- await _show_profiles(q.message, uid, edit=True)
797
- else:
798
- err = r.get("err", "unknown") if isinstance(r, dict) else "unknown"
799
- await safe_edit(q.message, f"❌ Set default failed: `{err}`", reply_markup=main_menu_keyboard())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
800
  return
801
 
802
- if action == "pdel":
803
- pid = (value or "").strip()
804
- if not pid:
805
  return
806
- _PENDING_DELETE[uid] = pid
807
- kb = InlineKeyboardMarkup(
808
- [
809
- [
810
- InlineKeyboardButton("✅ Yes, delete", callback_data=make("pdelok", pid)),
811
- InlineKeyboardButton("❌ Cancel", callback_data=make("pref")),
812
- ]
813
- ]
814
- )
815
- await safe_edit(q.message, f"⚠️ Delete profile `{pid}` ?", reply_markup=kb)
816
  return
817
 
818
- if action == "pdelok":
819
- pid = (value or "").strip()
820
- if not pid:
821
  return
822
- if _PENDING_DELETE.get(uid) != pid:
823
- await _show_profiles(q.message, uid, edit=True)
 
824
  return
825
- _PENDING_DELETE.pop(uid, None)
826
- r = await _admin_profile_remove(uid, pid)
827
- if isinstance(r, dict) and r.get("ok"):
828
- await _show_profiles(q.message, uid, edit=True)
829
- else:
830
- err = r.get("err", "unknown") if isinstance(r, dict) else "unknown"
831
- await safe_edit(q.message, f"❌ Delete failed: `{err}`", reply_markup=main_menu_keyboard())
832
  return
833
 
834
- if action in ("plog", "preauth"):
835
- pid = (value or "").strip()
836
- if not pid:
 
837
  return
838
- force = (action == "preauth")
839
- r = await _admin_profile_login_link(uid, pid, force=force)
840
- if not (isinstance(r, dict) and r.get("ok") and r.get("login_url")):
841
- err = r.get("err", "unknown") if isinstance(r, dict) else "unknown"
842
- await safe_edit(q.message, f"❌ Login link failed: `{err}`", reply_markup=main_menu_keyboard())
843
- return
844
- url = str(r.get("login_url"))
845
- kb = InlineKeyboardMarkup(
846
- [
847
- [InlineKeyboardButton("🌐 Open Google Login", url=url)],
848
- [InlineKeyboardButton("🔄 Back to profiles", callback_data=make(MENU_PROFILES))],
849
- ]
850
- )
851
- msg = "🔑 Open this link to authorize your YouTube channel."
852
- if force:
853
- msg = "🔁 Re-auth link generated (fresh consent). Open this link:"
854
- await safe_edit(q.message, msg, reply_markup=kb)
855
  return
856
 
857
- # ===== Auth mode selection =====
858
- if action == AUTH_JSON:
859
- _AWAIT_AUTH_MODE[uid] = "json"
 
 
860
  await safe_edit(
861
  q.message,
862
- "Paste credentials JSON:\n`{\"client_id\":\"...\",\"client_secret\":\"...\",\"label\":\"optional\"}`",
863
- reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Back", callback_data=make(BACK))]]),
 
 
864
  )
865
  return
866
 
867
- if action == AUTH_CI:
868
- _AWAIT_AUTH_MODE[uid] = "ci"
869
- await safe_edit(
870
- q.message,
871
- "Send in 2–3 lines:\n`client_id`\n`client_secret`\n(optional 3rd line = label)",
872
- reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Back", callback_data=make(BACK))]]),
873
- )
874
  return
875
 
876
- # ===== Upload controls =====
877
- if action in (UP_GO, UP_EDIT, UP_PRIV, UP_CANCEL):
878
- pending = _PENDING_UPLOAD.get(uid)
879
- if not pending:
880
- await safe_edit(q.message, "No pending upload. Send a video/document first.", reply_markup=main_menu_keyboard())
881
  return
882
-
883
- pending.status_msg = q.message
884
- _PENDING_UPLOAD[uid] = pending
885
-
886
- if action == UP_CANCEL:
887
- _PENDING_UPLOAD.pop(uid, None)
888
- _AWAIT_EDIT.pop(uid, None)
889
- await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard())
890
  return
 
 
 
 
 
 
891
 
892
- if action == UP_PRIV:
893
- pending.privacy = _next_privacy(pending.privacy)
894
- _PENDING_UPLOAD[uid] = pending
895
- await safe_edit(q.message, _render_preview(pending), reply_markup=upload_confirm_keyboard(pending.privacy))
896
  return
 
 
 
 
897
 
898
- if action == UP_EDIT:
899
- _AWAIT_EDIT[uid] = True
900
- await safe_reply(
901
- q.message,
902
- "✏️ Send new title + optional description.\n\nFormats:\n1) `Title | Description`\n2) `Title` then blank line then description",
903
- )
904
  return
 
 
 
 
 
 
 
 
 
 
905
 
906
- if action == UP_GO:
907
- await safe_edit(q.message, "⏳ Starting upload…")
908
- asyncio.create_task(_run_upload(uid))
909
  return
910
-
911
- # text router (auth + edit)
912
- @app.on_message(filters.text & filters.private)
913
- async def text_router(_, m: Message) -> None:
914
- uid = m.from_user.id if m.from_user else 0
915
- if not uid:
 
 
 
 
916
  return
917
 
918
- mode = _AWAIT_AUTH_MODE.get(uid)
919
- if mode:
920
- if not await _ensure_allowed(m):
921
- _AWAIT_AUTH_MODE.pop(uid, None)
922
- return
923
 
924
- text = (m.text or "").strip()
925
- try:
926
- if mode == "json":
927
- obj = json.loads(text)
928
- client_id = str(obj.get("client_id", "")).strip()
929
- client_secret = str(obj.get("client_secret", "")).strip()
930
- label = str(obj.get("label", "")).strip()[:40] if obj.get("label") else ""
931
  else:
932
- lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
933
- client_id = lines[0] if len(lines) >= 1 else ""
934
- client_secret = lines[1] if len(lines) >= 2 else ""
935
- label = lines[2][:40] if len(lines) >= 3 else ""
936
-
937
- if not client_id or not client_secret:
938
- raise ValueError("missing client_id/client_secret")
939
-
940
- _AWAIT_AUTH_MODE.pop(uid, None)
941
-
942
- resp = await profile_add(uid, client_id, client_secret, label)
943
- if not (isinstance(resp, dict) and resp.get("ok")):
944
- err = resp.get("err", "unknown") if isinstance(resp, dict) else "unknown"
945
- await safe_reply(m, f"❌ profile_add failed: `{err}`")
946
- return
947
-
948
- login_url = resp.get("login_url")
949
- if not login_url:
950
- # fallback: tell user to open Profiles and tap Login
951
- await safe_reply(m, "✅ Profile added. Now open /profiles and tap **Login** to authorize.")
952
- return
953
-
954
- await safe_reply(m, f"✅ Profile added.\n\nOpen this link to authorize:\n{login_url}")
955
- await safe_reply(m, "After authorizing, open /profiles and set default.", reply_markup=main_menu_keyboard())
956
-
957
- except Exception as e:
958
- await safe_reply(m, f"❌ Parse error: `{str(e)[:180]}`\n\nTry /auth again.")
959
  return
960
 
961
- if _AWAIT_EDIT.get(uid):
962
- pending = _PENDING_UPLOAD.get(uid)
963
- if not pending:
964
- _AWAIT_EDIT.pop(uid, None)
 
965
  return
966
-
967
- raw = (m.text or "").strip()
968
- if not raw or raw.startswith("/"):
969
- return
970
-
971
- title = raw
972
- desc = pending.description or ""
973
-
974
- if "\n\n" in raw:
975
- a, b = raw.split("\n\n", 1)
976
- title = a.strip()
977
- desc = b.strip()
978
- elif "|" in raw:
979
- a, b = raw.split("|", 1)
980
- title = a.strip()
981
- desc = b.strip()
982
-
983
- title = (title or pending.title or "video").strip()[:95]
984
- desc = (desc or "").strip()[:4900]
985
-
986
- pending.title = title
987
- pending.description = desc
988
- _PENDING_UPLOAD[uid] = pending
989
- _AWAIT_EDIT.pop(uid, None)
990
-
991
- await safe_edit(pending.status_msg, _render_preview(pending), reply_markup=upload_confirm_keyboard(pending.privacy))
992
  return
 
1
+ # FILE: bot/handlers.py
2
+ # NOTE: Updated:
3
+ # - Fix float(dict) crash by using SpeedETA that returns float and safe int conversions
4
+ # - Show download speed/ETA + upload speed/ETA + total time summary
5
+ # - Fix upload_video return-type mismatch (now dict)
6
+ # - /cancel actually cancels running upload and batch tasks
7
+ # - Add /batch (admin/owner only)
8
+
9
  from __future__ import annotations
10
 
11
+ import asyncio
12
  import os
 
13
  import time
 
14
  from dataclasses import dataclass
15
+ from typing import Any, Dict, Optional, Tuple
16
 
 
17
  from hydrogram import Client, filters
18
+ from hydrogram.types import CallbackQuery, InlineKeyboardMarkup, Message
19
+
20
+ from bot.config import Auth, Workers
21
+ from bot.integrations.auth import (allow_user, disallow_user, get_stats,
22
+ is_allowed)
23
+ from bot.integrations.cf_worker1 import (profile_add, profile_check_auth,
24
+ profile_delete)
25
+ from bot.integrations.cf_worker2 import (get_default_profile, list_profiles,
26
+ record_upload, set_default_profile)
27
+ from bot.telegram.files import cleanup_file
28
+ from bot.telegram.media import download_to_temp
29
+ from bot.telegram.replies import safe_edit, safe_reply
30
+ from bot.ui.callbacks import (MENU_AUTH, MENU_HELP, MENU_PROFILES, MENU_SPEED,
31
+ UP_DEL, UP_EDIT, UP_GO, UP_PRIV)
32
+ from bot.ui.keyboards import (auth_menu_keyboard, main_menu_keyboard,
33
+ profiles_keyboard, upload_confirm_keyboard)
34
+ from bot.ui.parse import parse_cb
35
+ from bot.ui.texts import CANCELLED, HELP_TEXT, NEED_AUTH, NOT_ALLOWED, OWNER_ONLY
36
  from bot.core.progress import SpeedETA, human_bytes, human_eta
37
+ from bot.core.settings import Settings
38
+ from bot.core.uptime import uptime_text
39
+ from bot.youtube.link_parser import parse_telegram_link
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  @dataclass
43
  class PendingUpload:
 
46
  file_name: str
47
  title: str
48
  description: str
49
+ privacy: str = "private"
50
+ status_msg: Optional[Message] = None
51
  via_link: bool = False
52
 
53
 
54
+ @dataclass
55
+ class EditState:
56
+ title: str
57
+ description: str
58
+ privacy: str
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
 
61
+ # In-memory state
62
+ _PENDING_UPLOAD: Dict[int, PendingUpload] = {} # uid -> pending upload
63
+ _AWAIT_EDIT: Dict[int, EditState] = {} # uid -> editing title/desc
64
+ _AWAIT_AUTH_MODE: Dict[int, str] = {} # uid -> "oauth" | "token"
65
+ _PENDING_DELETE: Dict[int, str] = {} # uid -> profile_id to delete
66
+ _IN_PROGRESS: Dict[int, bool] = {} # uid -> upload is running
67
+ _UPLOAD_TASK: Dict[int, asyncio.Task] = {} # uid -> running upload task (so /cancel can stop it)
68
+ _BATCH_TASK: Dict[int, asyncio.Task] = {} # uid -> running batch task
69
 
70
 
71
  def _is_admin_or_owner(uid: int) -> bool:
72
+ return uid in set(Auth.OWNERS) or uid in set(Auth.ADMINS)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
 
75
  async def _ensure_allowed(m: Message) -> bool:
76
+ uid = m.from_user.id
 
 
 
 
 
77
  if _is_admin_or_owner(uid):
78
  return True
79
+ ok = await is_allowed(uid)
80
+ if not ok:
81
  await safe_reply(m, NOT_ALLOWED)
82
+ return ok
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
 
85
+ def _media_and_filename(m: Message) -> Tuple[Optional[Any], str, int]:
86
+ if m.video:
87
+ return m.video, (m.video.file_name or "video.mp4"), (m.video.file_size or 0)
88
+ if m.document:
89
+ return m.document, (m.document.file_name or "file.bin"), (m.document.file_size or 0)
90
+ return None, "", 0
91
+
92
+
93
+ def extract_title_description(src: Message, file_name: str) -> Tuple[str, str]:
94
+ # Prefer caption text
95
+ caption = (src.caption or "").strip()
96
+ if caption:
97
+ # title = first line, desc = remaining lines
98
+ parts = caption.splitlines()
99
+ title = parts[0].strip()
100
+ desc = "\n".join([p.strip() for p in parts[1:]]).strip()
101
+ return title[:Settings.MAX_TITLE], desc[:Settings.MAX_DESC]
102
+ # fallback to filename
103
+ base = os.path.splitext(file_name)[0].strip()
104
+ title = (base or "Untitled")[:Settings.MAX_TITLE]
105
+ return title, ""
106
+
107
+
108
+ def _render_preview(p: PendingUpload, size: int) -> str:
109
+ return (
110
+ "📦 *Ready to upload*\\n\\n"
111
+ f"*File:* `{p.file_name}`\\n"
112
+ f"*Size:* {human_bytes(size)}\\n"
113
+ f"*Privacy:* `{p.privacy}`\\n\\n"
114
+ f"*Title:* {p.title}\\n"
115
+ f"*Description:* {(p.description[:600] + '…') if len(p.description) > 600 else (p.description or '—')}"
116
  )
117
 
118
 
119
+ async def _start_pending_upload(uid: int, src: Message, downloader: Client, via_link: bool = False) -> None:
120
+ media, file_name, size = _media_and_filename(src)
121
+ if not media:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  return
123
 
124
+ title, desc = extract_title_description(src, file_name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
 
126
+ p = PendingUpload(
127
+ src_msg=src,
128
  downloader=downloader,
129
  file_name=file_name,
130
  title=title,
131
  description=desc,
132
+ privacy="private",
 
133
  via_link=via_link,
134
+ status_msg=None,
135
  )
136
+ _PENDING_UPLOAD[uid] = p
137
+
138
+ txt = _render_preview(p, size)
139
+ await safe_reply(src, txt, reply_markup=upload_confirm_keyboard(p.privacy))
140
 
141
 
142
  async def _run_upload(uid: int) -> None:
 
145
  pending = _PENDING_UPLOAD.get(uid)
146
  if not pending:
147
  return
148
+ if not pending.status_msg:
149
+ return
150
 
151
  _IN_PROGRESS[uid] = True
152
  st = pending.status_msg
153
+ file_path: Optional[str] = None
154
 
155
  try:
156
+ # Ensure there is a default profile
157
+ prof = await get_default_profile(uid)
158
+ if not (isinstance(prof, dict) and prof.get("ok") and prof.get("profile_id") and prof.get("access_token")):
159
+ await safe_edit(st, NEED_AUTH, reply_markup=main_menu_keyboard())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  return
161
 
162
+ prof_id = str(prof["profile_id"])
163
+ access_token = str(prof["access_token"])
164
 
165
+ # Track total time (download + upload)
166
+ overall_start = time.time()
 
 
 
 
 
167
 
168
+ # Safety: Telegram/hydrogram callbacks sometimes pass dict/None.
169
+ # Convert to int to avoid int(dict)/float(dict) crashes.
170
  def _as_int(v: Any) -> int:
171
  if v is None:
172
  return 0
 
180
  except Exception:
181
  return 0
182
  if isinstance(v, dict):
183
+ for k in ("current", "sent", "bytes", "done", "downloaded", "uploaded", "processed"):
184
  if k in v:
185
+ try:
186
+ return int(float(v.get(k) or 0))
187
+ except Exception:
188
+ continue
189
+ # common pattern: {"current":..., "total":...}
190
+ try:
191
+ return int(float(next(iter(v.values()))))
192
+ except Exception:
193
+ return 0
194
  return 0
195
 
196
+ # ---- Download (TG -> temp) with live speed ----
197
+ await safe_edit(st, "⬇️ Downloading…")
198
+ dl_speed = SpeedETA()
199
+ dl_last_ui = 0.0
200
+ dl_start = time.time()
201
+
202
+ async def dl_progress(cur: Any, total: Any) -> None:
203
+ nonlocal dl_last_ui
204
+ now = time.time()
205
+ if now - dl_last_ui < 0.8:
206
+ return
207
+ dl_last_ui = now
208
+
209
+ cur_i = _as_int(cur)
210
+ total_i = _as_int(total)
211
+ if total_i <= 0:
212
+ total_i = max(cur_i, 1)
213
+
214
+ rate = dl_speed.update(cur_i, total_i)
215
+ txt = (
216
+ "⬇️ Downloading…\\n\\n"
217
+ f"{human_bytes(cur_i)} / {human_bytes(total_i)}\\n"
218
+ f"Speed: {human_bytes(rate)}/s\\n"
219
+ f"ETA: {human_eta(dl_speed.eta_seconds)}"
220
+ )
221
+ await safe_edit(st, txt)
222
+
223
+ file_path, _, _ = await download_to_temp(pending.downloader, pending.src_msg, progress_cb=dl_progress)
224
+ dl_dur = max(0.001, time.time() - dl_start)
225
+ try:
226
+ file_bytes = int(os.path.getsize(file_path))
227
+ except Exception:
228
+ file_bytes = 0
229
+
230
+ # ---- Upload (temp -> YouTube) with live speed ----
231
+ from bot.youtube.uploader import upload_video # local import
232
+ ul_speed = SpeedETA()
233
+ ul_last_ui = 0.0
234
+ ul_start = time.time()
235
+
236
  async def progress_cb(sent: Any, total: Any) -> None:
237
+ nonlocal ul_last_ui
238
  now = time.time()
239
+ if now - ul_last_ui < 0.8:
240
  return
241
+ ul_last_ui = now
242
 
243
  sent_i = _as_int(sent)
244
  total_i = _as_int(total)
245
  if total_i <= 0:
246
  total_i = max(sent_i, 1)
247
 
248
+ rate = ul_speed.update(sent_i, total_i)
249
  txt = (
250
+ "⬆️ Uploading…\\n\\n"
251
+ f"{human_bytes(sent_i)} / {human_bytes(total_i)}\\n"
252
+ f"Speed: {human_bytes(rate)}/s\\n"
253
+ f"ETA: {human_eta(ul_speed.eta_seconds)}"
254
  )
255
  await safe_edit(st, txt)
256
 
257
  up = await upload_video(
258
+ access_token=access_token,
259
  file_path=file_path,
260
  title=pending.title,
261
  description=pending.description,
 
265
 
266
  if not (isinstance(up, dict) and up.get("ok")):
267
  err = up.get("err", "upload_failed") if isinstance(up, dict) else "upload_failed"
268
+ detail = up.get("detail") if isinstance(up, dict) else None
269
+
270
+ msg = f"❌ Upload failed: `{err}`"
271
+ if detail:
272
+ d = str(detail)
273
+ msg += f"\n`{d[:280]}`"
274
+
275
+ if "uploadLimitExceeded" in d or "quotaExceeded" in d:
276
+ msg += "\n\nℹ️ This looks like a YouTube daily upload/quota limit. Try another profile or wait and retry later."
277
+
278
+ await safe_edit(st, msg)
279
  return
280
 
281
+ url = up.get("url") or ""
282
+
283
  await record_upload(uid, prof_id)
284
 
285
+ ul_dur = max(0.001, time.time() - ul_start)
286
+ total_dur = max(0.001, time.time() - overall_start)
287
+
288
+ dl_avg = (human_bytes(file_bytes / dl_dur) + "/s") if file_bytes and dl_dur else "—"
289
+ ul_avg = (human_bytes(file_bytes / ul_dur) + "/s") if file_bytes and ul_dur else "—"
290
+
291
  done = "✅ Uploaded!"
292
  if url:
293
  done += f"\n\n{url}"
294
+ done += (
295
+ f"\n\nDownload: {dl_dur:.1f}s • avg {dl_avg}"
296
+ f"\nUpload: {ul_dur:.1f}s • avg {ul_avg}"
297
+ f"\nTotal: {total_dur:.1f}s"
298
+ )
299
  await safe_edit(st, done)
300
 
301
+ except asyncio.CancelledError:
302
+ # /cancel pressed
303
+ try:
304
+ await safe_edit(st, CANCELLED, reply_markup=main_menu_keyboard())
305
+ except Exception:
306
+ pass
307
  except Exception as e:
308
  await safe_edit(st, f"❌ Error: `{str(e)[:180]}`")
309
  finally:
310
+ _PENDING_UPLOAD.pop(uid, None)
311
+ _AWAIT_EDIT.pop(uid, None)
312
  if file_path:
313
  try:
314
  cleanup_file(file_path)
315
  except Exception:
316
  pass
317
  _IN_PROGRESS.pop(uid, None)
318
+ _UPLOAD_TASK.pop(uid, None)
319
 
320
 
321
+ def setup_handlers(app: Client, user_app: Optional[Client]) -> None:
322
+ # ---- basic ----
323
+ @app.on_message(filters.command(["start", "help"]) & filters.private)
 
 
 
324
  async def start_cmd(_, m: Message) -> None:
325
+ from bot.ui.texts import START_TEXT
326
  await safe_reply(m, START_TEXT, reply_markup=main_menu_keyboard())
327
 
 
 
 
 
328
  @app.on_message(filters.command("cancel") & filters.private)
329
  async def cancel_cmd(_, m: Message) -> None:
330
  uid = m.from_user.id
 
332
  _AWAIT_EDIT.pop(uid, None)
333
  _PENDING_UPLOAD.pop(uid, None)
334
  _PENDING_DELETE.pop(uid, None)
335
+
336
+ # stop running upload task (if any)
337
+ task = _UPLOAD_TASK.pop(uid, None)
338
+ if task and not task.done():
339
+ task.cancel()
340
+
341
+ btask = _BATCH_TASK.pop(uid, None)
342
+ if btask and not btask.done():
343
+ btask.cancel()
344
+
345
+ _IN_PROGRESS.pop(uid, None)
346
+
347
  await safe_reply(m, CANCELLED, reply_markup=main_menu_keyboard())
348
 
349
+ # ---- admin allowlist ----
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
350
  @app.on_message(filters.command("allow") & filters.private)
351
  async def allow_cmd(_, m: Message) -> None:
352
  uid = m.from_user.id
353
  if not _is_admin_or_owner(uid):
354
  await safe_reply(m, OWNER_ONLY)
355
  return
356
+ args = (m.text or "").split(maxsplit=1)
357
+ if len(args) < 2:
358
  await safe_reply(m, "Usage: /allow <tg_id>")
359
  return
360
+ try:
361
+ tid = int(args[1].strip())
362
+ except Exception:
363
+ await safe_reply(m, "Invalid tg_id")
364
+ return
365
+ await allow_user(tid)
366
+ await safe_reply(m, f"✅ Allowed `{tid}`")
367
 
368
  @app.on_message(filters.command("disallow") & filters.private)
369
  async def disallow_cmd(_, m: Message) -> None:
 
371
  if not _is_admin_or_owner(uid):
372
  await safe_reply(m, OWNER_ONLY)
373
  return
374
+ args = (m.text or "").split(maxsplit=1)
375
+ if len(args) < 2:
376
  await safe_reply(m, "Usage: /disallow <tg_id>")
377
  return
378
+ try:
379
+ tid = int(args[1].strip())
380
+ except Exception:
381
+ await safe_reply(m, "Invalid tg_id")
382
+ return
383
+ await disallow_user(tid)
384
+ await safe_reply(m, f"✅ Disallowed `{tid}`")
385
 
386
  @app.on_message(filters.command("stats") & filters.private)
387
  async def stats_cmd(_, m: Message) -> None:
 
389
  if not _is_admin_or_owner(uid):
390
  await safe_reply(m, OWNER_ONLY)
391
  return
392
+ st = await get_stats()
393
+ if not isinstance(st, dict):
394
+ await safe_reply(m, "❌ stats failed")
395
+ return
396
+ await safe_reply(
397
+ m,
398
+ "📊 *Today stats*\n\n"
399
+ f"Allowed users: `{st.get('allowed_users', 0)}`\n"
400
+ f"Profiles: `{st.get('profiles', 0)}`\n"
401
+ f"Uploads today: `{st.get('uploads_today', 0)}`",
402
+ )
403
+
404
+ @app.on_message(filters.command("diag") & filters.private)
405
+ async def diag_cmd(_, m: Message) -> None:
406
+ uid = m.from_user.id
407
+ if not _is_admin_or_owner(uid):
408
+ await safe_reply(m, OWNER_ONLY)
409
+ return
410
 
411
+ w1 = Workers.WORKER1_URL or "—"
412
+ w2 = Workers.WORKER2_URL or "—"
413
+
414
+ await safe_reply(
415
+ m,
416
+ "🧪 *Diag*\n\n"
417
+ f"Uptime: `{uptime_text()}`\n"
418
+ f"WORKER1_URL: `{w1}`\n"
419
+ f"WORKER2_URL: `{w2}`\n"
420
+ f"Owners: `{len(Auth.OWNERS)}` Admins: `{len(Auth.ADMINS)}`",
421
+ )
422
+
423
+ # ---- auth/profile flow ----
424
  @app.on_message(filters.command("auth") & filters.private)
425
  async def auth_cmd(_, m: Message) -> None:
426
  if not await _ensure_allowed(m):
427
  return
428
+ uid = m.from_user.id
429
+ _AWAIT_AUTH_MODE[uid] = "oauth"
430
+ await safe_reply(m, "🔐 Add a profile:", reply_markup=auth_menu_keyboard())
431
 
432
  @app.on_message(filters.command("profiles") & filters.private)
433
  async def profiles_cmd(_, m: Message) -> None:
434
  if not await _ensure_allowed(m):
435
  return
436
  uid = m.from_user.id
437
+ data = await list_profiles(uid, only_connected=False)
438
+ if not (isinstance(data, dict) and data.get("ok")):
439
+ await safe_reply(m, "❌ Failed to list profiles.")
440
+ return
441
+ profiles = data.get("profiles") or []
442
+ default_id = data.get("default_profile_id") or ""
443
+ await safe_reply(m, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id))
444
 
445
+ # ---- upload from DM media ----
446
+ @app.on_message(filters.private & (filters.video | filters.document))
447
+ async def media_in_dm(_, m: Message) -> None:
448
  if not await _ensure_allowed(m):
449
  return
450
  uid = m.from_user.id
451
  if _IN_PROGRESS.get(uid):
452
+ await safe_reply(m, "⏳ Upload already running. Use /cancel to stop.")
 
 
 
 
453
  return
454
+ await _start_pending_upload(uid, m, app, via_link=False)
455
 
456
+ # ---- upload from link (admin/owner) ----
457
  @app.on_message(filters.command(["yt", "dl", "archive"]) & filters.private)
458
  async def archive_cmd(_, m: Message) -> None:
459
+ uid = m.from_user.id
460
  if not await _ensure_allowed(m):
461
  return
 
462
  if not _is_admin_or_owner(uid):
463
  await safe_reply(m, OWNER_ONLY)
464
  return
465
+ if user_app is None:
466
+ await safe_reply(m, "❌ This mode requires user session (user_app missing).")
467
+ return
468
  args = (m.text or "").split(maxsplit=1)
469
  if len(args) < 2:
470
+ await safe_reply(m, "Usage: /archive <t.me message link>")
 
 
 
471
  return
 
472
  try:
473
  chat_ref, msg_id = parse_telegram_link(args[1].strip())
474
  except Exception as e:
475
+ await safe_reply(m, f"Bad link: {e}")
476
+ return
477
+ st = await safe_reply(m, "🔎 Fetching message…")
478
+ if not st:
479
  return
 
 
480
  try:
481
  src = await user_app.get_messages(chat_ref, msg_id)
482
  except Exception as e:
483
+ await safe_edit(st, f"❌ Fetch failed: `{str(e)[:180]}`")
484
  return
485
+ await safe_edit(st, "✅ Message fetched. Preparing preview…")
486
+ await _start_pending_upload(uid, src, user_app, via_link=True)
487
 
488
+ # batch mode (owner/admin only): /batch <t.me links...> (one per line)
489
+ @app.on_message(filters.command("batch") & filters.private)
490
+ async def batch_cmd(_, m: Message) -> None:
491
+ if not await _ensure_allowed(m):
492
  return
493
+ uid = m.from_user.id
494
+ if not _is_admin_or_owner(uid):
495
+ await safe_reply(m, OWNER_ONLY)
496
+ return
497
+ if user_app is None:
498
+ await safe_reply(m, "❌ Batch mode is not configured (user session missing).")
 
 
 
 
 
 
 
 
 
 
499
  return
500
 
501
+ args = (m.text or "").split(maxsplit=1)
502
+ if len(args) < 2:
503
+ await safe_reply(m, "Send: /batch <t.me message links> (one per line)")
504
  return
505
 
506
+ links = [ln.strip() for ln in args[1].splitlines() if ln.strip()]
507
+ if not links:
508
+ await safe_reply(m, "No links found. Put one t.me link per line after /batch")
 
 
509
  return
510
 
511
+ if _BATCH_TASK.get(uid) and not _BATCH_TASK[uid].done():
512
+ await safe_reply(m, "⏳ A batch is already running. Use /cancel to stop it.")
 
 
 
513
  return
514
 
515
+ await safe_reply(m, f"🧾 Batch starting: {len(links)} item(s).")
516
+
517
+ async def runner() -> None:
518
+ batch_start = time.time()
519
+ for i, link in enumerate(links, 1):
520
+ try:
521
+ chat_ref, msg_id = parse_telegram_link(link)
522
+ except Exception as e:
523
+ await safe_reply(m, f"❌ {i}/{len(links)} bad link: {e}")
524
+ continue
525
 
526
+ st = await safe_reply(m, f"🔎 Batch {i}/{len(links)}: fetching…")
527
+ if not st:
528
+ continue
529
 
530
+ try:
531
+ src = await user_app.get_messages(chat_ref, msg_id)
532
+ except Exception as e:
533
+ await safe_edit(st, f"❌ Batch {i}/{len(links)} fetch failed: `{str(e)[:180]}`")
534
+ continue
535
+
536
+ media, file_name, _ = _media_and_filename(src)
537
+ if not media:
538
+ await safe_edit(st, f"❌ Batch {i}/{len(links)}: no video/document in that message.")
539
+ continue
540
+
541
+ title, desc = extract_title_description(src, file_name)
542
+
543
+ _PENDING_UPLOAD[uid] = PendingUpload(
544
+ src_msg=src,
545
+ downloader=user_app,
546
+ file_name=file_name,
547
+ title=title,
548
+ description=desc,
549
+ privacy="private",
550
+ status_msg=st,
551
+ via_link=True,
552
  )
 
 
 
 
553
 
554
+ await safe_edit(st, f"⏳ Batch {i}/{len(links)}: starting upload…")
555
+ await _run_upload(uid)
556
+
557
+ batch_dur = max(0.001, time.time() - batch_start)
558
+ await safe_reply(m, f"✅ Batch done in {batch_dur:.1f}s.")
559
+
560
+ t = asyncio.create_task(runner())
561
+ _BATCH_TASK[uid] = t
562
+
563
+ # callbacks
564
+ @app.on_callback_query(filters.private)
565
+ async def cb_handler(_, q: CallbackQuery) -> None:
566
+ uid = q.from_user.id
567
+ action, value = parse_cb(q.data or "")
568
+ # Menus
569
+ if action == MENU_HELP:
570
+ await safe_edit(q.message, HELP_TEXT, reply_markup=main_menu_keyboard())
571
  return
572
 
573
+ if action == MENU_SPEED:
574
+ if not await _ensure_allowed(q.message):
 
575
  return
576
+ from bot.core.speedtest import net_download_test, net_upload_test
577
+ await safe_edit(q.message, "⏱ Running speed test…")
578
+ dl = await net_download_test()
579
+ ul = await net_upload_test()
580
+ try:
581
+ dl_bps = float(dl.get("bps", 0) or 0)
582
+ except Exception:
583
+ dl_bps = 0.0
584
+ try:
585
+ ul_bps = float(ul.get("bps", 0) or 0)
586
+ except Exception:
587
+ ul_bps = 0.0
588
+
589
+ await safe_edit(
590
+ q.message,
591
+ "📶 *Speed Test*\n\n"
592
+ f"Download: `{human_bytes(dl_bps)}/s`\n"
593
+ f"Upload: `{human_bytes(ul_bps)}/s`",
594
+ reply_markup=main_menu_keyboard(),
595
+ )
596
  return
597
 
598
+ if action == MENU_AUTH:
599
+ if not await _ensure_allowed(q.message):
 
600
  return
601
+ await safe_edit(q.message, "🔐 Add a profile:", reply_markup=auth_menu_keyboard())
 
 
 
 
 
 
 
 
 
602
  return
603
 
604
+ if action == MENU_PROFILES:
605
+ if not await _ensure_allowed(q.message):
 
606
  return
607
+ data = await list_profiles(uid, only_connected=False)
608
+ if not (isinstance(data, dict) and data.get("ok")):
609
+ await safe_edit(q.message, "❌ Failed to list profiles.", reply_markup=main_menu_keyboard())
610
  return
611
+ profiles = data.get("profiles") or []
612
+ default_id = data.get("default_profile_id") or ""
613
+ await safe_edit(q.message, "👤 *Profiles*", reply_markup=profiles_keyboard(profiles, default_id))
 
 
 
 
614
  return
615
 
616
+ # Upload buttons
617
+ if action == UP_PRIV:
618
+ p = _PENDING_UPLOAD.get(uid)
619
+ if not p:
620
  return
621
+ cycle = {"private": "unlisted", "unlisted": "public", "public": "private"}
622
+ p.privacy = cycle.get(p.privacy, "private")
623
+ _PENDING_UPLOAD[uid] = p
624
+ media, _, size = _media_and_filename(p.src_msg)
625
+ await safe_edit(q.message, _render_preview(p, size), reply_markup=upload_confirm_keyboard(p.privacy))
 
 
 
 
 
 
 
 
 
 
 
 
626
  return
627
 
628
+ if action == UP_EDIT:
629
+ p = _PENDING_UPLOAD.get(uid)
630
+ if not p:
631
+ return
632
+ _AWAIT_EDIT[uid] = EditState(title=p.title, description=p.description, privacy=p.privacy)
633
  await safe_edit(
634
  q.message,
635
+ "✍️ Send new title + description like this:\n\n"
636
+ "Title line\n"
637
+ "Description lines…\n\n"
638
+ "(Send only 1 line to change title only)",
639
  )
640
  return
641
 
642
+ if action == UP_DEL:
643
+ _PENDING_UPLOAD.pop(uid, None)
644
+ await safe_edit(q.message, CANCELLED, reply_markup=main_menu_keyboard())
 
 
 
 
645
  return
646
 
647
+ if action == UP_GO:
648
+ p = _PENDING_UPLOAD.get(uid)
649
+ if not p:
 
 
650
  return
651
+ if _IN_PROGRESS.get(uid):
652
+ await safe_edit(q.message, "⏳ Upload already running. Use /cancel to stop.", reply_markup=main_menu_keyboard())
 
 
 
 
 
 
653
  return
654
+ p.status_msg = q.message
655
+ _PENDING_UPLOAD[uid] = p
656
+ await safe_edit(q.message, "⏳ Starting upload…")
657
+ t = asyncio.create_task(_run_upload(uid))
658
+ _UPLOAD_TASK[uid] = t
659
+ return
660
 
661
+ # Profile callbacks (from profiles_keyboard)
662
+ if action == "pdel":
663
+ if not await _ensure_allowed(q.message):
 
664
  return
665
+ pid = value
666
+ _PENDING_DELETE[uid] = pid
667
+ await safe_edit(q.message, f"❓ Delete profile `{pid}`? Reply YES to confirm.")
668
+ return
669
 
670
+ if action == "pdef":
671
+ if not await _ensure_allowed(q.message):
 
 
 
 
672
  return
673
+ pid = value
674
+ out = await set_default_profile(uid, pid)
675
+ if isinstance(out, dict) and out.get("ok"):
676
+ data = await list_profiles(uid, only_connected=False)
677
+ profiles = (data or {}).get("profiles") or []
678
+ default_id = (data or {}).get("default_profile_id") or ""
679
+ await safe_edit(q.message, "✅ Default set.", reply_markup=profiles_keyboard(profiles, default_id))
680
+ else:
681
+ await safe_edit(q.message, "❌ Failed to set default.", reply_markup=main_menu_keyboard())
682
+ return
683
 
684
+ if action == "plog":
685
+ if not await _ensure_allowed(q.message):
 
686
  return
687
+ pid = value
688
+ chk = await profile_check_auth(uid, pid)
689
+ if isinstance(chk, dict) and chk.get("ok") and chk.get("login_url"):
690
+ await safe_edit(
691
+ q.message,
692
+ f"🔗 Login URL:\n{chk.get('login_url')}\n\nAfter login, reopen /profiles.",
693
+ reply_markup=main_menu_keyboard(),
694
+ )
695
+ else:
696
+ await safe_edit(q.message, "❌ Failed to create login URL.", reply_markup=main_menu_keyboard())
697
  return
698
 
699
+ # ---- edit text handler ----
700
+ @app.on_message(filters.private & filters.text)
701
+ async def text_in_dm(_, m: Message) -> None:
702
+ uid = m.from_user.id
 
703
 
704
+ # Confirm delete
705
+ if uid in _PENDING_DELETE:
706
+ if (m.text or "").strip().lower() == "yes":
707
+ pid = _PENDING_DELETE.pop(uid)
708
+ out = await profile_delete(uid, pid)
709
+ if isinstance(out, dict) and out.get("ok"):
710
+ await safe_reply(m, " Profile deleted.", reply_markup=main_menu_keyboard())
711
  else:
712
+ await safe_reply(m, "❌ Delete failed.", reply_markup=main_menu_keyboard())
713
+ else:
714
+ _PENDING_DELETE.pop(uid, None)
715
+ await safe_reply(m, "Cancelled.", reply_markup=main_menu_keyboard())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
716
  return
717
 
718
+ # Edit title/desc
719
+ if uid in _AWAIT_EDIT:
720
+ st = _AWAIT_EDIT.pop(uid)
721
+ p = _PENDING_UPLOAD.get(uid)
722
+ if not p:
723
  return
724
+ txt = (m.text or "").strip()
725
+ lines = txt.splitlines()
726
+ if len(lines) == 1:
727
+ p.title = lines[0].strip()[:Settings.MAX_TITLE]
728
+ else:
729
+ p.title = lines[0].strip()[:Settings.MAX_TITLE]
730
+ p.description = "\n".join(lines[1:]).strip()[:Settings.MAX_DESC]
731
+ _PENDING_UPLOAD[uid] = p
732
+ media, _, size = _media_and_filename(p.src_msg)
733
+ await safe_reply(m, _render_preview(p, size), reply_markup=upload_confirm_keyboard(p.privacy))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
734
  return