basketball-highlight / api /internal.py
hiro1997's picture
Upload api/internal.py with huggingface_hub
5d29e48 verified
Raw
History Blame Contribute Delete
5.45 kB
import os
import asyncio
import logging
import tempfile
import shutil
from fastapi import APIRouter, HTTPException, Query, UploadFile, File, Form
from services.gpu_gateway_service import GPU_INTERNAL_SECRET
from services.database_service import update_video_status, execute_query
from services.storage_service import get_local_highlight_path
from services.progress_service import get_latest_progress, set_progress_sync
from orchestrator.analysis_orchestrator import AnalysisOrchestrator
from orchestrator.rendering_orchestrator import RenderingOrchestrator
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/internal", tags=["internal"])
_internal_tasks = {}
def _verify_secret(secret: str):
expected = GPU_INTERNAL_SECRET
if not expected:
raise HTTPException(status_code=403, detail="Internal API disabled (no secret configured)")
if secret != expected:
raise HTTPException(status_code=403, detail="Invalid secret")
@router.post("/submit")
async def internal_submit(
video: UploadFile = File(...),
video_id: str = Form(...),
mode: str = Form("balanced"),
user_id: str = Form("gpu-gateway"),
secret: str = Form(""),
person_query: str = Form(""),
):
_verify_secret(secret)
tmp_dir = tempfile.mkdtemp()
ext = os.path.splitext(video.filename or "video.mp4")[1] or ".mp4"
video_path = os.path.join(tmp_dir, f"original{ext}")
with open(video_path, "wb") as f:
content = await video.read()
f.write(content)
logger.info(f"[Internal] Received video for analysis: video_id={video_id}, mode={mode}, person_query='{person_query}', size={len(content)} bytes")
gpu_video_id = f"gpu-{video_id}"
task = asyncio.create_task(_run_internal_analysis(gpu_video_id, video_path, mode, person_query))
_internal_tasks[gpu_video_id] = task
return {"video_id": gpu_video_id, "status": "queued"}
@router.get("/progress/{video_id}")
async def internal_progress(video_id: str, secret: str = Query("")):
_verify_secret(secret)
progress = get_latest_progress(video_id)
if progress:
return progress
if video_id in _internal_tasks:
task = _internal_tasks[video_id]
if task.done():
if task.exception():
return {"stage": "error", "progress": 0, "message": str(task.exception())}
return {"stage": "completed", "progress": 1.0, "message": "分析完成"}
return {"stage": "processing", "progress": 0.5, "message": "处理中..."}
return {"stage": "unknown", "progress": 0, "message": "未找到任务"}
@router.get("/results/{video_id}")
async def internal_results(video_id: str, secret: str = Query("")):
_verify_secret(secret)
highlights_dir = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "data", "uploads", "highlights", video_id
)
if not os.path.exists(highlights_dir):
raise HTTPException(status_code=404, detail="Results not found")
highlights_data = []
try:
video = await execute_query("SELECT highlights_json FROM videos WHERE video_id = ?", (video_id,))
if video and video[0].get("highlights_json"):
import json
highlights_data = json.loads(video[0]["highlights_json"])
except Exception as e:
logger.warning(f"Failed to load highlights for {video_id}: {e}")
available_formats = []
if os.path.exists(highlights_dir):
for f in os.listdir(highlights_dir):
available_formats.append(f)
return {
"highlights": highlights_data,
"available_formats": available_formats,
}
@router.get("/download/{video_id}/{format_type}")
async def internal_download(video_id: str, format_type: str, secret: str = Query("")):
_verify_secret(secret)
from services.range_response import RangeFileResponse
if format_type == "gif":
path = get_local_highlight_path(video_id, "gif")
else:
path = get_local_highlight_path(video_id, format_type)
if not path or not os.path.exists(path):
raise HTTPException(status_code=404, detail=f"File not found: {format_type}")
return RangeFileResponse(path)
async def _run_internal_analysis(gpu_video_id: str, video_path: str, mode: str, person_query: str = ""):
try:
set_progress_sync(gpu_video_id, "coarse", 0.0, "GPU 后端开始粗筛分析...")
analysis = AnalysisOrchestrator()
highlights = await analysis.analyze(video_path, gpu_video_id, mode, person_query=person_query)
if highlights:
rendering = RenderingOrchestrator()
await rendering.render(video_path, gpu_video_id, highlights)
set_progress_sync(gpu_video_id, "completed", 1.0, "GPU 加速分析完成!")
await update_video_status(gpu_video_id, "completed")
else:
set_progress_sync(gpu_video_id, "completed", 1.0, "分析完成,未发现高光片段")
await update_video_status(gpu_video_id, "completed")
except Exception as e:
logger.error(f"[Internal] Analysis failed for {gpu_video_id}: {e}", exc_info=True)
set_progress_sync(gpu_video_id, "error", 0, f"分析失败: {str(e)}")
finally:
_internal_tasks.pop(gpu_video_id, None)
if os.path.exists(video_path):
try:
shutil.rmtree(os.path.dirname(video_path))
except Exception:
pass