| """ |
| FootClip AI — FastAPI Backend |
| """ |
| import os |
| import uuid |
| import asyncio |
| import logging |
| from typing import Dict |
|
|
| from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks |
| from fastapi.responses import FileResponse |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| from models import ProcessRequest, JobStatus |
| from pipeline import run_pipeline, get_device, WORKSPACE, OUTPUT_DIR |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") |
| logger = logging.getLogger("footclip") |
|
|
| app = FastAPI( |
| title="FootClip AI", |
| description="Automatic football highlight generator using YOLOv8 + audio analysis", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| jobs: Dict[str, JobStatus] = {} |
|
|
| @app.get("/") |
| def root(): |
| return {"status": "ok", "message": "FootClip AI is running", "docs": "/docs"} |
|
|
|
|
| @app.get("/api/health") |
| async def health(): |
| return {"status": "ok", "device": get_device(), "workspace": WORKSPACE} |
|
|
|
|
| @app.post("/api/upload") |
| async def upload_video(file: UploadFile = File(...)): |
| upload_dir = os.path.join(WORKSPACE, "uploads") |
| os.makedirs(upload_dir, exist_ok=True) |
| filename = f"{uuid.uuid4().hex[:8]}_{file.filename}" |
| path = os.path.join(upload_dir, filename) |
| with open(path, "wb") as f: |
| while chunk := await file.read(1024 * 1024): |
| f.write(chunk) |
| return {"filename": filename, "path": path} |
|
|
|
|
| @app.post("/api/process") |
| async def start_processing(config: ProcessRequest, background_tasks: BackgroundTasks): |
| job_id = uuid.uuid4().hex[:12] |
| job = JobStatus(job_id=job_id, status="queued", stage="queued") |
| jobs[job_id] = job |
| background_tasks.add_task(_run_job, config, job) |
| return {"job_id": job_id, "status": "queued"} |
|
|
| def _run_job(config: ProcessRequest, job: JobStatus): |
| job.status = "processing" |
| try: |
| run_pipeline(config, job, update_cb=lambda j: None) |
| except Exception as e: |
| job.status = "error" |
| job.message = str(e) |
| logger.error(f"Job {job.job_id} failed: {e}") |
|
|
|
|
| @app.get("/api/status/{job_id}") |
| async def get_status(job_id: str): |
| job = jobs.get(job_id) |
| if not job: |
| raise HTTPException(404, "Job not found") |
| return job.model_dump() |
|
|
|
|
| @app.get("/api/download/{job_id}") |
| async def download_result(job_id: str): |
| job = jobs.get(job_id) |
| if not job: |
| raise HTTPException(404, "Job not found") |
| if job.status != "done": |
| raise HTTPException(400, f"Job not done yet (status: {job.status})") |
| output_path = job.result.get("output_path", "") |
| if not os.path.exists(output_path): |
| raise HTTPException(404, "Output file not found") |
| return FileResponse(output_path, media_type="video/mp4", filename=f"footclip_{job_id}.mp4") |
|
|
|
|
| @app.get("/api/jobs") |
| async def list_jobs(): |
| return [j.model_dump() for j in jobs.values()] |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|