media2url / main.py
no-name-here's picture
Update main.py
8df0016 verified
# Link Generator Bot
# Webhook-only | HuggingFace Spaces compatible
import os
import sys
import time
import logging
import asyncio
import traceback
from contextlib import asynccontextmanager
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
from fastapi import FastAPI, Request
from fastapi.responses import FileResponse, JSONResponse
from pyrogram import Client
from pyrogram.enums import ParseMode
from pyrogram.errors import FloodWait
from fileManager import FileManager
# ============================================================
# Logging
# ============================================================
try:
os.remove("logs.txt")
except Exception:
pass
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s - %(levelname)s] - %(name)s - %(message)s",
datefmt="%d-%b-%y %I:%M:%S %p",
handlers=[
RotatingFileHandler("logs.txt", mode="w+", maxBytes=5_000_000, backupCount=10),
logging.StreamHandler(),
],
)
logging.getLogger("pyrogram").setLevel(logging.ERROR)
def LOGGER(name: str) -> logging.Logger:
return logging.getLogger(name)
# ============================================================
# Config
# ============================================================
try:
load_dotenv("config.env.local")
load_dotenv("config.env")
except Exception:
pass
print(f"[config] API_ID = {os.getenv('API_ID', 'NOT SET')}")
print(f"[config] API_HASH = {'SET' if os.getenv('API_HASH') else 'NOT SET'}")
print(f"[config] BOT_TOKEN = {'SET' if os.getenv('BOT_TOKEN') else 'NOT SET'}")
print(f"[config] SPACE_URL = {os.getenv('SPACE_URL', 'NOT SET')}")
_required = ["BOT_TOKEN", "API_ID", "API_HASH", "SPACE_URL"]
_missing = [v for v in _required if not os.getenv(v)]
if _missing:
print(f"ERROR: Missing required env vars: {', '.join(_missing)}")
sys.exit(1)
try:
API_ID = int(os.getenv("API_ID"))
except (TypeError, ValueError):
print(f"ERROR: API_ID must be a plain integer, got: {os.getenv('API_ID')!r}")
sys.exit(1)
API_HASH = os.getenv("API_HASH").strip()
BOT_TOKEN = os.getenv("BOT_TOKEN").strip()
SPACE_URL = os.getenv("SPACE_URL").strip()
FILE_EXPIRE_MINUTES = int(os.getenv("FILE_EXPIRE_MINUTES", "0"))
# ============================================================
# FileManager
# ============================================================
os.makedirs("fl", exist_ok=True)
fm = FileManager(base_url=SPACE_URL, expires_minutes=FILE_EXPIRE_MINUTES or 999_999)
fm.start()
# ============================================================
# Pyrogram bot client
# ============================================================
bot = Client(
"link_gen_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
workers=50,
parse_mode=ParseMode.MARKDOWN,
max_concurrent_transmissions=4,
sleep_threshold=30,
)
# ============================================================
# Global state
# ============================================================
RUNNING_TASKS: set = set()
def track_task(coro):
task = asyncio.create_task(coro)
RUNNING_TASKS.add(task)
task.add_done_callback(RUNNING_TASKS.discard)
return task
# ============================================================
# UX helpers
# ============================================================
def fmt_size(b: float) -> str:
for u in ["B", "KB", "MB", "GB"]:
if b < 1024:
return f"{b:.1f} {u}"
b /= 1024
return f"{b:.1f} TB"
def fmt_time(seconds: float) -> str:
if seconds <= 0 or seconds != seconds: # handle 0, negative, NaN
return "β€”"
seconds = int(seconds)
if seconds < 60:
return f"{seconds}s"
m, s = divmod(seconds, 60)
if m < 60:
return f"{m}m {s}s"
h, m = divmod(m, 60)
return f"{h}h {m}m"
def progress_bar(current: int, total: int, length: int = 10) -> str:
if not total or total <= 0 or current <= 0:
return "β–‘" * length # empty bar instead of full when unknown
pct = min(1.0, current / total)
filled = int(length * pct)
empty = length - filled
return "β–“" * filled + "β–‘" * empty
def build_progress_text(phase: str, current: int, total: int,
speed: float, elapsed: float, filename: str) -> str:
pct = min(100, int(current * 100 / total)) if total > 0 else 0
bar = progress_bar(current, total)
# ETA only makes sense when we know total and have speed
if total > 0 and speed > 0 and current < total:
eta = fmt_time((total - current) / speed)
else:
eta = "β€”"
icons = {"download": "πŸ“₯", "upload": "πŸ“€"}
icon = icons.get(phase, "βš™οΈ")
label = "Downloading" if phase == "download" else "Saving"
total_str = fmt_size(total) if total > 0 else "?"
return (
f"{icon} **{label}:** `{filename}`\n\n"
f"`{bar}` **{pct}%**\n\n"
f"**Done:** `{fmt_size(current)}` / `{total_str}`\n"
f"**Speed:** `{fmt_size(speed)}/s`\n"
f"**ETA:** `{eta}`\n"
f"**Elapsed:** `{fmt_time(elapsed)}`"
)
# ============================================================
# Webhook response helpers
# ============================================================
def _msg(chat_id: int, text: str, parse_mode: str = "Markdown",
reply_markup: dict = None, disable_web_page_preview: bool = False) -> dict:
payload = {"method": "sendMessage", "chat_id": chat_id,
"text": text, "parse_mode": parse_mode}
if reply_markup:
payload["reply_markup"] = reply_markup
if disable_web_page_preview:
payload["disable_web_page_preview"] = True
return payload
# ============================================================
# Instant command handlers
# ============================================================
def handle_start(chat_id: int) -> dict:
return _msg(chat_id,
"πŸ‘‹ **Welcome to Link Generator Bot!**\n\n"
"Send me any file, photo, video, audio or document\n"
"and I'll give you a **public shareable link** instantly.\n\n"
"πŸ“Ž Supports: documents, photos, videos, audio, voice, stickers\n"
"πŸ”— Links are permanent and accessible by anyone.\n\n"
"Just send a file to get started!")
def handle_help(chat_id: int) -> dict:
return _msg(chat_id,
"πŸ’‘ **Link Generator Bot Help**\n\n"
"➀ Send any file/media β†’ get a public link\n"
"➀ `/start` – welcome message\n"
"➀ `/help` – this message\n"
"➀ `/stats` – bot stats\n\n"
"**Supported types:**\n"
"β€’ Documents, PDFs, ZIPs\n"
"β€’ Photos\n"
"β€’ Videos\n"
"β€’ Audio / Voice messages\n"
"β€’ Stickers, Animations\n\n"
"Just send the file β€” no commands needed!")
def handle_stats(chat_id: int) -> dict:
import shutil, psutil
from time import time as _time
total, used, free = shutil.disk_usage(".")
proc = psutil.Process(os.getpid())
fl_count = len([f for f in os.listdir("fl")
if not f.startswith("tmp_")]) if os.path.isdir("fl") else 0
active = len([t for t in RUNNING_TASKS if not t.done()])
return _msg(chat_id,
"πŸ“Š **Bot Statistics**\n"
"━━━━━━━━━━━━━━━━━━━\n"
f"πŸ’Ύ **Disk Used:** `{fmt_size(used)}` / `{fmt_size(total)}`\n"
f"πŸ†“ **Disk Free:** `{fmt_size(free)}`\n"
f"🧠 **Memory:** `{round(proc.memory_info()[0]/1024**2)} MiB`\n"
f"⚑ **CPU:** `{psutil.cpu_percent(interval=0.2)}%`\n"
f"πŸ“ **Stored Files:** `{fl_count}`\n"
f"βš™οΈ **Active Jobs:** `{active}`")
# ============================================================
# Extract file_id + filename + size from raw webhook message
# ============================================================
MEDIA_TYPES = [
"document", "video", "audio", "voice",
"photo", "animation", "sticker", "video_note"
]
EXT_MAP = {
"video": "mp4", "audio": "mp3", "voice": "ogg",
"animation": "gif", "sticker": "webp", "video_note": "mp4"
}
def get_file_info(msg: dict) -> tuple:
"""Returns (file_id, filename, file_size) or (None, None, 0)."""
for mtype in MEDIA_TYPES:
if mtype not in msg:
continue
if mtype == "photo":
photos = msg["photo"]
obj = photos[-1] if isinstance(photos, list) else photos
return obj["file_id"], f"{obj.get('file_unique_id', 'photo')}.jpg", obj.get("file_size", 0)
obj = msg[mtype]
fid = obj["file_id"]
fname = obj.get("file_name") or obj.get("file_unique_id", mtype)
if "." not in fname.split("/")[-1]:
fname = f"{fname}.{EXT_MAP.get(mtype, 'bin')}"
return fid, fname, obj.get("file_size", 0)
return None, None, 0
# ============================================================
# Async media handler with live progress
# ============================================================
async def handle_media(chat_id: int, reply_to_id: int,
file_id: str, filename: str, file_size: int):
short_name = filename if len(filename) <= 30 else filename[:27] + "..."
# ── Phase 0: initial status ──────────────────────────────
status_msg = await bot.send_message(
chat_id,
f"πŸ” **Preparing to process your file…**\n\n"
f"πŸ“„ `{short_name}`\n"
f"πŸ“¦ Size: `{fmt_size(file_size) if file_size else 'unknown'}`",
reply_to_message_id=reply_to_id,
)
dl_start = time.time()
last_edit = [0.0] # mutable for closure
tmp_path = f"fl/tmp_{reply_to_id}_{filename}"
# ── Progress callback for Pyrogram download ──────────────
async def on_progress(current: int, total: int):
now = time.time()
if now - last_edit[0] < 2.5:
return
last_edit[0] = now
elapsed = now - dl_start
speed = current / elapsed if elapsed > 0 else 0
# Pyrogram often passes total=0 β€” use file_size from webhook as fallback
actual_total = total if total and total > 0 else file_size
try:
await status_msg.edit(
build_progress_text("download", current, actual_total,
speed, elapsed, short_name)
)
except Exception:
pass
try:
# ── Phase 1: Download ────────────────────────────────
downloaded = None
for attempt in range(2):
try:
downloaded = await bot.download_media(
file_id,
file_name=tmp_path,
progress=on_progress,
)
break
except FloodWait as e:
wait_s = int(getattr(e, "value", 0) or 0)
LOGGER(__name__).warning(f"FloodWait: {wait_s}s")
if attempt == 0 and wait_s > 0:
await status_msg.edit(
f"⏳ **FloodWait β€” retrying in {wait_s}s…**\n\n"
f"πŸ“„ `{short_name}`"
)
await asyncio.sleep(wait_s + 1)
dl_start = time.time()
continue
raise
if not downloaded or not os.path.exists(downloaded) \
or os.path.getsize(downloaded) == 0:
await status_msg.edit(
"❌ **Download Failed**\n\n"
"Could not retrieve the file from Telegram.\n"
"Please try again."
)
try: os.remove(tmp_path)
except: pass
return
actual_size = os.path.getsize(downloaded)
dl_elapsed = time.time() - dl_start
dl_speed = actual_size / dl_elapsed if dl_elapsed > 0 else 0
# ── Phase 2: Saving ──────────────────────────────────
await status_msg.edit(
f"πŸ“€ **Saving to server…**\n\n"
f"πŸ“„ `{short_name}`\n"
f"πŸ“¦ `{fmt_size(actual_size)}`\n\n"
f"⬇️ Downloaded in `{fmt_time(dl_elapsed)}` "
f"at `{fmt_size(dl_speed)}/s`"
)
with open(downloaded, "rb") as f:
file_data = f.read()
os.remove(downloaded)
entry = fm.save_file(file_data, filename)
link = f"{SPACE_URL}/download/{entry['id']}"
# ── Phase 3: Done ────────────────────────────────────
total_elapsed = time.time() - dl_start
await status_msg.edit(
f"βœ… **Link Generated Successfully!**\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"πŸ”— **Link:**\n`{link}`\n\n"
f"πŸ“„ **File:** `{entry['filename']}`\n"
f"πŸ“¦ **Size:** `{fmt_size(actual_size)}`\n"
f"⚑ **Speed:** `{fmt_size(dl_speed)}/s`\n"
f"⏱ **Time:** `{fmt_time(total_elapsed)}`\n"
f"━━━━━━━━━━━━━━━━━━━\n"
f"_Tap the link to copy β€’ Accessible by anyone_",
disable_web_page_preview=True,
)
LOGGER(__name__).info(f"Generated link for {filename} ({fmt_size(actual_size)}): {link}")
except Exception as e:
LOGGER(__name__).error(traceback.format_exc())
try:
await status_msg.edit(
f"❌ **Something went wrong**\n\n"
f"`{str(e)[:200]}`\n\n"
f"Please try again or contact support."
)
except Exception:
pass
# ============================================================
# FastAPI lifespan
# ============================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
LOGGER(__name__).info("Starting bot client…")
await bot.start()
me = await bot.get_me()
LOGGER(__name__).info(f"Bot ready: @{me.username} β€” waiting for webhook updates.")
yield
LOGGER(__name__).info("Shutting down…")
await bot.stop()
app = FastAPI(lifespan=lifespan)
# ============================================================
# File serving endpoints
# ============================================================
@app.get("/")
async def root():
return {"status": "ok", "service": "Link Generator Bot"}
@app.get("/download/{file_id}")
async def download_file(file_id: str):
entry = fm.get_entry_by_id(file_id)
if not entry:
return JSONResponse({"error": "File not found or expired"}, status_code=404)
path = entry["path"]
if not os.path.exists(path):
return JSONResponse({"error": "File not found on disk"}, status_code=404)
return FileResponse(
path=path,
filename=entry["filename"],
media_type="application/octet-stream",
)
# ============================================================
# Webhook endpoint
# ============================================================
@app.post("/webhook")
async def telegram_webhook(request: Request):
update = await request.json()
LOGGER(__name__).debug(f"Update: {update}")
if "message" not in update:
return JSONResponse({"status": "ok"})
msg = update["message"]
chat_id = msg["chat"]["id"]
text = msg.get("text", "").strip()
msg_id = msg["message_id"]
if text.startswith("/start"):
return JSONResponse(handle_start(chat_id))
if text.startswith("/help"):
return JSONResponse(handle_help(chat_id))
if text.startswith("/stats"):
return JSONResponse(handle_stats(chat_id))
file_id, filename, file_size = get_file_info(msg)
if file_id:
track_task(handle_media(chat_id, msg_id, file_id, filename, file_size))
return JSONResponse({"status": "ok"})
if text and not text.startswith("/"):
return JSONResponse(_msg(chat_id,
"πŸ“Ž **Please send a file, photo, video or audio.**\n"
"I'll generate a public link for it!"))
return JSONResponse({"status": "ok"})