| import os |
| import time |
| import asyncio |
| import json |
| import re |
| import logging |
| from datetime import datetime, timedelta |
| from fastapi import FastAPI, HTTPException, Request, Depends, Query |
| from fastapi.responses import Response, FileResponse, JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| import httpx |
| from backend.database import init_db, sync_sources, get_sources, upsert_recording, get_recording, list_recordings, cache_epg |
| from backend.adapters import get_adapter, start_local_stream_record |
| from backend.recorder import start_local_record, check_local_status, stream_file_range |
| from backend.sync import upload_to_dataset |
| from backend.config import load_sources, load_datasets, get_admin_credentials, generate_token, verify_token |
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI() |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) |
| app.mount("/static", StaticFiles(directory="/app/frontend", html=True), name="static") |
|
|
| security = HTTPBearer(auto_error=False) |
| active_local = {} |
| DATASETS = load_datasets() |
| tokens_db = {} |
| ADMIN_USER, ADMIN_PASS = get_admin_credentials() |
|
|
| @app.on_event("startup") |
| async def startup(): |
| await init_db() |
| await sync_sources(load_sources()) |
| logger.info("Application started") |
|
|
| @app.get("/") |
| async def root(): |
| return FileResponse("/app/frontend/index.html") |
|
|
| @app.exception_handler(Exception) |
| async def global_exception_handler(request: Request, exc: Exception): |
| logger.error(f"Unhandled exception: {type(exc).__name__} - {str(exc)}", exc_info=True) |
| return JSONResponse(status_code=500, content={"detail": f"Internal server error: {str(exc)}"}) |
|
|
| @app.exception_handler(HTTPException) |
| async def http_exception_handler(request: Request, exc: HTTPException): |
| logger.warning(f"HTTPException: {exc.status_code} - {exc.detail}") |
| return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) |
|
|
| async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| if not credentials: |
| raise HTTPException(status_code=401, detail="Missing token") |
| if not verify_token(credentials.credentials, tokens_db): |
| raise HTTPException(status_code=401, detail="Invalid or expired token") |
| return {"username": ADMIN_USER} |
|
|
| @app.post("/api/login") |
| async def login(request: Request): |
| data = await request.json() |
| username = data.get("username") |
| password = data.get("password") |
| if username == ADMIN_USER and password == ADMIN_PASS: |
| token = generate_token() |
| tokens_db[token] = {"username": username, "expires": datetime.now() + timedelta(hours=24)} |
| return {"token": token, "expires_in": 86400} |
| raise HTTPException(status_code=401, detail="Invalid credentials") |
|
|
| @app.post("/api/logout") |
| async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| if credentials and credentials.credentials in tokens_db: |
| tokens_db.pop(credentials.credentials, None) |
| return {"status": "ok"} |
|
|
| @app.get("/api/sources") |
| async def get_sources_ep(current_user: dict = Depends(get_current_user)): |
| return await get_sources() |
|
|
| @app.get("/api/test-connection/{source_id}") |
| async def test_connection(source_id: str, current_user: dict = Depends(get_current_user)): |
| src = next((s for s in await get_sources() if s["id"]==source_id), None) |
| if not src: |
| raise HTTPException(404, "Source not found") |
| try: |
| async with httpx.AsyncClient(timeout=10.0, verify=False) as client: |
| res = await client.get(src["url"]) |
| return {"status": "ok", "status_code": res.status_code, "url": src["url"]} |
| except Exception as e: |
| logger.error(f"Connection test failed: {str(e)}") |
| raise HTTPException(500, f"Connection failed: {str(e)}") |
|
|
| @app.get("/api/epg/{source_id}") |
| async def get_epg_ep(source_id: str, current_user: dict = Depends(get_current_user)): |
| src = next((s for s in await get_sources() if s["id"]==source_id), None) |
| if not src: |
| raise HTTPException(404, "Source not found") |
| logger.info(f"Fetching EPG from {src['type']} at {src['url']}") |
| adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id) |
| try: |
| data = await adapter.get_epg() |
| logger.info(f"EPG fetched: {len(data)} programs") |
| await cache_epg(source_id, json.dumps(data), time.time()) |
| return data |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"EPG fetch failed: {str(e)}") |
| raise HTTPException(500, f"Failed to fetch EPG: {str(e)}") |
|
|
| @app.post("/api/record") |
| async def schedule_record(data: dict, current_user: dict = Depends(get_current_user)): |
| source_id = data["source_id"] |
| original_name = data.get("original_name","program") |
| channel_id = data["channel_id"] |
| start_ts = float(data["start"]) |
| end_ts = float(data["end"]) |
| mode = data.get("mode","remote") |
| dataset_repo = data.get("dataset_repo", "") |
| src = next((s for s in await get_sources() if s["id"]==source_id), None) |
| if not src: raise HTTPException(404, "Source not found") |
| |
| if mode == "local": |
| rec = await start_local_record(source_id, dataset_repo, original_name, channel_id, start_ts, end_ts) |
| await upsert_recording(rec) |
| active_local[rec["id"]] = rec |
| asyncio.create_task(monitor_local(rec["id"], dataset_repo)) |
| return {"status": "ok", "id": rec["id"]} |
| |
| adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id) |
| remote_id = await adapter.schedule_record(channel_id, start_ts, end_ts, original_name) |
| rec_id = f"remote_{source_id}_{remote_id}_{int(start_ts)}" |
| rec = { |
| "id": rec_id, "source_id": source_id, "dataset_repo": dataset_repo, |
| "original_name": original_name, "md5_name": f"remote_{remote_id}.ts", |
| "status": "scheduled", "start_time": start_ts, "end_time": end_ts, |
| "file_path": "", "remote_url": "", "remote_id": remote_id |
| } |
| await upsert_recording(rec) |
| asyncio.create_task(monitor_remote(rec_id, src, adapter, rec, dataset_repo)) |
| return {"status": "ok", "id": rec_id} |
|
|
| @app.get("/api/recordings") |
| async def list_recordings_ep(current_user: dict = Depends(get_current_user)): |
| recs = await list_recordings() |
| for r in recs: |
| if r["id"] in active_local: |
| u = check_local_status(active_local[r["id"]]) |
| r["status"] = u["status"] |
| if u["status"] == "completed": |
| r["end_time"] = time.time() |
| await upsert_recording(r) |
| return recs |
|
|
| @app.get("/api/stream/{rec_id}") |
| async def stream_ep(rec_id: str, request: Request, token: str = Query(None), current_user: dict = Depends(get_current_user)): |
| auth_token = token or (request.headers.get("authorization", "").replace("Bearer ", "")) |
| if not auth_token or not verify_token(auth_token, tokens_db): |
| raise HTTPException(401, "Invalid token") |
| rec = await get_recording(rec_id) |
| if not rec or rec["status"] in ("scheduled", "error"): raise HTTPException(404, "Not found") |
| if rec["file_path"] and os.path.exists(rec["file_path"]): |
| chunk, length, start, end, size = await stream_file_range(rec["file_path"], request.headers.get("range")) |
| headers = {"Accept-Ranges": "bytes", "Content-Type": "video/MP2T", "Content-Length": str(length)} |
| if request.headers.get("range"): |
| headers["Content-Range"] = f"bytes {start}-{end}/{size}" |
| return Response(content=chunk, status_code=206, headers=headers) |
| return Response(content=chunk, status_code=200, headers=headers) |
| raise HTTPException(404, "File not available") |
|
|
| @app.delete("/api/recordings/{rec_id}") |
| async def delete_recording_ep(rec_id: str, current_user: dict = Depends(get_current_user)): |
| rec = await get_recording(rec_id) |
| if rec and rec["file_path"] and os.path.exists(rec["file_path"]): |
| try: |
| if rec["id"] in active_local: |
| active_local[rec["id"]]["process"].terminate() |
| del active_local[rec["id"]] |
| os.remove(rec["file_path"]) |
| except: pass |
| return {"status": "ok"} |
|
|
| async def monitor_local(rec_id, dataset_repo): |
| while True: |
| await asyncio.sleep(5) |
| rec = await get_recording(rec_id) |
| if not rec or rec["status"] != "recording": break |
| if rec_id in active_local: |
| u = check_local_status(active_local[rec_id]) |
| if u["status"] == "completed": |
| rec["status"] = "completed" |
| rec["end_time"] = time.time() |
| await upsert_recording(rec) |
| token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", "")) |
| asyncio.create_task(upload_to_dataset(rec["file_path"], rec["md5_name"], rec["original_name"], dataset_repo, token)) |
| break |
|
|
| async def monitor_remote(rec_id, source, adapter, rec_data, dataset_repo): |
| hf_token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", "")) |
| for _ in range(1440): |
| await asyncio.sleep(60) |
| try: |
| st = await adapter.get_record_status(rec_data["remote_id"]) |
| if st.get("status") in ["completed","success","finished"] or st.get("is_completed") or not st.get("isRecording", True): |
| rec_data["status"] = "completed" |
| rec_data["end_time"] = time.time() |
| await upsert_recording(rec_data) |
| result = await adapter.wait_and_download( |
| rec_data["remote_id"], rec_data["start_time"], rec_data["end_time"], |
| rec_data["original_name"], dataset_repo, hf_token |
| ) |
| if result: |
| rec_data["file_path"] = os.path.join("/app/recordings", rec_data["md5_name"]) |
| await upsert_recording(rec_data) |
| break |
| except Exception as e: |
| logger.error(f"Monitor error: {str(e)}") |
| pass |