import asyncio import json import math import os import sqlite3 import urllib.parse import urllib.request from pathlib import Path from contextlib import asynccontextmanager from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, RedirectResponse, Response, StreamingResponse from fastapi.staticfiles import StaticFiles from pyrogram import Client from pyrogram.errors import FloodWait load_dotenv() API_ID = int(os.getenv("API_ID", "0")) API_HASH = os.getenv("API_HASH", "") SESSION_STRING = os.getenv("SESSION_STRING", "") BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip() BOT_API_BASE = os.getenv("BOT_API_BASE", "https://api.telegram.org").strip() BOT_TARGET = os.getenv("BOT_TARGET", "").strip() env_db_path = os.getenv("DB_PATH", "").strip() if env_db_path: DB_PATH = env_db_path else: DB_PATH = "/data/movies.db" if Path("/data").exists() else "movies.db" CHUNK_SIZE = 1024 * 1024 # Ensure DB directory exists data_dir = Path(DB_PATH).parent data_dir.mkdir(parents=True, exist_ok=True) BATCH_SIZE = 100 # messages per GetHistory call BATCH_DELAY = 1.5 # seconds between batches (avoids FloodWait) FRONTEND_DIST = Path(os.getenv("FRONTEND_DIST", "frontend/dist")).resolve() ASSETS_DIR = FRONTEND_DIST / "assets" def load_channel_id(value: str) -> str | int: if not value: return "" cleaned = value.strip() if cleaned.lstrip("-").isdigit(): return int(cleaned) return cleaned CHANNEL_ID = load_channel_id(os.getenv("CHANNEL_ID", "")) bot = Client( "tgstream", api_id=API_ID, api_hash=API_HASH, session_string=SESSION_STRING, no_updates=True, ) bot_client = Client( "tgstream_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN, no_updates=True, ) if BOT_TOKEN else None # ─────────────────────────── SQLite helpers ─────────────────────────────────── def get_db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): with get_db() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS movies ( id INTEGER PRIMARY KEY, title TEXT NOT NULL, duration TEXT, duration_raw INTEGER, size TEXT, size_raw INTEGER, width INTEGER, height INTEGER, mime TEXT, has_thumb INTEGER DEFAULT 0 ); CREATE VIRTUAL TABLE IF NOT EXISTS movies_fts USING fts5(title, content='movies', content_rowid='id'); CREATE TRIGGER IF NOT EXISTS movies_ai AFTER INSERT ON movies BEGIN INSERT INTO movies_fts(rowid, title) VALUES (new.id, new.title); END; CREATE TRIGGER IF NOT EXISTS movies_ad AFTER DELETE ON movies BEGIN INSERT INTO movies_fts(movies_fts, rowid, title) VALUES ('delete', old.id, old.title); END; CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT ); CREATE TABLE IF NOT EXISTS bot_cache ( message_id INTEGER PRIMARY KEY, bot_message_id INTEGER, file_id TEXT ); """) def db_get_meta(key: str, default=None): with get_db() as conn: row = conn.execute("SELECT value FROM meta WHERE key=?", (key,)).fetchone() return row["value"] if row else default def db_set_meta(key: str, value: str): with get_db() as conn: conn.execute( "INSERT INTO meta(key,value) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", (key, value), ) def db_clear_all(): with get_db() as conn: conn.executescript("DELETE FROM movies; DELETE FROM meta;") def db_upsert_movies(rows: list[dict]): if not rows: return with get_db() as conn: conn.executemany( """ INSERT OR IGNORE INTO movies (id, title, duration, duration_raw, size, size_raw, width, height, mime, has_thumb) VALUES (:id, :title, :duration, :duration_raw, :size, :size_raw, :width, :height, :mime, :has_thumb) """, rows, ) def db_count() -> int: with get_db() as conn: return conn.execute("SELECT COUNT(*) FROM movies").fetchone()[0] def db_list_movies(limit: int, offset_id: int) -> list[dict]: with get_db() as conn: if limit <= 0: if offset_id: rows = conn.execute( "SELECT * FROM movies WHERE id < ? ORDER BY id DESC", (offset_id,), ).fetchall() else: rows = conn.execute( "SELECT * FROM movies ORDER BY id DESC", ).fetchall() elif offset_id: rows = conn.execute( "SELECT * FROM movies WHERE id < ? ORDER BY id DESC LIMIT ?", (offset_id, limit), ).fetchall() else: rows = conn.execute( "SELECT * FROM movies ORDER BY id DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] def db_search_movies(query: str, limit: int = 50) -> list[dict]: with get_db() as conn: rows = conn.execute( """ SELECT m.* FROM movies m JOIN movies_fts f ON m.id = f.rowid WHERE movies_fts MATCH ? ORDER BY rank LIMIT ? """, (query, limit), ).fetchall() return [dict(r) for r in rows] # ─────────────────────────── formatting utils ───────────────────────────────── def fmt_size(num_bytes: int | None) -> str: if not num_bytes: return "0 B" size = float(num_bytes) for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024 or unit == "TB": return f"{size:.1f} {unit}" size /= 1024 return f"{size:.1f} TB" def fmt_dur(seconds: int | None) -> str: if seconds is None: return "--:--" seconds = int(seconds) h = seconds // 3600 m = (seconds % 3600) // 60 s = seconds % 60 if h: return f"{h}:{m:02d}:{s:02d}" return f"{m}:{s:02d}" def extract_vid(msg): if msg.video: return msg.video if msg.document: mime = getattr(msg.document, "mime_type", None) or "" if mime.startswith("video"): return msg.document file_name = (getattr(msg.document, "file_name", "") or "").lower() if file_name.endswith(( ".mp4", ".mkv", ".mov", ".avi", ".webm", ".m4v", ".ts", ".m2ts", )): return msg.document return None def bot_file_url(file_id: str) -> str | None: if not BOT_TOKEN: return None endpoint = f"{BOT_API_BASE}/bot{BOT_TOKEN}/getFile?file_id={urllib.parse.quote(file_id)}" try: with urllib.request.urlopen(endpoint, timeout=10) as resp: data = json.loads(resp.read().decode("utf-8")) if not data.get("ok"): return None file_path = data.get("result", {}).get("file_path") if not file_path: return None return f"{BOT_API_BASE}/file/bot{BOT_TOKEN}/{file_path}" except Exception as exc: print(f"[BOT] getFile error: {exc}") return None def bot_cache_get(message_id: int) -> str | None: with get_db() as conn: row = conn.execute( "SELECT file_id FROM bot_cache WHERE message_id=?", (message_id,), ).fetchone() return row["file_id"] if row else None def bot_cache_set(message_id: int, bot_message_id: int, file_id: str) -> None: with get_db() as conn: conn.execute( "INSERT OR REPLACE INTO bot_cache(message_id, bot_message_id, file_id) VALUES(?,?,?)", (message_id, bot_message_id, file_id), ) async def get_or_create_bot_file_id(message_id: int) -> str | None: if not BOT_TOKEN or not BOT_TARGET or not bot_client: return None cached = bot_cache_get(message_id) if cached: return cached try: forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id) forwarded_id = forwarded.id bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id) vid = extract_vid(bot_msg) if not vid: return None file_id = getattr(vid, "file_id", None) if not file_id: return None bot_cache_set(message_id, forwarded_id, file_id) return file_id except Exception as exc: print(f"[BOT] forward error: {exc}") return None def msg_to_row(msg) -> dict | None: vid = extract_vid(msg) if not vid: return None title = (msg.caption or f"Movie #{msg.id}").strip() return { "id": msg.id, "title": title, "duration": fmt_dur(getattr(vid, "duration", None)), "duration_raw": getattr(vid, "duration", None), "size": fmt_size(getattr(vid, "file_size", None)), "size_raw": getattr(vid, "file_size", None), "width": getattr(vid, "width", None), "height": getattr(vid, "height", None), "mime": getattr(vid, "mime_type", None), "has_thumb": int(bool(getattr(vid, "thumbs", None))), } # ─────────────────────────── background crawler ─────────────────────────────── _crawl_task: asyncio.Task | None = None _crawl_status = {"running": False, "indexed": 0, "last_id": 0} async def crawl_channel(): """ Incrementally crawl the channel oldest-to-newest using offset_id pagination. Saves progress so restarts resume from where they left off. """ global _crawl_status _crawl_status["running"] = True # Resume from last known min message id saved_min_id = int(db_get_meta("crawl_min_id", "0")) offset_id = saved_min_id if saved_min_id else 0 print(f"[CRAWLER] Starting. Resume offset_id={offset_id}. DB has {db_count()} movies.") while True: try: batch = [] async for msg in bot.get_chat_history(CHANNEL_ID, limit=BATCH_SIZE, offset_id=offset_id): batch.append(msg) if not batch: print("[CRAWLER] No more messages. Crawl complete.") break rows = [r for msg in batch if (r := msg_to_row(msg))] db_upsert_movies(rows) min_id = min(m.id for m in batch) db_set_meta("crawl_min_id", str(min_id)) _crawl_status["indexed"] = db_count() _crawl_status["last_id"] = min_id print(f"[CRAWLER] Batch saved {len(rows)} videos. DB total={db_count()}. Next offset_id={min_id}") if len(batch) < BATCH_SIZE: print("[CRAWLER] Last batch — history exhausted.") break offset_id = min_id await asyncio.sleep(BATCH_DELAY) except FloodWait as exc: wait = exc.value + 2 print(f"[CRAWLER] FloodWait — sleeping {wait}s") await asyncio.sleep(wait) except asyncio.CancelledError: print("[CRAWLER] Cancelled.") break except Exception as e: print(f"[CRAWLER] Error: {e}. Retrying in 10s.") await asyncio.sleep(10) _crawl_status["running"] = False print(f"[CRAWLER] Finished. DB total: {db_count()}") # ─────────────────────────── app lifespan ───────────────────────────────────── @asynccontextmanager async def lifespan(_: FastAPI): global _crawl_task init_db() await bot.start() if bot_client: await bot_client.start() # Reset DB if CHANNEL_ID changed to avoid mixing data. stored_channel = db_get_meta("channel_id", "") current_channel = str(CHANNEL_ID) if stored_channel and stored_channel != current_channel: print("[INFO] Channel changed. Resetting local DB and crawl state.") db_clear_all() db_set_meta("channel_id", current_channel) print("[INFO] Warming up peer cache...") try: async for _ in bot.get_dialogs(): pass print("[INFO] Peer cache ready.") except Exception as e: print(f"[WARNING] Could not warm peer cache: {e}") # Background crawl — non-blocking, app is usable immediately _crawl_task = asyncio.create_task(crawl_channel()) try: yield finally: if _crawl_task and not _crawl_task.done(): _crawl_task.cancel() try: await _crawl_task except asyncio.CancelledError: pass if bot_client: await bot_client.stop() await bot.stop() app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) # Mount assets early for direct file serving if FRONTEND_DIST.exists() and ASSETS_DIR.exists(): app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") # ─────────────────────────── routes ─────────────────────────────────────────── @app.get("/assets/{asset_path:path}") async def assets_fallback(asset_path: str): """ Fallback asset handler in case static mount is unavailable. """ file_path = ASSETS_DIR / asset_path if not file_path.is_file(): raise HTTPException(status_code=404, detail="Asset not found") return FileResponse(file_path) @app.get("/movies") async def movies( limit: int = Query(default=0, ge=0), offset_id: int = Query(default=0), ): """ Paginated list. Use next_offset_id from response as offset_id for next page. """ items = db_list_movies(limit=limit, offset_id=offset_id) next_offset = items[-1]["id"] if limit > 0 and len(items) == limit else None return { "items": items, "count": len(items), "next_offset_id": next_offset, "db_total": db_count(), } @app.get("/search") async def search( q: str = Query(default=""), limit: int = Query(default=50, le=200), live: bool = Query(default=False), ): """ Search movies. ?q=term → SQLite FTS (instant, from DB) ?q=term&live=1 → also queries Telegram for uncrawled messages """ q = q.strip() if not q: return {"items": db_list_movies(limit=limit, offset_id=0), "source": "db"} results = db_search_movies(q, limit=limit) if live and len(results) < limit: seen_ids = {r["id"] for r in results} try: async for msg in bot.search_messages(CHANNEL_ID, query=q, filter="video", limit=20): if msg.id not in seen_ids: row = msg_to_row(msg) if row: results.append(row) seen_ids.add(msg.id) except Exception as e: print(f"[SEARCH] Live search error: {e}") return {"items": results, "count": len(results), "source": "fts+live" if live else "fts"} @app.get("/status") async def status(): """Crawl progress and DB stats.""" return { "crawl_running": _crawl_status["running"], "db_total": db_count(), "crawl_last_id": _crawl_status["last_id"], "crawl_resume_point": db_get_meta("crawl_min_id", "0"), } @app.post("/crawl/restart") async def restart_crawl(): """Wipe DB and restart crawl from scratch.""" global _crawl_task if _crawl_task and not _crawl_task.done(): _crawl_task.cancel() try: await _crawl_task except asyncio.CancelledError: pass db_clear_all() db_set_meta("channel_id", str(CHANNEL_ID)) _crawl_task = asyncio.create_task(crawl_channel()) return {"message": "Crawl restarted from scratch."} @app.get("/thumb/{message_id}") async def thumb(message_id: int): msg = await bot.get_messages(CHANNEL_ID, message_id) vid = extract_vid(msg) if not vid or not getattr(vid, "thumbs", None): raise HTTPException(status_code=404, detail="Thumbnail not found") thumb_item = vid.thumbs[0] data = await bot.download_media(thumb_item, in_memory=True) if hasattr(data, "getvalue"): data = data.getvalue() return Response(content=data, media_type="image/jpeg") @app.get("/stream/{message_id}") async def stream(message_id: int, request: Request): msg = await bot.get_messages(CHANNEL_ID, message_id) vid = extract_vid(msg) if not vid: raise HTTPException(status_code=404, detail="Video not found") size = getattr(vid, "file_size", 0) or 0 start, end = parse_range(request.headers.get("range"), size) if start >= size: raise HTTPException(status_code=416, detail="Range not satisfiable") length = end - start + 1 first_part = start // CHUNK_SIZE skip_bytes = start % CHUNK_SIZE parts_needed = math.ceil((skip_bytes + length) / CHUNK_SIZE) async def streamer(): nonlocal skip_bytes remaining = length async for chunk in bot.stream_media(msg, offset=first_part, limit=parts_needed): if skip_bytes: if len(chunk) <= skip_bytes: skip_bytes -= len(chunk) continue chunk = chunk[skip_bytes:] skip_bytes = 0 if remaining <= 0: break if len(chunk) > remaining: chunk = chunk[:remaining] remaining -= len(chunk) if chunk: yield chunk if remaining <= 0: break mime = getattr(vid, "mime_type", None) or "video/mp4" headers = { "Content-Range": f"bytes {start}-{end}/{size}", "Accept-Ranges": "bytes", "Content-Length": str(length), "Content-Type": mime, } return StreamingResponse(streamer(), status_code=206, headers=headers, media_type=mime) @app.get("/stream-bot/{message_id}") async def stream_bot(message_id: int): if not BOT_TOKEN: raise HTTPException(status_code=404, detail="Bot token not configured") file_id = await get_or_create_bot_file_id(message_id) if not file_id: raise HTTPException(status_code=502, detail="Bot getFile failed") direct_url = bot_file_url(file_id) if not direct_url: raise HTTPException(status_code=502, detail="Bot getFile failed") return RedirectResponse(direct_url) @app.get("/bot-debug/{message_id}") async def bot_debug(message_id: int): """Return debug info for bot forward/getFile flow.""" if not BOT_TOKEN: return {"ok": False, "error": "BOT_TOKEN not configured"} if not BOT_TARGET: return {"ok": False, "error": "BOT_TARGET not configured"} if not bot_client: return {"ok": False, "error": "bot_client not initialized"} result = { "ok": False, "message_id": message_id, "bot_target": BOT_TARGET, } try: forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id) forwarded_id = forwarded.id result["forwarded_id"] = forwarded_id bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id) vid = extract_vid(bot_msg) if not vid: result["error"] = "No video in forwarded message" return result file_id = getattr(vid, "file_id", None) if not file_id: result["error"] = "file_id missing" return result result["file_id"] = file_id direct_url = bot_file_url(file_id) result["direct_url"] = direct_url result["ok"] = bool(direct_url) if not direct_url: result["error"] = "getFile failed" return result except Exception as exc: result["error"] = str(exc) return result def parse_range(range_header: str | None, total_size: int) -> tuple[int, int]: if not range_header or not range_header.startswith("bytes="): return 0, total_size - 1 value = range_header.replace("bytes=", "") if "-" not in value: return 0, total_size - 1 start_str, end_str = value.split("-", 1) if start_str == "": suffix = int(end_str) start = max(total_size - suffix, 0) end = total_size - 1 return start, end start = int(start_str) end = int(end_str) if end_str else total_size - 1 if start < 0 or end < start: return 0, total_size - 1 return start, min(end, total_size - 1) # ─────────────────────────── SPA fallback (must be last) ───────────────────── if FRONTEND_DIST.exists(): @app.get("/") async def index(): return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"}) @app.get("/{full_path:path}") async def spa_fallback(full_path: str): # Reject API paths that don't exist if full_path.startswith(("movies", "search", "status", "thumb", "stream", "crawl")): raise HTTPException(status_code=404, detail="Not found") file_path = FRONTEND_DIST / full_path if file_path.is_file(): return FileResponse(file_path) return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"})