import os import time import asyncio import json import re import logging from datetime import datetime, timedelta from fastapi import FastAPI, HTTPException, Request, Depends, Query from fastapi.responses import Response, FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import httpx from backend.database import init_db, sync_sources, get_sources, upsert_recording, get_recording, list_recordings, cache_epg from backend.adapters import get_adapter, start_local_stream_record from backend.recorder import start_local_record, check_local_status, stream_file_range from backend.sync import upload_to_dataset from backend.config import load_sources, load_datasets, get_admin_credentials, generate_token, verify_token logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) app.mount("/static", StaticFiles(directory="/app/frontend", html=True), name="static") security = HTTPBearer(auto_error=False) active_local = {} DATASETS = load_datasets() tokens_db = {} ADMIN_USER, ADMIN_PASS = get_admin_credentials() @app.on_event("startup") async def startup(): await init_db() await sync_sources(load_sources()) logger.info("Application started") @app.get("/") async def root(): return FileResponse("/app/frontend/index.html") @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled exception: {type(exc).__name__} - {str(exc)}", exc_info=True) return JSONResponse(status_code=500, content={"detail": f"Internal server error: {str(exc)}"}) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): logger.warning(f"HTTPException: {exc.status_code} - {exc.detail}") return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): if not credentials: raise HTTPException(status_code=401, detail="Missing token") if not verify_token(credentials.credentials, tokens_db): raise HTTPException(status_code=401, detail="Invalid or expired token") return {"username": ADMIN_USER} @app.post("/api/login") async def login(request: Request): data = await request.json() username = data.get("username") password = data.get("password") if username == ADMIN_USER and password == ADMIN_PASS: token = generate_token() tokens_db[token] = {"username": username, "expires": datetime.now() + timedelta(hours=24)} return {"token": token, "expires_in": 86400} raise HTTPException(status_code=401, detail="Invalid credentials") @app.post("/api/logout") async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)): if credentials and credentials.credentials in tokens_db: tokens_db.pop(credentials.credentials, None) return {"status": "ok"} @app.get("/api/sources") async def get_sources_ep(current_user: dict = Depends(get_current_user)): return await get_sources() @app.get("/api/test-connection/{source_id}") async def test_connection(source_id: str, current_user: dict = Depends(get_current_user)): src = next((s for s in await get_sources() if s["id"]==source_id), None) if not src: raise HTTPException(404, "Source not found") try: async with httpx.AsyncClient(timeout=10.0, verify=False) as client: res = await client.get(src["url"]) return {"status": "ok", "status_code": res.status_code, "url": src["url"]} except Exception as e: logger.error(f"Connection test failed: {str(e)}") raise HTTPException(500, f"Connection failed: {str(e)}") @app.get("/api/epg/{source_id}") async def get_epg_ep(source_id: str, current_user: dict = Depends(get_current_user)): src = next((s for s in await get_sources() if s["id"]==source_id), None) if not src: raise HTTPException(404, "Source not found") logger.info(f"Fetching EPG from {src['type']} at {src['url']}") adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id) try: data = await adapter.get_epg() logger.info(f"EPG fetched: {len(data)} programs") await cache_epg(source_id, json.dumps(data), time.time()) return data except HTTPException: raise except Exception as e: logger.error(f"EPG fetch failed: {str(e)}") raise HTTPException(500, f"Failed to fetch EPG: {str(e)}") @app.post("/api/record") async def schedule_record(data: dict, current_user: dict = Depends(get_current_user)): source_id = data["source_id"] original_name = data.get("original_name","program") channel_id = data["channel_id"] start_ts = float(data["start"]) end_ts = float(data["end"]) mode = data.get("mode","remote") dataset_repo = data.get("dataset_repo", "") src = next((s for s in await get_sources() if s["id"]==source_id), None) if not src: raise HTTPException(404, "Source not found") if mode == "local": rec = await start_local_record(source_id, dataset_repo, original_name, channel_id, start_ts, end_ts) await upsert_recording(rec) active_local[rec["id"]] = rec asyncio.create_task(monitor_local(rec["id"], dataset_repo)) return {"status": "ok", "id": rec["id"]} adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id) remote_id = await adapter.schedule_record(channel_id, start_ts, end_ts, original_name) rec_id = f"remote_{source_id}_{remote_id}_{int(start_ts)}" rec = { "id": rec_id, "source_id": source_id, "dataset_repo": dataset_repo, "original_name": original_name, "md5_name": f"remote_{remote_id}.ts", "status": "scheduled", "start_time": start_ts, "end_time": end_ts, "file_path": "", "remote_url": "", "remote_id": remote_id } await upsert_recording(rec) asyncio.create_task(monitor_remote(rec_id, src, adapter, rec, dataset_repo)) return {"status": "ok", "id": rec_id} @app.get("/api/recordings") async def list_recordings_ep(current_user: dict = Depends(get_current_user)): recs = await list_recordings() for r in recs: if r["id"] in active_local: u = check_local_status(active_local[r["id"]]) r["status"] = u["status"] if u["status"] == "completed": r["end_time"] = time.time() await upsert_recording(r) return recs @app.get("/api/stream/{rec_id}") async def stream_ep(rec_id: str, request: Request, token: str = Query(None), current_user: dict = Depends(get_current_user)): auth_token = token or (request.headers.get("authorization", "").replace("Bearer ", "")) if not auth_token or not verify_token(auth_token, tokens_db): raise HTTPException(401, "Invalid token") rec = await get_recording(rec_id) if not rec or rec["status"] in ("scheduled", "error"): raise HTTPException(404, "Not found") if rec["file_path"] and os.path.exists(rec["file_path"]): chunk, length, start, end, size = await stream_file_range(rec["file_path"], request.headers.get("range")) headers = {"Accept-Ranges": "bytes", "Content-Type": "video/MP2T", "Content-Length": str(length)} if request.headers.get("range"): headers["Content-Range"] = f"bytes {start}-{end}/{size}" return Response(content=chunk, status_code=206, headers=headers) return Response(content=chunk, status_code=200, headers=headers) raise HTTPException(404, "File not available") @app.delete("/api/recordings/{rec_id}") async def delete_recording_ep(rec_id: str, current_user: dict = Depends(get_current_user)): rec = await get_recording(rec_id) if rec and rec["file_path"] and os.path.exists(rec["file_path"]): try: if rec["id"] in active_local: active_local[rec["id"]]["process"].terminate() del active_local[rec["id"]] os.remove(rec["file_path"]) except: pass return {"status": "ok"} async def monitor_local(rec_id, dataset_repo): while True: await asyncio.sleep(5) rec = await get_recording(rec_id) if not rec or rec["status"] != "recording": break if rec_id in active_local: u = check_local_status(active_local[rec_id]) if u["status"] == "completed": rec["status"] = "completed" rec["end_time"] = time.time() await upsert_recording(rec) token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", "")) asyncio.create_task(upload_to_dataset(rec["file_path"], rec["md5_name"], rec["original_name"], dataset_repo, token)) break async def monitor_remote(rec_id, source, adapter, rec_data, dataset_repo): hf_token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", "")) for _ in range(1440): await asyncio.sleep(60) try: st = await adapter.get_record_status(rec_data["remote_id"]) if st.get("status") in ["completed","success","finished"] or st.get("is_completed") or not st.get("isRecording", True): rec_data["status"] = "completed" rec_data["end_time"] = time.time() await upsert_recording(rec_data) result = await adapter.wait_and_download( rec_data["remote_id"], rec_data["start_time"], rec_data["end_time"], rec_data["original_name"], dataset_repo, hf_token ) if result: rec_data["file_path"] = os.path.join("/app/recordings", rec_data["md5_name"]) await upsert_recording(rec_data) break except Exception as e: logger.error(f"Monitor error: {str(e)}") pass