Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import traceback | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from threading import Lock | |
| from typing import Any, Dict, Iterator, List, Optional, Tuple | |
| SCRIPT_DIR = Path(__file__).resolve().parent | |
| PROJECT_DIR = SCRIPT_DIR.parent | |
| if str(PROJECT_DIR) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_DIR)) | |
| COLLECTOR_PROMPT = """You are the Collector for a public video reasoning demo. | |
| Watch the short video and write a concise factual summary that helps answer the user question. | |
| Focus on visible actions, objects, scene changes, and any obvious temporal order. | |
| Do not answer the question directly.""" | |
| PLANNER_PROMPT = """You are the Planner for a public video reasoning demo. | |
| Decide whether the question needs a focused time span from the video. | |
| Return valid JSON only: | |
| {"use_grounder": true or false, "grounding_query": "short retrieval query", "reason": "short reason"}""" | |
| GROUNDER_PROMPT = """You are a lightweight textual Grounder. | |
| Identify the most relevant time span in the video for the question. | |
| Return valid JSON only: | |
| {"start_sec": number, "end_sec": number, "reason": "short reason"} | |
| Rules: | |
| - Use seconds from the start of the video. | |
| - The span must be short and useful. | |
| - If unsure, choose a slightly broader span. | |
| - Do not return markdown.""" | |
| ANSWER_PROMPT = """You are the Answerer for a public video reasoning demo. | |
| Use the video and the context summary to answer the user question. | |
| Return a short explanation and put the final answer inside <answer>...</answer>.""" | |
| REVIEW_PROMPT = """You are a lightweight Reviewer. | |
| Judge whether the answer seems well supported by the visible video evidence. | |
| Return valid JSON only: | |
| {"confidence": "low" | "medium" | "high", "review": "one short paragraph"}""" | |
| def extract_json_object(text: str) -> Dict[str, Any]: | |
| match = re.search(r"(\{.*\})", text, re.DOTALL) | |
| if not match: | |
| return {} | |
| try: | |
| return json.loads(match.group(1)) | |
| except json.JSONDecodeError: | |
| return {} | |
| def extract_answer(text: str) -> str: | |
| match = re.search(r"<answer>\s*(.*?)\s*</answer>", text, re.DOTALL) | |
| return match.group(1).strip() if match else text.strip() | |
| def format_messages_markdown(title: str, messages: List[Dict[str, Any]]) -> str: | |
| payload = json.dumps(messages, ensure_ascii=False, indent=2) | |
| return f"### {title}\n```json\n{payload}\n```" | |
| def probe_video(video_path: str) -> Dict[str, Any]: | |
| import av | |
| info = { | |
| "duration_sec": 0.0, | |
| "width": 0, | |
| "height": 0, | |
| "size_mb": round(os.path.getsize(video_path) / (1024 * 1024), 2), | |
| } | |
| with av.open(video_path) as container: | |
| info["duration_sec"] = round((container.duration or 0) / 1_000_000.0, 2) | |
| video_stream = next((stream for stream in container.streams if stream.type == "video"), None) | |
| if video_stream is not None: | |
| info["width"] = int(video_stream.width or 0) | |
| info["height"] = int(video_stream.height or 0) | |
| return info | |
| def trim_video_ffmpeg(video_path: str, start: float, end: float, output_path: str) -> str: | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-ss", | |
| f"{max(start, 0.0):.2f}", | |
| "-to", | |
| f"{max(end, start + 0.1):.2f}", | |
| "-i", | |
| video_path, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "veryfast", | |
| "-crf", | |
| "28", | |
| "-c:a", | |
| "aac", | |
| output_path, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(result.stderr.strip() or "ffmpeg trim failed") | |
| return output_path | |
| def normalize_span(span: Dict[str, Any], duration_sec: float, max_window_sec: float) -> Optional[Tuple[float, float]]: | |
| try: | |
| start = float(span.get("start_sec")) | |
| end = float(span.get("end_sec")) | |
| except (TypeError, ValueError): | |
| return None | |
| if duration_sec <= 0: | |
| return None | |
| start = max(0.0, min(start, duration_sec)) | |
| end = max(0.0, min(end, duration_sec)) | |
| if end <= start: | |
| return None | |
| if end - start > max_window_sec: | |
| end = min(duration_sec, start + max_window_sec) | |
| if end <= start: | |
| return None | |
| return round(start, 2), round(end, 2) | |
| def build_demo_query(question: str, sample: Optional[Dict[str, Any]] = None) -> str: | |
| full_question = question.strip() | |
| if sample and sample.get("problem", "").strip() == full_question: | |
| problem_type = sample.get("problem_type", "free-form") | |
| if problem_type in {"multiple choice", "emer_ov_mc"}: | |
| options = sample.get("options") or [] | |
| if options: | |
| full_question = f"{full_question}\nOptions:\n" + "\n".join(options) | |
| return full_question | |
| def _import_torch(): | |
| import torch | |
| return torch | |
| def _import_vision_helpers(): | |
| try: | |
| from qwen_vl_utils import process_vision_info | |
| return process_vision_info | |
| except ImportError: | |
| from videomind.dataset.utils import process_vision_info | |
| return process_vision_info | |
| class RuntimeConfig: | |
| model_id: str = "Qwen/Qwen2-VL-2B-Instruct" | |
| backend_mode: str = "cpu_fallback" | |
| artifact_root: str = "/tmp/intentbench_space_artifacts" | |
| local_device: str = "cpu" | |
| remote_inference_url: str = "" | |
| remote_api_key: str = "" | |
| cpu_max_duration_sec: float = 8.0 | |
| cpu_max_frames: int = 8 | |
| cpu_fps: float = 1.0 | |
| cpu_max_pixels: int = 128 * 28 * 28 | |
| cpu_max_size_mb: float = 80.0 | |
| cpu_max_edge: int = 1400 | |
| local_max_duration_sec: float = 30.0 | |
| local_max_frames: int = 24 | |
| local_fps: float = 1.0 | |
| local_max_pixels: int = 256 * 28 * 28 | |
| max_new_tokens_collector: int = 96 | |
| max_new_tokens_planner: int = 64 | |
| max_new_tokens_grounder: int = 48 | |
| max_new_tokens_answer: int = 96 | |
| max_new_tokens_review: int = 64 | |
| max_grounded_window_sec: float = 12.0 | |
| def from_env(cls) -> "RuntimeConfig": | |
| return cls( | |
| model_id=os.environ.get("MODEL_ID", "Qwen/Qwen2-VL-2B-Instruct"), | |
| backend_mode=os.environ.get("INFERENCE_BACKEND", "cpu_fallback"), | |
| artifact_root=os.environ.get("INTENTBENCH_DEMO_ARTIFACT_ROOT", "/tmp/intentbench_space_artifacts"), | |
| local_device=os.environ.get("LOCAL_DEVICE", "cpu"), | |
| remote_inference_url=os.environ.get("REMOTE_INFERENCE_URL", ""), | |
| remote_api_key=os.environ.get("REMOTE_API_KEY", ""), | |
| cpu_max_duration_sec=float(os.environ.get("CPU_MAX_DURATION_SEC", "8")), | |
| cpu_max_frames=int(os.environ.get("CPU_MAX_FRAMES", "8")), | |
| cpu_fps=float(os.environ.get("CPU_FPS", "1.0")), | |
| cpu_max_pixels=int(os.environ.get("CPU_MAX_PIXELS", str(128 * 28 * 28))), | |
| cpu_max_size_mb=float(os.environ.get("CPU_MAX_SIZE_MB", "80")), | |
| cpu_max_edge=int(os.environ.get("CPU_MAX_EDGE", "1400")), | |
| local_max_duration_sec=float(os.environ.get("LOCAL_MAX_DURATION_SEC", "30")), | |
| local_max_frames=int(os.environ.get("LOCAL_MAX_FRAMES", "24")), | |
| local_fps=float(os.environ.get("LOCAL_FPS", "1.0")), | |
| local_max_pixels=int(os.environ.get("LOCAL_MAX_PIXELS", str(256 * 28 * 28))), | |
| max_new_tokens_collector=int(os.environ.get("MAX_NEW_TOKENS_COLLECTOR", "96")), | |
| max_new_tokens_planner=int(os.environ.get("MAX_NEW_TOKENS_PLANNER", "64")), | |
| max_new_tokens_grounder=int(os.environ.get("MAX_NEW_TOKENS_GROUNDER", "48")), | |
| max_new_tokens_answer=int(os.environ.get("MAX_NEW_TOKENS_ANSWER", "96")), | |
| max_new_tokens_review=int(os.environ.get("MAX_NEW_TOKENS_REVIEW", "64")), | |
| max_grounded_window_sec=float(os.environ.get("MAX_GROUNDED_WINDOW_SEC", "12")), | |
| ) | |
| def backend_label(self) -> str: | |
| mapping = { | |
| "cpu_fallback": "CPU fallback", | |
| "local_gpu": "Local GPU", | |
| "remote_api": "Remote API", | |
| "disabled": "Disabled", | |
| } | |
| return mapping.get(self.backend_mode, self.backend_mode) | |
| def cpu_limits_text(self) -> str: | |
| return ( | |
| f"max {self.cpu_max_duration_sec:.0f}s video, " | |
| f"{self.cpu_fps:.1f} fps, " | |
| f"{self.cpu_max_frames} frames, " | |
| f"{self.cpu_max_size_mb:.0f} MB, " | |
| f"edge <= {self.cpu_max_edge}px" | |
| ) | |
| class InferenceBackend: | |
| mode = "disabled" | |
| def __init__(self, config: RuntimeConfig): | |
| self.config = config | |
| def describe(self) -> Dict[str, str]: | |
| return { | |
| "mode": self.mode, | |
| "title": self.config.backend_label(), | |
| "message": "Inference is not configured.", | |
| } | |
| def run_pipeline( | |
| self, | |
| video_path: str, | |
| question: str, | |
| grounder_mode: str, | |
| sample: Optional[Dict[str, Any]] = None, | |
| ) -> Iterator[Dict[str, Any]]: | |
| yield { | |
| "stage": "error", | |
| "message": "Inference backend is unavailable.", | |
| "backend_mode": self.mode, | |
| "traceback": "", | |
| } | |
| class DisabledBackend(InferenceBackend): | |
| mode = "disabled" | |
| def describe(self) -> Dict[str, str]: | |
| return { | |
| "mode": self.mode, | |
| "title": "UI-only mode", | |
| "message": "The public page is online, but real inference is disabled until GPU grant or a remote backend is connected.", | |
| } | |
| def run_pipeline(self, video_path: str, question: str, grounder_mode: str, sample: Optional[Dict[str, Any]] = None): | |
| yield { | |
| "stage": "status", | |
| "message": self.describe()["message"], | |
| "backend_mode": self.mode, | |
| "warnings": ["Switch to remote_api or local_gpu to enable real inference."], | |
| } | |
| yield { | |
| "stage": "done", | |
| "message": "UI-only mode complete.", | |
| "backend_mode": self.mode, | |
| "final_answer": "Inference is currently unavailable on this deployment.", | |
| "review_summary": "No model execution happened.", | |
| } | |
| class RemoteAPIBackend(InferenceBackend): | |
| mode = "remote_api" | |
| def describe(self) -> Dict[str, str]: | |
| if not self.config.remote_inference_url: | |
| return { | |
| "mode": self.mode, | |
| "title": "Remote API", | |
| "message": "Remote mode is selected, but REMOTE_INFERENCE_URL is not configured.", | |
| } | |
| return { | |
| "mode": self.mode, | |
| "title": "Remote API", | |
| "message": "The Space UI is live and forwards inference to an external GPU service.", | |
| } | |
| def run_pipeline(self, video_path: str, question: str, grounder_mode: str, sample: Optional[Dict[str, Any]] = None): | |
| import requests | |
| if not self.config.remote_inference_url: | |
| yield { | |
| "stage": "error", | |
| "message": "REMOTE_INFERENCE_URL is missing.", | |
| "backend_mode": self.mode, | |
| "traceback": "", | |
| } | |
| return | |
| yield { | |
| "stage": "status", | |
| "message": "Sending request to remote inference service...", | |
| "backend_mode": self.mode, | |
| } | |
| headers = {} | |
| if self.config.remote_api_key: | |
| headers["Authorization"] = f"Bearer {self.config.remote_api_key}" | |
| with open(video_path, "rb") as handle: | |
| response = requests.post( | |
| self.config.remote_inference_url, | |
| headers=headers, | |
| data={ | |
| "question": question, | |
| "grounder_mode": grounder_mode, | |
| "sample": json.dumps(sample or {}, ensure_ascii=False), | |
| }, | |
| files={"video": (Path(video_path).name, handle, "video/mp4")}, | |
| timeout=600, | |
| ) | |
| response.raise_for_status() | |
| payload = response.json() | |
| if payload.get("collector_summary"): | |
| yield { | |
| "stage": "collector", | |
| "message": "Collector finished via remote API.", | |
| "backend_mode": self.mode, | |
| "collector_summary": payload.get("collector_summary", ""), | |
| "collector_raw": payload.get("collector_raw", payload.get("collector_summary", "")), | |
| "raw_prompt": payload.get("collector_prompt", ""), | |
| } | |
| if payload.get("planner_decision"): | |
| yield { | |
| "stage": "planner", | |
| "message": "Planner finished via remote API.", | |
| "backend_mode": self.mode, | |
| "planner_decision": payload.get("planner_decision", ""), | |
| "planner_raw": payload.get("planner_raw", payload.get("planner_decision", "")), | |
| "grounder_span_text": payload.get("grounder_span", ""), | |
| "raw_prompt": payload.get("planner_prompt", ""), | |
| } | |
| if payload.get("grounder_span"): | |
| yield { | |
| "stage": "grounder", | |
| "message": "Grounder finished via remote API.", | |
| "backend_mode": self.mode, | |
| "grounder_span_text": payload.get("grounder_span", ""), | |
| "grounder_raw": payload.get("grounder_raw", payload.get("grounder_span", "")), | |
| "grounded_video": payload.get("highlight_clip_path"), | |
| "raw_prompt": payload.get("grounder_prompt", ""), | |
| } | |
| yield { | |
| "stage": "answer", | |
| "message": "Answer received from remote API.", | |
| "backend_mode": self.mode, | |
| "final_answer": payload.get("final_answer", ""), | |
| "answer_raw": payload.get("answer_raw", payload.get("final_answer", "")), | |
| "raw_prompt": payload.get("answer_prompt", ""), | |
| } | |
| yield { | |
| "stage": "review", | |
| "message": "Review received from remote API.", | |
| "backend_mode": self.mode, | |
| "review_summary": payload.get("review_summary", ""), | |
| "review_raw": payload.get("review_raw", payload.get("review_summary", "")), | |
| "raw_prompt": payload.get("review_prompt", ""), | |
| } | |
| yield { | |
| "stage": "done", | |
| "message": "Remote inference completed.", | |
| "backend_mode": self.mode, | |
| "final_answer": payload.get("final_answer", ""), | |
| "review_summary": payload.get("review_summary", ""), | |
| "grounded_video": payload.get("highlight_clip_path"), | |
| } | |
| class LocalQwenVLBackend(InferenceBackend): | |
| mode = "local" | |
| def __init__(self, config: RuntimeConfig): | |
| super().__init__(config) | |
| self.model = None | |
| self.processor = None | |
| self.process_vision_info = None | |
| self.torch = None | |
| self.device = config.local_device | |
| self.dtype = None | |
| self._load_lock = Lock() | |
| self._run_lock = Lock() | |
| self.artifact_root = Path(config.artifact_root) | |
| self.artifact_root.mkdir(parents=True, exist_ok=True) | |
| def describe(self) -> Dict[str, str]: | |
| return { | |
| "mode": self.mode, | |
| "title": self.config.backend_label(), | |
| "message": f"Single-model Qwen2-VL pipeline on {self.device}.", | |
| } | |
| def ensure_loaded(self) -> None: | |
| if self.model is not None and self.processor is not None: | |
| return | |
| with self._load_lock: | |
| if self.model is not None and self.processor is not None: | |
| return | |
| self.torch = _import_torch() | |
| from transformers import AutoProcessor, Qwen2VLForConditionalGeneration | |
| self.process_vision_info = _import_vision_helpers() | |
| if self.device.startswith("cuda") and self.torch.cuda.is_available(): | |
| self.dtype = self.torch.bfloat16 | |
| else: | |
| self.device = "cpu" | |
| self.dtype = self.torch.float32 | |
| kwargs: Dict[str, Any] = { | |
| "torch_dtype": self.dtype, | |
| "low_cpu_mem_usage": True, | |
| } | |
| self.model = Qwen2VLForConditionalGeneration.from_pretrained(self.config.model_id, **kwargs) | |
| self.model = self.model.to(self.device) | |
| self.model.eval() | |
| self.processor = AutoProcessor.from_pretrained(self.config.model_id) | |
| def _make_run_dir(self) -> Path: | |
| run_dir = self.artifact_root / f"run_{uuid.uuid4().hex[:8]}" | |
| run_dir.mkdir(parents=True, exist_ok=True) | |
| return run_dir | |
| def _processor_inputs( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| ) -> Dict[str, Any]: | |
| chat_text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = self.process_vision_info(messages) | |
| inputs = self.processor( | |
| text=[chat_text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| for key, value in inputs.items(): | |
| if hasattr(value, "to"): | |
| inputs[key] = value.to(self.device) | |
| if getattr(inputs[key], "is_floating_point", lambda: False)(): | |
| inputs[key] = inputs[key].to(self.dtype) | |
| return inputs | |
| def _generate( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| max_new_tokens: int, | |
| ) -> str: | |
| inputs = self._processor_inputs(messages) | |
| with self.torch.inference_mode(): | |
| output_ids = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| ) | |
| generated = output_ids[0][inputs["input_ids"].size(1):] | |
| return self.processor.decode(generated, skip_special_tokens=True) | |
| def _video_messages( | |
| self, | |
| video_path: str, | |
| prompt: str, | |
| fps: float, | |
| max_frames: int, | |
| max_pixels: int, | |
| ) -> List[Dict[str, Any]]: | |
| return [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "video", | |
| "video": video_path, | |
| "fps": fps, | |
| "max_frames": max_frames, | |
| "max_pixels": max_pixels, | |
| }, | |
| {"type": "text", "text": prompt}, | |
| ], | |
| } | |
| ] | |
| def _run_collector(self, video_path: str, question: str, fps: float, max_frames: int, max_pixels: int): | |
| prompt = f"{COLLECTOR_PROMPT}\n\nUser question: {question}" | |
| messages = self._video_messages(video_path, prompt, fps, max_frames, max_pixels) | |
| output = self._generate(messages, self.config.max_new_tokens_collector) | |
| return output, messages | |
| def _run_planner( | |
| self, | |
| video_path: str, | |
| question: str, | |
| collector_summary: str, | |
| fps: float, | |
| max_frames: int, | |
| max_pixels: int, | |
| ): | |
| prompt = ( | |
| f"{PLANNER_PROMPT}\n\n" | |
| f"User question: {question}\n\n" | |
| f"Collector summary:\n{collector_summary}" | |
| ) | |
| messages = self._video_messages(video_path, prompt, fps, max_frames, max_pixels) | |
| output = self._generate(messages, self.config.max_new_tokens_planner) | |
| return extract_json_object(output), output, messages | |
| def _run_grounder( | |
| self, | |
| video_path: str, | |
| question: str, | |
| grounding_query: str, | |
| fps: float, | |
| max_frames: int, | |
| max_pixels: int, | |
| ): | |
| prompt = ( | |
| f"{GROUNDER_PROMPT}\n\n" | |
| f"User question: {question}\n" | |
| f"Grounding query: {grounding_query or question}" | |
| ) | |
| messages = self._video_messages(video_path, prompt, fps, max_frames, max_pixels) | |
| output = self._generate(messages, self.config.max_new_tokens_grounder) | |
| return extract_json_object(output), output, messages | |
| def _run_answer( | |
| self, | |
| video_path: str, | |
| question: str, | |
| collector_summary: str, | |
| fps: float, | |
| max_frames: int, | |
| max_pixels: int, | |
| ): | |
| prompt = ( | |
| f"{ANSWER_PROMPT}\n\n" | |
| f"User question: {build_demo_query(question)}\n\n" | |
| f"Collector summary:\n{collector_summary}" | |
| ) | |
| messages = self._video_messages(video_path, prompt, fps, max_frames, max_pixels) | |
| output = self._generate(messages, self.config.max_new_tokens_answer) | |
| return output, messages | |
| def _run_review( | |
| self, | |
| video_path: str, | |
| question: str, | |
| answer_text: str, | |
| fps: float, | |
| max_frames: int, | |
| max_pixels: int, | |
| ): | |
| prompt = ( | |
| f"{REVIEW_PROMPT}\n\n" | |
| f"User question: {question}\n\n" | |
| f"Candidate answer:\n{answer_text}" | |
| ) | |
| messages = self._video_messages(video_path, prompt, fps, max_frames, max_pixels) | |
| output = self._generate(messages, self.config.max_new_tokens_review) | |
| return extract_json_object(output), output, messages | |
| class CPUFallbackBackend(LocalQwenVLBackend): | |
| mode = "cpu_fallback" | |
| def describe(self) -> Dict[str, str]: | |
| return { | |
| "mode": self.mode, | |
| "title": "CPU fallback", | |
| "message": f"Very short clips only. Limits: {self.config.cpu_limits_text()}", | |
| } | |
| def _check_limits(self, video_info: Dict[str, Any]) -> List[str]: | |
| issues = [] | |
| if video_info["duration_sec"] > self.config.cpu_max_duration_sec: | |
| issues.append( | |
| f"Video is {video_info['duration_sec']:.2f}s; CPU fallback only supports up to {self.config.cpu_max_duration_sec:.0f}s." | |
| ) | |
| if video_info["size_mb"] > self.config.cpu_max_size_mb: | |
| issues.append( | |
| f"Video is {video_info['size_mb']:.1f} MB; CPU fallback only supports up to {self.config.cpu_max_size_mb:.0f} MB." | |
| ) | |
| if max(video_info["width"], video_info["height"]) > self.config.cpu_max_edge: | |
| issues.append( | |
| f"Video edge is {max(video_info['width'], video_info['height'])} px; CPU fallback only supports up to {self.config.cpu_max_edge}px." | |
| ) | |
| return issues | |
| def run_pipeline( | |
| self, | |
| video_path: str, | |
| question: str, | |
| grounder_mode: str, | |
| sample: Optional[Dict[str, Any]] = None, | |
| ) -> Iterator[Dict[str, Any]]: | |
| if not video_path: | |
| yield {"stage": "error", "message": "A video is required.", "backend_mode": self.mode, "traceback": ""} | |
| return | |
| if not question.strip(): | |
| yield {"stage": "error", "message": "A question is required.", "backend_mode": self.mode, "traceback": ""} | |
| return | |
| with self._run_lock: | |
| run_dir = self._make_run_dir() | |
| try: | |
| info = probe_video(video_path) | |
| issues = self._check_limits(info) | |
| yield { | |
| "stage": "status", | |
| "message": f"CPU fallback mode active. Video: {info['duration_sec']:.2f}s, {info['width']}x{info['height']}, {info['size_mb']} MB.", | |
| "backend_mode": self.mode, | |
| "warnings": issues, | |
| } | |
| if issues: | |
| yield { | |
| "stage": "error", | |
| "message": "CPU fallback limits exceeded.", | |
| "backend_mode": self.mode, | |
| "traceback": "\n".join(issues), | |
| } | |
| return | |
| self.ensure_loaded() | |
| collector_summary, collector_messages = self._run_collector( | |
| video_path, | |
| question, | |
| self.config.cpu_fps, | |
| self.config.cpu_max_frames, | |
| self.config.cpu_max_pixels, | |
| ) | |
| yield { | |
| "stage": "collector", | |
| "message": "Collector finished.", | |
| "backend_mode": self.mode, | |
| "collector_summary": collector_summary, | |
| "collector_raw": collector_summary, | |
| "raw_prompt": format_messages_markdown("Collector Input", collector_messages), | |
| } | |
| planner_json, planner_raw, planner_messages = self._run_planner( | |
| video_path, | |
| question, | |
| collector_summary, | |
| self.config.cpu_fps, | |
| self.config.cpu_max_frames, | |
| self.config.cpu_max_pixels, | |
| ) | |
| use_grounder = bool(planner_json.get("use_grounder")) | |
| if grounder_mode == "Always On": | |
| use_grounder = True | |
| elif grounder_mode == "Off": | |
| use_grounder = False | |
| grounding_query = str(planner_json.get("grounding_query", "")).strip() | |
| planner_text = ( | |
| f"Use Grounder: {use_grounder}\n" | |
| f"Grounding Query: {grounding_query or '-'}\n" | |
| f"Reason: {planner_json.get('reason', '-')}" | |
| ) | |
| yield { | |
| "stage": "planner", | |
| "message": "Planner finished.", | |
| "backend_mode": self.mode, | |
| "planner_decision": planner_text, | |
| "planner_raw": planner_raw, | |
| "raw_prompt": format_messages_markdown("Planner Input", planner_messages), | |
| } | |
| generation_video = video_path | |
| grounded_span_text = "Grounder skipped." | |
| grounded_video = None | |
| if use_grounder: | |
| span_json, grounder_raw, grounder_messages = self._run_grounder( | |
| video_path, | |
| question, | |
| grounding_query or question, | |
| self.config.cpu_fps, | |
| self.config.cpu_max_frames, | |
| self.config.cpu_max_pixels, | |
| ) | |
| normalized = normalize_span(span_json, info["duration_sec"], self.config.max_grounded_window_sec) | |
| if normalized: | |
| grounded_video = str(run_dir / "highlight.mp4") | |
| trim_video_ffmpeg(video_path, normalized[0], normalized[1], grounded_video) | |
| generation_video = grounded_video | |
| grounded_span_text = ( | |
| f"[{normalized[0]:.2f}s, {normalized[1]:.2f}s] " | |
| f"- {span_json.get('reason', 'no reason provided')}" | |
| ) | |
| else: | |
| grounded_span_text = f"Grounder returned an invalid span.\n\nRaw:\n{grounder_raw}" | |
| yield { | |
| "stage": "grounder", | |
| "message": "Grounder finished.", | |
| "backend_mode": self.mode, | |
| "grounder_span_text": grounded_span_text, | |
| "grounder_raw": grounder_raw, | |
| "grounded_video": grounded_video, | |
| "raw_prompt": format_messages_markdown("Grounder Input", grounder_messages), | |
| } | |
| else: | |
| yield { | |
| "stage": "grounder", | |
| "message": "Grounder skipped.", | |
| "backend_mode": self.mode, | |
| "grounder_span_text": grounded_span_text, | |
| "grounder_raw": grounded_span_text, | |
| "grounded_video": None, | |
| } | |
| answer_raw, answer_messages = self._run_answer( | |
| generation_video, | |
| question, | |
| collector_summary, | |
| self.config.cpu_fps, | |
| self.config.cpu_max_frames, | |
| self.config.cpu_max_pixels, | |
| ) | |
| final_answer = extract_answer(answer_raw) | |
| yield { | |
| "stage": "answer", | |
| "message": "Answer finished.", | |
| "backend_mode": self.mode, | |
| "final_answer": final_answer, | |
| "answer_raw": answer_raw, | |
| "grounded_video": grounded_video, | |
| "raw_prompt": format_messages_markdown("Answer Input", answer_messages), | |
| } | |
| review_json, review_raw, review_messages = self._run_review( | |
| generation_video, | |
| question, | |
| answer_raw, | |
| self.config.cpu_fps, | |
| self.config.cpu_max_frames, | |
| self.config.cpu_max_pixels, | |
| ) | |
| review_summary = ( | |
| f"Confidence: {review_json.get('confidence', 'unknown')}\n\n" | |
| f"{review_json.get('review', review_raw)}" | |
| ) | |
| yield { | |
| "stage": "review", | |
| "message": "Review finished.", | |
| "backend_mode": self.mode, | |
| "review_summary": review_summary, | |
| "review_raw": review_raw, | |
| "raw_prompt": format_messages_markdown("Review Input", review_messages), | |
| } | |
| yield { | |
| "stage": "done", | |
| "message": "CPU fallback pipeline completed.", | |
| "backend_mode": self.mode, | |
| "final_answer": final_answer, | |
| "review_summary": review_summary, | |
| "grounded_video": grounded_video, | |
| } | |
| except Exception as exc: | |
| yield { | |
| "stage": "error", | |
| "message": f"{type(exc).__name__}: {exc}", | |
| "backend_mode": self.mode, | |
| "traceback": traceback.format_exc(), | |
| } | |
| class LocalGPUBackend(CPUFallbackBackend): | |
| mode = "local_gpu" | |
| def __init__(self, config: RuntimeConfig): | |
| super().__init__(config) | |
| self.device = config.local_device | |
| def describe(self) -> Dict[str, str]: | |
| return { | |
| "mode": self.mode, | |
| "title": "Local GPU", | |
| "message": "Local GPU mode is enabled for the single-model Qwen2-VL pipeline.", | |
| } | |
| def _check_limits(self, video_info: Dict[str, Any]) -> List[str]: | |
| issues = [] | |
| if video_info["duration_sec"] > self.config.local_max_duration_sec: | |
| issues.append( | |
| f"Video is {video_info['duration_sec']:.2f}s; local mode supports up to {self.config.local_max_duration_sec:.0f}s by default." | |
| ) | |
| return issues | |
| def run_pipeline(self, video_path: str, question: str, grounder_mode: str, sample: Optional[Dict[str, Any]] = None): | |
| old_fps = self.config.cpu_fps | |
| old_frames = self.config.cpu_max_frames | |
| old_pixels = self.config.cpu_max_pixels | |
| self.config.cpu_fps = self.config.local_fps | |
| self.config.cpu_max_frames = self.config.local_max_frames | |
| self.config.cpu_max_pixels = self.config.local_max_pixels | |
| try: | |
| for event in super().run_pipeline(video_path, question, grounder_mode, sample): | |
| event["backend_mode"] = self.mode | |
| yield event | |
| finally: | |
| self.config.cpu_fps = old_fps | |
| self.config.cpu_max_frames = old_frames | |
| self.config.cpu_max_pixels = old_pixels | |
| class IntentBenchDemoRuntime: | |
| def __init__(self, config: RuntimeConfig): | |
| self.config = config | |
| self.backends = { | |
| "disabled": DisabledBackend(config), | |
| "remote_api": RemoteAPIBackend(config), | |
| "cpu_fallback": CPUFallbackBackend(config), | |
| "local_gpu": LocalGPUBackend(config), | |
| } | |
| def shutdown(self) -> None: | |
| return None | |
| def describe_backend(self) -> Dict[str, str]: | |
| backend = self.backends.get(self.config.backend_mode, self.backends["disabled"]) | |
| return backend.describe() | |
| def run_pipeline( | |
| self, | |
| video_path: str, | |
| question: str, | |
| grounder_mode: str = "Auto", | |
| sample: Optional[Dict[str, Any]] = None, | |
| ) -> Iterator[Dict[str, Any]]: | |
| backend = self.backends.get(self.config.backend_mode, self.backends["disabled"]) | |
| return backend.run_pipeline(video_path, question, grounder_mode, sample) | |