# Link Generator Bot # Webhook-only | HuggingFace Spaces compatible import os import sys import time import logging import asyncio import traceback from contextlib import asynccontextmanager from logging.handlers import RotatingFileHandler from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import FileResponse, JSONResponse from pyrogram import Client from pyrogram.enums import ParseMode from pyrogram.errors import FloodWait from fileManager import FileManager # ============================================================ # Logging # ============================================================ try: os.remove("logs.txt") except Exception: pass logging.basicConfig( level=logging.INFO, format="[%(asctime)s - %(levelname)s] - %(name)s - %(message)s", datefmt="%d-%b-%y %I:%M:%S %p", handlers=[ RotatingFileHandler("logs.txt", mode="w+", maxBytes=5_000_000, backupCount=10), logging.StreamHandler(), ], ) logging.getLogger("pyrogram").setLevel(logging.ERROR) def LOGGER(name: str) -> logging.Logger: return logging.getLogger(name) # ============================================================ # Config # ============================================================ try: load_dotenv("config.env.local") load_dotenv("config.env") except Exception: pass print(f"[config] API_ID = {os.getenv('API_ID', 'NOT SET')}") print(f"[config] API_HASH = {'SET' if os.getenv('API_HASH') else 'NOT SET'}") print(f"[config] BOT_TOKEN = {'SET' if os.getenv('BOT_TOKEN') else 'NOT SET'}") print(f"[config] SPACE_URL = {os.getenv('SPACE_URL', 'NOT SET')}") _required = ["BOT_TOKEN", "API_ID", "API_HASH", "SPACE_URL"] _missing = [v for v in _required if not os.getenv(v)] if _missing: print(f"ERROR: Missing required env vars: {', '.join(_missing)}") sys.exit(1) try: API_ID = int(os.getenv("API_ID")) except (TypeError, ValueError): print(f"ERROR: API_ID must be a plain integer, got: {os.getenv('API_ID')!r}") sys.exit(1) API_HASH = os.getenv("API_HASH").strip() BOT_TOKEN = os.getenv("BOT_TOKEN").strip() SPACE_URL = os.getenv("SPACE_URL").strip() FILE_EXPIRE_MINUTES = int(os.getenv("FILE_EXPIRE_MINUTES", "0")) # ============================================================ # FileManager # ============================================================ os.makedirs("fl", exist_ok=True) fm = FileManager(base_url=SPACE_URL, expires_minutes=FILE_EXPIRE_MINUTES or 999_999) fm.start() # ============================================================ # Pyrogram bot client # ============================================================ bot = Client( "link_gen_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN, workers=50, parse_mode=ParseMode.MARKDOWN, max_concurrent_transmissions=4, sleep_threshold=30, ) # ============================================================ # Global state # ============================================================ RUNNING_TASKS: set = set() def track_task(coro): task = asyncio.create_task(coro) RUNNING_TASKS.add(task) task.add_done_callback(RUNNING_TASKS.discard) return task # ============================================================ # UX helpers # ============================================================ def fmt_size(b: float) -> str: for u in ["B", "KB", "MB", "GB"]: if b < 1024: return f"{b:.1f} {u}" b /= 1024 return f"{b:.1f} TB" def fmt_time(seconds: float) -> str: if seconds <= 0 or seconds != seconds: # handle 0, negative, NaN return "—" seconds = int(seconds) if seconds < 60: return f"{seconds}s" m, s = divmod(seconds, 60) if m < 60: return f"{m}m {s}s" h, m = divmod(m, 60) return f"{h}h {m}m" def progress_bar(current: int, total: int, length: int = 10) -> str: if not total or total <= 0 or current <= 0: return "░" * length # empty bar instead of full when unknown pct = min(1.0, current / total) filled = int(length * pct) empty = length - filled return "▓" * filled + "░" * empty def build_progress_text(phase: str, current: int, total: int, speed: float, elapsed: float, filename: str) -> str: pct = min(100, int(current * 100 / total)) if total > 0 else 0 bar = progress_bar(current, total) # ETA only makes sense when we know total and have speed if total > 0 and speed > 0 and current < total: eta = fmt_time((total - current) / speed) else: eta = "—" icons = {"download": "📥", "upload": "📤"} icon = icons.get(phase, "⚙️") label = "Downloading" if phase == "download" else "Saving" total_str = fmt_size(total) if total > 0 else "?" return ( f"{icon} **{label}:** `{filename}`\n\n" f"`{bar}` **{pct}%**\n\n" f"**Done:** `{fmt_size(current)}` / `{total_str}`\n" f"**Speed:** `{fmt_size(speed)}/s`\n" f"**ETA:** `{eta}`\n" f"**Elapsed:** `{fmt_time(elapsed)}`" ) # ============================================================ # Webhook response helpers # ============================================================ def _msg(chat_id: int, text: str, parse_mode: str = "Markdown", reply_markup: dict = None, disable_web_page_preview: bool = False) -> dict: payload = {"method": "sendMessage", "chat_id": chat_id, "text": text, "parse_mode": parse_mode} if reply_markup: payload["reply_markup"] = reply_markup if disable_web_page_preview: payload["disable_web_page_preview"] = True return payload # ============================================================ # Instant command handlers # ============================================================ def handle_start(chat_id: int) -> dict: return _msg(chat_id, "👋 **Welcome to Link Generator Bot!**\n\n" "Send me any file, photo, video, audio or document\n" "and I'll give you a **public shareable link** instantly.\n\n" "📎 Supports: documents, photos, videos, audio, voice, stickers\n" "🔗 Links are permanent and accessible by anyone.\n\n" "Just send a file to get started!") def handle_help(chat_id: int) -> dict: return _msg(chat_id, "💡 **Link Generator Bot Help**\n\n" "➤ Send any file/media → get a public link\n" "➤ `/start` – welcome message\n" "➤ `/help` – this message\n" "➤ `/stats` – bot stats\n\n" "**Supported types:**\n" "• Documents, PDFs, ZIPs\n" "• Photos\n" "• Videos\n" "• Audio / Voice messages\n" "• Stickers, Animations\n\n" "Just send the file — no commands needed!") def handle_stats(chat_id: int) -> dict: import shutil, psutil from time import time as _time total, used, free = shutil.disk_usage(".") proc = psutil.Process(os.getpid()) fl_count = len([f for f in os.listdir("fl") if not f.startswith("tmp_")]) if os.path.isdir("fl") else 0 active = len([t for t in RUNNING_TASKS if not t.done()]) return _msg(chat_id, "📊 **Bot Statistics**\n" "━━━━━━━━━━━━━━━━━━━\n" f"💾 **Disk Used:** `{fmt_size(used)}` / `{fmt_size(total)}`\n" f"🆓 **Disk Free:** `{fmt_size(free)}`\n" f"🧠 **Memory:** `{round(proc.memory_info()[0]/1024**2)} MiB`\n" f"⚡ **CPU:** `{psutil.cpu_percent(interval=0.2)}%`\n" f"📁 **Stored Files:** `{fl_count}`\n" f"⚙️ **Active Jobs:** `{active}`") # ============================================================ # Extract file_id + filename + size from raw webhook message # ============================================================ MEDIA_TYPES = [ "document", "video", "audio", "voice", "photo", "animation", "sticker", "video_note" ] EXT_MAP = { "video": "mp4", "audio": "mp3", "voice": "ogg", "animation": "gif", "sticker": "webp", "video_note": "mp4" } def get_file_info(msg: dict) -> tuple: """Returns (file_id, filename, file_size) or (None, None, 0).""" for mtype in MEDIA_TYPES: if mtype not in msg: continue if mtype == "photo": photos = msg["photo"] obj = photos[-1] if isinstance(photos, list) else photos return obj["file_id"], f"{obj.get('file_unique_id', 'photo')}.jpg", obj.get("file_size", 0) obj = msg[mtype] fid = obj["file_id"] fname = obj.get("file_name") or obj.get("file_unique_id", mtype) if "." not in fname.split("/")[-1]: fname = f"{fname}.{EXT_MAP.get(mtype, 'bin')}" return fid, fname, obj.get("file_size", 0) return None, None, 0 # ============================================================ # Async media handler with live progress # ============================================================ async def handle_media(chat_id: int, reply_to_id: int, file_id: str, filename: str, file_size: int): short_name = filename if len(filename) <= 30 else filename[:27] + "..." # ── Phase 0: initial status ────────────────────────────── status_msg = await bot.send_message( chat_id, f"🔍 **Preparing to process your file…**\n\n" f"📄 `{short_name}`\n" f"📦 Size: `{fmt_size(file_size) if file_size else 'unknown'}`", reply_to_message_id=reply_to_id, ) dl_start = time.time() last_edit = [0.0] # mutable for closure tmp_path = f"fl/tmp_{reply_to_id}_{filename}" # ── Progress callback for Pyrogram download ────────────── async def on_progress(current: int, total: int): now = time.time() if now - last_edit[0] < 2.5: return last_edit[0] = now elapsed = now - dl_start speed = current / elapsed if elapsed > 0 else 0 # Pyrogram often passes total=0 — use file_size from webhook as fallback actual_total = total if total and total > 0 else file_size try: await status_msg.edit( build_progress_text("download", current, actual_total, speed, elapsed, short_name) ) except Exception: pass try: # ── Phase 1: Download ──────────────────────────────── downloaded = None for attempt in range(2): try: downloaded = await bot.download_media( file_id, file_name=tmp_path, progress=on_progress, ) break except FloodWait as e: wait_s = int(getattr(e, "value", 0) or 0) LOGGER(__name__).warning(f"FloodWait: {wait_s}s") if attempt == 0 and wait_s > 0: await status_msg.edit( f"⏳ **FloodWait — retrying in {wait_s}s…**\n\n" f"📄 `{short_name}`" ) await asyncio.sleep(wait_s + 1) dl_start = time.time() continue raise if not downloaded or not os.path.exists(downloaded) \ or os.path.getsize(downloaded) == 0: await status_msg.edit( "❌ **Download Failed**\n\n" "Could not retrieve the file from Telegram.\n" "Please try again." ) try: os.remove(tmp_path) except: pass return actual_size = os.path.getsize(downloaded) dl_elapsed = time.time() - dl_start dl_speed = actual_size / dl_elapsed if dl_elapsed > 0 else 0 # ── Phase 2: Saving ────────────────────────────────── await status_msg.edit( f"📤 **Saving to server…**\n\n" f"📄 `{short_name}`\n" f"📦 `{fmt_size(actual_size)}`\n\n" f"⬇️ Downloaded in `{fmt_time(dl_elapsed)}` " f"at `{fmt_size(dl_speed)}/s`" ) with open(downloaded, "rb") as f: file_data = f.read() os.remove(downloaded) entry = fm.save_file(file_data, filename) link = f"{SPACE_URL}/download/{entry['id']}" # ── Phase 3: Done ──────────────────────────────────── total_elapsed = time.time() - dl_start await status_msg.edit( f"✅ **Link Generated Successfully!**\n" f"━━━━━━━━━━━━━━━━━━━\n" f"🔗 **Link:**\n`{link}`\n\n" f"📄 **File:** `{entry['filename']}`\n" f"📦 **Size:** `{fmt_size(actual_size)}`\n" f"⚡ **Speed:** `{fmt_size(dl_speed)}/s`\n" f"⏱ **Time:** `{fmt_time(total_elapsed)}`\n" f"━━━━━━━━━━━━━━━━━━━\n" f"_Tap the link to copy • Accessible by anyone_", disable_web_page_preview=True, ) LOGGER(__name__).info(f"Generated link for {filename} ({fmt_size(actual_size)}): {link}") except Exception as e: LOGGER(__name__).error(traceback.format_exc()) try: await status_msg.edit( f"❌ **Something went wrong**\n\n" f"`{str(e)[:200]}`\n\n" f"Please try again or contact support." ) except Exception: pass # ============================================================ # FastAPI lifespan # ============================================================ @asynccontextmanager async def lifespan(app: FastAPI): LOGGER(__name__).info("Starting bot client…") await bot.start() me = await bot.get_me() LOGGER(__name__).info(f"Bot ready: @{me.username} — waiting for webhook updates.") yield LOGGER(__name__).info("Shutting down…") await bot.stop() app = FastAPI(lifespan=lifespan) # ============================================================ # File serving endpoints # ============================================================ @app.get("/") async def root(): return {"status": "ok", "service": "Link Generator Bot"} @app.get("/download/{file_id}") async def download_file(file_id: str): entry = fm.get_entry_by_id(file_id) if not entry: return JSONResponse({"error": "File not found or expired"}, status_code=404) path = entry["path"] if not os.path.exists(path): return JSONResponse({"error": "File not found on disk"}, status_code=404) return FileResponse( path=path, filename=entry["filename"], media_type="application/octet-stream", ) # ============================================================ # Webhook endpoint # ============================================================ @app.post("/webhook") async def telegram_webhook(request: Request): update = await request.json() LOGGER(__name__).debug(f"Update: {update}") if "message" not in update: return JSONResponse({"status": "ok"}) msg = update["message"] chat_id = msg["chat"]["id"] text = msg.get("text", "").strip() msg_id = msg["message_id"] if text.startswith("/start"): return JSONResponse(handle_start(chat_id)) if text.startswith("/help"): return JSONResponse(handle_help(chat_id)) if text.startswith("/stats"): return JSONResponse(handle_stats(chat_id)) file_id, filename, file_size = get_file_info(msg) if file_id: track_task(handle_media(chat_id, msg_id, file_id, filename, file_size)) return JSONResponse({"status": "ok"}) if text and not text.startswith("/"): return JSONResponse(_msg(chat_id, "📎 **Please send a file, photo, video or audio.**\n" "I'll generate a public link for it!")) return JSONResponse({"status": "ok"})