Spaces:
Running
Running
| import os | |
| import asyncio | |
| import logging | |
| import tempfile | |
| import shutil | |
| from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form | |
| from services.gpu_gateway_service import GPU_INTERNAL_SECRET | |
| from services.database_service import update_video_status, execute_query | |
| from services.storage_service import get_local_highlight_path | |
| from services.progress_service import get_latest_progress, set_progress_sync | |
| from orchestrator.analysis_orchestrator import AnalysisOrchestrator | |
| from orchestrator.rendering_orchestrator import RenderingOrchestrator | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/internal", tags=["internal"]) | |
| _internal_tasks = {} | |
| def _verify_secret(secret: str): | |
| expected = GPU_INTERNAL_SECRET | |
| if not expected: | |
| raise HTTPException(status_code=403, detail="Internal API disabled (no secret configured)") | |
| if secret != expected: | |
| raise HTTPException(status_code=403, detail="Invalid secret") | |
| async def internal_submit( | |
| video: UploadFile = File(...), | |
| video_id: str = Form(...), | |
| mode: str = Form("balanced"), | |
| user_id: str = Form("gpu-gateway"), | |
| secret: str = Form(""), | |
| person_query: str = Form(""), | |
| ): | |
| _verify_secret(secret) | |
| tmp_dir = tempfile.mkdtemp() | |
| ext = os.path.splitext(video.filename or "video.mp4")[1] or ".mp4" | |
| video_path = os.path.join(tmp_dir, f"original{ext}") | |
| with open(video_path, "wb") as f: | |
| content = await video.read() | |
| f.write(content) | |
| logger.info(f"[Internal] Received video for analysis: video_id={video_id}, mode={mode}, person_query='{person_query}', size={len(content)} bytes") | |
| gpu_video_id = f"gpu-{video_id}" | |
| task = asyncio.create_task(_run_internal_analysis(gpu_video_id, video_path, mode, person_query)) | |
| _internal_tasks[gpu_video_id] = task | |
| return {"video_id": gpu_video_id, "status": "queued"} | |
| async def internal_progress(video_id: str, secret: str = Query("")): | |
| _verify_secret(secret) | |
| progress = get_latest_progress(video_id) | |
| if progress: | |
| return progress | |
| if video_id in _internal_tasks: | |
| task = _internal_tasks[video_id] | |
| if task.done(): | |
| if task.exception(): | |
| return {"stage": "error", "progress": 0, "message": str(task.exception())} | |
| return {"stage": "completed", "progress": 1.0, "message": "分析完成"} | |
| return {"stage": "processing", "progress": 0.5, "message": "处理中..."} | |
| return {"stage": "unknown", "progress": 0, "message": "未找到任务"} | |
| async def internal_results(video_id: str, secret: str = Query("")): | |
| _verify_secret(secret) | |
| highlights_dir = os.path.join( | |
| os.path.dirname(os.path.dirname(__file__)), "data", "uploads", "highlights", video_id | |
| ) | |
| if not os.path.exists(highlights_dir): | |
| raise HTTPException(status_code=404, detail="Results not found") | |
| highlights_data = [] | |
| try: | |
| video = await execute_query("SELECT highlights_json FROM videos WHERE video_id = ?", (video_id,)) | |
| if video and video[0].get("highlights_json"): | |
| import json | |
| highlights_data = json.loads(video[0]["highlights_json"]) | |
| except Exception as e: | |
| logger.warning(f"Failed to load highlights for {video_id}: {e}") | |
| available_formats = [] | |
| if os.path.exists(highlights_dir): | |
| for f in os.listdir(highlights_dir): | |
| available_formats.append(f) | |
| return { | |
| "highlights": highlights_data, | |
| "available_formats": available_formats, | |
| } | |
| async def internal_download(video_id: str, format_type: str, secret: str = Query("")): | |
| _verify_secret(secret) | |
| from services.range_response import RangeFileResponse | |
| if format_type == "gif": | |
| path = get_local_highlight_path(video_id, "gif") | |
| else: | |
| path = get_local_highlight_path(video_id, format_type) | |
| if not path or not os.path.exists(path): | |
| raise HTTPException(status_code=404, detail=f"File not found: {format_type}") | |
| return RangeFileResponse(path) | |
| async def _run_internal_analysis(gpu_video_id: str, video_path: str, mode: str, person_query: str = ""): | |
| try: | |
| set_progress_sync(gpu_video_id, "coarse", 0.0, "GPU 后端开始粗筛分析...") | |
| analysis = AnalysisOrchestrator() | |
| highlights = await analysis.analyze(video_path, gpu_video_id, mode, person_query=person_query) | |
| if highlights: | |
| rendering = RenderingOrchestrator() | |
| await rendering.render(video_path, gpu_video_id, highlights) | |
| set_progress_sync(gpu_video_id, "completed", 1.0, "GPU 加速分析完成!") | |
| await update_video_status(gpu_video_id, "completed") | |
| else: | |
| set_progress_sync(gpu_video_id, "completed", 1.0, "分析完成,未发现高光片段") | |
| await update_video_status(gpu_video_id, "completed") | |
| except Exception as e: | |
| logger.error(f"[Internal] Analysis failed for {gpu_video_id}: {e}", exc_info=True) | |
| set_progress_sync(gpu_video_id, "error", 0, f"分析失败: {str(e)}") | |
| finally: | |
| _internal_tasks.pop(gpu_video_id, None) | |
| if os.path.exists(video_path): | |
| try: | |
| shutil.rmtree(os.path.dirname(video_path)) | |
| except Exception: | |
| pass | |