Spaces:
Paused
Paused
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import logging | |
| # Fix: Set Hugging Face cache to writable location | |
| # In containerized environments, /.cache may not be writable | |
| if "HF_HOME" not in os.environ: | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| print(f"Set HF_HOME to {os.environ['HF_HOME']}") | |
| # Debug/Fix: Unset CUDA_VISIBLE_DEVICES to ensure all GPUs are visible | |
| # Some environments (like HF Spaces) might set this to "0" by default. | |
| if "CUDA_VISIBLE_DEVICES" in os.environ: | |
| # Use print because logging config might not be set yet | |
| print(f"Found CUDA_VISIBLE_DEVICES={os.environ['CUDA_VISIBLE_DEVICES']}. Unsetting it to enable all GPUs.") | |
| del os.environ["CUDA_VISIBLE_DEVICES"] | |
| else: | |
| print("CUDA_VISIBLE_DEVICES not set. All GPUs should be visible.") | |
| import torch | |
| try: | |
| print(f"Startup Diagnostics: Torch version {torch.__version__}, CUDA available: {torch.cuda.is_available()}, Device count: {torch.cuda.device_count()}") | |
| except Exception as e: | |
| print(f"Startup Diagnostics Error: {e}") | |
| import asyncio | |
| import shutil | |
| import tempfile | |
| import time | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| from datetime import timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| import cv2 | |
| import numpy as np | |
| from fastapi import BackgroundTasks, FastAPI, File, Form, HTTPException, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| from inference import process_first_frame, run_inference, run_grounded_sam2_tracking | |
| from models.depth_estimators.model_loader import list_depth_estimators | |
| from jobs.background import process_video_async | |
| from jobs.models import JobInfo, JobStatus | |
| from jobs.streaming import get_stream, get_stream_event | |
| from jobs.storage import ( | |
| get_depth_output_path, | |
| get_first_frame_depth_path, | |
| get_first_frame_path, | |
| get_input_video_path, | |
| get_job_directory, | |
| get_job_storage, | |
| get_output_video_path, | |
| ) | |
| from utils.gpt_reasoning import estimate_threat_gpt | |
| from utils.threat_chat import chat_about_threats | |
| from utils.relevance import evaluate_relevance | |
| from utils.enrichment import run_enrichment | |
| from utils.schemas import AssessmentStatus | |
| from models.segmenters.model_loader import get_segmenter_detector | |
| from utils.mission_parser import parse_mission_text, build_broad_queries, MissionParseError | |
| logging.basicConfig(level=logging.INFO) | |
| # Suppress noisy external libraries | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("huggingface_hub").setLevel(logging.WARNING) | |
| logging.getLogger("transformers").setLevel(logging.WARNING) | |
| # GPT concurrency limiter — prevents thread exhaustion under load | |
| _GPT_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("GPT_CONCURRENCY_LIMIT", "4"))) | |
| async def _enrich_first_frame_gpt( | |
| job_id: str, | |
| frame: np.ndarray, | |
| detections: list, | |
| enable_gpt: bool, | |
| mission_spec, | |
| ) -> None: | |
| """Fire-and-forget GPT enrichment for first-frame track cards. | |
| Runs concurrently with the video pipeline so the user gets instant | |
| first-frame preview (UNASSESSED), then track cards update once GPT | |
| finishes (typically 2-5s later). | |
| """ | |
| if not enable_gpt or not detections: | |
| return | |
| try: | |
| # Non-LLM_EXTRACTED relevance filter runs BEFORE run_enrichment (FAST_PATH case) | |
| if mission_spec and mission_spec.parse_mode != "LLM_EXTRACTED": | |
| for d in detections: | |
| decision = evaluate_relevance(d, mission_spec.relevance_criteria) | |
| d["mission_relevant"] = decision.relevant | |
| d["relevance_reason"] = decision.reason | |
| filtered = [d for d in detections if d.get("mission_relevant", True)] | |
| if not filtered: | |
| for det in detections: | |
| det["assessment_status"] = AssessmentStatus.ASSESSED | |
| get_job_storage().update( | |
| job_id, | |
| first_frame_detections=detections, | |
| ) | |
| logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id) | |
| return | |
| gpt_results = await asyncio.to_thread( | |
| run_enrichment, 0, frame, detections, mission_spec, | |
| job_id=job_id, | |
| ) | |
| logging.info("Background GPT enrichment complete for job %s", job_id) | |
| if not gpt_results: | |
| # All detections filtered as not relevant | |
| for det in detections: | |
| det["assessment_status"] = AssessmentStatus.ASSESSED | |
| get_job_storage().update( | |
| job_id, | |
| first_frame_detections=detections, | |
| ) | |
| logging.info("All detections non-relevant for job %s; marked ASSESSED", job_id) | |
| return | |
| # Tag any remaining detections without an assessment status | |
| for det in detections: | |
| if "assessment_status" not in det: | |
| det["assessment_status"] = AssessmentStatus.UNASSESSED | |
| # Update stored job so frontend polls pick up GPT data | |
| get_job_storage().update( | |
| job_id, | |
| first_frame_detections=detections, | |
| first_frame_gpt_results=gpt_results, | |
| ) | |
| logging.info("Updated first_frame_detections with GPT results for job %s", job_id) | |
| except Exception: | |
| logging.exception("Background GPT enrichment failed for job %s", job_id) | |
| async def _periodic_cleanup() -> None: | |
| while True: | |
| await asyncio.sleep(600) | |
| get_job_storage().cleanup_expired(timedelta(hours=1)) | |
| async def lifespan(_: FastAPI): | |
| cleanup_task = asyncio.create_task(_periodic_cleanup()) | |
| try: | |
| yield | |
| finally: | |
| cleanup_task.cancel() | |
| app = FastAPI(title="Video Object Detection", lifespan=lifespan) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| from fastapi import Request | |
| async def add_no_cache_header(request: Request, call_next): | |
| """Ensure frontend assets are not cached by the browser (important for HF Spaces updates).""" | |
| response = await call_next(request) | |
| # Apply to all static files and the root page | |
| if request.url.path.startswith("/laser") or request.url.path == "/": | |
| response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" | |
| response.headers["Pragma"] = "no-cache" | |
| response.headers["Expires"] = "0" | |
| return response | |
| # Optional: serve the LaserPerception frontend from this backend. | |
| # The frontend files are now located in the 'frontend' directory. | |
| _FRONTEND_DIR = Path(__file__).with_name("frontend") | |
| if _FRONTEND_DIR.exists(): | |
| # Mount the entire frontend directory at /laser (legacy path) or /frontend | |
| app.mount("/laser", StaticFiles(directory=_FRONTEND_DIR, html=True), name="laser") | |
| # Valid detection modes | |
| VALID_MODES = {"object_detection", "segmentation", "drone_detection"} | |
| def _save_upload_to_tmp(upload: UploadFile) -> str: | |
| """Save uploaded file to temporary location.""" | |
| suffix = Path(upload.filename or "upload.mp4").suffix or ".mp4" | |
| fd, path = tempfile.mkstemp(prefix="input_", suffix=suffix, dir="/tmp") | |
| os.close(fd) | |
| with open(path, "wb") as buffer: | |
| data = upload.file.read() | |
| buffer.write(data) | |
| return path | |
| def _save_upload_to_path(upload: UploadFile, path: Path) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "wb") as buffer: | |
| data = upload.file.read() | |
| buffer.write(data) | |
| def _safe_delete(path: str) -> None: | |
| """Safely delete a file, ignoring errors.""" | |
| try: | |
| os.remove(path) | |
| except FileNotFoundError: | |
| return | |
| except Exception: | |
| logging.exception("Failed to remove temporary file: %s", path) | |
| def _schedule_cleanup(background_tasks: BackgroundTasks, path: str) -> None: | |
| """Schedule file cleanup after response is sent.""" | |
| def _cleanup(target: str = path) -> None: | |
| _safe_delete(target) | |
| background_tasks.add_task(_cleanup) | |
| def _default_queries_for_mode(mode: str) -> list[str]: | |
| if mode == "segmentation": | |
| return ["object"] | |
| if mode == "drone_detection": | |
| return ["drone"] | |
| return ["person", "car", "truck", "motorcycle", "bicycle", "bus", "train", "airplane"] | |
| async def demo_page(): | |
| """Redirect to LaserPerception app.""" | |
| # The main entry point is now index.html in the mounted directory | |
| return RedirectResponse(url="/laser/index.html") | |
| async def detect_endpoint( | |
| background_tasks: BackgroundTasks, | |
| video: UploadFile = File(...), | |
| mode: str = Form(...), | |
| queries: str = Form(""), | |
| detector: str = Form("yolo11"), | |
| segmenter: str = Form("GSAM2-L"), | |
| enable_depth: bool = Form(False), | |
| enable_gpt: bool = Form(True), | |
| ): | |
| """ | |
| Main detection endpoint. | |
| Args: | |
| video: Video file to process | |
| mode: Detection mode (object_detection, segmentation, drone_detection) | |
| queries: Comma-separated object classes for object_detection mode | |
| detector: Model to use (yolo11, detr_resnet50, grounding_dino) | |
| segmenter: Segmentation model to use (GSAM2-S/B/L, YSAM2-S/B/L) | |
| enable_depth: Whether to run legacy depth estimation (default: False) | |
| drone_detection uses the dedicated drone_yolo model. | |
| Returns: | |
| - For object_detection: Processed video with bounding boxes | |
| - For segmentation: Processed video with masks rendered | |
| - For drone_detection: Processed video with bounding boxes | |
| """ | |
| # Validate mode | |
| if mode not in VALID_MODES: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid mode '{mode}'. Must be one of: {', '.join(VALID_MODES)}" | |
| ) | |
| if mode == "segmentation": | |
| if video is None: | |
| raise HTTPException(status_code=400, detail="Video file is required.") | |
| try: | |
| input_path = _save_upload_to_tmp(video) | |
| except Exception: | |
| logging.exception("Failed to save uploaded file.") | |
| raise HTTPException(status_code=500, detail="Failed to save uploaded video.") | |
| finally: | |
| await video.close() | |
| fd, output_path = tempfile.mkstemp(prefix="output_", suffix=".mp4", dir="/tmp") | |
| os.close(fd) | |
| # Parse queries | |
| query_list = [q.strip() for q in queries.split(",") if q.strip()] | |
| if not query_list: | |
| query_list = ["object"] | |
| try: | |
| output_path = run_grounded_sam2_tracking( | |
| input_path, | |
| output_path, | |
| query_list, | |
| segmenter_name=segmenter, | |
| num_maskmem=7, | |
| ) | |
| except ValueError as exc: | |
| logging.exception("Segmentation processing failed.") | |
| _safe_delete(input_path) | |
| _safe_delete(output_path) | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| except Exception as exc: | |
| logging.exception("Segmentation inference failed.") | |
| _safe_delete(input_path) | |
| _safe_delete(output_path) | |
| return JSONResponse(status_code=500, content={"error": str(exc)}) | |
| _schedule_cleanup(background_tasks, input_path) | |
| _schedule_cleanup(background_tasks, output_path) | |
| return FileResponse( | |
| path=output_path, | |
| media_type="video/mp4", | |
| filename="segmented.mp4", | |
| ) | |
| # Handle object detection or drone detection mode | |
| if video is None: | |
| raise HTTPException(status_code=400, detail="Video file is required.") | |
| # Save uploaded video | |
| try: | |
| input_path = _save_upload_to_tmp(video) | |
| except Exception: | |
| logging.exception("Failed to save uploaded file.") | |
| raise HTTPException(status_code=500, detail="Failed to save uploaded video.") | |
| finally: | |
| await video.close() | |
| # Create output path | |
| fd, output_path = tempfile.mkstemp(prefix="output_", suffix=".mp4", dir="/tmp") | |
| os.close(fd) | |
| # Parse queries with mission awareness | |
| detector_name = "drone_yolo" if mode == "drone_detection" else detector | |
| mission_spec = None | |
| if queries.strip(): | |
| try: | |
| mission_spec = parse_mission_text(queries.strip(), detector_name, video_path=input_path) | |
| query_list = build_broad_queries(detector_name, mission_spec) | |
| except MissionParseError as e: | |
| raise HTTPException(status_code=422, detail=str(e)) | |
| else: | |
| query_list = _default_queries_for_mode(mode) | |
| if mode == "drone_detection" and not query_list: | |
| query_list = ["drone"] | |
| # Run inference | |
| try: | |
| # Determine depth estimator | |
| active_depth = "depth" if enable_depth else None | |
| output_path, _ = run_inference( | |
| input_path, | |
| output_path, | |
| query_list, | |
| detector_name=detector_name, | |
| depth_estimator_name=active_depth, | |
| depth_scale=25.0, | |
| enable_gpt=enable_gpt, | |
| ) | |
| except ValueError as exc: | |
| logging.exception("Video processing failed.") | |
| _safe_delete(input_path) | |
| _safe_delete(output_path) | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| except Exception as exc: | |
| logging.exception("Inference failed.") | |
| _safe_delete(input_path) | |
| _safe_delete(output_path) | |
| return JSONResponse(status_code=500, content={"error": str(exc)}) | |
| # Schedule cleanup | |
| _schedule_cleanup(background_tasks, input_path) | |
| _schedule_cleanup(background_tasks, output_path) | |
| # Return processed video | |
| response = FileResponse( | |
| path=output_path, | |
| media_type="video/mp4", | |
| filename="processed.mp4", | |
| ) | |
| return response | |
| async def detect_async_endpoint( | |
| video: UploadFile = File(...), | |
| mode: str = Form(...), | |
| queries: str = Form(""), | |
| detector: str = Form("yolo11"), | |
| segmenter: str = Form("GSAM2-L"), | |
| depth_estimator: str = Form("depth"), | |
| depth_scale: float = Form(25.0), | |
| enable_depth: bool = Form(False), | |
| enable_gpt: bool = Form(True), | |
| step: int = Form(7), | |
| ): | |
| _ttfs_t0 = time.perf_counter() | |
| if mode not in VALID_MODES: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid mode '{mode}'. Must be one of: {', '.join(VALID_MODES)}", | |
| ) | |
| if video is None: | |
| raise HTTPException(status_code=400, detail="Video file is required.") | |
| job_id = uuid.uuid4().hex | |
| job_dir = get_job_directory(job_id) | |
| input_path = get_input_video_path(job_id) | |
| output_path = get_output_video_path(job_id) | |
| first_frame_path = get_first_frame_path(job_id) | |
| depth_output_path = get_depth_output_path(job_id) | |
| first_frame_depth_path = get_first_frame_depth_path(job_id) | |
| try: | |
| _save_upload_to_path(video, input_path) | |
| except Exception: | |
| logging.exception("Failed to save uploaded file.") | |
| raise HTTPException(status_code=500, detail="Failed to save uploaded video.") | |
| finally: | |
| await video.close() | |
| logging.info("[TTFS:%s] +%.1fs upload_saved", job_id, time.perf_counter() - _ttfs_t0) | |
| # --- Mission-Driven Query Parsing --- | |
| mission_spec = None | |
| mission_mode = "LEGACY" | |
| detector_name = detector | |
| mission_detector = detector # detector key used for mission query parsing | |
| if mode == "drone_detection": | |
| detector_name = "drone_yolo" | |
| mission_detector = "drone_yolo" | |
| elif mode == "segmentation": | |
| # Segmenter registry owns detector selection (GSAM2→GDINO, YSAM2→YOLO). | |
| # detector_name=None so the job doesn't forward it (avoids duplicate kwarg). | |
| try: | |
| mission_detector = get_segmenter_detector(segmenter) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| detector_name = None | |
| if queries.strip(): | |
| try: | |
| mission_spec = parse_mission_text(queries.strip(), mission_detector, video_path=str(input_path)) | |
| query_list = build_broad_queries(mission_detector, mission_spec) | |
| mission_mode = "MISSION" | |
| logging.info( | |
| "Mission parsed: mode=%s classes=%s broad_queries=%s domain=%s(%s)", | |
| mission_mode, mission_spec.object_classes, query_list, | |
| mission_spec.domain, mission_spec.domain_source, | |
| ) | |
| except MissionParseError as e: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=str(e), | |
| ) | |
| else: | |
| # LEGACY mode: no mission context, use defaults, disable GPT | |
| query_list = _default_queries_for_mode(mode) | |
| enable_gpt = False | |
| mission_mode = "LEGACY" | |
| logging.info( | |
| "LEGACY mode: no mission text, defaults=%s, GPT disabled", query_list | |
| ) | |
| logging.info("[TTFS:%s] +%.1fs mission_parsed", job_id, time.perf_counter() - _ttfs_t0) | |
| available_depth_estimators = set(list_depth_estimators()) | |
| if depth_estimator not in available_depth_estimators: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=( | |
| f"Invalid depth estimator '{depth_estimator}'. " | |
| f"Must be one of: {', '.join(sorted(available_depth_estimators))}" | |
| ), | |
| ) | |
| # Determine active depth estimator (Legacy) | |
| active_depth = depth_estimator if enable_depth else None | |
| try: | |
| logging.info("[TTFS:%s] +%.1fs process_first_frame start", job_id, time.perf_counter() - _ttfs_t0) | |
| processed_frame, detections = process_first_frame( | |
| str(input_path), | |
| query_list, | |
| mode=mode, | |
| detector_name=detector_name, | |
| segmenter_name=segmenter, | |
| ) | |
| cv2.imwrite(str(first_frame_path), processed_frame) | |
| logging.info("[TTFS:%s] +%.1fs process_first_frame done", job_id, time.perf_counter() - _ttfs_t0) | |
| # GPT and depth are now handled in the async pipeline (enrichment thread) | |
| first_frame_gpt_results = None | |
| except Exception: | |
| logging.exception("First-frame processing failed.") | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail="Failed to process first frame.") | |
| job = JobInfo( | |
| job_id=job_id, | |
| status=JobStatus.PROCESSING, | |
| mode=mode, | |
| queries=query_list, | |
| detector_name=detector_name, | |
| segmenter_name=segmenter, | |
| input_video_path=str(input_path), | |
| output_video_path=str(output_path), | |
| first_frame_path=str(first_frame_path), | |
| first_frame_detections=detections, | |
| depth_estimator_name=active_depth, | |
| depth_scale=float(depth_scale), | |
| depth_output_path=str(depth_output_path), | |
| first_frame_depth_path=str(first_frame_depth_path), | |
| enable_gpt=enable_gpt, | |
| mission_spec=mission_spec, | |
| mission_mode=mission_mode, | |
| first_frame_gpt_results=first_frame_gpt_results, | |
| step=step, | |
| ttfs_t0=_ttfs_t0, | |
| ) | |
| get_job_storage().create(job) | |
| asyncio.create_task(process_video_async(job_id)) | |
| # Fire-and-forget: enrich first-frame detections with GPT in background. | |
| # Runs for ALL modes including segmentation — first-frame detections from | |
| # process_first_frame() already have stable track IDs (T01, T02, ...) and | |
| # valid bboxes, so there's no reason to defer. The GSAM2 writer's | |
| # enrichment thread will see the cached results via first_frame_gpt_results | |
| # in JobStorage and skip the duplicate call on frame 0. | |
| asyncio.create_task(_enrich_first_frame_gpt( | |
| job_id, processed_frame, detections, enable_gpt, mission_spec, | |
| )) | |
| response_data = { | |
| "job_id": job_id, | |
| "first_frame_url": f"/detect/first-frame/{job_id}", | |
| "first_frame_depth_url": f"/detect/first-frame-depth/{job_id}", | |
| "status_url": f"/detect/status/{job_id}", | |
| "video_url": f"/detect/video/{job_id}", | |
| "depth_video_url": f"/detect/depth-video/{job_id}", | |
| "stream_url": f"/detect/stream/{job_id}", | |
| "status": job.status.value, | |
| "first_frame_detections": detections, | |
| "mission_mode": mission_mode, | |
| } | |
| if mission_spec: | |
| response_data["mission_spec"] = { | |
| "object_classes": mission_spec.object_classes, | |
| "mission_intent": mission_spec.mission_intent, | |
| "domain": mission_spec.domain, | |
| "domain_source": mission_spec.domain_source, | |
| "parse_confidence": mission_spec.parse_confidence, | |
| "parse_warnings": mission_spec.parse_warnings, | |
| "context_phrases": mission_spec.context_phrases, | |
| "stripped_modifiers": mission_spec.stripped_modifiers, | |
| } | |
| return response_data | |
| async def detect_status(job_id: str): | |
| job = get_job_storage().get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found or expired.") | |
| return { | |
| "job_id": job.job_id, | |
| "status": job.status.value, | |
| "created_at": job.created_at.isoformat(), | |
| "completed_at": job.completed_at.isoformat() if job.completed_at else None, | |
| "error": job.error, | |
| "first_frame_detections": job.first_frame_detections, | |
| } | |
| async def get_frame_tracks(job_id: str, frame_idx: int): | |
| """Retrieve detections (with tracking info) for a specific frame.""" | |
| # This requires us to store detections PER FRAME in JobStorage or similar. | |
| # Currently, inference.py returns 'sorted_detections' at the end. | |
| # But during streaming, where is it? | |
| # We can peek into the 'stream_queue' logic or we need a shared store. | |
| # Ideally, inference should write to a map/db that we can read. | |
| # Quick fix: If job is done, we might have it. If running, it's harder absent a DB. | |
| # BUT, 'stream_queue' sends frames. | |
| # Let's use a global cache in memory for active jobs? | |
| # See inference.py: 'all_detections_map' is local to that function. | |
| # BETTER APPROACH for this demo: | |
| # Use a simple shared dictionary in jobs/storage.py or app.py used by inference. | |
| # We will pass a callback or shared dict to run_inference. | |
| # For now, let's just return 404 if not implemented, but I need to implement it. | |
| # I'll add a cache in app.py for active job tracks? | |
| from jobs.storage import get_track_data | |
| data = get_track_data(job_id, frame_idx) | |
| return data or [] | |
| async def analyze_frame( | |
| image: UploadFile = File(...), | |
| detections: str = Form(...), | |
| job_id: str = Form(None), | |
| ): | |
| """Run GPT threat assessment on a single video frame.""" | |
| import json as json_module | |
| from utils.gpt_reasoning import encode_frame_to_b64 | |
| dets = json_module.loads(detections) | |
| # Look up mission_spec from stored job (if available) | |
| mission_spec = None | |
| if job_id: | |
| job = get_job_storage().get(job_id) | |
| if job: | |
| mission_spec = job.mission_spec | |
| # Decode uploaded image | |
| image_bytes = await image.read() | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if frame is None: | |
| raise HTTPException(status_code=400, detail="Invalid image") | |
| # Run GPT in thread pool (blocking OpenAI API call) | |
| frame_b64 = encode_frame_to_b64(frame) | |
| async with _GPT_SEMAPHORE: | |
| gpt_results = await asyncio.to_thread( | |
| estimate_threat_gpt, | |
| detections=dets, | |
| mission_spec=mission_spec, | |
| image_b64=frame_b64, | |
| ) | |
| # Merge GPT results into detection records | |
| for d in dets: | |
| oid = d.get("track_id") or d.get("id") | |
| if oid and oid in gpt_results: | |
| payload = gpt_results[oid] | |
| d["gpt_raw"] = payload | |
| d["assessment_status"] = payload.get("assessment_status", "ASSESSED") | |
| d["threat_level_score"] = payload.get("threat_level_score", 0) | |
| d["threat_classification"] = payload.get("threat_classification", "Unknown") | |
| d["weapon_readiness"] = payload.get("weapon_readiness", "Unknown") | |
| d["gpt_description"] = payload.get("gpt_description") | |
| d["gpt_distance_m"] = payload.get("gpt_distance_m") | |
| d["gpt_direction"] = payload.get("gpt_direction") | |
| return dets | |
| async def cancel_job(job_id: str): | |
| """Cancel a running job.""" | |
| job = get_job_storage().get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found or expired.") | |
| if job.status != JobStatus.PROCESSING: | |
| return { | |
| "message": f"Job already {job.status.value}", | |
| "status": job.status.value, | |
| } | |
| get_job_storage().update(job_id, status=JobStatus.CANCELLED) | |
| return { | |
| "message": "Job cancellation requested", | |
| "status": "cancelled", | |
| } | |
| async def detect_first_frame(job_id: str): | |
| job = get_job_storage().get(job_id) | |
| if not job or not Path(job.first_frame_path).exists(): | |
| raise HTTPException(status_code=404, detail="First frame not found.") | |
| return FileResponse( | |
| path=job.first_frame_path, | |
| media_type="image/jpeg", | |
| filename="first_frame.jpg", | |
| ) | |
| async def detect_video(job_id: str): | |
| job = get_job_storage().get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found or expired.") | |
| if job.status == JobStatus.FAILED: | |
| raise HTTPException(status_code=500, detail=f"Job failed: {job.error}") | |
| if job.status == JobStatus.CANCELLED: | |
| raise HTTPException(status_code=410, detail="Job was cancelled") | |
| if job.status == JobStatus.PROCESSING: | |
| return JSONResponse( | |
| status_code=202, | |
| content={"detail": "Video still processing", "status": "processing"}, | |
| ) | |
| if not job.output_video_path or not Path(job.output_video_path).exists(): | |
| raise HTTPException(status_code=404, detail="Video file not found.") | |
| return FileResponse( | |
| path=job.output_video_path, | |
| media_type="video/mp4", | |
| filename="processed.mp4", | |
| ) | |
| async def detect_depth_video(job_id: str): | |
| """Return depth estimation video.""" | |
| job = get_job_storage().get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found or expired.") | |
| if not job.depth_output_path: | |
| # Check if depth failed (partial success) | |
| if job.partial_success and job.depth_error: | |
| raise HTTPException(status_code=404, detail=f"Depth unavailable: {job.depth_error}") | |
| raise HTTPException(status_code=404, detail="No depth video for this job.") | |
| if job.status == JobStatus.FAILED: | |
| raise HTTPException(status_code=500, detail=f"Job failed: {job.error}") | |
| if job.status == JobStatus.CANCELLED: | |
| raise HTTPException(status_code=410, detail="Job was cancelled") | |
| if job.status == JobStatus.PROCESSING: | |
| return JSONResponse( | |
| status_code=202, | |
| content={"detail": "Video still processing", "status": "processing"}, | |
| ) | |
| if not Path(job.depth_output_path).exists(): | |
| raise HTTPException(status_code=404, detail="Depth video file not found.") | |
| return FileResponse( | |
| path=job.depth_output_path, | |
| media_type="video/mp4", | |
| filename="depth.mp4", | |
| ) | |
| async def detect_first_frame_depth(job_id: str): | |
| """Return first frame depth visualization.""" | |
| job = get_job_storage().get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found or expired.") | |
| if not job.first_frame_depth_path: | |
| # Return placeholder or error if depth not available | |
| if job.partial_success and job.depth_error: | |
| raise HTTPException(status_code=404, detail=f"Depth unavailable: {job.depth_error}") | |
| raise HTTPException(status_code=404, detail="First frame depth not found.") | |
| if not Path(job.first_frame_depth_path).exists(): | |
| raise HTTPException(status_code=404, detail="First frame depth file not found.") | |
| return FileResponse( | |
| path=job.first_frame_depth_path, | |
| media_type="image/jpeg", | |
| filename="first_frame_depth.jpg", | |
| ) | |
| async def stream_video(job_id: str): | |
| """MJPEG stream of the processing video (event-driven).""" | |
| import queue as queue_mod | |
| async def stream_generator(): | |
| loop = asyncio.get_running_loop() | |
| buffered = False | |
| # TTFS instrumentation | |
| _first_yielded = False | |
| _buffer_wait_logged = False | |
| _job = get_job_storage().get(job_id) | |
| _stream_t0 = _job.ttfs_t0 if _job else None | |
| if _stream_t0: | |
| logging.info("[TTFS:%s] +%.1fs stream_subscribed", job_id, time.perf_counter() - _stream_t0) | |
| # Get or create the asyncio.Event for this stream (must be in async context) | |
| event = get_stream_event(job_id) | |
| while True: | |
| q = get_stream(job_id) | |
| if not q: | |
| break | |
| try: | |
| # Initial Buffer: Wait until we have enough frames or job is done | |
| if not buffered: | |
| if not _buffer_wait_logged and _stream_t0: | |
| logging.info("[TTFS:%s] +%.1fs stream_buffer_wait (qsize=%d)", job_id, time.perf_counter() - _stream_t0, q.qsize()) | |
| _buffer_wait_logged = True | |
| if q.qsize() < 5: | |
| await asyncio.sleep(0.1) | |
| continue | |
| buffered = True | |
| if _stream_t0: | |
| logging.info("[TTFS:%s] +%.1fs stream_buffer_ready", job_id, time.perf_counter() - _stream_t0) | |
| # Event-driven wait — replaces busy-wait polling | |
| if event is not None: | |
| try: | |
| await asyncio.wait_for(event.wait(), timeout=1.0) | |
| event.clear() | |
| except asyncio.TimeoutError: | |
| if not get_stream(job_id): | |
| return | |
| continue | |
| else: | |
| # Fallback if no event (shouldn't happen) | |
| await asyncio.sleep(0.033) | |
| # Drain available frame (already pre-resized by publish_frame) | |
| try: | |
| frame = q.get_nowait() | |
| except queue_mod.Empty: | |
| continue | |
| # Encode in thread (frame already resized by publish_frame) | |
| encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60] | |
| success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param) | |
| if success: | |
| if not _first_yielded: | |
| _first_yielded = True | |
| if _stream_t0: | |
| logging.info("[TTFS:%s] +%.1fs first_yield_to_client", job_id, time.perf_counter() - _stream_t0) | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') | |
| # Simple pacer (~30fps) | |
| await asyncio.sleep(0.033) | |
| except Exception: | |
| await asyncio.sleep(0.1) | |
| return StreamingResponse( | |
| stream_generator(), | |
| media_type="multipart/x-mixed-replace; boundary=frame" | |
| ) | |
| async def reason_track( | |
| frame: UploadFile = File(...), | |
| tracks: str = Form(...) # JSON string of tracks: [{"id": "T01", "bbox": [x,y,w,h], "label": "car"}, ...] | |
| ): | |
| """ | |
| Reason about specific tracks in a frame using GPT. | |
| Returns distance and description for each object ID. | |
| """ | |
| import json | |
| try: | |
| input_path = _save_upload_to_tmp(frame) | |
| except Exception: | |
| raise HTTPException(status_code=500, detail="Failed to save uploaded frame") | |
| try: | |
| track_list = json.loads(tracks) | |
| except json.JSONDecodeError: | |
| _safe_delete(input_path) | |
| raise HTTPException(status_code=400, detail="Invalid tracks JSON") | |
| # Run GPT estimation | |
| # This is blocking, but that's expected for this endpoint structure. | |
| # For high concurrency, might want to offload to threadpool or async wrapper. | |
| try: | |
| async with _GPT_SEMAPHORE: | |
| results = await asyncio.to_thread(estimate_threat_gpt, input_path, track_list) | |
| logging.info(f"GPT Output for Video Track Update:\n{results}") | |
| except Exception as e: | |
| logging.exception("GPT reasoning failed") | |
| _safe_delete(input_path) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| _safe_delete(input_path) | |
| return results | |
| async def chat_threat_endpoint( | |
| question: str = Form(...), | |
| detections: str = Form(...), # JSON string of current detections | |
| mission_context: str = Form(""), # Optional JSON string of mission spec | |
| ): | |
| """ | |
| Chat about detected threats using GPT. | |
| Args: | |
| question: User's question about the current threat situation. | |
| detections: JSON string of detection list with threat analysis data. | |
| mission_context: Optional JSON string of mission specification. | |
| Returns: | |
| GPT response about the threats. | |
| """ | |
| import json as json_module | |
| if not question.strip(): | |
| raise HTTPException(status_code=400, detail="Question cannot be empty.") | |
| try: | |
| detection_list = json_module.loads(detections) | |
| except json_module.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail="Invalid detections JSON.") | |
| if not isinstance(detection_list, list): | |
| raise HTTPException(status_code=400, detail="Detections must be a list.") | |
| # Parse optional mission context | |
| mission_spec_dict = None | |
| if mission_context.strip(): | |
| try: | |
| mission_spec_dict = json_module.loads(mission_context) | |
| except json_module.JSONDecodeError: | |
| pass # Non-critical, proceed without mission context | |
| # Run chat in thread to avoid blocking (with concurrency limit) | |
| try: | |
| async with _GPT_SEMAPHORE: | |
| response = await asyncio.to_thread( | |
| chat_about_threats, question, detection_list, mission_spec_dict | |
| ) | |
| return {"response": response} | |
| except Exception as e: | |
| logging.exception("Threat chat failed") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def benchmark_endpoint( | |
| video: UploadFile = File(...), | |
| queries: str = Form("person,car,truck"), | |
| segmenter: str = Form("GSAM2-L"), | |
| step: int = Form(60), | |
| num_maskmem: Optional[int] = Form(None), | |
| ): | |
| """Run instrumented GSAM2 pipeline and return latency breakdown JSON. | |
| This is a long-running synchronous request (may take minutes). | |
| Callers should set an appropriate HTTP timeout. | |
| """ | |
| import threading | |
| # Save uploaded video to temp path | |
| input_path = tempfile.mktemp(suffix=".mp4", prefix="bench_in_") | |
| output_path = tempfile.mktemp(suffix=".mp4", prefix="bench_out_") | |
| try: | |
| with open(input_path, "wb") as f: | |
| shutil.copyfileobj(video.file, f) | |
| query_list = [q.strip() for q in queries.split(",") if q.strip()] | |
| metrics = { | |
| "end_to_end_ms": 0.0, | |
| "frame_extraction_ms": 0.0, | |
| "model_load_ms": 0.0, | |
| "init_state_ms": 0.0, | |
| "tracking_total_ms": 0.0, | |
| "gdino_total_ms": 0.0, | |
| "sam_image_total_ms": 0.0, | |
| "sam_video_total_ms": 0.0, | |
| "id_reconciliation_ms": 0.0, | |
| "render_total_ms": 0.0, | |
| "writer_total_ms": 0.0, | |
| "gpu_peak_mem_mb": 0.0, | |
| } | |
| lock = threading.Lock() | |
| await asyncio.to_thread( | |
| run_grounded_sam2_tracking, | |
| input_path, | |
| output_path, | |
| query_list, | |
| segmenter_name=segmenter, | |
| step=step, | |
| enable_gpt=False, | |
| _perf_metrics=metrics, | |
| _perf_lock=lock, | |
| num_maskmem=num_maskmem, | |
| ) | |
| # Read frame count and fps from output video | |
| total_frames = 0 | |
| fps = 0.0 | |
| cap = cv2.VideoCapture(output_path) | |
| if cap.isOpened(): | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 | |
| cap.release() | |
| num_gpus = torch.cuda.device_count() | |
| return JSONResponse({ | |
| "total_frames": total_frames, | |
| "fps": fps, | |
| "num_gpus": num_gpus, | |
| "num_maskmem": num_maskmem if num_maskmem is not None else 7, | |
| "metrics": metrics, | |
| }) | |
| finally: | |
| for p in (input_path, output_path): | |
| try: | |
| os.remove(p) | |
| except OSError: | |
| pass | |
| async def gpu_monitor_endpoint(duration: int = 180, interval: int = 1): | |
| """Stream nvidia-smi dmon output for the given duration. | |
| Usage: curl 'http://.../gpu-monitor?duration=180&interval=1' | |
| Run this in one terminal while /benchmark runs in another. | |
| """ | |
| import subprocess | |
| async def _stream(): | |
| proc = subprocess.Popen( | |
| ["nvidia-smi", "dmon", "-s", "u", "-d", str(interval)], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| ) | |
| try: | |
| elapsed = 0 | |
| for line in proc.stdout: | |
| yield line | |
| if interval > 0: | |
| elapsed += interval | |
| if elapsed > duration: | |
| break | |
| finally: | |
| proc.terminate() | |
| proc.wait() | |
| return StreamingResponse(_stream(), media_type="text/plain") | |
| # --------------------------------------------------------------------------- | |
| # Benchmark Profiler & Roofline Analysis Endpoints | |
| # --------------------------------------------------------------------------- | |
| async def benchmark_hardware(): | |
| """Return hardware specs JSON (no video needed, cached).""" | |
| import dataclasses | |
| from utils.hardware_info import get_hardware_info | |
| hw = await asyncio.to_thread(get_hardware_info) | |
| return JSONResponse(dataclasses.asdict(hw)) | |
| async def benchmark_profile( | |
| video: UploadFile = File(...), | |
| mode: str = Form("detection"), | |
| detector: str = Form("yolo11"), | |
| segmenter: str = Form("GSAM2-L"), | |
| queries: str = Form("person,car,truck"), | |
| max_frames: int = Form(100), | |
| warmup_frames: int = Form(5), | |
| step: int = Form(60), | |
| num_maskmem: Optional[int] = Form(None), | |
| ): | |
| """Run profiled inference and return per-frame timing breakdown. | |
| Args: | |
| video: Video file to profile. | |
| mode: "detection" or "segmentation". | |
| detector: Detector key (for detection mode). | |
| segmenter: Segmenter key (for segmentation mode). | |
| queries: Comma-separated object classes. | |
| max_frames: Maximum frames to profile. | |
| warmup_frames: Warmup frames (detection only). | |
| step: Keyframe interval (segmentation only). | |
| num_maskmem: SAM2 memory frames (None = model default 7). | |
| """ | |
| import dataclasses | |
| from utils.profiler import run_profiled_detection, run_profiled_segmentation | |
| if mode not in ("detection", "segmentation"): | |
| raise HTTPException(status_code=400, detail="mode must be 'detection' or 'segmentation'") | |
| input_path = _save_upload_to_tmp(video) | |
| await video.close() | |
| query_list = [q.strip() for q in queries.split(",") if q.strip()] | |
| try: | |
| if mode == "detection": | |
| result = await asyncio.to_thread( | |
| run_profiled_detection, | |
| input_path, detector, query_list, | |
| max_frames=max_frames, warmup_frames=warmup_frames, | |
| ) | |
| else: | |
| result = await asyncio.to_thread( | |
| run_profiled_segmentation, | |
| input_path, segmenter, query_list, | |
| max_frames=max_frames, step=step, | |
| num_maskmem=num_maskmem, | |
| ) | |
| except Exception as exc: | |
| _safe_delete(input_path) | |
| logging.exception("Profiling failed") | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| finally: | |
| _safe_delete(input_path) | |
| # Serialize dataclass, handling any non-serializable fields | |
| out = dataclasses.asdict(result) | |
| # Include GSAM2 metrics if present | |
| gsam2 = getattr(result, "_gsam2_metrics", None) | |
| if gsam2: | |
| out["gsam2_metrics"] = gsam2 | |
| return JSONResponse(out) | |
| async def benchmark_analysis( | |
| video: UploadFile = File(...), | |
| mode: str = Form("detection"), | |
| detector: str = Form("yolo11"), | |
| segmenter: str = Form("GSAM2-L"), | |
| queries: str = Form("person,car,truck"), | |
| max_frames: int = Form(100), | |
| warmup_frames: int = Form(5), | |
| step: int = Form(60), | |
| num_maskmem: Optional[int] = Form(None), | |
| ): | |
| """Full roofline analysis: hardware + profiling + theoretical ceilings + bottleneck ID. | |
| Combines hardware extraction, profiled inference, and roofline model | |
| to identify bottlenecks and provide actionable recommendations. | |
| """ | |
| import dataclasses | |
| from utils.hardware_info import get_hardware_info | |
| from utils.profiler import run_profiled_detection, run_profiled_segmentation | |
| from utils.roofline import compute_roofline | |
| if mode not in ("detection", "segmentation"): | |
| raise HTTPException(status_code=400, detail="mode must be 'detection' or 'segmentation'") | |
| input_path = _save_upload_to_tmp(video) | |
| await video.close() | |
| query_list = [q.strip() for q in queries.split(",") if q.strip()] | |
| try: | |
| # Get hardware info (cached, fast) | |
| hardware = await asyncio.to_thread(get_hardware_info) | |
| # Run profiling | |
| if mode == "detection": | |
| profiling = await asyncio.to_thread( | |
| run_profiled_detection, | |
| input_path, detector, query_list, | |
| max_frames=max_frames, warmup_frames=warmup_frames, | |
| ) | |
| else: | |
| profiling = await asyncio.to_thread( | |
| run_profiled_segmentation, | |
| input_path, segmenter, query_list, | |
| max_frames=max_frames, step=step, | |
| num_maskmem=num_maskmem, | |
| ) | |
| # Compute roofline | |
| roofline = compute_roofline(hardware, profiling) | |
| except Exception as exc: | |
| _safe_delete(input_path) | |
| logging.exception("Benchmark analysis failed") | |
| raise HTTPException(status_code=500, detail=str(exc)) | |
| finally: | |
| _safe_delete(input_path) | |
| return JSONResponse({ | |
| "hardware": dataclasses.asdict(hardware), | |
| "profiling": dataclasses.asdict(profiling), | |
| "roofline": dataclasses.asdict(roofline), | |
| }) | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) | |