BasicHfBot / bot /handlers.py
understanding's picture
Create bot/handlers.py
0eb342a verified
raw
history blame
28.8 kB
from __future__ import annotations
import os
import json
import asyncio
from typing import Optional
from hydrogram import Client, filters
from hydrogram.types import (
Message,
InlineKeyboardMarkup,
InlineKeyboardButton,
CallbackQuery,
)
from app.state import STATE, UploadJob
from app.settings import (
ROTATE_AFTER_PER_PROFILE,
DEFAULT_PRIVACY,
DEFAULT_TITLE_MODE,
YOUTUBE_CHUNK_SIZE,
PROGRESS_EDIT_EVERY_SEC,
MAX_CONCURRENT_UPLOADS,
TMP_DIR,
)
from app.progress import human_bytes, fmt_eta
from app.youtube_api import youtube_resumable_upload, _clean_title
from app.cf_api import CFClient
from app.config import Config
def _mkdir_tmp():
os.makedirs(TMP_DIR, exist_ok=True)
def menu_kb(is_owner: bool):
rows = [
[InlineKeyboardButton("📦 Accounts", callback_data="menu:accounts"),
InlineKeyboardButton("➕ Add Account", callback_data="menu:addacc")],
[InlineKeyboardButton("⚙️ Auto Mode", callback_data="menu:automode")],
]
if is_owner:
rows.append([InlineKeyboardButton("📊 Stats", callback_data="menu:stats")])
rows.append([InlineKeyboardButton("✅ Allow User", callback_data="menu:allow"),
InlineKeyboardButton("⛔ Disallow User", callback_data="menu:disallow")])
return InlineKeyboardMarkup(rows)
def upload_options_kb(u: UploadJob):
# Compact stable format
rows = [
[
InlineKeyboardButton(f"Privacy: {u.privacy}", callback_data=f"u:{u.upload_id}:privacy"),
InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:privacy_menu"),
],
[
InlineKeyboardButton(f"Title: {u.title_mode}", callback_data=f"u:{u.upload_id}:title"),
InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:title_menu"),
],
[
InlineKeyboardButton("▶️ Start Upload", callback_data=f"u:{u.upload_id}:start"),
InlineKeyboardButton("✖️ Cancel", callback_data=f"u:{u.upload_id}:cancel"),
],
]
return InlineKeyboardMarkup(rows)
def privacy_kb(upload_id: str):
return InlineKeyboardMarkup([
[InlineKeyboardButton("private", callback_data=f"u:{upload_id}:set_priv:private"),
InlineKeyboardButton("unlisted", callback_data=f"u:{upload_id}:set_priv:unlisted"),
InlineKeyboardButton("public", callback_data=f"u:{upload_id}:set_priv:public")],
[InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")],
])
def title_kb(upload_id: str):
return InlineKeyboardMarkup([
[InlineKeyboardButton("filename", callback_data=f"u:{upload_id}:set_title:filename"),
InlineKeyboardButton("caption", callback_data=f"u:{upload_id}:set_title:caption")],
[InlineKeyboardButton("custom", callback_data=f"u:{upload_id}:set_title:custom")],
[InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")],
])
def accounts_kb(profiles: list, tg_id: str, default_profile_id: Optional[str], is_owner: bool):
rows = []
for p in profiles:
pid = p["profile_id"]
label = p.get("label") or "profile"
ch = p.get("channel_title") or p.get("channel_id") or "no-channel"
ok = "✅" if p.get("has_refresh") else "⚠️"
d = "⭐" if pid == default_profile_id else ""
rows.append([InlineKeyboardButton(f"{ok}{d} {label}{ch}", callback_data=f"acc:info:{pid}")])
if profiles:
rows.append([InlineKeyboardButton("⭐ Set Default", callback_data="acc:setdef_menu")])
rows.append([InlineKeyboardButton("🗑 Remove Profile", callback_data="acc:remove_menu")])
rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:home")])
if is_owner:
rows.append([InlineKeyboardButton("List Allowed (if available)", callback_data="acc:list_allowed")])
return InlineKeyboardMarkup(rows)
def pick_profile_kb(profiles: list, action: str):
# action: "setdef" or "remove"
rows = []
for p in profiles:
pid = p["profile_id"]
label = p.get("label") or "profile"
ch = p.get("channel_title") or p.get("channel_id") or "no-channel"
ok = "✅" if p.get("has_refresh") else "⚠️"
rows.append([InlineKeyboardButton(f"{ok} {label}{ch}", callback_data=f"acc:{action}:{pid}")])
rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:accounts")])
return InlineKeyboardMarkup(rows)
def is_owner(cfg: Config, user_id: int) -> bool:
return user_id == cfg.OWNER_ID
def _safe_title_from(job: UploadJob) -> str:
if job.title_mode == "custom" and job.custom_title.strip():
return _clean_title(job.custom_title.strip())
if job.title_mode == "filename":
base = os.path.splitext(job.file_name)[0]
return _clean_title(base)
# caption default
if job.caption.strip():
# first line as title
first = job.caption.strip().splitlines()[0]
return _clean_title(first)
base = os.path.splitext(job.file_name)[0]
return _clean_title(base)
async def register_handlers(app: Client, cfg: Config, cf: CFClient):
# semaphore size from settings
STATE.sem = asyncio.Semaphore(MAX_CONCURRENT_UPLOADS)
@app.on_message(filters.command("start"))
async def start_cmd(_, m: Message):
u = m.from_user.id
kb = menu_kb(is_owner(cfg, u))
await m.reply_text(
"✅ Bot online.\n\nSend a video to upload.\nUse buttons for accounts/settings.",
reply_markup=kb
)
@app.on_callback_query()
async def cb(_, q: CallbackQuery):
data = (q.data or "")
u = q.from_user.id
tg_id = str(u)
if data.startswith("menu:"):
act = data.split(":", 1)[1]
if act == "home":
await q.message.edit_text("Main menu:", reply_markup=menu_kb(is_owner(cfg, u)))
await q.answer()
return
if act == "accounts":
res = await cf.list_profiles_w2(tg_id)
profiles = res.get("profiles", [])
dpid = res.get("default_profile_id")
await q.message.edit_text(
f"📦 Accounts for {tg_id}\nDefault: {dpid}\n\n✅ authorized | ⚠️ not authorized",
reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u))
)
await q.answer()
return
if act == "addacc":
STATE.waiting_client_id[u] = True
await q.message.edit_text(
"➕ Add Account\n\nSend:\n1) Google OAuth JSON file (client_secret_*.json)\nOR\n2) client_id text (ends with apps.googleusercontent.com)",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
)
await q.answer()
return
if act == "automode":
cur = STATE.auto_mode.get(u, False)
STATE.auto_mode[u] = not cur
v = "ON ✅" if STATE.auto_mode[u] else "OFF ❌"
await q.answer(f"Auto Mode: {v}", show_alert=True)
# no edit required
return
if act == "stats":
if not is_owner(cfg, u):
await q.answer("Owner only", show_alert=True)
return
res = await cf.stats_today()
if not res.get("ok"):
await q.answer("stats failed", show_alert=True)
return
text = (
f"📊 Stats (UTC day {res.get('day')})\n"
f"Uploads today total: {res.get('uploads_today_total')}\n"
f"Active users today: {res.get('active_users_today')}\n\n"
f"Errors last20:\n"
)
errs = res.get("errors_last20") or []
if not errs:
text += "— none —"
else:
for e in errs[:20]:
text += f"- {e.get('where')} | {e.get('tg_id')} | {e.get('err')}\n"
await q.message.edit_text(text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]]))
await q.answer()
return
if act == "allow":
if not is_owner(cfg, u):
await q.answer("Owner only", show_alert=True)
return
await q.message.edit_text(
"✅ Allow User\n\nForward a message from that user.\nIf forward privacy hides id, then send numeric user_id.",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
)
# mark mode using waiting_client_secret dict hack? better separate:
STATE.waiting_client_secret[u] = "__ALLOW__"
await q.answer()
return
if act == "disallow":
if not is_owner(cfg, u):
await q.answer("Owner only", show_alert=True)
return
await q.message.edit_text(
"⛔ Disallow User\n\nForward a message from that user OR send numeric user_id.",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])
)
STATE.waiting_client_secret[u] = "__DISALLOW__"
await q.answer()
return
# accounts submenus
if data == "acc:setdef_menu":
res = await cf.list_profiles_w2(tg_id)
profiles = res.get("profiles", [])
await q.message.edit_text("Pick profile to set default:", reply_markup=pick_profile_kb(profiles, "setdef"))
await q.answer()
return
if data == "acc:remove_menu":
res = await cf.list_profiles_w2(tg_id)
profiles = res.get("profiles", [])
await q.message.edit_text("Pick profile to remove:", reply_markup=pick_profile_kb(profiles, "remove"))
await q.answer()
return
if data.startswith("acc:setdef:"):
pid = data.split(":", 2)[2]
r = await cf.profile_set_default(tg_id, pid)
await q.answer("Default set ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True)
# back to accounts
res = await cf.list_profiles_w2(tg_id)
profiles = res.get("profiles", [])
dpid = res.get("default_profile_id")
await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u)))
return
if data.startswith("acc:remove:"):
pid = data.split(":", 2)[2]
r = await cf.profile_remove(tg_id, pid)
await q.answer("Removed ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True)
res = await cf.list_profiles_w2(tg_id)
profiles = res.get("profiles", [])
dpid = res.get("default_profile_id")
await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u)))
return
# upload callbacks
if data.startswith("u:"):
parts = data.split(":")
upload_id = parts[1]
act = parts[2] if len(parts) > 2 else ""
job = STATE.uploads.get(upload_id)
if not job:
await q.answer("This upload expired/restarted.", show_alert=True)
return
if job.user_id != u:
await q.answer("Not yours", show_alert=True)
return
if act == "privacy_menu":
await q.message.edit_text("Choose privacy:", reply_markup=privacy_kb(upload_id))
await q.answer()
return
if act == "title_menu":
await q.message.edit_text("Choose title mode:", reply_markup=title_kb(upload_id))
await q.answer()
return
if act == "set_priv":
val = parts[3]
job.privacy = val
await q.message.edit_text(
f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
reply_markup=upload_options_kb(job)
)
await q.answer("Saved ✅")
return
if act == "set_title":
val = parts[3]
job.title_mode = val
if val == "custom":
STATE.waiting_custom_title[u] = upload_id
await q.message.edit_text("Send custom title text now (next message).", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")]]))
await q.answer()
return
await q.message.edit_text(
f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
reply_markup=upload_options_kb(job)
)
await q.answer("Saved ✅")
return
if act == "back":
await q.message.edit_text(
f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
reply_markup=upload_options_kb(job)
)
await q.answer()
return
if act == "cancel":
STATE.uploads.pop(upload_id, None)
await q.message.edit_text("Cancelled ✅", reply_markup=menu_kb(is_owner(cfg, u)))
await q.answer()
return
if act == "start":
await q.answer("Starting upload…")
asyncio.create_task(run_upload(app, cfg, cf, job))
return
await q.answer()
@app.on_message(filters.text & filters.private)
async def text_in(_, m: Message):
u = m.from_user.id
# custom title input
if u in STATE.waiting_custom_title:
upload_id = STATE.waiting_custom_title.pop(u)
job = STATE.uploads.get(upload_id)
if job:
job.custom_title = m.text.strip()
await m.reply_text(
f"✅ Custom title saved.\nNow press Start Upload for file: {job.file_name}",
reply_markup=upload_options_kb(job)
)
return
# owner allow/disallow mode
if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] in ("__ALLOW__", "__DISALLOW__"):
mode = STATE.waiting_client_secret.pop(u)
target = (m.text or "").strip()
# if numeric in text
tg_id = None
if target.isdigit():
tg_id = target
else:
await m.reply_text("Send numeric user_id or forward a message from user (if forward shows id).")
return
if mode == "__ALLOW__":
r = await cf.allow_user(tg_id)
await m.reply_text("✅ Allowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True))
return
else:
r = await cf.disallow_user(tg_id)
await m.reply_text("⛔ Disallowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True))
return
# add account flow (client_id then client_secret)
if STATE.waiting_client_id.get(u):
t = (m.text or "").strip()
if "apps.googleusercontent.com" in t:
STATE.waiting_client_id.pop(u, None)
STATE.waiting_client_secret[u] = t
await m.reply_text("Now send CLIENT_SECRET (starts with GOCSPX-...)")
return
await m.reply_text("Send client_id (apps.googleusercontent.com) or upload Google JSON file.")
return
if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] and not STATE.waiting_client_secret[u].startswith("__"):
client_id = STATE.waiting_client_secret[u]
client_secret = (m.text or "").strip()
if not client_secret:
await m.reply_text("Empty secret. Send again.")
return
STATE.waiting_client_secret.pop(u, None)
tg_id = str(u)
# create profile + login link
r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600)
if not r.get("ok"):
await m.reply_text(f"❌ Failed to add profile: {r.get('err')}")
return
login_url = r.get("login_url")
pid = r.get("profile_id")
await m.reply_text(
f"✅ Profile created: {pid}\n\nOpen login link to authorize:\n(If Telegram preview consumes ticket, just add again for now.)",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=login_url)]])
)
return
@app.on_message(filters.document & filters.private)
async def doc_in(_, m: Message):
u = m.from_user.id
if not STATE.waiting_client_id.get(u):
return
doc = m.document
name = (doc.file_name or "").lower()
if not name.endswith(".json"):
await m.reply_text("Send Google OAuth JSON file (client_secret_*.json).")
return
_mkdir_tmp()
path = os.path.join(TMP_DIR, f"cred_{u}.json")
await m.download(file_name=path)
try:
with open(path, "r", encoding="utf-8") as f:
j = json.load(f)
block = j.get("installed") or j.get("web") or {}
client_id = block.get("client_id", "")
client_secret = block.get("client_secret", "")
if not client_id or not client_secret:
raise ValueError("missing client_id/client_secret in json")
except Exception as e:
await m.reply_text(f"❌ Bad JSON: {e}")
return
finally:
try:
os.remove(path)
except Exception:
pass
STATE.waiting_client_id.pop(u, None)
tg_id = str(u)
r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600)
if not r.get("ok"):
await m.reply_text(f"❌ Failed to add profile: {r.get('err')}")
return
await m.reply_text(
f"✅ Profile created: {r.get('profile_id')}\nAuthorize now:",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=r.get("login_url"))]])
)
@app.on_message((filters.video | filters.document) & filters.private)
async def video_in(_, m: Message):
u = m.from_user.id
tg_id = str(u)
# owner always allowed
if not is_owner(cfg, u):
ok = await cf.is_allowed(tg_id)
if not ok:
await m.reply_text("⛔ You are not allowed. Ask owner to allow you.")
return
# Determine file
file_id = None
file_name = "video.mp4"
file_type = "video"
caption = (m.caption or "")
if m.video:
file_id = m.video.file_id
file_name = m.video.file_name or "video.mp4"
file_type = "video"
elif m.document and (m.document.mime_type or "").startswith("video/"):
file_id = m.document.file_id
file_name = m.document.file_name or "video.mp4"
file_type = "document"
else:
return
# Create upload job
upload_id = STATE.new_upload_id()
job = UploadJob(
upload_id=upload_id,
user_id=u,
chat_id=m.chat.id,
src_msg_id=m.id,
file_type=file_type,
tg_file_id=file_id,
file_name=file_name,
caption=caption,
privacy=DEFAULT_PRIVACY,
title_mode=DEFAULT_TITLE_MODE,
)
STATE.uploads[upload_id] = job
# Auto mode: start immediately with defaults (bulk-friendly)
auto = STATE.auto_mode.get(u, False)
if auto:
status = await m.reply_text(
f"📤 Auto Mode ON\nUploading with defaults…\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
)
job.status_msg_id = status.id
asyncio.create_task(run_upload(app, cfg, cf, job))
return
# Manual mode: show options
msg = await m.reply_text(
f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}",
reply_markup=upload_options_kb(job)
)
job.status_msg_id = msg.id
async def run_upload(app: Client, cfg: Config, cf: CFClient, job: UploadJob):
"""
Upload pipeline:
- check default profile/channel
- pick rotated profile for same channel
- get access token
- download file
- resumable upload with progress (speed/ETA)
- record upload + cleanup
"""
tg_id = str(job.user_id)
async with STATE.sem:
_mkdir_tmp()
local_path = os.path.join(TMP_DIR, f"{job.upload_id}_{job.file_name}")
status_msg = None
try:
status_msg = await app.get_messages(job.chat_id, job.status_msg_id) if job.status_msg_id else None
except Exception:
status_msg = None
def _edit(text: str):
async def _do():
try:
if status_msg:
await status_msg.edit_text(text)
except Exception:
pass
return _do()
try:
# 1) list profiles to find default + channel
profs = await cf.list_profiles_w2(tg_id)
if not profs.get("ok"):
await _edit(f"❌ Worker2 list_profiles failed: {profs.get('err')}")
return
default_pid = profs.get("default_profile_id")
profiles = profs.get("profiles") or []
default = next((p for p in profiles if p.get("profile_id") == default_pid), None)
if not default_pid or not default:
await _edit("⚠️ No default profile.\nGo: Accounts → set default.")
return
if not default.get("has_refresh"):
await _edit("⚠️ Default profile not authorized.\nGo: Accounts → choose an authorized profile and set default.")
return
channel_id = default.get("channel_id")
if not channel_id:
await _edit("⚠️ Default profile has no channel_id.\nAuthorize again and then set default.")
return
# 2) pick rotated profile for SAME channel
pick = await cf.pick_profile(tg_id, channel_id, ROTATE_AFTER_PER_PROFILE)
if not pick.get("ok"):
await _edit(f"❌ pick_profile failed: {pick.get('err')}")
return
profile_id = pick.get("profile_id")
# 3) access token
tok = await cf.access_token(tg_id, profile_id)
if not tok.get("ok"):
await cf.log_error(tg_id, profile_id, "access_token", tok.get("err", "token_failed"))
await _edit(f"❌ access_token failed: {tok.get('err')}")
return
access_token = tok.get("access_token")
# 4) download file with download progress
await _edit("⬇️ Downloading from Telegram…")
# hydrogram download (no progress hook reliably in all forks),
# so we just download and then show upload progress.
msg = await app.get_messages(job.chat_id, job.src_msg_id)
await msg.download(file_name=local_path)
size = os.path.getsize(local_path)
# 5) upload to YouTube with progress edits
title = _safe_title_from(job)
desc = job.caption.strip()
last_update = 0.0
async def progress_upload(done: int, total: int, speed: float, eta: float):
nonlocal last_update
import time
now = time.time()
if now - last_update < PROGRESS_EDIT_EVERY_SEC:
return
last_update = now
pct = (done / max(1, total)) * 100.0
text = (
f"⬆️ Uploading to YouTube…\n"
f"File: {job.file_name}\n"
f"Title: {title}\n"
f"Privacy: {job.privacy}\n\n"
f"{pct:.1f}% ({human_bytes(done)}/{human_bytes(total)})\n"
f"Speed: {human_bytes(speed)}/s\n"
f"ETA: {fmt_eta(eta)}"
)
await _edit(text)
await _edit(
f"⬆️ Starting YouTube upload…\n"
f"File: {job.file_name} ({human_bytes(size)})\n"
f"Title: {title}\n"
f"Privacy: {job.privacy}"
)
# First attempt
http = None
try:
import httpx
http = httpx.AsyncClient(timeout=None)
res = await youtube_resumable_upload(
access_token=access_token,
file_path=local_path,
title=title,
description=desc,
privacy=job.privacy,
chunk_size=YOUTUBE_CHUNK_SIZE,
progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)),
http=http,
)
finally:
if http:
await http.aclose()
# Retry once if token expired
if not res.get("ok") and "401" in str(res.get("err", "")):
tok2 = await cf.access_token(tg_id, profile_id)
if tok2.get("ok"):
access_token = tok2.get("access_token")
http = None
try:
import httpx
http = httpx.AsyncClient(timeout=None)
res = await youtube_resumable_upload(
access_token=access_token,
file_path=local_path,
title=title,
description=desc,
privacy=job.privacy,
chunk_size=YOUTUBE_CHUNK_SIZE,
progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)),
http=http,
)
finally:
if http:
await http.aclose()
if not res.get("ok"):
err = res.get("err", "upload_failed")
await cf.log_error(tg_id, profile_id, "youtube_upload", err)
await _edit(f"❌ Upload failed: {err}")
return
video_id = res.get("video_id")
link = f"https://youtu.be/{video_id}" if video_id else "no_link"
# 6) record upload
await cf.record_upload(tg_id, profile_id)
# 7) final message (stable format)
final_text = (
f"✅ Uploaded\n"
f"Title: {title}\n"
f"Privacy: {job.privacy}\n"
f"Channel: {tok.get('channel_title') or tok.get('channel_id')}\n"
f"Profile: {profile_id}\n"
f"Link: {link}"
)
await _edit(final_text)
except Exception as e:
try:
await cf.log_error(tg_id, "", "run_upload_exception", str(e)[:200])
except Exception:
pass
if status_msg:
try:
await status_msg.edit_text(f"❌ Crash: {e}")
except Exception:
pass
finally:
# cleanup
try:
if os.path.exists(local_path):
os.remove(local_path)
except Exception:
pass
# drop job
STATE.uploads.pop(job.upload_id, None)