from __future__ import annotations from abc import ABC, abstractmethod from typing import Any from app.llm_client import chat_completion_json, safe_json_loads from app.schemas import AgentResult, Finding from app.prompts import build_agent_prompt from app.video_payload import VideoPayload class BaseAgent(ABC): def __init__(self, model: str): self.model = model @abstractmethod def agent_name(self) -> str: raise NotImplementedError def build_messages(self, video_payload: VideoPayload) -> list[dict[str, Any]]: content = [ { "type": "video_url", "video_url": { "url": video_payload.value }, "fps": video_payload.fps, }, { "type": "text", "text": build_agent_prompt(self.agent_name()), }, ] return [{"role": "user", "content": content}] def run(self, client, video_payload: VideoPayload) -> AgentResult: messages = self.build_messages(video_payload) raw_text = chat_completion_json(client=client, model=self.model, messages=messages) try: payload = safe_json_loads(raw_text) findings = [Finding(**f) for f in payload.get("findings", [])] clip_level_summary = payload.get("clip_level_summary", "") except Exception: findings = [] clip_level_summary = "模型输出解析失败!!!" # 这里做的主要是补齐 clip_start_sec / clip_end_sec # 当前整段视频输入,没有显式分段,因此先统一设为 0 # 真正的时间信息我仍然还是以 findings 里的 start_sec / end_sec 为准 return AgentResult( agent_name=self.agent_name(), clip_start_sec=0.0, clip_end_sec=0.0, findings=findings, clip_level_summary=clip_level_summary, raw_text=raw_text, )