streamtg / backend /main.py
dragonxd1's picture
Detect video docs by file extension
14ed35b
import asyncio
import json
import math
import os
import sqlite3
import urllib.parse
import urllib.request
from pathlib import Path
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, RedirectResponse, Response, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pyrogram import Client
from pyrogram.errors import FloodWait
load_dotenv()
API_ID = int(os.getenv("API_ID", "0"))
API_HASH = os.getenv("API_HASH", "")
SESSION_STRING = os.getenv("SESSION_STRING", "")
BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip()
BOT_API_BASE = os.getenv("BOT_API_BASE", "https://api.telegram.org").strip()
BOT_TARGET = os.getenv("BOT_TARGET", "").strip()
env_db_path = os.getenv("DB_PATH", "").strip()
if env_db_path:
DB_PATH = env_db_path
else:
DB_PATH = "/data/movies.db" if Path("/data").exists() else "movies.db"
CHUNK_SIZE = 1024 * 1024
# Ensure DB directory exists
data_dir = Path(DB_PATH).parent
data_dir.mkdir(parents=True, exist_ok=True)
BATCH_SIZE = 100 # messages per GetHistory call
BATCH_DELAY = 1.5 # seconds between batches (avoids FloodWait)
FRONTEND_DIST = Path(os.getenv("FRONTEND_DIST", "frontend/dist")).resolve()
ASSETS_DIR = FRONTEND_DIST / "assets"
def load_channel_id(value: str) -> str | int:
if not value:
return ""
cleaned = value.strip()
if cleaned.lstrip("-").isdigit():
return int(cleaned)
return cleaned
CHANNEL_ID = load_channel_id(os.getenv("CHANNEL_ID", ""))
bot = Client(
"tgstream",
api_id=API_ID,
api_hash=API_HASH,
session_string=SESSION_STRING,
no_updates=True,
)
bot_client = Client(
"tgstream_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
no_updates=True,
) if BOT_TOKEN else None
# ─────────────────────────── SQLite helpers ───────────────────────────────────
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_db() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS movies (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
duration TEXT,
duration_raw INTEGER,
size TEXT,
size_raw INTEGER,
width INTEGER,
height INTEGER,
mime TEXT,
has_thumb INTEGER DEFAULT 0
);
CREATE VIRTUAL TABLE IF NOT EXISTS movies_fts
USING fts5(title, content='movies', content_rowid='id');
CREATE TRIGGER IF NOT EXISTS movies_ai
AFTER INSERT ON movies BEGIN
INSERT INTO movies_fts(rowid, title) VALUES (new.id, new.title);
END;
CREATE TRIGGER IF NOT EXISTS movies_ad
AFTER DELETE ON movies BEGIN
INSERT INTO movies_fts(movies_fts, rowid, title)
VALUES ('delete', old.id, old.title);
END;
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS bot_cache (
message_id INTEGER PRIMARY KEY,
bot_message_id INTEGER,
file_id TEXT
);
""")
def db_get_meta(key: str, default=None):
with get_db() as conn:
row = conn.execute("SELECT value FROM meta WHERE key=?", (key,)).fetchone()
return row["value"] if row else default
def db_set_meta(key: str, value: str):
with get_db() as conn:
conn.execute(
"INSERT INTO meta(key,value) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value",
(key, value),
)
def db_clear_all():
with get_db() as conn:
conn.executescript("DELETE FROM movies; DELETE FROM meta;")
def db_upsert_movies(rows: list[dict]):
if not rows:
return
with get_db() as conn:
conn.executemany(
"""
INSERT OR IGNORE INTO movies
(id, title, duration, duration_raw, size, size_raw, width, height, mime, has_thumb)
VALUES
(:id, :title, :duration, :duration_raw, :size, :size_raw, :width, :height, :mime, :has_thumb)
""",
rows,
)
def db_count() -> int:
with get_db() as conn:
return conn.execute("SELECT COUNT(*) FROM movies").fetchone()[0]
def db_list_movies(limit: int, offset_id: int) -> list[dict]:
with get_db() as conn:
if limit <= 0:
if offset_id:
rows = conn.execute(
"SELECT * FROM movies WHERE id < ? ORDER BY id DESC",
(offset_id,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM movies ORDER BY id DESC",
).fetchall()
elif offset_id:
rows = conn.execute(
"SELECT * FROM movies WHERE id < ? ORDER BY id DESC LIMIT ?",
(offset_id, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM movies ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def db_search_movies(query: str, limit: int = 50) -> list[dict]:
with get_db() as conn:
rows = conn.execute(
"""
SELECT m.* FROM movies m
JOIN movies_fts f ON m.id = f.rowid
WHERE movies_fts MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
return [dict(r) for r in rows]
# ─────────────────────────── formatting utils ─────────────────────────────────
def fmt_size(num_bytes: int | None) -> str:
if not num_bytes:
return "0 B"
size = float(num_bytes)
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024 or unit == "TB":
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def fmt_dur(seconds: int | None) -> str:
if seconds is None:
return "--:--"
seconds = int(seconds)
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def extract_vid(msg):
if msg.video:
return msg.video
if msg.document:
mime = getattr(msg.document, "mime_type", None) or ""
if mime.startswith("video"):
return msg.document
file_name = (getattr(msg.document, "file_name", "") or "").lower()
if file_name.endswith((
".mp4",
".mkv",
".mov",
".avi",
".webm",
".m4v",
".ts",
".m2ts",
)):
return msg.document
return None
def bot_file_url(file_id: str) -> str | None:
if not BOT_TOKEN:
return None
endpoint = f"{BOT_API_BASE}/bot{BOT_TOKEN}/getFile?file_id={urllib.parse.quote(file_id)}"
try:
with urllib.request.urlopen(endpoint, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
if not data.get("ok"):
return None
file_path = data.get("result", {}).get("file_path")
if not file_path:
return None
return f"{BOT_API_BASE}/file/bot{BOT_TOKEN}/{file_path}"
except Exception as exc:
print(f"[BOT] getFile error: {exc}")
return None
def bot_cache_get(message_id: int) -> str | None:
with get_db() as conn:
row = conn.execute(
"SELECT file_id FROM bot_cache WHERE message_id=?",
(message_id,),
).fetchone()
return row["file_id"] if row else None
def bot_cache_set(message_id: int, bot_message_id: int, file_id: str) -> None:
with get_db() as conn:
conn.execute(
"INSERT OR REPLACE INTO bot_cache(message_id, bot_message_id, file_id) VALUES(?,?,?)",
(message_id, bot_message_id, file_id),
)
async def get_or_create_bot_file_id(message_id: int) -> str | None:
if not BOT_TOKEN or not BOT_TARGET or not bot_client:
return None
cached = bot_cache_get(message_id)
if cached:
return cached
try:
forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id)
forwarded_id = forwarded.id
bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id)
vid = extract_vid(bot_msg)
if not vid:
return None
file_id = getattr(vid, "file_id", None)
if not file_id:
return None
bot_cache_set(message_id, forwarded_id, file_id)
return file_id
except Exception as exc:
print(f"[BOT] forward error: {exc}")
return None
def msg_to_row(msg) -> dict | None:
vid = extract_vid(msg)
if not vid:
return None
title = (msg.caption or f"Movie #{msg.id}").strip()
return {
"id": msg.id,
"title": title,
"duration": fmt_dur(getattr(vid, "duration", None)),
"duration_raw": getattr(vid, "duration", None),
"size": fmt_size(getattr(vid, "file_size", None)),
"size_raw": getattr(vid, "file_size", None),
"width": getattr(vid, "width", None),
"height": getattr(vid, "height", None),
"mime": getattr(vid, "mime_type", None),
"has_thumb": int(bool(getattr(vid, "thumbs", None))),
}
# ─────────────────────────── background crawler ───────────────────────────────
_crawl_task: asyncio.Task | None = None
_crawl_status = {"running": False, "indexed": 0, "last_id": 0}
async def crawl_channel():
"""
Incrementally crawl the channel oldest-to-newest using offset_id pagination.
Saves progress so restarts resume from where they left off.
"""
global _crawl_status
_crawl_status["running"] = True
# Resume from last known min message id
saved_min_id = int(db_get_meta("crawl_min_id", "0"))
offset_id = saved_min_id if saved_min_id else 0
print(f"[CRAWLER] Starting. Resume offset_id={offset_id}. DB has {db_count()} movies.")
while True:
try:
batch = []
async for msg in bot.get_chat_history(CHANNEL_ID, limit=BATCH_SIZE, offset_id=offset_id):
batch.append(msg)
if not batch:
print("[CRAWLER] No more messages. Crawl complete.")
break
rows = [r for msg in batch if (r := msg_to_row(msg))]
db_upsert_movies(rows)
min_id = min(m.id for m in batch)
db_set_meta("crawl_min_id", str(min_id))
_crawl_status["indexed"] = db_count()
_crawl_status["last_id"] = min_id
print(f"[CRAWLER] Batch saved {len(rows)} videos. DB total={db_count()}. Next offset_id={min_id}")
if len(batch) < BATCH_SIZE:
print("[CRAWLER] Last batch β€” history exhausted.")
break
offset_id = min_id
await asyncio.sleep(BATCH_DELAY)
except FloodWait as exc:
wait = exc.value + 2
print(f"[CRAWLER] FloodWait β€” sleeping {wait}s")
await asyncio.sleep(wait)
except asyncio.CancelledError:
print("[CRAWLER] Cancelled.")
break
except Exception as e:
print(f"[CRAWLER] Error: {e}. Retrying in 10s.")
await asyncio.sleep(10)
_crawl_status["running"] = False
print(f"[CRAWLER] Finished. DB total: {db_count()}")
# ─────────────────────────── app lifespan ─────────────────────────────────────
@asynccontextmanager
async def lifespan(_: FastAPI):
global _crawl_task
init_db()
await bot.start()
if bot_client:
await bot_client.start()
# Reset DB if CHANNEL_ID changed to avoid mixing data.
stored_channel = db_get_meta("channel_id", "")
current_channel = str(CHANNEL_ID)
if stored_channel and stored_channel != current_channel:
print("[INFO] Channel changed. Resetting local DB and crawl state.")
db_clear_all()
db_set_meta("channel_id", current_channel)
print("[INFO] Warming up peer cache...")
try:
async for _ in bot.get_dialogs():
pass
print("[INFO] Peer cache ready.")
except Exception as e:
print(f"[WARNING] Could not warm peer cache: {e}")
# Background crawl β€” non-blocking, app is usable immediately
_crawl_task = asyncio.create_task(crawl_channel())
try:
yield
finally:
if _crawl_task and not _crawl_task.done():
_crawl_task.cancel()
try:
await _crawl_task
except asyncio.CancelledError:
pass
if bot_client:
await bot_client.stop()
await bot.stop()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount assets early for direct file serving
if FRONTEND_DIST.exists() and ASSETS_DIR.exists():
app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets")
# ─────────────────────────── routes ───────────────────────────────────────────
@app.get("/assets/{asset_path:path}")
async def assets_fallback(asset_path: str):
"""
Fallback asset handler in case static mount is unavailable.
"""
file_path = ASSETS_DIR / asset_path
if not file_path.is_file():
raise HTTPException(status_code=404, detail="Asset not found")
return FileResponse(file_path)
@app.get("/movies")
async def movies(
limit: int = Query(default=0, ge=0),
offset_id: int = Query(default=0),
):
"""
Paginated list. Use next_offset_id from response as offset_id for next page.
"""
items = db_list_movies(limit=limit, offset_id=offset_id)
next_offset = items[-1]["id"] if limit > 0 and len(items) == limit else None
return {
"items": items,
"count": len(items),
"next_offset_id": next_offset,
"db_total": db_count(),
}
@app.get("/search")
async def search(
q: str = Query(default=""),
limit: int = Query(default=50, le=200),
live: bool = Query(default=False),
):
"""
Search movies.
?q=term β†’ SQLite FTS (instant, from DB)
?q=term&live=1 β†’ also queries Telegram for uncrawled messages
"""
q = q.strip()
if not q:
return {"items": db_list_movies(limit=limit, offset_id=0), "source": "db"}
results = db_search_movies(q, limit=limit)
if live and len(results) < limit:
seen_ids = {r["id"] for r in results}
try:
async for msg in bot.search_messages(CHANNEL_ID, query=q, filter="video", limit=20):
if msg.id not in seen_ids:
row = msg_to_row(msg)
if row:
results.append(row)
seen_ids.add(msg.id)
except Exception as e:
print(f"[SEARCH] Live search error: {e}")
return {"items": results, "count": len(results), "source": "fts+live" if live else "fts"}
@app.get("/status")
async def status():
"""Crawl progress and DB stats."""
return {
"crawl_running": _crawl_status["running"],
"db_total": db_count(),
"crawl_last_id": _crawl_status["last_id"],
"crawl_resume_point": db_get_meta("crawl_min_id", "0"),
}
@app.post("/crawl/restart")
async def restart_crawl():
"""Wipe DB and restart crawl from scratch."""
global _crawl_task
if _crawl_task and not _crawl_task.done():
_crawl_task.cancel()
try:
await _crawl_task
except asyncio.CancelledError:
pass
db_clear_all()
db_set_meta("channel_id", str(CHANNEL_ID))
_crawl_task = asyncio.create_task(crawl_channel())
return {"message": "Crawl restarted from scratch."}
@app.get("/thumb/{message_id}")
async def thumb(message_id: int):
msg = await bot.get_messages(CHANNEL_ID, message_id)
vid = extract_vid(msg)
if not vid or not getattr(vid, "thumbs", None):
raise HTTPException(status_code=404, detail="Thumbnail not found")
thumb_item = vid.thumbs[0]
data = await bot.download_media(thumb_item, in_memory=True)
if hasattr(data, "getvalue"):
data = data.getvalue()
return Response(content=data, media_type="image/jpeg")
@app.get("/stream/{message_id}")
async def stream(message_id: int, request: Request):
msg = await bot.get_messages(CHANNEL_ID, message_id)
vid = extract_vid(msg)
if not vid:
raise HTTPException(status_code=404, detail="Video not found")
size = getattr(vid, "file_size", 0) or 0
start, end = parse_range(request.headers.get("range"), size)
if start >= size:
raise HTTPException(status_code=416, detail="Range not satisfiable")
length = end - start + 1
first_part = start // CHUNK_SIZE
skip_bytes = start % CHUNK_SIZE
parts_needed = math.ceil((skip_bytes + length) / CHUNK_SIZE)
async def streamer():
nonlocal skip_bytes
remaining = length
async for chunk in bot.stream_media(msg, offset=first_part, limit=parts_needed):
if skip_bytes:
if len(chunk) <= skip_bytes:
skip_bytes -= len(chunk)
continue
chunk = chunk[skip_bytes:]
skip_bytes = 0
if remaining <= 0:
break
if len(chunk) > remaining:
chunk = chunk[:remaining]
remaining -= len(chunk)
if chunk:
yield chunk
if remaining <= 0:
break
mime = getattr(vid, "mime_type", None) or "video/mp4"
headers = {
"Content-Range": f"bytes {start}-{end}/{size}",
"Accept-Ranges": "bytes",
"Content-Length": str(length),
"Content-Type": mime,
}
return StreamingResponse(streamer(), status_code=206, headers=headers, media_type=mime)
@app.get("/stream-bot/{message_id}")
async def stream_bot(message_id: int):
if not BOT_TOKEN:
raise HTTPException(status_code=404, detail="Bot token not configured")
file_id = await get_or_create_bot_file_id(message_id)
if not file_id:
raise HTTPException(status_code=502, detail="Bot getFile failed")
direct_url = bot_file_url(file_id)
if not direct_url:
raise HTTPException(status_code=502, detail="Bot getFile failed")
return RedirectResponse(direct_url)
@app.get("/bot-debug/{message_id}")
async def bot_debug(message_id: int):
"""Return debug info for bot forward/getFile flow."""
if not BOT_TOKEN:
return {"ok": False, "error": "BOT_TOKEN not configured"}
if not BOT_TARGET:
return {"ok": False, "error": "BOT_TARGET not configured"}
if not bot_client:
return {"ok": False, "error": "bot_client not initialized"}
result = {
"ok": False,
"message_id": message_id,
"bot_target": BOT_TARGET,
}
try:
forwarded = await bot.forward_messages(BOT_TARGET, CHANNEL_ID, message_id)
forwarded_id = forwarded.id
result["forwarded_id"] = forwarded_id
bot_msg = await bot_client.get_messages(BOT_TARGET, forwarded_id)
vid = extract_vid(bot_msg)
if not vid:
result["error"] = "No video in forwarded message"
return result
file_id = getattr(vid, "file_id", None)
if not file_id:
result["error"] = "file_id missing"
return result
result["file_id"] = file_id
direct_url = bot_file_url(file_id)
result["direct_url"] = direct_url
result["ok"] = bool(direct_url)
if not direct_url:
result["error"] = "getFile failed"
return result
except Exception as exc:
result["error"] = str(exc)
return result
def parse_range(range_header: str | None, total_size: int) -> tuple[int, int]:
if not range_header or not range_header.startswith("bytes="):
return 0, total_size - 1
value = range_header.replace("bytes=", "")
if "-" not in value:
return 0, total_size - 1
start_str, end_str = value.split("-", 1)
if start_str == "":
suffix = int(end_str)
start = max(total_size - suffix, 0)
end = total_size - 1
return start, end
start = int(start_str)
end = int(end_str) if end_str else total_size - 1
if start < 0 or end < start:
return 0, total_size - 1
return start, min(end, total_size - 1)
# ─────────────────────────── SPA fallback (must be last) ─────────────────────
if FRONTEND_DIST.exists():
@app.get("/")
async def index():
return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"})
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str):
# Reject API paths that don't exist
if full_path.startswith(("movies", "search", "status", "thumb", "stream", "crawl")):
raise HTTPException(status_code=404, detail="Not found")
file_path = FRONTEND_DIST / full_path
if file_path.is_file():
return FileResponse(file_path)
return FileResponse(FRONTEND_DIST / "index.html", headers={"Cache-Control": "no-store"})