Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import os | |
| import json | |
| import asyncio | |
| from typing import Optional | |
| from hydrogram import Client, filters | |
| from hydrogram.types import ( | |
| Message, | |
| InlineKeyboardMarkup, | |
| InlineKeyboardButton, | |
| CallbackQuery, | |
| ) | |
| from app.state import STATE, UploadJob | |
| from app.settings import ( | |
| ROTATE_AFTER_PER_PROFILE, | |
| DEFAULT_PRIVACY, | |
| DEFAULT_TITLE_MODE, | |
| YOUTUBE_CHUNK_SIZE, | |
| PROGRESS_EDIT_EVERY_SEC, | |
| MAX_CONCURRENT_UPLOADS, | |
| TMP_DIR, | |
| ) | |
| from app.progress import human_bytes, fmt_eta | |
| from app.youtube_api import youtube_resumable_upload, _clean_title | |
| from app.cf_api import CFClient | |
| from app.config import Config | |
| def _mkdir_tmp(): | |
| os.makedirs(TMP_DIR, exist_ok=True) | |
| def menu_kb(is_owner: bool): | |
| rows = [ | |
| [InlineKeyboardButton("📦 Accounts", callback_data="menu:accounts"), | |
| InlineKeyboardButton("➕ Add Account", callback_data="menu:addacc")], | |
| [InlineKeyboardButton("⚙️ Auto Mode", callback_data="menu:automode")], | |
| ] | |
| if is_owner: | |
| rows.append([InlineKeyboardButton("📊 Stats", callback_data="menu:stats")]) | |
| rows.append([InlineKeyboardButton("✅ Allow User", callback_data="menu:allow"), | |
| InlineKeyboardButton("⛔ Disallow User", callback_data="menu:disallow")]) | |
| return InlineKeyboardMarkup(rows) | |
| def upload_options_kb(u: UploadJob): | |
| # Compact stable format | |
| rows = [ | |
| [ | |
| InlineKeyboardButton(f"Privacy: {u.privacy}", callback_data=f"u:{u.upload_id}:privacy"), | |
| InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:privacy_menu"), | |
| ], | |
| [ | |
| InlineKeyboardButton(f"Title: {u.title_mode}", callback_data=f"u:{u.upload_id}:title"), | |
| InlineKeyboardButton("Change", callback_data=f"u:{u.upload_id}:title_menu"), | |
| ], | |
| [ | |
| InlineKeyboardButton("▶️ Start Upload", callback_data=f"u:{u.upload_id}:start"), | |
| InlineKeyboardButton("✖️ Cancel", callback_data=f"u:{u.upload_id}:cancel"), | |
| ], | |
| ] | |
| return InlineKeyboardMarkup(rows) | |
| def privacy_kb(upload_id: str): | |
| return InlineKeyboardMarkup([ | |
| [InlineKeyboardButton("private", callback_data=f"u:{upload_id}:set_priv:private"), | |
| InlineKeyboardButton("unlisted", callback_data=f"u:{upload_id}:set_priv:unlisted"), | |
| InlineKeyboardButton("public", callback_data=f"u:{upload_id}:set_priv:public")], | |
| [InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")], | |
| ]) | |
| def title_kb(upload_id: str): | |
| return InlineKeyboardMarkup([ | |
| [InlineKeyboardButton("filename", callback_data=f"u:{upload_id}:set_title:filename"), | |
| InlineKeyboardButton("caption", callback_data=f"u:{upload_id}:set_title:caption")], | |
| [InlineKeyboardButton("custom", callback_data=f"u:{upload_id}:set_title:custom")], | |
| [InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")], | |
| ]) | |
| def accounts_kb(profiles: list, tg_id: str, default_profile_id: Optional[str], is_owner: bool): | |
| rows = [] | |
| for p in profiles: | |
| pid = p["profile_id"] | |
| label = p.get("label") or "profile" | |
| ch = p.get("channel_title") or p.get("channel_id") or "no-channel" | |
| ok = "✅" if p.get("has_refresh") else "⚠️" | |
| d = "⭐" if pid == default_profile_id else "" | |
| rows.append([InlineKeyboardButton(f"{ok}{d} {label} — {ch}", callback_data=f"acc:info:{pid}")]) | |
| if profiles: | |
| rows.append([InlineKeyboardButton("⭐ Set Default", callback_data="acc:setdef_menu")]) | |
| rows.append([InlineKeyboardButton("🗑 Remove Profile", callback_data="acc:remove_menu")]) | |
| rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:home")]) | |
| if is_owner: | |
| rows.append([InlineKeyboardButton("List Allowed (if available)", callback_data="acc:list_allowed")]) | |
| return InlineKeyboardMarkup(rows) | |
| def pick_profile_kb(profiles: list, action: str): | |
| # action: "setdef" or "remove" | |
| rows = [] | |
| for p in profiles: | |
| pid = p["profile_id"] | |
| label = p.get("label") or "profile" | |
| ch = p.get("channel_title") or p.get("channel_id") or "no-channel" | |
| ok = "✅" if p.get("has_refresh") else "⚠️" | |
| rows.append([InlineKeyboardButton(f"{ok} {label} — {ch}", callback_data=f"acc:{action}:{pid}")]) | |
| rows.append([InlineKeyboardButton("⬅ Back", callback_data="menu:accounts")]) | |
| return InlineKeyboardMarkup(rows) | |
| def is_owner(cfg: Config, user_id: int) -> bool: | |
| return user_id == cfg.OWNER_ID | |
| def _safe_title_from(job: UploadJob) -> str: | |
| if job.title_mode == "custom" and job.custom_title.strip(): | |
| return _clean_title(job.custom_title.strip()) | |
| if job.title_mode == "filename": | |
| base = os.path.splitext(job.file_name)[0] | |
| return _clean_title(base) | |
| # caption default | |
| if job.caption.strip(): | |
| # first line as title | |
| first = job.caption.strip().splitlines()[0] | |
| return _clean_title(first) | |
| base = os.path.splitext(job.file_name)[0] | |
| return _clean_title(base) | |
| async def register_handlers(app: Client, cfg: Config, cf: CFClient): | |
| # semaphore size from settings | |
| STATE.sem = asyncio.Semaphore(MAX_CONCURRENT_UPLOADS) | |
| async def start_cmd(_, m: Message): | |
| u = m.from_user.id | |
| kb = menu_kb(is_owner(cfg, u)) | |
| await m.reply_text( | |
| "✅ Bot online.\n\nSend a video to upload.\nUse buttons for accounts/settings.", | |
| reply_markup=kb | |
| ) | |
| async def cb(_, q: CallbackQuery): | |
| data = (q.data or "") | |
| u = q.from_user.id | |
| tg_id = str(u) | |
| if data.startswith("menu:"): | |
| act = data.split(":", 1)[1] | |
| if act == "home": | |
| await q.message.edit_text("Main menu:", reply_markup=menu_kb(is_owner(cfg, u))) | |
| await q.answer() | |
| return | |
| if act == "accounts": | |
| res = await cf.list_profiles_w2(tg_id) | |
| profiles = res.get("profiles", []) | |
| dpid = res.get("default_profile_id") | |
| await q.message.edit_text( | |
| f"📦 Accounts for {tg_id}\nDefault: {dpid}\n\n✅ authorized | ⚠️ not authorized", | |
| reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u)) | |
| ) | |
| await q.answer() | |
| return | |
| if act == "addacc": | |
| STATE.waiting_client_id[u] = True | |
| await q.message.edit_text( | |
| "➕ Add Account\n\nSend:\n1) Google OAuth JSON file (client_secret_*.json)\nOR\n2) client_id text (ends with apps.googleusercontent.com)", | |
| reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]]) | |
| ) | |
| await q.answer() | |
| return | |
| if act == "automode": | |
| cur = STATE.auto_mode.get(u, False) | |
| STATE.auto_mode[u] = not cur | |
| v = "ON ✅" if STATE.auto_mode[u] else "OFF ❌" | |
| await q.answer(f"Auto Mode: {v}", show_alert=True) | |
| # no edit required | |
| return | |
| if act == "stats": | |
| if not is_owner(cfg, u): | |
| await q.answer("Owner only", show_alert=True) | |
| return | |
| res = await cf.stats_today() | |
| if not res.get("ok"): | |
| await q.answer("stats failed", show_alert=True) | |
| return | |
| text = ( | |
| f"📊 Stats (UTC day {res.get('day')})\n" | |
| f"Uploads today total: {res.get('uploads_today_total')}\n" | |
| f"Active users today: {res.get('active_users_today')}\n\n" | |
| f"Errors last20:\n" | |
| ) | |
| errs = res.get("errors_last20") or [] | |
| if not errs: | |
| text += "— none —" | |
| else: | |
| for e in errs[:20]: | |
| text += f"- {e.get('where')} | {e.get('tg_id')} | {e.get('err')}\n" | |
| await q.message.edit_text(text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]])) | |
| await q.answer() | |
| return | |
| if act == "allow": | |
| if not is_owner(cfg, u): | |
| await q.answer("Owner only", show_alert=True) | |
| return | |
| await q.message.edit_text( | |
| "✅ Allow User\n\nForward a message from that user.\nIf forward privacy hides id, then send numeric user_id.", | |
| reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]]) | |
| ) | |
| # mark mode using waiting_client_secret dict hack? better separate: | |
| STATE.waiting_client_secret[u] = "__ALLOW__" | |
| await q.answer() | |
| return | |
| if act == "disallow": | |
| if not is_owner(cfg, u): | |
| await q.answer("Owner only", show_alert=True) | |
| return | |
| await q.message.edit_text( | |
| "⛔ Disallow User\n\nForward a message from that user OR send numeric user_id.", | |
| reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data="menu:home")]]) | |
| ) | |
| STATE.waiting_client_secret[u] = "__DISALLOW__" | |
| await q.answer() | |
| return | |
| # accounts submenus | |
| if data == "acc:setdef_menu": | |
| res = await cf.list_profiles_w2(tg_id) | |
| profiles = res.get("profiles", []) | |
| await q.message.edit_text("Pick profile to set default:", reply_markup=pick_profile_kb(profiles, "setdef")) | |
| await q.answer() | |
| return | |
| if data == "acc:remove_menu": | |
| res = await cf.list_profiles_w2(tg_id) | |
| profiles = res.get("profiles", []) | |
| await q.message.edit_text("Pick profile to remove:", reply_markup=pick_profile_kb(profiles, "remove")) | |
| await q.answer() | |
| return | |
| if data.startswith("acc:setdef:"): | |
| pid = data.split(":", 2)[2] | |
| r = await cf.profile_set_default(tg_id, pid) | |
| await q.answer("Default set ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True) | |
| # back to accounts | |
| res = await cf.list_profiles_w2(tg_id) | |
| profiles = res.get("profiles", []) | |
| dpid = res.get("default_profile_id") | |
| await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u))) | |
| return | |
| if data.startswith("acc:remove:"): | |
| pid = data.split(":", 2)[2] | |
| r = await cf.profile_remove(tg_id, pid) | |
| await q.answer("Removed ✅" if r.get("ok") else f"Failed: {r.get('err')}", show_alert=True) | |
| res = await cf.list_profiles_w2(tg_id) | |
| profiles = res.get("profiles", []) | |
| dpid = res.get("default_profile_id") | |
| await q.message.edit_text("📦 Accounts", reply_markup=accounts_kb(profiles, tg_id, dpid, is_owner(cfg, u))) | |
| return | |
| # upload callbacks | |
| if data.startswith("u:"): | |
| parts = data.split(":") | |
| upload_id = parts[1] | |
| act = parts[2] if len(parts) > 2 else "" | |
| job = STATE.uploads.get(upload_id) | |
| if not job: | |
| await q.answer("This upload expired/restarted.", show_alert=True) | |
| return | |
| if job.user_id != u: | |
| await q.answer("Not yours", show_alert=True) | |
| return | |
| if act == "privacy_menu": | |
| await q.message.edit_text("Choose privacy:", reply_markup=privacy_kb(upload_id)) | |
| await q.answer() | |
| return | |
| if act == "title_menu": | |
| await q.message.edit_text("Choose title mode:", reply_markup=title_kb(upload_id)) | |
| await q.answer() | |
| return | |
| if act == "set_priv": | |
| val = parts[3] | |
| job.privacy = val | |
| await q.message.edit_text( | |
| f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}", | |
| reply_markup=upload_options_kb(job) | |
| ) | |
| await q.answer("Saved ✅") | |
| return | |
| if act == "set_title": | |
| val = parts[3] | |
| job.title_mode = val | |
| if val == "custom": | |
| STATE.waiting_custom_title[u] = upload_id | |
| await q.message.edit_text("Send custom title text now (next message).", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("⬅ Back", callback_data=f"u:{upload_id}:back")]])) | |
| await q.answer() | |
| return | |
| await q.message.edit_text( | |
| f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}", | |
| reply_markup=upload_options_kb(job) | |
| ) | |
| await q.answer("Saved ✅") | |
| return | |
| if act == "back": | |
| await q.message.edit_text( | |
| f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}", | |
| reply_markup=upload_options_kb(job) | |
| ) | |
| await q.answer() | |
| return | |
| if act == "cancel": | |
| STATE.uploads.pop(upload_id, None) | |
| await q.message.edit_text("Cancelled ✅", reply_markup=menu_kb(is_owner(cfg, u))) | |
| await q.answer() | |
| return | |
| if act == "start": | |
| await q.answer("Starting upload…") | |
| asyncio.create_task(run_upload(app, cfg, cf, job)) | |
| return | |
| await q.answer() | |
| async def text_in(_, m: Message): | |
| u = m.from_user.id | |
| # custom title input | |
| if u in STATE.waiting_custom_title: | |
| upload_id = STATE.waiting_custom_title.pop(u) | |
| job = STATE.uploads.get(upload_id) | |
| if job: | |
| job.custom_title = m.text.strip() | |
| await m.reply_text( | |
| f"✅ Custom title saved.\nNow press Start Upload for file: {job.file_name}", | |
| reply_markup=upload_options_kb(job) | |
| ) | |
| return | |
| # owner allow/disallow mode | |
| if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] in ("__ALLOW__", "__DISALLOW__"): | |
| mode = STATE.waiting_client_secret.pop(u) | |
| target = (m.text or "").strip() | |
| # if numeric in text | |
| tg_id = None | |
| if target.isdigit(): | |
| tg_id = target | |
| else: | |
| await m.reply_text("Send numeric user_id or forward a message from user (if forward shows id).") | |
| return | |
| if mode == "__ALLOW__": | |
| r = await cf.allow_user(tg_id) | |
| await m.reply_text("✅ Allowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True)) | |
| return | |
| else: | |
| r = await cf.disallow_user(tg_id) | |
| await m.reply_text("⛔ Disallowed" if r.get("ok") else f"❌ Failed: {r.get('err')}", reply_markup=menu_kb(True)) | |
| return | |
| # add account flow (client_id then client_secret) | |
| if STATE.waiting_client_id.get(u): | |
| t = (m.text or "").strip() | |
| if "apps.googleusercontent.com" in t: | |
| STATE.waiting_client_id.pop(u, None) | |
| STATE.waiting_client_secret[u] = t | |
| await m.reply_text("Now send CLIENT_SECRET (starts with GOCSPX-...)") | |
| return | |
| await m.reply_text("Send client_id (apps.googleusercontent.com) or upload Google JSON file.") | |
| return | |
| if u in STATE.waiting_client_secret and STATE.waiting_client_secret[u] and not STATE.waiting_client_secret[u].startswith("__"): | |
| client_id = STATE.waiting_client_secret[u] | |
| client_secret = (m.text or "").strip() | |
| if not client_secret: | |
| await m.reply_text("Empty secret. Send again.") | |
| return | |
| STATE.waiting_client_secret.pop(u, None) | |
| tg_id = str(u) | |
| # create profile + login link | |
| r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600) | |
| if not r.get("ok"): | |
| await m.reply_text(f"❌ Failed to add profile: {r.get('err')}") | |
| return | |
| login_url = r.get("login_url") | |
| pid = r.get("profile_id") | |
| await m.reply_text( | |
| f"✅ Profile created: {pid}\n\nOpen login link to authorize:\n(If Telegram preview consumes ticket, just add again for now.)", | |
| reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=login_url)]]) | |
| ) | |
| return | |
| async def doc_in(_, m: Message): | |
| u = m.from_user.id | |
| if not STATE.waiting_client_id.get(u): | |
| return | |
| doc = m.document | |
| name = (doc.file_name or "").lower() | |
| if not name.endswith(".json"): | |
| await m.reply_text("Send Google OAuth JSON file (client_secret_*.json).") | |
| return | |
| _mkdir_tmp() | |
| path = os.path.join(TMP_DIR, f"cred_{u}.json") | |
| await m.download(file_name=path) | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| j = json.load(f) | |
| block = j.get("installed") or j.get("web") or {} | |
| client_id = block.get("client_id", "") | |
| client_secret = block.get("client_secret", "") | |
| if not client_id or not client_secret: | |
| raise ValueError("missing client_id/client_secret in json") | |
| except Exception as e: | |
| await m.reply_text(f"❌ Bad JSON: {e}") | |
| return | |
| finally: | |
| try: | |
| os.remove(path) | |
| except Exception: | |
| pass | |
| STATE.waiting_client_id.pop(u, None) | |
| tg_id = str(u) | |
| r = await cf.profile_add(tg_id, client_id, client_secret, label="main", ttl_sec=600) | |
| if not r.get("ok"): | |
| await m.reply_text(f"❌ Failed to add profile: {r.get('err')}") | |
| return | |
| await m.reply_text( | |
| f"✅ Profile created: {r.get('profile_id')}\nAuthorize now:", | |
| reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("🔐 Login with Google", url=r.get("login_url"))]]) | |
| ) | |
| async def video_in(_, m: Message): | |
| u = m.from_user.id | |
| tg_id = str(u) | |
| # owner always allowed | |
| if not is_owner(cfg, u): | |
| ok = await cf.is_allowed(tg_id) | |
| if not ok: | |
| await m.reply_text("⛔ You are not allowed. Ask owner to allow you.") | |
| return | |
| # Determine file | |
| file_id = None | |
| file_name = "video.mp4" | |
| file_type = "video" | |
| caption = (m.caption or "") | |
| if m.video: | |
| file_id = m.video.file_id | |
| file_name = m.video.file_name or "video.mp4" | |
| file_type = "video" | |
| elif m.document and (m.document.mime_type or "").startswith("video/"): | |
| file_id = m.document.file_id | |
| file_name = m.document.file_name or "video.mp4" | |
| file_type = "document" | |
| else: | |
| return | |
| # Create upload job | |
| upload_id = STATE.new_upload_id() | |
| job = UploadJob( | |
| upload_id=upload_id, | |
| user_id=u, | |
| chat_id=m.chat.id, | |
| src_msg_id=m.id, | |
| file_type=file_type, | |
| tg_file_id=file_id, | |
| file_name=file_name, | |
| caption=caption, | |
| privacy=DEFAULT_PRIVACY, | |
| title_mode=DEFAULT_TITLE_MODE, | |
| ) | |
| STATE.uploads[upload_id] = job | |
| # Auto mode: start immediately with defaults (bulk-friendly) | |
| auto = STATE.auto_mode.get(u, False) | |
| if auto: | |
| status = await m.reply_text( | |
| f"📤 Auto Mode ON\nUploading with defaults…\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}", | |
| ) | |
| job.status_msg_id = status.id | |
| asyncio.create_task(run_upload(app, cfg, cf, job)) | |
| return | |
| # Manual mode: show options | |
| msg = await m.reply_text( | |
| f"📤 Ready\nFile: {job.file_name}\nPrivacy: {job.privacy}\nTitle mode: {job.title_mode}", | |
| reply_markup=upload_options_kb(job) | |
| ) | |
| job.status_msg_id = msg.id | |
| async def run_upload(app: Client, cfg: Config, cf: CFClient, job: UploadJob): | |
| """ | |
| Upload pipeline: | |
| - check default profile/channel | |
| - pick rotated profile for same channel | |
| - get access token | |
| - download file | |
| - resumable upload with progress (speed/ETA) | |
| - record upload + cleanup | |
| """ | |
| tg_id = str(job.user_id) | |
| async with STATE.sem: | |
| _mkdir_tmp() | |
| local_path = os.path.join(TMP_DIR, f"{job.upload_id}_{job.file_name}") | |
| status_msg = None | |
| try: | |
| status_msg = await app.get_messages(job.chat_id, job.status_msg_id) if job.status_msg_id else None | |
| except Exception: | |
| status_msg = None | |
| def _edit(text: str): | |
| async def _do(): | |
| try: | |
| if status_msg: | |
| await status_msg.edit_text(text) | |
| except Exception: | |
| pass | |
| return _do() | |
| try: | |
| # 1) list profiles to find default + channel | |
| profs = await cf.list_profiles_w2(tg_id) | |
| if not profs.get("ok"): | |
| await _edit(f"❌ Worker2 list_profiles failed: {profs.get('err')}") | |
| return | |
| default_pid = profs.get("default_profile_id") | |
| profiles = profs.get("profiles") or [] | |
| default = next((p for p in profiles if p.get("profile_id") == default_pid), None) | |
| if not default_pid or not default: | |
| await _edit("⚠️ No default profile.\nGo: Accounts → set default.") | |
| return | |
| if not default.get("has_refresh"): | |
| await _edit("⚠️ Default profile not authorized.\nGo: Accounts → choose an authorized profile and set default.") | |
| return | |
| channel_id = default.get("channel_id") | |
| if not channel_id: | |
| await _edit("⚠️ Default profile has no channel_id.\nAuthorize again and then set default.") | |
| return | |
| # 2) pick rotated profile for SAME channel | |
| pick = await cf.pick_profile(tg_id, channel_id, ROTATE_AFTER_PER_PROFILE) | |
| if not pick.get("ok"): | |
| await _edit(f"❌ pick_profile failed: {pick.get('err')}") | |
| return | |
| profile_id = pick.get("profile_id") | |
| # 3) access token | |
| tok = await cf.access_token(tg_id, profile_id) | |
| if not tok.get("ok"): | |
| await cf.log_error(tg_id, profile_id, "access_token", tok.get("err", "token_failed")) | |
| await _edit(f"❌ access_token failed: {tok.get('err')}") | |
| return | |
| access_token = tok.get("access_token") | |
| # 4) download file with download progress | |
| await _edit("⬇️ Downloading from Telegram…") | |
| # hydrogram download (no progress hook reliably in all forks), | |
| # so we just download and then show upload progress. | |
| msg = await app.get_messages(job.chat_id, job.src_msg_id) | |
| await msg.download(file_name=local_path) | |
| size = os.path.getsize(local_path) | |
| # 5) upload to YouTube with progress edits | |
| title = _safe_title_from(job) | |
| desc = job.caption.strip() | |
| last_update = 0.0 | |
| async def progress_upload(done: int, total: int, speed: float, eta: float): | |
| nonlocal last_update | |
| import time | |
| now = time.time() | |
| if now - last_update < PROGRESS_EDIT_EVERY_SEC: | |
| return | |
| last_update = now | |
| pct = (done / max(1, total)) * 100.0 | |
| text = ( | |
| f"⬆️ Uploading to YouTube…\n" | |
| f"File: {job.file_name}\n" | |
| f"Title: {title}\n" | |
| f"Privacy: {job.privacy}\n\n" | |
| f"{pct:.1f}% ({human_bytes(done)}/{human_bytes(total)})\n" | |
| f"Speed: {human_bytes(speed)}/s\n" | |
| f"ETA: {fmt_eta(eta)}" | |
| ) | |
| await _edit(text) | |
| await _edit( | |
| f"⬆️ Starting YouTube upload…\n" | |
| f"File: {job.file_name} ({human_bytes(size)})\n" | |
| f"Title: {title}\n" | |
| f"Privacy: {job.privacy}" | |
| ) | |
| # First attempt | |
| http = None | |
| try: | |
| import httpx | |
| http = httpx.AsyncClient(timeout=None) | |
| res = await youtube_resumable_upload( | |
| access_token=access_token, | |
| file_path=local_path, | |
| title=title, | |
| description=desc, | |
| privacy=job.privacy, | |
| chunk_size=YOUTUBE_CHUNK_SIZE, | |
| progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)), | |
| http=http, | |
| ) | |
| finally: | |
| if http: | |
| await http.aclose() | |
| # Retry once if token expired | |
| if not res.get("ok") and "401" in str(res.get("err", "")): | |
| tok2 = await cf.access_token(tg_id, profile_id) | |
| if tok2.get("ok"): | |
| access_token = tok2.get("access_token") | |
| http = None | |
| try: | |
| import httpx | |
| http = httpx.AsyncClient(timeout=None) | |
| res = await youtube_resumable_upload( | |
| access_token=access_token, | |
| file_path=local_path, | |
| title=title, | |
| description=desc, | |
| privacy=job.privacy, | |
| chunk_size=YOUTUBE_CHUNK_SIZE, | |
| progress_cb=lambda d, t, s, e: asyncio.create_task(progress_upload(d, t, s, e)), | |
| http=http, | |
| ) | |
| finally: | |
| if http: | |
| await http.aclose() | |
| if not res.get("ok"): | |
| err = res.get("err", "upload_failed") | |
| await cf.log_error(tg_id, profile_id, "youtube_upload", err) | |
| await _edit(f"❌ Upload failed: {err}") | |
| return | |
| video_id = res.get("video_id") | |
| link = f"https://youtu.be/{video_id}" if video_id else "no_link" | |
| # 6) record upload | |
| await cf.record_upload(tg_id, profile_id) | |
| # 7) final message (stable format) | |
| final_text = ( | |
| f"✅ Uploaded\n" | |
| f"Title: {title}\n" | |
| f"Privacy: {job.privacy}\n" | |
| f"Channel: {tok.get('channel_title') or tok.get('channel_id')}\n" | |
| f"Profile: {profile_id}\n" | |
| f"Link: {link}" | |
| ) | |
| await _edit(final_text) | |
| except Exception as e: | |
| try: | |
| await cf.log_error(tg_id, "", "run_upload_exception", str(e)[:200]) | |
| except Exception: | |
| pass | |
| if status_msg: | |
| try: | |
| await status_msg.edit_text(f"❌ Crash: {e}") | |
| except Exception: | |
| pass | |
| finally: | |
| # cleanup | |
| try: | |
| if os.path.exists(local_path): | |
| os.remove(local_path) | |
| except Exception: | |
| pass | |
| # drop job | |
| STATE.uploads.pop(job.upload_id, None) |