| """ |
| Personal Analysis API endpoints. |
| |
| Handles video upload + triggering the swiss basketball shot analysis pipeline |
| for individual (personal account) players. |
| |
| Does NOT touch or interfere with the team analysis pipeline. |
| """ |
| import os |
| import uuid |
| import logging |
| from typing import Optional |
| from datetime import datetime |
| from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, BackgroundTasks |
| from fastapi.responses import JSONResponse |
|
|
| from app.dependencies import require_personal_account, get_supabase |
| from app.services.supabase_client import SupabaseService |
| from app.models.video import VideoStatus, AnalysisMode |
|
|
| logger = logging.getLogger("personal_analysis_api") |
|
|
| router = APIRouter() |
|
|
| |
| _BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| PERSONAL_OUTPUT_DIR = os.path.join(_BASE_DIR, "uploads", "personal_output") |
| os.makedirs(PERSONAL_OUTPUT_DIR, exist_ok=True) |
|
|
| |
| _job_cache: dict = {} |
|
|
|
|
| async def _save_job_to_db(supabase: SupabaseService, job: dict): |
| """Persist the job record to Supabase. Best-effort only.""" |
| try: |
| existing = await supabase.select("personal_analyses", filters={"job_id": job["job_id"]}) |
| if existing: |
| await supabase.update("personal_analyses", existing[0]["id"], job) |
| else: |
| await supabase.insert("personal_analyses", {**job, "id": str(uuid.uuid4())}) |
| except Exception as e: |
| logger.warning(f"Could not save job to DB: {e}") |
|
|
|
|
| async def _run_and_update(job_id: str, video_path: str, user_id: str, supabase: SupabaseService, shooting_arm: str = "right"): |
| """Background task that runs the pipeline and updates the DB.""" |
| from personal_analysis.pipeline import run_personal_analysis |
|
|
| BUCKET = "personal-analysis-videos" |
|
|
| _job_cache[job_id] = {"job_id": job_id, "status": "processing", "user_id": user_id} |
|
|
| result = await run_personal_analysis( |
| video_path=video_path, |
| output_dir=PERSONAL_OUTPUT_DIR, |
| job_id=job_id, |
| shooting_arm=shooting_arm, |
| ) |
|
|
| |
| if result.get("status") == "completed": |
| local_output = os.path.join(PERSONAL_OUTPUT_DIR, f"{job_id}_output.mp4") |
| if os.path.exists(local_output): |
| try: |
| storage_path = f"{user_id}/{job_id}_output.mp4" |
| |
| |
| await supabase.ensure_bucket(BUCKET, public=True) |
| |
| await supabase.upload_file_from_path( |
| bucket=BUCKET, |
| storage_path=storage_path, |
| local_path=local_output, |
| content_type="video/mp4", |
| ) |
| signed_url = await supabase.get_long_lived_url( |
| bucket=BUCKET, |
| storage_path=storage_path, |
| expires_in=60 * 60 * 24 * 7, |
| ) |
| if signed_url: |
| result["annotated_video_url"] = signed_url |
| logger.info(f"[{job_id}] Uploaded to Supabase Storage → {storage_path}") |
| |
| try: |
| os.remove(local_output) |
| |
| tmp = local_output.replace("_output.mp4", "_output_tmp.mp4") |
| if os.path.exists(tmp): |
| os.remove(tmp) |
| except Exception: |
| pass |
| except Exception as upload_err: |
| |
| logger.warning(f"[{job_id}] Supabase upload failed, using local URL: {upload_err}") |
| result["annotated_video_url"] = f"/personal-output/{job_id}_output.mp4" |
|
|
| _job_cache[job_id] = {**result, "user_id": user_id} |
|
|
| |
| await _save_job_to_db(supabase, { |
| "job_id": job_id, |
| "user_id": user_id, |
| "status": result.get("status", "completed"), |
| "results_json": result, |
| "created_at": datetime.utcnow().isoformat(), |
| }) |
|
|
| |
| if result.get("status") == "completed": |
| try: |
| |
| p_rows = await supabase.select("players", filters={"user_id": user_id}) |
| if p_rows: |
| player_id = p_rows[0]["id"] |
| ts = datetime.utcnow().isoformat() |
| |
| metrics_to_save = [ |
| ("shot_attempt", result.get("shots_total", 0)), |
| ("shot_made", result.get("shots_made", 0)), |
| ("distance_km", float(result.get("total_distance_meters", 0) or 0) / 1000.0), |
| ("avg_speed_kmh", result.get("avg_speed_kmh", 0)), |
| ("max_speed_kmh", result.get("max_speed_kmh", 0)), |
| ("dribble_count", result.get("dribble_count", 0)), |
| ("form_consistency", 100 if result.get("overall_verdict") == "GOOD FORM" else 60), |
| ] |
| |
| for m_type, val in metrics_to_save: |
| if val is not None: |
| await supabase.insert("analytics", { |
| "id": str(uuid.uuid4()), |
| "player_id": player_id, |
| "metric_type": m_type, |
| "value": float(val), |
| "timestamp": ts, |
| "video_id": job_id |
| }) |
| except Exception as ae: |
| logger.warning(f"Could not push to analytics table: {ae}") |
|
|
| |
| try: |
| final_video_status = VideoStatus.COMPLETED.value if result.get("status") == "completed" else VideoStatus.FAILED.value |
| await supabase.update("videos", job_id, { |
| "status": final_video_status, |
| "progress_percent": 100 if final_video_status == VideoStatus.COMPLETED.value else 0 |
| }) |
| except Exception as e: |
| logger.warning(f"Could not update videos table status: {e}") |
|
|
| |
| try: |
| if os.path.exists(video_path): |
| os.remove(video_path) |
| except Exception: |
| pass |
|
|
|
|
| @router.post("/analysis/trigger") |
| async def trigger_analysis( |
| background_tasks: BackgroundTasks, |
| video: UploadFile = File(...), |
| shooting_arm: str = "right", |
| current_user: dict = Depends(require_personal_account), |
| supabase: SupabaseService = Depends(get_supabase), |
| ): |
| """ |
| Upload a personal training video and start shot analysis. |
| Returns a job_id immediately — poll /analysis/{job_id} for results. |
| """ |
| |
| allowed_ext = {".mp4", ".avi", ".mov", ".mkv"} |
| _, ext = os.path.splitext(video.filename or "video.mp4") |
| if ext.lower() not in allowed_ext: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=f"Unsupported video format '{ext}'. Allowed: {', '.join(allowed_ext)}" |
| ) |
|
|
| |
| job_id = str(uuid.uuid4()) |
| upload_path = os.path.join(PERSONAL_OUTPUT_DIR, f"{job_id}_input{ext}") |
|
|
| content = await video.read() |
| if len(content) > 500 * 1024 * 1024: |
| raise HTTPException(status_code=413, detail="Video file too large (max 500 MB)") |
|
|
| with open(upload_path, "wb") as f: |
| f.write(content) |
|
|
| |
| try: |
| |
| |
| video_record = { |
| "id": job_id, |
| "uploader_id": current_user["id"], |
| "title": video.filename or f"Analysis {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}", |
| "description": f"Personal shot analysis (hand: {shooting_arm})", |
| "analysis_mode": AnalysisMode.PERSONAL.value, |
| "status": VideoStatus.PROCESSING.value, |
| "storage_path": upload_path, |
| "file_size_bytes": len(content), |
| "created_at": datetime.utcnow().isoformat(), |
| } |
| await supabase.insert("videos", video_record) |
| except Exception as e: |
| logger.warning(f"Could not insert into videos table: {e}") |
|
|
| user_id = current_user["id"] |
| _job_cache[job_id] = {"job_id": job_id, "status": "processing", "user_id": user_id} |
|
|
| |
| background_tasks.add_task( |
| _run_and_update, job_id, upload_path, user_id, supabase, shooting_arm |
| ) |
|
|
| return { |
| "job_id": job_id, |
| "status": "processing", |
| "message": "Analysis started. Poll /player/analysis/${job_id} for results.", |
| } |
|
|
|
|
| @router.get("/analysis/{job_id}") |
| async def get_analysis_result( |
| job_id: str, |
| current_user: dict = Depends(require_personal_account), |
| supabase: SupabaseService = Depends(get_supabase), |
| ): |
| """ |
| Poll the status / results of a personal analysis job. |
| Returns 'processing' until done, then the full results. |
| """ |
| |
| if job_id in _job_cache: |
| job = _job_cache[job_id] |
| if job.get("user_id") != current_user["id"]: |
| raise HTTPException(status_code=403, detail="Access denied") |
| return job |
|
|
| |
| try: |
| rows = await supabase.select("personal_analyses", filters={"job_id": job_id}) |
| if rows: |
| record = rows[0] |
| if record.get("user_id") != current_user["id"]: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| |
| |
| |
| |
| results_json = record.get("results_json") or {} |
| if isinstance(results_json, str): |
| import json as _json |
| try: |
| results_json = _json.loads(results_json) |
| except Exception: |
| results_json = {} |
|
|
| merged = {**record, **results_json} |
| return merged |
| except HTTPException: |
| raise |
| except Exception: |
| pass |
|
|
| raise HTTPException(status_code=404, detail="Analysis job not found") |
|
|
|
|
| @router.get("/analysis") |
| async def list_my_analyses( |
| current_user: dict = Depends(require_personal_account), |
| supabase: SupabaseService = Depends(get_supabase), |
| ): |
| """ |
| List all past personal analysis jobs for the current player. |
| """ |
| try: |
| rows = await supabase.select( |
| "personal_analyses", |
| filters={"user_id": current_user["id"]}, |
| order_by="created_at", |
| ascending=False |
| ) |
| return rows or [] |
| except Exception as e: |
| logger.warning(f"Could not fetch analyses: {e}") |
| return [] |
|
|
|
|
| @router.delete("/analysis/{job_id}", status_code=200) |
| async def delete_analysis( |
| job_id: str, |
| current_user: dict = Depends(require_personal_account), |
| supabase: SupabaseService = Depends(get_supabase), |
| ): |
| """ |
| Delete a personal analysis job and its output files. |
| """ |
| |
| try: |
| rows = await supabase.select("personal_analyses", filters={"job_id": job_id}) |
| except Exception: |
| rows = [] |
|
|
| if not rows: |
| raise HTTPException(status_code=404, detail="Analysis not found") |
|
|
| record = rows[0] |
| if record.get("user_id") != current_user["id"]: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| |
| try: |
| await supabase.delete("personal_analyses", record["id"]) |
| except Exception as e: |
| logger.warning(f"Could not delete personal_analyses record: {e}") |
|
|
| |
| try: |
| await supabase.delete("videos", job_id) |
| except Exception: |
| pass |
|
|
| |
| for suffix in ["_output.mp4", "_output.avi", "_report.txt"]: |
| fpath = os.path.join(PERSONAL_OUTPUT_DIR, f"{job_id}{suffix}") |
| try: |
| if os.path.exists(fpath): |
| os.remove(fpath) |
| except Exception: |
| pass |
|
|
| |
| _job_cache.pop(job_id, None) |
|
|
| return {"message": "Analysis deleted successfully"} |
|
|