eeee / backend /main.py
harii88's picture
Update backend/main.py
954216d verified
import os
import time
import asyncio
import json
import re
import logging
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException, Request, Depends, Query
from fastapi.responses import Response, FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import httpx
from backend.database import init_db, sync_sources, get_sources, upsert_recording, get_recording, list_recordings, cache_epg
from backend.adapters import get_adapter, start_local_stream_record
from backend.recorder import start_local_record, check_local_status, stream_file_range
from backend.sync import upload_to_dataset
from backend.config import load_sources, load_datasets, get_admin_credentials, generate_token, verify_token
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.mount("/static", StaticFiles(directory="/app/frontend", html=True), name="static")
security = HTTPBearer(auto_error=False)
active_local = {}
DATASETS = load_datasets()
tokens_db = {}
ADMIN_USER, ADMIN_PASS = get_admin_credentials()
@app.on_event("startup")
async def startup():
await init_db()
await sync_sources(load_sources())
logger.info("Application started")
@app.get("/")
async def root():
return FileResponse("/app/frontend/index.html")
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {type(exc).__name__} - {str(exc)}", exc_info=True)
return JSONResponse(status_code=500, content={"detail": f"Internal server error: {str(exc)}"})
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
logger.warning(f"HTTPException: {exc.status_code} - {exc.detail}")
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not credentials:
raise HTTPException(status_code=401, detail="Missing token")
if not verify_token(credentials.credentials, tokens_db):
raise HTTPException(status_code=401, detail="Invalid or expired token")
return {"username": ADMIN_USER}
@app.post("/api/login")
async def login(request: Request):
data = await request.json()
username = data.get("username")
password = data.get("password")
if username == ADMIN_USER and password == ADMIN_PASS:
token = generate_token()
tokens_db[token] = {"username": username, "expires": datetime.now() + timedelta(hours=24)}
return {"token": token, "expires_in": 86400}
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.post("/api/logout")
async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)):
if credentials and credentials.credentials in tokens_db:
tokens_db.pop(credentials.credentials, None)
return {"status": "ok"}
@app.get("/api/sources")
async def get_sources_ep(current_user: dict = Depends(get_current_user)):
return await get_sources()
@app.get("/api/test-connection/{source_id}")
async def test_connection(source_id: str, current_user: dict = Depends(get_current_user)):
src = next((s for s in await get_sources() if s["id"]==source_id), None)
if not src:
raise HTTPException(404, "Source not found")
try:
async with httpx.AsyncClient(timeout=10.0, verify=False) as client:
res = await client.get(src["url"])
return {"status": "ok", "status_code": res.status_code, "url": src["url"]}
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
raise HTTPException(500, f"Connection failed: {str(e)}")
@app.get("/api/epg/{source_id}")
async def get_epg_ep(source_id: str, current_user: dict = Depends(get_current_user)):
src = next((s for s in await get_sources() if s["id"]==source_id), None)
if not src:
raise HTTPException(404, "Source not found")
logger.info(f"Fetching EPG from {src['type']} at {src['url']}")
adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id)
try:
data = await adapter.get_epg()
logger.info(f"EPG fetched: {len(data)} programs")
await cache_epg(source_id, json.dumps(data), time.time())
return data
except HTTPException:
raise
except Exception as e:
logger.error(f"EPG fetch failed: {str(e)}")
raise HTTPException(500, f"Failed to fetch EPG: {str(e)}")
@app.post("/api/record")
async def schedule_record(data: dict, current_user: dict = Depends(get_current_user)):
source_id = data["source_id"]
original_name = data.get("original_name","program")
channel_id = data["channel_id"]
start_ts = float(data["start"])
end_ts = float(data["end"])
mode = data.get("mode","remote")
dataset_repo = data.get("dataset_repo", "")
src = next((s for s in await get_sources() if s["id"]==source_id), None)
if not src: raise HTTPException(404, "Source not found")
if mode == "local":
rec = await start_local_record(source_id, dataset_repo, original_name, channel_id, start_ts, end_ts)
await upsert_recording(rec)
active_local[rec["id"]] = rec
asyncio.create_task(monitor_local(rec["id"], dataset_repo))
return {"status": "ok", "id": rec["id"]}
adapter = get_adapter(src["type"], src["url"], src.get("api_key",""), source_id)
remote_id = await adapter.schedule_record(channel_id, start_ts, end_ts, original_name)
rec_id = f"remote_{source_id}_{remote_id}_{int(start_ts)}"
rec = {
"id": rec_id, "source_id": source_id, "dataset_repo": dataset_repo,
"original_name": original_name, "md5_name": f"remote_{remote_id}.ts",
"status": "scheduled", "start_time": start_ts, "end_time": end_ts,
"file_path": "", "remote_url": "", "remote_id": remote_id
}
await upsert_recording(rec)
asyncio.create_task(monitor_remote(rec_id, src, adapter, rec, dataset_repo))
return {"status": "ok", "id": rec_id}
@app.get("/api/recordings")
async def list_recordings_ep(current_user: dict = Depends(get_current_user)):
recs = await list_recordings()
for r in recs:
if r["id"] in active_local:
u = check_local_status(active_local[r["id"]])
r["status"] = u["status"]
if u["status"] == "completed":
r["end_time"] = time.time()
await upsert_recording(r)
return recs
@app.get("/api/stream/{rec_id}")
async def stream_ep(rec_id: str, request: Request, token: str = Query(None), current_user: dict = Depends(get_current_user)):
auth_token = token or (request.headers.get("authorization", "").replace("Bearer ", ""))
if not auth_token or not verify_token(auth_token, tokens_db):
raise HTTPException(401, "Invalid token")
rec = await get_recording(rec_id)
if not rec or rec["status"] in ("scheduled", "error"): raise HTTPException(404, "Not found")
if rec["file_path"] and os.path.exists(rec["file_path"]):
chunk, length, start, end, size = await stream_file_range(rec["file_path"], request.headers.get("range"))
headers = {"Accept-Ranges": "bytes", "Content-Type": "video/MP2T", "Content-Length": str(length)}
if request.headers.get("range"):
headers["Content-Range"] = f"bytes {start}-{end}/{size}"
return Response(content=chunk, status_code=206, headers=headers)
return Response(content=chunk, status_code=200, headers=headers)
raise HTTPException(404, "File not available")
@app.delete("/api/recordings/{rec_id}")
async def delete_recording_ep(rec_id: str, current_user: dict = Depends(get_current_user)):
rec = await get_recording(rec_id)
if rec and rec["file_path"] and os.path.exists(rec["file_path"]):
try:
if rec["id"] in active_local:
active_local[rec["id"]]["process"].terminate()
del active_local[rec["id"]]
os.remove(rec["file_path"])
except: pass
return {"status": "ok"}
async def monitor_local(rec_id, dataset_repo):
while True:
await asyncio.sleep(5)
rec = await get_recording(rec_id)
if not rec or rec["status"] != "recording": break
if rec_id in active_local:
u = check_local_status(active_local[rec_id])
if u["status"] == "completed":
rec["status"] = "completed"
rec["end_time"] = time.time()
await upsert_recording(rec)
token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", ""))
asyncio.create_task(upload_to_dataset(rec["file_path"], rec["md5_name"], rec["original_name"], dataset_repo, token))
break
async def monitor_remote(rec_id, source, adapter, rec_data, dataset_repo):
hf_token = DATASETS.get(dataset_repo or os.getenv("DEFAULT_DATASET", ""))
for _ in range(1440):
await asyncio.sleep(60)
try:
st = await adapter.get_record_status(rec_data["remote_id"])
if st.get("status") in ["completed","success","finished"] or st.get("is_completed") or not st.get("isRecording", True):
rec_data["status"] = "completed"
rec_data["end_time"] = time.time()
await upsert_recording(rec_data)
result = await adapter.wait_and_download(
rec_data["remote_id"], rec_data["start_time"], rec_data["end_time"],
rec_data["original_name"], dataset_repo, hf_token
)
if result:
rec_data["file_path"] = os.path.join("/app/recordings", rec_data["md5_name"])
await upsert_recording(rec_data)
break
except Exception as e:
logger.error(f"Monitor error: {str(e)}")
pass