Spaces:
Sleeping
Sleeping
| # Link Generator Bot | |
| # Webhook-only | HuggingFace Spaces compatible | |
| import os | |
| import sys | |
| import time | |
| import logging | |
| import asyncio | |
| import traceback | |
| from contextlib import asynccontextmanager | |
| from logging.handlers import RotatingFileHandler | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pyrogram import Client | |
| from pyrogram.enums import ParseMode | |
| from pyrogram.errors import FloodWait | |
| from fileManager import FileManager | |
| # ============================================================ | |
| # Logging | |
| # ============================================================ | |
| try: | |
| os.remove("logs.txt") | |
| except Exception: | |
| pass | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="[%(asctime)s - %(levelname)s] - %(name)s - %(message)s", | |
| datefmt="%d-%b-%y %I:%M:%S %p", | |
| handlers=[ | |
| RotatingFileHandler("logs.txt", mode="w+", maxBytes=5_000_000, backupCount=10), | |
| logging.StreamHandler(), | |
| ], | |
| ) | |
| logging.getLogger("pyrogram").setLevel(logging.ERROR) | |
| def LOGGER(name: str) -> logging.Logger: | |
| return logging.getLogger(name) | |
| # ============================================================ | |
| # Config | |
| # ============================================================ | |
| try: | |
| load_dotenv("config.env.local") | |
| load_dotenv("config.env") | |
| except Exception: | |
| pass | |
| print(f"[config] API_ID = {os.getenv('API_ID', 'NOT SET')}") | |
| print(f"[config] API_HASH = {'SET' if os.getenv('API_HASH') else 'NOT SET'}") | |
| print(f"[config] BOT_TOKEN = {'SET' if os.getenv('BOT_TOKEN') else 'NOT SET'}") | |
| print(f"[config] SPACE_URL = {os.getenv('SPACE_URL', 'NOT SET')}") | |
| _required = ["BOT_TOKEN", "API_ID", "API_HASH", "SPACE_URL"] | |
| _missing = [v for v in _required if not os.getenv(v)] | |
| if _missing: | |
| print(f"ERROR: Missing required env vars: {', '.join(_missing)}") | |
| sys.exit(1) | |
| try: | |
| API_ID = int(os.getenv("API_ID")) | |
| except (TypeError, ValueError): | |
| print(f"ERROR: API_ID must be a plain integer, got: {os.getenv('API_ID')!r}") | |
| sys.exit(1) | |
| API_HASH = os.getenv("API_HASH").strip() | |
| BOT_TOKEN = os.getenv("BOT_TOKEN").strip() | |
| SPACE_URL = os.getenv("SPACE_URL").strip() | |
| FILE_EXPIRE_MINUTES = int(os.getenv("FILE_EXPIRE_MINUTES", "0")) | |
| # ============================================================ | |
| # FileManager | |
| # ============================================================ | |
| os.makedirs("fl", exist_ok=True) | |
| fm = FileManager(base_url=SPACE_URL, expires_minutes=FILE_EXPIRE_MINUTES or 999_999) | |
| fm.start() | |
| # ============================================================ | |
| # Pyrogram bot client | |
| # ============================================================ | |
| bot = Client( | |
| "link_gen_bot", | |
| api_id=API_ID, | |
| api_hash=API_HASH, | |
| bot_token=BOT_TOKEN, | |
| workers=50, | |
| parse_mode=ParseMode.MARKDOWN, | |
| max_concurrent_transmissions=4, | |
| sleep_threshold=30, | |
| ) | |
| # ============================================================ | |
| # Global state | |
| # ============================================================ | |
| RUNNING_TASKS: set = set() | |
| def track_task(coro): | |
| task = asyncio.create_task(coro) | |
| RUNNING_TASKS.add(task) | |
| task.add_done_callback(RUNNING_TASKS.discard) | |
| return task | |
| # ============================================================ | |
| # UX helpers | |
| # ============================================================ | |
| def fmt_size(b: float) -> str: | |
| for u in ["B", "KB", "MB", "GB"]: | |
| if b < 1024: | |
| return f"{b:.1f} {u}" | |
| b /= 1024 | |
| return f"{b:.1f} TB" | |
| def fmt_time(seconds: float) -> str: | |
| if seconds <= 0 or seconds != seconds: # handle 0, negative, NaN | |
| return "β" | |
| seconds = int(seconds) | |
| if seconds < 60: | |
| return f"{seconds}s" | |
| m, s = divmod(seconds, 60) | |
| if m < 60: | |
| return f"{m}m {s}s" | |
| h, m = divmod(m, 60) | |
| return f"{h}h {m}m" | |
| def progress_bar(current: int, total: int, length: int = 10) -> str: | |
| if not total or total <= 0 or current <= 0: | |
| return "β" * length # empty bar instead of full when unknown | |
| pct = min(1.0, current / total) | |
| filled = int(length * pct) | |
| empty = length - filled | |
| return "β" * filled + "β" * empty | |
| def build_progress_text(phase: str, current: int, total: int, | |
| speed: float, elapsed: float, filename: str) -> str: | |
| pct = min(100, int(current * 100 / total)) if total > 0 else 0 | |
| bar = progress_bar(current, total) | |
| # ETA only makes sense when we know total and have speed | |
| if total > 0 and speed > 0 and current < total: | |
| eta = fmt_time((total - current) / speed) | |
| else: | |
| eta = "β" | |
| icons = {"download": "π₯", "upload": "π€"} | |
| icon = icons.get(phase, "βοΈ") | |
| label = "Downloading" if phase == "download" else "Saving" | |
| total_str = fmt_size(total) if total > 0 else "?" | |
| return ( | |
| f"{icon} **{label}:** `{filename}`\n\n" | |
| f"`{bar}` **{pct}%**\n\n" | |
| f"**Done:** `{fmt_size(current)}` / `{total_str}`\n" | |
| f"**Speed:** `{fmt_size(speed)}/s`\n" | |
| f"**ETA:** `{eta}`\n" | |
| f"**Elapsed:** `{fmt_time(elapsed)}`" | |
| ) | |
| # ============================================================ | |
| # Webhook response helpers | |
| # ============================================================ | |
| def _msg(chat_id: int, text: str, parse_mode: str = "Markdown", | |
| reply_markup: dict = None, disable_web_page_preview: bool = False) -> dict: | |
| payload = {"method": "sendMessage", "chat_id": chat_id, | |
| "text": text, "parse_mode": parse_mode} | |
| if reply_markup: | |
| payload["reply_markup"] = reply_markup | |
| if disable_web_page_preview: | |
| payload["disable_web_page_preview"] = True | |
| return payload | |
| # ============================================================ | |
| # Instant command handlers | |
| # ============================================================ | |
| def handle_start(chat_id: int) -> dict: | |
| return _msg(chat_id, | |
| "π **Welcome to Link Generator Bot!**\n\n" | |
| "Send me any file, photo, video, audio or document\n" | |
| "and I'll give you a **public shareable link** instantly.\n\n" | |
| "π Supports: documents, photos, videos, audio, voice, stickers\n" | |
| "π Links are permanent and accessible by anyone.\n\n" | |
| "Just send a file to get started!") | |
| def handle_help(chat_id: int) -> dict: | |
| return _msg(chat_id, | |
| "π‘ **Link Generator Bot Help**\n\n" | |
| "β€ Send any file/media β get a public link\n" | |
| "β€ `/start` β welcome message\n" | |
| "β€ `/help` β this message\n" | |
| "β€ `/stats` β bot stats\n\n" | |
| "**Supported types:**\n" | |
| "β’ Documents, PDFs, ZIPs\n" | |
| "β’ Photos\n" | |
| "β’ Videos\n" | |
| "β’ Audio / Voice messages\n" | |
| "β’ Stickers, Animations\n\n" | |
| "Just send the file β no commands needed!") | |
| def handle_stats(chat_id: int) -> dict: | |
| import shutil, psutil | |
| from time import time as _time | |
| total, used, free = shutil.disk_usage(".") | |
| proc = psutil.Process(os.getpid()) | |
| fl_count = len([f for f in os.listdir("fl") | |
| if not f.startswith("tmp_")]) if os.path.isdir("fl") else 0 | |
| active = len([t for t in RUNNING_TASKS if not t.done()]) | |
| return _msg(chat_id, | |
| "π **Bot Statistics**\n" | |
| "βββββββββββββββββββ\n" | |
| f"πΎ **Disk Used:** `{fmt_size(used)}` / `{fmt_size(total)}`\n" | |
| f"π **Disk Free:** `{fmt_size(free)}`\n" | |
| f"π§ **Memory:** `{round(proc.memory_info()[0]/1024**2)} MiB`\n" | |
| f"β‘ **CPU:** `{psutil.cpu_percent(interval=0.2)}%`\n" | |
| f"π **Stored Files:** `{fl_count}`\n" | |
| f"βοΈ **Active Jobs:** `{active}`") | |
| # ============================================================ | |
| # Extract file_id + filename + size from raw webhook message | |
| # ============================================================ | |
| MEDIA_TYPES = [ | |
| "document", "video", "audio", "voice", | |
| "photo", "animation", "sticker", "video_note" | |
| ] | |
| EXT_MAP = { | |
| "video": "mp4", "audio": "mp3", "voice": "ogg", | |
| "animation": "gif", "sticker": "webp", "video_note": "mp4" | |
| } | |
| def get_file_info(msg: dict) -> tuple: | |
| """Returns (file_id, filename, file_size) or (None, None, 0).""" | |
| for mtype in MEDIA_TYPES: | |
| if mtype not in msg: | |
| continue | |
| if mtype == "photo": | |
| photos = msg["photo"] | |
| obj = photos[-1] if isinstance(photos, list) else photos | |
| return obj["file_id"], f"{obj.get('file_unique_id', 'photo')}.jpg", obj.get("file_size", 0) | |
| obj = msg[mtype] | |
| fid = obj["file_id"] | |
| fname = obj.get("file_name") or obj.get("file_unique_id", mtype) | |
| if "." not in fname.split("/")[-1]: | |
| fname = f"{fname}.{EXT_MAP.get(mtype, 'bin')}" | |
| return fid, fname, obj.get("file_size", 0) | |
| return None, None, 0 | |
| # ============================================================ | |
| # Async media handler with live progress | |
| # ============================================================ | |
| async def handle_media(chat_id: int, reply_to_id: int, | |
| file_id: str, filename: str, file_size: int): | |
| short_name = filename if len(filename) <= 30 else filename[:27] + "..." | |
| # ββ Phase 0: initial status ββββββββββββββββββββββββββββββ | |
| status_msg = await bot.send_message( | |
| chat_id, | |
| f"π **Preparing to process your fileβ¦**\n\n" | |
| f"π `{short_name}`\n" | |
| f"π¦ Size: `{fmt_size(file_size) if file_size else 'unknown'}`", | |
| reply_to_message_id=reply_to_id, | |
| ) | |
| dl_start = time.time() | |
| last_edit = [0.0] # mutable for closure | |
| tmp_path = f"fl/tmp_{reply_to_id}_{filename}" | |
| # ββ Progress callback for Pyrogram download ββββββββββββββ | |
| async def on_progress(current: int, total: int): | |
| now = time.time() | |
| if now - last_edit[0] < 2.5: | |
| return | |
| last_edit[0] = now | |
| elapsed = now - dl_start | |
| speed = current / elapsed if elapsed > 0 else 0 | |
| # Pyrogram often passes total=0 β use file_size from webhook as fallback | |
| actual_total = total if total and total > 0 else file_size | |
| try: | |
| await status_msg.edit( | |
| build_progress_text("download", current, actual_total, | |
| speed, elapsed, short_name) | |
| ) | |
| except Exception: | |
| pass | |
| try: | |
| # ββ Phase 1: Download ββββββββββββββββββββββββββββββββ | |
| downloaded = None | |
| for attempt in range(2): | |
| try: | |
| downloaded = await bot.download_media( | |
| file_id, | |
| file_name=tmp_path, | |
| progress=on_progress, | |
| ) | |
| break | |
| except FloodWait as e: | |
| wait_s = int(getattr(e, "value", 0) or 0) | |
| LOGGER(__name__).warning(f"FloodWait: {wait_s}s") | |
| if attempt == 0 and wait_s > 0: | |
| await status_msg.edit( | |
| f"β³ **FloodWait β retrying in {wait_s}sβ¦**\n\n" | |
| f"π `{short_name}`" | |
| ) | |
| await asyncio.sleep(wait_s + 1) | |
| dl_start = time.time() | |
| continue | |
| raise | |
| if not downloaded or not os.path.exists(downloaded) \ | |
| or os.path.getsize(downloaded) == 0: | |
| await status_msg.edit( | |
| "β **Download Failed**\n\n" | |
| "Could not retrieve the file from Telegram.\n" | |
| "Please try again." | |
| ) | |
| try: os.remove(tmp_path) | |
| except: pass | |
| return | |
| actual_size = os.path.getsize(downloaded) | |
| dl_elapsed = time.time() - dl_start | |
| dl_speed = actual_size / dl_elapsed if dl_elapsed > 0 else 0 | |
| # ββ Phase 2: Saving ββββββββββββββββββββββββββββββββββ | |
| await status_msg.edit( | |
| f"π€ **Saving to serverβ¦**\n\n" | |
| f"π `{short_name}`\n" | |
| f"π¦ `{fmt_size(actual_size)}`\n\n" | |
| f"β¬οΈ Downloaded in `{fmt_time(dl_elapsed)}` " | |
| f"at `{fmt_size(dl_speed)}/s`" | |
| ) | |
| with open(downloaded, "rb") as f: | |
| file_data = f.read() | |
| os.remove(downloaded) | |
| entry = fm.save_file(file_data, filename) | |
| link = f"{SPACE_URL}/download/{entry['id']}" | |
| # ββ Phase 3: Done ββββββββββββββββββββββββββββββββββββ | |
| total_elapsed = time.time() - dl_start | |
| await status_msg.edit( | |
| f"β **Link Generated Successfully!**\n" | |
| f"βββββββββββββββββββ\n" | |
| f"π **Link:**\n`{link}`\n\n" | |
| f"π **File:** `{entry['filename']}`\n" | |
| f"π¦ **Size:** `{fmt_size(actual_size)}`\n" | |
| f"β‘ **Speed:** `{fmt_size(dl_speed)}/s`\n" | |
| f"β± **Time:** `{fmt_time(total_elapsed)}`\n" | |
| f"βββββββββββββββββββ\n" | |
| f"_Tap the link to copy β’ Accessible by anyone_", | |
| disable_web_page_preview=True, | |
| ) | |
| LOGGER(__name__).info(f"Generated link for {filename} ({fmt_size(actual_size)}): {link}") | |
| except Exception as e: | |
| LOGGER(__name__).error(traceback.format_exc()) | |
| try: | |
| await status_msg.edit( | |
| f"β **Something went wrong**\n\n" | |
| f"`{str(e)[:200]}`\n\n" | |
| f"Please try again or contact support." | |
| ) | |
| except Exception: | |
| pass | |
| # ============================================================ | |
| # FastAPI lifespan | |
| # ============================================================ | |
| async def lifespan(app: FastAPI): | |
| LOGGER(__name__).info("Starting bot clientβ¦") | |
| await bot.start() | |
| me = await bot.get_me() | |
| LOGGER(__name__).info(f"Bot ready: @{me.username} β waiting for webhook updates.") | |
| yield | |
| LOGGER(__name__).info("Shutting downβ¦") | |
| await bot.stop() | |
| app = FastAPI(lifespan=lifespan) | |
| # ============================================================ | |
| # File serving endpoints | |
| # ============================================================ | |
| async def root(): | |
| return {"status": "ok", "service": "Link Generator Bot"} | |
| async def download_file(file_id: str): | |
| entry = fm.get_entry_by_id(file_id) | |
| if not entry: | |
| return JSONResponse({"error": "File not found or expired"}, status_code=404) | |
| path = entry["path"] | |
| if not os.path.exists(path): | |
| return JSONResponse({"error": "File not found on disk"}, status_code=404) | |
| return FileResponse( | |
| path=path, | |
| filename=entry["filename"], | |
| media_type="application/octet-stream", | |
| ) | |
| # ============================================================ | |
| # Webhook endpoint | |
| # ============================================================ | |
| async def telegram_webhook(request: Request): | |
| update = await request.json() | |
| LOGGER(__name__).debug(f"Update: {update}") | |
| if "message" not in update: | |
| return JSONResponse({"status": "ok"}) | |
| msg = update["message"] | |
| chat_id = msg["chat"]["id"] | |
| text = msg.get("text", "").strip() | |
| msg_id = msg["message_id"] | |
| if text.startswith("/start"): | |
| return JSONResponse(handle_start(chat_id)) | |
| if text.startswith("/help"): | |
| return JSONResponse(handle_help(chat_id)) | |
| if text.startswith("/stats"): | |
| return JSONResponse(handle_stats(chat_id)) | |
| file_id, filename, file_size = get_file_info(msg) | |
| if file_id: | |
| track_task(handle_media(chat_id, msg_id, file_id, filename, file_size)) | |
| return JSONResponse({"status": "ok"}) | |
| if text and not text.startswith("/"): | |
| return JSONResponse(_msg(chat_id, | |
| "π **Please send a file, photo, video or audio.**\n" | |
| "I'll generate a public link for it!")) | |
| return JSONResponse({"status": "ok"}) |