| import asyncio |
| import json |
| import math |
| import os |
| import sqlite3 |
| import urllib.parse |
| import urllib.request |
| from pathlib import Path |
| from contextlib import asynccontextmanager |
|
|
| from dotenv import load_dotenv |
| from fastapi import FastAPI, HTTPException, Query, Request |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, RedirectResponse, Response, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from pyrogram import Client |
| from pyrogram.errors import FloodWait |
|
|
| load_dotenv() |
|
|
| API_ID = int(os.getenv("API_ID", "0")) |
| API_HASH = os.getenv("API_HASH", "") |
| SESSION_STRING = os.getenv("SESSION_STRING", "") |
| BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip() |
| BOT_API_BASE = os.getenv("BOT_API_BASE", "https://api.telegram.org").strip() |
| BOT_TARGET = os.getenv("BOT_TARGET", "").strip() |
|
|
| env_db_path = os.getenv("DB_PATH", "").strip() |
| if env_db_path: |
| DB_PATH = env_db_path |
| else: |
| DB_PATH = "/data/movies.db" if Path("/data").exists() else "movies.db" |
| CHUNK_SIZE = 1024 * 1024 |
|
|
| |
| data_dir = Path(DB_PATH).parent |
| data_dir.mkdir(parents=True, exist_ok=True) |
| BATCH_SIZE = 100 |
| BATCH_DELAY = 1.5 |
| FRONTEND_DIST = Path(os.getenv("FRONTEND_DIST", "frontend/dist")).resolve() |
| ASSETS_DIR = FRONTEND_DIST / "assets" |
|
|
|
|
| def load_channel_id(value: str) -> str | int: |
| if not value: |
| return "" |
| cleaned = value.strip() |
| if cleaned.lstrip("-").isdigit(): |
| return int(cleaned) |
| return cleaned |
|
|
|
|
| CHANNEL_ID = load_channel_id(os.getenv("CHANNEL_ID", "")) |
|
|
| bot = Client( |
| "tgstream", |
| api_id=API_ID, |
| api_hash=API_HASH, |
| session_string=SESSION_STRING, |
| no_updates=True, |
| ) |
|
|
| bot_client = Client( |
| "tgstream_bot", |
| api_id=API_ID, |
| api_hash=API_HASH, |
| bot_token=BOT_TOKEN, |
| no_updates=True, |
| ) if BOT_TOKEN else None |
|
|
| |
|
|
| def get_db() -> sqlite3.Connection: |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
|
|
| def init_db(): |
| with get_db() as conn: |
| conn.executescript(""" |
| CREATE TABLE IF NOT EXISTS movies ( |
| id INTEGER PRIMARY KEY, |
| title TEXT NOT NULL, |
| duration TEXT, |
| duration_raw INTEGER, |
| size TEXT, |
| size_raw INTEGER, |
| width INTEGER, |
| height INTEGER, |
| mime TEXT, |
| has_thumb INTEGER DEFAULT 0 |
| ); |
| |
| CREATE VIRTUAL TABLE IF NOT EXISTS movies_fts |
| USING fts5(title, content='movies', content_rowid='id'); |
| |
| CREATE TRIGGER IF NOT EXISTS movies_ai |
| AFTER INSERT ON movies BEGIN |
| INSERT INTO movies_fts(rowid, title) VALUES (new.id, new.title); |
| END; |
| |
| CREATE TRIGGER IF NOT EXISTS movies_ad |
| AFTER DELETE ON movies BEGIN |
| INSERT INTO movies_fts(movies_fts, rowid, title) |
| VALUES ('delete', old.id, old.title); |
| END; |
| |
| CREATE TABLE IF NOT EXISTS meta ( |
| key TEXT PRIMARY KEY, |
| value TEXT |
| ); |
| |
| CREATE TABLE IF NOT EXISTS bot_cache ( |
| message_id INTEGER PRIMARY KEY, |
| bot_message_id INTEGER, |
| file_id TEXT |
| ); |
| """) |
|
|
|
|
| def db_get_meta(key: str, default=None): |
| with get_db() as conn: |
| row = conn.execute("SELECT value FROM meta WHERE key=?", (key,)).fetchone() |
| return row["value"] if row else default |
|
|
|
|
| def db_set_meta(key: str, value: str): |
| with get_db() as conn: |
| conn.execute( |
| "INSERT INTO meta(key,value) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", |
| (key, value), |
| ) |
|
|
|
|
| def db_clear_all(): |
| with get_db() as conn: |
| conn.executescript("DELETE FROM movies; DELETE FROM meta;") |
|
|
|
|
| def db_upsert_movies(rows: list[dict]): |
| if not rows: |
| return |
| with get_db() as conn: |
| conn.executemany( |
| """ |
| INSERT OR IGNORE INTO movies |
| (id, title, duration, duration_raw, size, size_raw, width, height, mime, has_thumb) |
| VALUES |
| (:id, :title, :duration, :duration_raw, :size, :size_raw, :width, :height, :mime, :has_thumb) |
| """, |
| rows, |
| ) |
|
|
|
|
| def db_count() -> int: |
| with get_db() as conn: |
| return conn.execute("SELECT COUNT(*) FROM movies").fetchone()[0] |
|
|
|
|
| def db_list_movies(limit: int, offset_id: int) -> list[dict]: |
| with get_db() as conn: |
| if limit <= 0: |
| if offset_id: |
| rows = conn.execute( |
| "SELECT * FROM movies WHERE id < ? ORDER BY id DESC", |
| (offset_id,), |
| ).fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM movies ORDER BY id DESC", |
| ).fetchall() |
| elif offset_id: |
| rows = conn.execute( |
| "SELECT * FROM movies WHERE id < ? ORDER BY id DESC LIMIT ?", |
| (offset_id, limit), |
| ).fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM movies ORDER BY id DESC LIMIT ?", |
| (limit,), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| def db_search_movies(query: str, limit: int = 50) -> list[dict]: |
| with get_db() as conn: |
| rows = conn.execute( |
| """ |
| SELECT m.* FROM movies m |
| JOIN movies_fts f ON m.id = f.rowid |
| WHERE movies_fts MATCH ? |
| ORDER BY rank |
| LIMIT ? |
| """, |
| (query, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
|
|
| def fmt_size(num_bytes: int | None) -> str: |
| if not num_bytes: |
| return "0 B" |
| size = float(num_bytes) |
| for unit in ["B", "KB", "MB", "GB", "TB"]: |
| if size < 1024 or unit == "TB": |
| return f"{size:.1f} {unit}" |
| size /= 1024 |
| return f"{size:.1f} TB" |
|
|
|
|
| def fmt_dur(seconds: int | None) -> str: |
| if seconds is None: |
| return "--:--" |
| seconds = int(seconds) |
| h = seconds // 3600 |
| m = (seconds % 3600) // 60 |
| s = seconds % 60 |
| if h: |
| return f"{h}:{m:02d}:{s:02d}" |
| return f"{m}:{s:02d}" |
|
|
|
|
| def extract_vid(msg): |
| if msg.video: |
| return msg.video |
| if msg.document: |
| mime = getattr(msg.document, "mime_type", None) or "" |
| if mime.startswith("video"): |
| return msg.document |
| file_name = (getattr(msg.document, "file_name", "") or "").lower() |
| if file_name.endswith(( |
| ".mp4", |
| ".mkv", |
| ".mov", |
| ".avi", |
| ".webm", |
| ".m4v", |
| ".ts", |
| ".m2ts", |
| )): |
| return msg.document |
| return None |
|
|
|
|
| def bot_file_url(file_id: str) -> str | None: |
| if not BOT_TOKEN: |
| return None |
| endpoint = f"{BOT_API_BASE}/bot{BOT_TOKEN}/getFile?file_id={urllib.parse.quote(file_id)}" |
| try: |
| with urllib.request.urlopen(endpoint, timeout=10) as resp: |
| data = json.loads(resp.read().decode("utf-8")) |
| if not data.get("ok"): |
| return None |
| file_path = data.get("result", {}).get("file_path") |
| if not file_path: |
| return None |
| return f"{BOT_API_BASE}/file/bot{BOT_TOKEN}/{file_path}" |
| except Exception as exc: |
| print(f"[BOT] getFile error: {exc}") |
| return None |
|
|
|
|
| def bot_cache_get(message_id: int) -> str | None: |
| with get_db() as conn: |
| row = conn.execute( |
| "SELECT file_id FROM bot_cache WHERE message_id=?", |
| (message_id,), |
| ).fetchone() |
| return row["file_id"] if row else None |
|
|
|
|
| def bot_cache_set(message_id: int, bot_message_id: int, file_id: str) -> None: |
| with get_db() as conn: |
| conn.execute( |
| "INSERT OR REPLACE INTO bot_cache(message_id, bot_message_id, file_id) VALUES(?,?,?)", |
| (message_id, bot_message_id, file_id), |
| ) |
|
|
|
|
| async def get_or_create_bot_file_id(message_id: int) -> str | None: |
| if not BOT_TOKEN or not BOT_TARGET or not bot_client: |
| return None |
|
|
| cached = bot_cache_get(message_id) |
| if cached: |
| return cached |
|
|
| try: |
| forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id) |
| forwarded_id = forwarded.id |
| bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id) |
| vid = extract_vid(bot_msg) |
| if not vid: |
| return None |
| file_id = getattr(vid, "file_id", None) |
| if not file_id: |
| return None |
| bot_cache_set(message_id, forwarded_id, file_id) |
| return file_id |
| except Exception as exc: |
| print(f"[BOT] forward error: {exc}") |
| return None |
|
|
|
|
| def msg_to_row(msg) -> dict | None: |
| vid = extract_vid(msg) |
| if not vid: |
| return None |
| title = (msg.caption or f"Movie #{msg.id}").strip() |
| return { |
| "id": msg.id, |
| "title": title, |
| "duration": fmt_dur(getattr(vid, "duration", None)), |
| "duration_raw": getattr(vid, "duration", None), |
| "size": fmt_size(getattr(vid, "file_size", None)), |
| "size_raw": getattr(vid, "file_size", None), |
| "width": getattr(vid, "width", None), |
| "height": getattr(vid, "height", None), |
| "mime": getattr(vid, "mime_type", None), |
| "has_thumb": int(bool(getattr(vid, "thumbs", None))), |
| } |
|
|
|
|
| |
|
|
| _crawl_task: asyncio.Task | None = None |
| _crawl_status = {"running": False, "indexed": 0, "last_id": 0} |
|
|
|
|
| async def crawl_channel(): |
| """ |
| Incrementally crawl the channel oldest-to-newest using offset_id pagination. |
| Saves progress so restarts resume from where they left off. |
| """ |
| global _crawl_status |
| _crawl_status["running"] = True |
|
|
| |
| saved_min_id = int(db_get_meta("crawl_min_id", "0")) |
| offset_id = saved_min_id if saved_min_id else 0 |
|
|
| print(f"[CRAWLER] Starting. Resume offset_id={offset_id}. DB has {db_count()} movies.") |
|
|
| while True: |
| try: |
| batch = [] |
| async for msg in bot.get_chat_history(CHANNEL_ID, limit=BATCH_SIZE, offset_id=offset_id): |
| batch.append(msg) |
|
|
| if not batch: |
| print("[CRAWLER] No more messages. Crawl complete.") |
| break |
|
|
| rows = [r for msg in batch if (r := msg_to_row(msg))] |
| db_upsert_movies(rows) |
|
|
| min_id = min(m.id for m in batch) |
| db_set_meta("crawl_min_id", str(min_id)) |
|
|
| _crawl_status["indexed"] = db_count() |
| _crawl_status["last_id"] = min_id |
|
|
| print(f"[CRAWLER] Batch saved {len(rows)} videos. DB total={db_count()}. Next offset_id={min_id}") |
|
|
| if len(batch) < BATCH_SIZE: |
| print("[CRAWLER] Last batch β history exhausted.") |
| break |
|
|
| offset_id = min_id |
| await asyncio.sleep(BATCH_DELAY) |
|
|
| except FloodWait as exc: |
| wait = exc.value + 2 |
| print(f"[CRAWLER] FloodWait β sleeping {wait}s") |
| await asyncio.sleep(wait) |
|
|
| except asyncio.CancelledError: |
| print("[CRAWLER] Cancelled.") |
| break |
|
|
| except Exception as e: |
| print(f"[CRAWLER] Error: {e}. Retrying in 10s.") |
| await asyncio.sleep(10) |
|
|
| _crawl_status["running"] = False |
| print(f"[CRAWLER] Finished. DB total: {db_count()}") |
|
|
|
|
| |
|
|
| @asynccontextmanager |
| async def lifespan(_: FastAPI): |
| global _crawl_task |
|
|
| init_db() |
| await bot.start() |
| if bot_client: |
| await bot_client.start() |
|
|
| |
| stored_channel = db_get_meta("channel_id", "") |
| current_channel = str(CHANNEL_ID) |
| if stored_channel and stored_channel != current_channel: |
| print("[INFO] Channel changed. Resetting local DB and crawl state.") |
| db_clear_all() |
| db_set_meta("channel_id", current_channel) |
|
|
| print("[INFO] Warming up peer cache...") |
| try: |
| async for _ in bot.get_dialogs(): |
| pass |
| print("[INFO] Peer cache ready.") |
| except Exception as e: |
| print(f"[WARNING] Could not warm peer cache: {e}") |
|
|
| |
| _crawl_task = asyncio.create_task(crawl_channel()) |
|
|
| try: |
| yield |
| finally: |
| if _crawl_task and not _crawl_task.done(): |
| _crawl_task.cancel() |
| try: |
| await _crawl_task |
| except asyncio.CancelledError: |
| pass |
| if bot_client: |
| await bot_client.stop() |
| await bot.stop() |
|
|
|
|
| app = FastAPI(lifespan=lifespan) |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=False, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| if FRONTEND_DIST.exists() and ASSETS_DIR.exists(): |
| app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") |
|
|
|
|
| |
|
|
| @app.get("/assets/{asset_path:path}") |
| async def assets_fallback(asset_path: str): |
| """ |
| Fallback asset handler in case static mount is unavailable. |
| """ |
| file_path = ASSETS_DIR / asset_path |
| if not file_path.is_file(): |
| raise HTTPException(status_code=404, detail="Asset not found") |
| return FileResponse(file_path) |
|
|
| @app.get("/movies") |
| async def movies( |
| limit: int = Query(default=0, ge=0), |
| offset_id: int = Query(default=0), |
| ): |
| """ |
| Paginated list. Use next_offset_id from response as offset_id for next page. |
| """ |
| items = db_list_movies(limit=limit, offset_id=offset_id) |
| next_offset = items[-1]["id"] if limit > 0 and len(items) == limit else None |
| return { |
| "items": items, |
| "count": len(items), |
| "next_offset_id": next_offset, |
| "db_total": db_count(), |
| } |
|
|
|
|
| @app.get("/search") |
| async def search( |
| q: str = Query(default=""), |
| limit: int = Query(default=50, le=200), |
| live: bool = Query(default=False), |
| ): |
| """ |
| Search movies. |
| ?q=term β SQLite FTS (instant, from DB) |
| ?q=term&live=1 β also queries Telegram for uncrawled messages |
| """ |
| q = q.strip() |
| if not q: |
| return {"items": db_list_movies(limit=limit, offset_id=0), "source": "db"} |
|
|
| results = db_search_movies(q, limit=limit) |
|
|
| if live and len(results) < limit: |
| seen_ids = {r["id"] for r in results} |
| try: |
| async for msg in bot.search_messages(CHANNEL_ID, query=q, filter="video", limit=20): |
| if msg.id not in seen_ids: |
| row = msg_to_row(msg) |
| if row: |
| results.append(row) |
| seen_ids.add(msg.id) |
| except Exception as e: |
| print(f"[SEARCH] Live search error: {e}") |
|
|
| return {"items": results, "count": len(results), "source": "fts+live" if live else "fts"} |
|
|
|
|
| @app.get("/status") |
| async def status(): |
| """Crawl progress and DB stats.""" |
| return { |
| "crawl_running": _crawl_status["running"], |
| "db_total": db_count(), |
| "crawl_last_id": _crawl_status["last_id"], |
| "crawl_resume_point": db_get_meta("crawl_min_id", "0"), |
| } |
|
|
|
|
| @app.post("/crawl/restart") |
| async def restart_crawl(): |
| """Wipe DB and restart crawl from scratch.""" |
| global _crawl_task |
| if _crawl_task and not _crawl_task.done(): |
| _crawl_task.cancel() |
| try: |
| await _crawl_task |
| except asyncio.CancelledError: |
| pass |
| db_clear_all() |
| db_set_meta("channel_id", str(CHANNEL_ID)) |
| _crawl_task = asyncio.create_task(crawl_channel()) |
| return {"message": "Crawl restarted from scratch."} |
|
|
|
|
| @app.get("/thumb/{message_id}") |
| async def thumb(message_id: int): |
| msg = await bot.get_messages(CHANNEL_ID, message_id) |
| vid = extract_vid(msg) |
| if not vid or not getattr(vid, "thumbs", None): |
| raise HTTPException(status_code=404, detail="Thumbnail not found") |
| thumb_item = vid.thumbs[0] |
| data = await bot.download_media(thumb_item, in_memory=True) |
| if hasattr(data, "getvalue"): |
| data = data.getvalue() |
| return Response(content=data, media_type="image/jpeg") |
|
|
|
|
| @app.get("/stream/{message_id}") |
| async def stream(message_id: int, request: Request): |
| msg = await bot.get_messages(CHANNEL_ID, message_id) |
| vid = extract_vid(msg) |
| if not vid: |
| raise HTTPException(status_code=404, detail="Video not found") |
|
|
| size = getattr(vid, "file_size", 0) or 0 |
| start, end = parse_range(request.headers.get("range"), size) |
| if start >= size: |
| raise HTTPException(status_code=416, detail="Range not satisfiable") |
|
|
| length = end - start + 1 |
| first_part = start // CHUNK_SIZE |
| skip_bytes = start % CHUNK_SIZE |
| parts_needed = math.ceil((skip_bytes + length) / CHUNK_SIZE) |
|
|
| async def streamer(): |
| nonlocal skip_bytes |
| remaining = length |
| async for chunk in bot.stream_media(msg, offset=first_part, limit=parts_needed): |
| if skip_bytes: |
| if len(chunk) <= skip_bytes: |
| skip_bytes -= len(chunk) |
| continue |
| chunk = chunk[skip_bytes:] |
| skip_bytes = 0 |
| if remaining <= 0: |
| break |
| if len(chunk) > remaining: |
| chunk = chunk[:remaining] |
| remaining -= len(chunk) |
| if chunk: |
| yield chunk |
| if remaining <= 0: |
| break |
|
|
| mime = getattr(vid, "mime_type", None) or "video/mp4" |
| headers = { |
| "Content-Range": f"bytes {start}-{end}/{size}", |
| "Accept-Ranges": "bytes", |
| "Content-Length": str(length), |
| "Content-Type": mime, |
| } |
| return StreamingResponse(streamer(), status_code=206, headers=headers, media_type=mime) |
|
|
|
|
| @app.get("/stream-bot/{message_id}") |
| async def stream_bot(message_id: int): |
| if not BOT_TOKEN: |
| raise HTTPException(status_code=404, detail="Bot token not configured") |
| file_id = await get_or_create_bot_file_id(message_id) |
| if not file_id: |
| raise HTTPException(status_code=502, detail="Bot getFile failed") |
| direct_url = bot_file_url(file_id) |
| if not direct_url: |
| raise HTTPException(status_code=502, detail="Bot getFile failed") |
| return RedirectResponse(direct_url) |
|
|
|
|
| @app.get("/bot-debug/{message_id}") |
| async def bot_debug(message_id: int): |
| """Return debug info for bot forward/getFile flow.""" |
| if not BOT_TOKEN: |
| return {"ok": False, "error": "BOT_TOKEN not configured"} |
| if not BOT_TARGET: |
| return {"ok": False, "error": "BOT_TARGET not configured"} |
| if not bot_client: |
| return {"ok": False, "error": "bot_client not initialized"} |
|
|
| result = { |
| "ok": False, |
| "message_id": message_id, |
| "bot_target": BOT_TARGET, |
| } |
|
|
| try: |
| forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id) |
| forwarded_id = forwarded.id |
| result["forwarded_id"] = forwarded_id |
|
|
| bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id) |
| vid = extract_vid(bot_msg) |
| if not vid: |
| result["error"] = "No video in forwarded message" |
| return result |
| file_id = getattr(vid, "file_id", None) |
| if not file_id: |
| result["error"] = "file_id missing" |
| return result |
|
|
| result["file_id"] = file_id |
| direct_url = bot_file_url(file_id) |
| result["direct_url"] = direct_url |
| result["ok"] = bool(direct_url) |
| if not direct_url: |
| result["error"] = "getFile failed" |
| return result |
| except Exception as exc: |
| result["error"] = str(exc) |
| return result |
|
|
|
|
| def parse_range(range_header: str | None, total_size: int) -> tuple[int, int]: |
| if not range_header or not range_header.startswith("bytes="): |
| return 0, total_size - 1 |
| value = range_header.replace("bytes=", "") |
| if "-" not in value: |
| return 0, total_size - 1 |
| start_str, end_str = value.split("-", 1) |
| if start_str == "": |
| suffix = int(end_str) |
| start = max(total_size - suffix, 0) |
| end = total_size - 1 |
| return start, end |
| start = int(start_str) |
| end = int(end_str) if end_str else total_size - 1 |
| if start < 0 or end < start: |
| return 0, total_size - 1 |
| return start, min(end, total_size - 1) |
|
|
|
|
| |
|
|
| if FRONTEND_DIST.exists(): |
| @app.get("/") |
| async def index(): |
| return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"}) |
|
|
| @app.get("/{full_path:path}") |
| async def spa_fallback(full_path: str): |
| |
| if full_path.startswith(("movies", "search", "status", "thumb", "stream", "crawl")): |
| raise HTTPException(status_code=404, detail="Not found") |
| file_path = FRONTEND_DIST / full_path |
| if file_path.is_file(): |
| return FileResponse(file_path) |
| return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"}) |