import os import asyncio import logging import tempfile import shutil from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form from services.gpu_gateway_service import GPU_INTERNAL_SECRET from services.database_service import update_video_status, execute_query from services.storage_service import get_local_highlight_path from services.progress_service import get_latest_progress, set_progress_sync from orchestrator.analysis_orchestrator import AnalysisOrchestrator from orchestrator.rendering_orchestrator import RenderingOrchestrator logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/internal", tags=["internal"]) _internal_tasks = {} def _verify_secret(secret: str): expected = GPU_INTERNAL_SECRET if not expected: raise HTTPException(status_code=403, detail="Internal API disabled (no secret configured)") if secret != expected: raise HTTPException(status_code=403, detail="Invalid secret") @router.post("/submit") async def internal_submit( video: UploadFile = File(...), video_id: str = Form(...), mode: str = Form("balanced"), user_id: str = Form("gpu-gateway"), secret: str = Form(""), person_query: str = Form(""), ): _verify_secret(secret) tmp_dir = tempfile.mkdtemp() ext = os.path.splitext(video.filename or "video.mp4")[1] or ".mp4" video_path = os.path.join(tmp_dir, f"original{ext}") with open(video_path, "wb") as f: content = await video.read() f.write(content) logger.info(f"[Internal] Received video for analysis: video_id={video_id}, mode={mode}, person_query='{person_query}', size={len(content)} bytes") gpu_video_id = f"gpu-{video_id}" task = asyncio.create_task(_run_internal_analysis(gpu_video_id, video_path, mode, person_query)) _internal_tasks[gpu_video_id] = task return {"video_id": gpu_video_id, "status": "queued"} @router.get("/progress/{video_id}") async def internal_progress(video_id: str, secret: str = Query("")): _verify_secret(secret) progress = get_latest_progress(video_id) if progress: return progress if video_id in _internal_tasks: task = _internal_tasks[video_id] if task.done(): if task.exception(): return {"stage": "error", "progress": 0, "message": str(task.exception())} return {"stage": "completed", "progress": 1.0, "message": "分析完成"} return {"stage": "processing", "progress": 0.5, "message": "处理中..."} return {"stage": "unknown", "progress": 0, "message": "未找到任务"} @router.get("/results/{video_id}") async def internal_results(video_id: str, secret: str = Query("")): _verify_secret(secret) highlights_dir = os.path.join( os.path.dirname(os.path.dirname(__file__)), "data", "uploads", "highlights", video_id ) if not os.path.exists(highlights_dir): raise HTTPException(status_code=404, detail="Results not found") highlights_data = [] try: video = await execute_query("SELECT highlights_json FROM videos WHERE video_id = ?", (video_id,)) if video and video[0].get("highlights_json"): import json highlights_data = json.loads(video[0]["highlights_json"]) except Exception as e: logger.warning(f"Failed to load highlights for {video_id}: {e}") available_formats = [] if os.path.exists(highlights_dir): for f in os.listdir(highlights_dir): available_formats.append(f) return { "highlights": highlights_data, "available_formats": available_formats, } @router.get("/download/{video_id}/{format_type}") async def internal_download(video_id: str, format_type: str, secret: str = Query("")): _verify_secret(secret) from services.range_response import RangeFileResponse if format_type == "gif": path = get_local_highlight_path(video_id, "gif") else: path = get_local_highlight_path(video_id, format_type) if not path or not os.path.exists(path): raise HTTPException(status_code=404, detail=f"File not found: {format_type}") return RangeFileResponse(path) async def _run_internal_analysis(gpu_video_id: str, video_path: str, mode: str, person_query: str = ""): try: set_progress_sync(gpu_video_id, "coarse", 0.0, "GPU 后端开始粗筛分析...") analysis = AnalysisOrchestrator() highlights = await analysis.analyze(video_path, gpu_video_id, mode, person_query=person_query) if highlights: rendering = RenderingOrchestrator() await rendering.render(video_path, gpu_video_id, highlights) set_progress_sync(gpu_video_id, "completed", 1.0, "GPU 加速分析完成!") await update_video_status(gpu_video_id, "completed") else: set_progress_sync(gpu_video_id, "completed", 1.0, "分析完成,未发现高光片段") await update_video_status(gpu_video_id, "completed") except Exception as e: logger.error(f"[Internal] Analysis failed for {gpu_video_id}: {e}", exc_info=True) set_progress_sync(gpu_video_id, "error", 0, f"分析失败: {str(e)}") finally: _internal_tasks.pop(gpu_video_id, None) if os.path.exists(video_path): try: shutil.rmtree(os.path.dirname(video_path)) except Exception: pass