| |
| |
|
|
| import os |
| import sys |
| import time |
| import logging |
| import asyncio |
| import traceback |
| from contextlib import asynccontextmanager |
| from logging.handlers import RotatingFileHandler |
|
|
| from dotenv import load_dotenv |
| from fastapi import FastAPI, Request |
| from fastapi.responses import FileResponse, JSONResponse |
| from pyrogram import Client |
| from pyrogram.enums import ParseMode |
| from pyrogram.errors import FloodWait |
|
|
| from fileManager import FileManager |
|
|
| |
| |
| |
| try: |
| os.remove("logs.txt") |
| except Exception: |
| pass |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="[%(asctime)s - %(levelname)s] - %(name)s - %(message)s", |
| datefmt="%d-%b-%y %I:%M:%S %p", |
| handlers=[ |
| RotatingFileHandler("logs.txt", mode="w+", maxBytes=5_000_000, backupCount=10), |
| logging.StreamHandler(), |
| ], |
| ) |
| logging.getLogger("pyrogram").setLevel(logging.ERROR) |
|
|
| def LOGGER(name: str) -> logging.Logger: |
| return logging.getLogger(name) |
|
|
| |
| |
| |
| try: |
| load_dotenv("config.env.local") |
| load_dotenv("config.env") |
| except Exception: |
| pass |
|
|
| print(f"[config] API_ID = {os.getenv('API_ID', 'NOT SET')}") |
| print(f"[config] API_HASH = {'SET' if os.getenv('API_HASH') else 'NOT SET'}") |
| print(f"[config] BOT_TOKEN = {'SET' if os.getenv('BOT_TOKEN') else 'NOT SET'}") |
| print(f"[config] SPACE_URL = {os.getenv('SPACE_URL', 'NOT SET')}") |
|
|
| _required = ["BOT_TOKEN", "API_ID", "API_HASH", "SPACE_URL"] |
| _missing = [v for v in _required if not os.getenv(v)] |
| if _missing: |
| print(f"ERROR: Missing required env vars: {', '.join(_missing)}") |
| sys.exit(1) |
|
|
| try: |
| API_ID = int(os.getenv("API_ID")) |
| except (TypeError, ValueError): |
| print(f"ERROR: API_ID must be a plain integer, got: {os.getenv('API_ID')!r}") |
| sys.exit(1) |
|
|
| API_HASH = os.getenv("API_HASH").strip() |
| BOT_TOKEN = os.getenv("BOT_TOKEN").strip() |
| SPACE_URL = os.getenv("SPACE_URL").strip() |
| FILE_EXPIRE_MINUTES = int(os.getenv("FILE_EXPIRE_MINUTES", "0")) |
|
|
| |
| |
| |
| os.makedirs("fl", exist_ok=True) |
| fm = FileManager(base_url=SPACE_URL, expires_minutes=FILE_EXPIRE_MINUTES or 999_999) |
| fm.start() |
|
|
| |
| |
| |
| bot = Client( |
| "link_gen_bot", |
| api_id=API_ID, |
| api_hash=API_HASH, |
| bot_token=BOT_TOKEN, |
| workers=50, |
| parse_mode=ParseMode.MARKDOWN, |
| max_concurrent_transmissions=4, |
| sleep_threshold=30, |
| ) |
|
|
| |
| |
| |
| RUNNING_TASKS: set = set() |
|
|
| def track_task(coro): |
| task = asyncio.create_task(coro) |
| RUNNING_TASKS.add(task) |
| task.add_done_callback(RUNNING_TASKS.discard) |
| return task |
|
|
| |
| |
| |
| def fmt_size(b: float) -> str: |
| for u in ["B", "KB", "MB", "GB"]: |
| if b < 1024: |
| return f"{b:.1f} {u}" |
| b /= 1024 |
| return f"{b:.1f} TB" |
|
|
| def fmt_time(seconds: float) -> str: |
| if seconds <= 0 or seconds != seconds: |
| return "β" |
| seconds = int(seconds) |
| if seconds < 60: |
| return f"{seconds}s" |
| m, s = divmod(seconds, 60) |
| if m < 60: |
| return f"{m}m {s}s" |
| h, m = divmod(m, 60) |
| return f"{h}h {m}m" |
|
|
| def progress_bar(current: int, total: int, length: int = 10) -> str: |
| if not total or total <= 0 or current <= 0: |
| return "β" * length |
| pct = min(1.0, current / total) |
| filled = int(length * pct) |
| empty = length - filled |
| return "β" * filled + "β" * empty |
|
|
| def build_progress_text(phase: str, current: int, total: int, |
| speed: float, elapsed: float, filename: str) -> str: |
| pct = min(100, int(current * 100 / total)) if total > 0 else 0 |
| bar = progress_bar(current, total) |
| |
| if total > 0 and speed > 0 and current < total: |
| eta = fmt_time((total - current) / speed) |
| else: |
| eta = "β" |
| icons = {"download": "π₯", "upload": "π€"} |
| icon = icons.get(phase, "βοΈ") |
| label = "Downloading" if phase == "download" else "Saving" |
| total_str = fmt_size(total) if total > 0 else "?" |
|
|
| return ( |
| f"{icon} **{label}:** `{filename}`\n\n" |
| f"`{bar}` **{pct}%**\n\n" |
| f"**Done:** `{fmt_size(current)}` / `{total_str}`\n" |
| f"**Speed:** `{fmt_size(speed)}/s`\n" |
| f"**ETA:** `{eta}`\n" |
| f"**Elapsed:** `{fmt_time(elapsed)}`" |
| ) |
|
|
| |
| |
| |
| def _msg(chat_id: int, text: str, parse_mode: str = "Markdown", |
| reply_markup: dict = None, disable_web_page_preview: bool = False) -> dict: |
| payload = {"method": "sendMessage", "chat_id": chat_id, |
| "text": text, "parse_mode": parse_mode} |
| if reply_markup: |
| payload["reply_markup"] = reply_markup |
| if disable_web_page_preview: |
| payload["disable_web_page_preview"] = True |
| return payload |
|
|
| |
| |
| |
| def handle_start(chat_id: int) -> dict: |
| return _msg(chat_id, |
| "π **Welcome to Link Generator Bot!**\n\n" |
| "Send me any file, photo, video, audio or document\n" |
| "and I'll give you a **public shareable link** instantly.\n\n" |
| "π Supports: documents, photos, videos, audio, voice, stickers\n" |
| "π Links are permanent and accessible by anyone.\n\n" |
| "Just send a file to get started!") |
|
|
| def handle_help(chat_id: int) -> dict: |
| return _msg(chat_id, |
| "π‘ **Link Generator Bot Help**\n\n" |
| "β€ Send any file/media β get a public link\n" |
| "β€ `/start` β welcome message\n" |
| "β€ `/help` β this message\n" |
| "β€ `/stats` β bot stats\n\n" |
| "**Supported types:**\n" |
| "β’ Documents, PDFs, ZIPs\n" |
| "β’ Photos\n" |
| "β’ Videos\n" |
| "β’ Audio / Voice messages\n" |
| "β’ Stickers, Animations\n\n" |
| "Just send the file β no commands needed!") |
|
|
| def handle_stats(chat_id: int) -> dict: |
| import shutil, psutil |
| from time import time as _time |
| total, used, free = shutil.disk_usage(".") |
| proc = psutil.Process(os.getpid()) |
| fl_count = len([f for f in os.listdir("fl") |
| if not f.startswith("tmp_")]) if os.path.isdir("fl") else 0 |
| active = len([t for t in RUNNING_TASKS if not t.done()]) |
| return _msg(chat_id, |
| "π **Bot Statistics**\n" |
| "βββββββββββββββββββ\n" |
| f"πΎ **Disk Used:** `{fmt_size(used)}` / `{fmt_size(total)}`\n" |
| f"π **Disk Free:** `{fmt_size(free)}`\n" |
| f"π§ **Memory:** `{round(proc.memory_info()[0]/1024**2)} MiB`\n" |
| f"β‘ **CPU:** `{psutil.cpu_percent(interval=0.2)}%`\n" |
| f"π **Stored Files:** `{fl_count}`\n" |
| f"βοΈ **Active Jobs:** `{active}`") |
|
|
| |
| |
| |
| MEDIA_TYPES = [ |
| "document", "video", "audio", "voice", |
| "photo", "animation", "sticker", "video_note" |
| ] |
| EXT_MAP = { |
| "video": "mp4", "audio": "mp3", "voice": "ogg", |
| "animation": "gif", "sticker": "webp", "video_note": "mp4" |
| } |
|
|
| def get_file_info(msg: dict) -> tuple: |
| """Returns (file_id, filename, file_size) or (None, None, 0).""" |
| for mtype in MEDIA_TYPES: |
| if mtype not in msg: |
| continue |
| if mtype == "photo": |
| photos = msg["photo"] |
| obj = photos[-1] if isinstance(photos, list) else photos |
| return obj["file_id"], f"{obj.get('file_unique_id', 'photo')}.jpg", obj.get("file_size", 0) |
| obj = msg[mtype] |
| fid = obj["file_id"] |
| fname = obj.get("file_name") or obj.get("file_unique_id", mtype) |
| if "." not in fname.split("/")[-1]: |
| fname = f"{fname}.{EXT_MAP.get(mtype, 'bin')}" |
| return fid, fname, obj.get("file_size", 0) |
| return None, None, 0 |
|
|
| |
| |
| |
| async def handle_media(chat_id: int, reply_to_id: int, |
| file_id: str, filename: str, file_size: int): |
|
|
| short_name = filename if len(filename) <= 30 else filename[:27] + "..." |
|
|
| |
| status_msg = await bot.send_message( |
| chat_id, |
| f"π **Preparing to process your fileβ¦**\n\n" |
| f"π `{short_name}`\n" |
| f"π¦ Size: `{fmt_size(file_size) if file_size else 'unknown'}`", |
| reply_to_message_id=reply_to_id, |
| ) |
|
|
| dl_start = time.time() |
| last_edit = [0.0] |
| tmp_path = f"fl/tmp_{reply_to_id}_{filename}" |
|
|
| |
| async def on_progress(current: int, total: int): |
| now = time.time() |
| if now - last_edit[0] < 2.5: |
| return |
| last_edit[0] = now |
| elapsed = now - dl_start |
| speed = current / elapsed if elapsed > 0 else 0 |
| |
| actual_total = total if total and total > 0 else file_size |
| try: |
| await status_msg.edit( |
| build_progress_text("download", current, actual_total, |
| speed, elapsed, short_name) |
| ) |
| except Exception: |
| pass |
|
|
| try: |
| |
| downloaded = None |
| for attempt in range(2): |
| try: |
| downloaded = await bot.download_media( |
| file_id, |
| file_name=tmp_path, |
| progress=on_progress, |
| ) |
| break |
| except FloodWait as e: |
| wait_s = int(getattr(e, "value", 0) or 0) |
| LOGGER(__name__).warning(f"FloodWait: {wait_s}s") |
| if attempt == 0 and wait_s > 0: |
| await status_msg.edit( |
| f"β³ **FloodWait β retrying in {wait_s}sβ¦**\n\n" |
| f"π `{short_name}`" |
| ) |
| await asyncio.sleep(wait_s + 1) |
| dl_start = time.time() |
| continue |
| raise |
|
|
| if not downloaded or not os.path.exists(downloaded) \ |
| or os.path.getsize(downloaded) == 0: |
| await status_msg.edit( |
| "β **Download Failed**\n\n" |
| "Could not retrieve the file from Telegram.\n" |
| "Please try again." |
| ) |
| try: os.remove(tmp_path) |
| except: pass |
| return |
|
|
| actual_size = os.path.getsize(downloaded) |
| dl_elapsed = time.time() - dl_start |
| dl_speed = actual_size / dl_elapsed if dl_elapsed > 0 else 0 |
|
|
| |
| await status_msg.edit( |
| f"π€ **Saving to serverβ¦**\n\n" |
| f"π `{short_name}`\n" |
| f"π¦ `{fmt_size(actual_size)}`\n\n" |
| f"β¬οΈ Downloaded in `{fmt_time(dl_elapsed)}` " |
| f"at `{fmt_size(dl_speed)}/s`" |
| ) |
|
|
| with open(downloaded, "rb") as f: |
| file_data = f.read() |
| os.remove(downloaded) |
|
|
| entry = fm.save_file(file_data, filename) |
| link = f"{SPACE_URL}/download/{entry['id']}" |
|
|
| |
| total_elapsed = time.time() - dl_start |
| await status_msg.edit( |
| f"β
**Link Generated Successfully!**\n" |
| f"βββββββββββββββββββ\n" |
| f"π **Link:**\n`{link}`\n\n" |
| f"π **File:** `{entry['filename']}`\n" |
| f"π¦ **Size:** `{fmt_size(actual_size)}`\n" |
| f"β‘ **Speed:** `{fmt_size(dl_speed)}/s`\n" |
| f"β± **Time:** `{fmt_time(total_elapsed)}`\n" |
| f"βββββββββββββββββββ\n" |
| f"_Tap the link to copy β’ Accessible by anyone_", |
| disable_web_page_preview=True, |
| ) |
| LOGGER(__name__).info(f"Generated link for {filename} ({fmt_size(actual_size)}): {link}") |
|
|
| except Exception as e: |
| LOGGER(__name__).error(traceback.format_exc()) |
| try: |
| await status_msg.edit( |
| f"β **Something went wrong**\n\n" |
| f"`{str(e)[:200]}`\n\n" |
| f"Please try again or contact support." |
| ) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| LOGGER(__name__).info("Starting bot clientβ¦") |
| await bot.start() |
| me = await bot.get_me() |
| LOGGER(__name__).info(f"Bot ready: @{me.username} β waiting for webhook updates.") |
| yield |
| LOGGER(__name__).info("Shutting downβ¦") |
| await bot.stop() |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| |
| |
| |
| @app.get("/") |
| async def root(): |
| return {"status": "ok", "service": "Link Generator Bot"} |
|
|
| @app.get("/download/{file_id}") |
| async def download_file(file_id: str): |
| entry = fm.get_entry_by_id(file_id) |
| if not entry: |
| return JSONResponse({"error": "File not found or expired"}, status_code=404) |
| path = entry["path"] |
| if not os.path.exists(path): |
| return JSONResponse({"error": "File not found on disk"}, status_code=404) |
| return FileResponse( |
| path=path, |
| filename=entry["filename"], |
| media_type="application/octet-stream", |
| ) |
|
|
| |
| |
| |
| @app.post("/webhook") |
| async def telegram_webhook(request: Request): |
| update = await request.json() |
| LOGGER(__name__).debug(f"Update: {update}") |
|
|
| if "message" not in update: |
| return JSONResponse({"status": "ok"}) |
|
|
| msg = update["message"] |
| chat_id = msg["chat"]["id"] |
| text = msg.get("text", "").strip() |
| msg_id = msg["message_id"] |
|
|
| if text.startswith("/start"): |
| return JSONResponse(handle_start(chat_id)) |
| if text.startswith("/help"): |
| return JSONResponse(handle_help(chat_id)) |
| if text.startswith("/stats"): |
| return JSONResponse(handle_stats(chat_id)) |
|
|
| file_id, filename, file_size = get_file_info(msg) |
| if file_id: |
| track_task(handle_media(chat_id, msg_id, file_id, filename, file_size)) |
| return JSONResponse({"status": "ok"}) |
|
|
| if text and not text.startswith("/"): |
| return JSONResponse(_msg(chat_id, |
| "π **Please send a file, photo, video or audio.**\n" |
| "I'll generate a public link for it!")) |
|
|
| return JSONResponse({"status": "ok"}) |