Spaces:
Paused
Paused
| """ | |
| Mission text parser — converts raw operator text into a validated MissionSpecification. | |
| Single public function: parse_mission_text(raw_text, detector_key) -> MissionSpecification | |
| Internal flow: | |
| 1. Fast-path regex check -> skip LLM if comma-separated labels | |
| 2. LLM extraction call (GPT-4o, temperature 0.0) | |
| 3. Deterministic validation pipeline | |
| 4. COCO vocabulary mapping for COCO-only detectors | |
| 5. Build RelevanceCriteria deterministically from mapped classes | |
| 6. Return validated MissionSpecification or raise MissionParseError | |
| """ | |
| import json | |
| import logging | |
| import re | |
| from typing import List, Optional | |
| from utils.openai_client import chat_completion, extract_content, get_api_key, OpenAIAPIError | |
| from coco_classes import COCO_CLASSES, canonicalize_coco_name, coco_class_catalog | |
| from utils.schemas import MissionSpecification, RelevanceCriteria | |
| logger = logging.getLogger(__name__) | |
| # Detectors that only support COCO class vocabulary | |
| _COCO_ONLY_DETECTORS = frozenset({"yolo11", "detr_resnet50"}) | |
| class MissionParseError(ValueError): | |
| """Raised when mission text cannot be parsed into a valid MissionSpecification.""" | |
| def __init__(self, message: str, warnings: Optional[List[str]] = None): | |
| self.warnings = warnings or [] | |
| super().__init__(message) | |
| def _is_comma_separated_labels(text: str) -> bool: | |
| """Fast-path: detect simple comma-separated class labels (no LLM needed).""" | |
| # Match: word tokens separated by commas, each token <= 3 words | |
| pattern = r"^[\w\s]+(,\s*[\w\s]+)*$" | |
| if not re.match(pattern, text.strip()): | |
| return False | |
| tokens = [t.strip() for t in text.split(",") if t.strip()] | |
| return all(len(t.split()) <= 3 for t in tokens) | |
| def _is_coco_only(detector_key: str) -> bool: | |
| return detector_key in _COCO_ONLY_DETECTORS | |
| def _map_coco_classes( | |
| object_classes: List[str], detector_key: str | |
| ) -> tuple[List[str], List[str], List[str]]: | |
| """Map object classes to COCO vocabulary for COCO-only detectors. | |
| Returns: | |
| (mapped_classes, unmappable_classes, warnings) | |
| """ | |
| if not _is_coco_only(detector_key): | |
| return object_classes, [], [] | |
| mapped = [] | |
| unmappable = [] | |
| warnings = [] | |
| seen = set() | |
| for cls in object_classes: | |
| canonical = canonicalize_coco_name(cls) | |
| if canonical is not None: | |
| if canonical not in seen: | |
| mapped.append(canonical) | |
| seen.add(canonical) | |
| if canonical.lower() != cls.lower(): | |
| warnings.append( | |
| f"'{cls}' mapped to COCO class '{canonical}'." | |
| ) | |
| else: | |
| unmappable.append(cls) | |
| warnings.append( | |
| f"'{cls}' is not in COCO vocabulary. Will not be detected by {detector_key}." | |
| ) | |
| return mapped, unmappable, warnings | |
| def _build_fast_path_spec( | |
| raw_text: str, object_classes: List[str], detector_key: str | |
| ) -> MissionSpecification: | |
| """Build MissionSpecification for simple comma-separated input (no LLM call).""" | |
| mapped, unmappable, warnings = _map_coco_classes(object_classes, detector_key) | |
| if _is_coco_only(detector_key) and not mapped: | |
| raise MissionParseError( | |
| f"None of the requested objects ({', '.join(object_classes)}) match the " | |
| f"{detector_key} vocabulary. This detector supports: " | |
| f"{coco_class_catalog()}. " | |
| f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.", | |
| warnings=warnings, | |
| ) | |
| final_classes = mapped if _is_coco_only(detector_key) else object_classes | |
| return MissionSpecification( | |
| object_classes=final_classes, | |
| mission_intent="DETECT", | |
| domain="GENERIC", | |
| domain_source="INFERRED", | |
| relevance_criteria=RelevanceCriteria( | |
| required_classes=final_classes, | |
| min_confidence=0.0, | |
| ), | |
| context_phrases=[], | |
| stripped_modifiers=[], | |
| operator_text=raw_text, | |
| parse_mode="FAST_PATH", | |
| parse_confidence="HIGH", | |
| parse_warnings=warnings, | |
| ) | |
| # --- LLM Extraction --- | |
| _SYSTEM_PROMPT = ( | |
| "You are a mission text parser for an object detection system. Your ONLY job is to extract " | |
| "structured fields from operator mission text. You do NOT assess threats. You do NOT reason " | |
| "about tactics. You extract and classify.\n\n" | |
| "OUTPUT SCHEMA (strict JSON):\n" | |
| "{\n" | |
| ' "object_classes": ["string"],\n' | |
| ' "mission_intent": "ENUM",\n' | |
| ' "domain": "ENUM",\n' | |
| ' "context_phrases": ["string"],\n' | |
| ' "stripped_modifiers": ["string"],\n' | |
| ' "parse_confidence": "ENUM",\n' | |
| ' "parse_warnings": ["string"]\n' | |
| "}\n\n" | |
| "EXTRACTION RULES:\n\n" | |
| "1. OBJECT_CLASSES — What to extract:\n" | |
| " - Extract nouns and noun phrases that refer to PHYSICAL, VISUALLY DETECTABLE objects.\n" | |
| " - Keep visual descriptors that narrow the category: 'small boat', 'military vehicle', 'cargo ship'.\n" | |
| " - Use singular form: 'vessels' -> 'vessel', 'people' -> 'person'.\n" | |
| " - If the input is already comma-separated class labels (e.g., 'person, car, boat'),\n" | |
| " use them directly without modification.\n\n" | |
| "2. OBJECT_CLASSES — What to strip:\n" | |
| " - Remove threat/intent adjectives: 'hostile', 'suspicious', 'friendly', 'dangerous', 'enemy'.\n" | |
| " -> Move these to stripped_modifiers.\n" | |
| " - Remove action verbs: 'approaching', 'fleeing', 'attacking'.\n" | |
| " -> Move the full phrase to context_phrases.\n" | |
| " - Remove spatial/temporal phrases: 'from the east', 'near the harbor', 'at night'.\n" | |
| " -> Move to context_phrases.\n" | |
| " - Do NOT extract abstract concepts: 'threat', 'danger', 'hazard', 'risk' are not objects.\n\n" | |
| "3. MISSION_INTENT — Infer from verbs:\n" | |
| " - 'detect', 'find', 'locate', 'spot', 'search for' -> DETECT\n" | |
| " - 'classify', 'identify', 'determine type of' -> CLASSIFY\n" | |
| " - 'track', 'follow', 'monitor movement of' -> TRACK\n" | |
| " - 'assess threat', 'evaluate danger', 'threat assessment' -> ASSESS_THREAT\n" | |
| " - 'monitor', 'watch', 'observe', 'surveil' -> MONITOR\n" | |
| " - If no verb present (bare class list), default to DETECT.\n\n" | |
| "4. DOMAIN — Infer from contextual clues:\n" | |
| " - Maritime vocabulary (vessel, ship, boat, harbor, naval, maritime, wake, sea) -> NAVAL\n" | |
| " - Ground vocabulary (vehicle, convoy, checkpoint, road, building, infantry) -> GROUND\n" | |
| " - Aerial vocabulary (aircraft, drone, UAV, airspace, altitude, flight) -> AERIAL\n" | |
| " - Urban vocabulary (pedestrian, intersection, storefront, crowd, building) -> URBAN\n" | |
| " - If no domain clues present -> GENERIC\n\n" | |
| "5. PARSE_CONFIDENCE:\n" | |
| " - HIGH: Clear object classes extracted, domain identifiable.\n" | |
| " - MEDIUM: Some ambiguity but reasonable extraction possible. Include warnings.\n" | |
| " - LOW: Cannot extract meaningful object classes. Input is too abstract,\n" | |
| " contradictory, or contains no visual object references.\n" | |
| " Examples of LOW: 'keep us safe', 'do your job', 'analyze everything'.\n\n" | |
| "FORBIDDEN:\n" | |
| "- Do NOT infer object classes not implied by the text. If the text says 'boats',\n" | |
| " do not add 'person' or 'vehicle' unless mentioned.\n" | |
| "- Do NOT add threat scores, engagement rules, or tactical recommendations.\n" | |
| "- Do NOT interpret what 'threat' or 'danger' means in terms of specific objects.\n" | |
| " If the operator writes 'detect threats', set parse_confidence to LOW and warn:\n" | |
| " \"'threats' is not a visual object class. Specify what objects to detect.\"" | |
| ) | |
| _VISION_GROUNDING_ADDENDUM = ( | |
| "\n\nVISION GROUNDING (when an image is provided):\n" | |
| "You may receive the first frame of the operator's video feed as an image.\n" | |
| "Use it to REFINE your object_classes extraction:\n\n" | |
| "1. If the operator uses a general term (e.g., 'vessels', 'vehicles'),\n" | |
| " inspect the image and add MORE SPECIFIC subcategories visible in the scene.\n" | |
| " Example: operator says 'detect vessels', image shows a speedboat and a cargo ship\n" | |
| " -> object_classes: ['vessel', 'speedboat', 'cargo ship']\n\n" | |
| "2. If the operator mentions objects NOT visible in the first frame,\n" | |
| " still include them (later frames may contain them), but add a\n" | |
| " parse_warning noting they were not visible in the first frame.\n\n" | |
| "3. Use the image to CONFIRM or REFINE the domain. If the text is ambiguous\n" | |
| " but the image clearly shows open water, set domain to NAVAL.\n\n" | |
| "4. Do NOT hallucinate objects. Only add specific subcategories if clearly\n" | |
| " identifiable. When uncertain, keep the general term.\n\n" | |
| "5. The same OUTPUT SCHEMA and all EXTRACTION RULES still apply.\n" | |
| " The image is supplementary context, not a replacement for the text.\n" | |
| ) | |
| def _extract_and_encode_first_frame(video_path: Optional[str]) -> Optional[str]: | |
| """Extract the first frame from a video and return it as a base64-encoded JPEG. | |
| Never raises — returns None on any failure so the caller can fall back | |
| to text-only parsing. | |
| """ | |
| if not video_path: | |
| return None | |
| try: | |
| from inference import extract_first_frame | |
| from utils.gpt_reasoning import encode_frame_to_b64 | |
| frame, _fps, _w, _h = extract_first_frame(video_path) | |
| return encode_frame_to_b64(frame, quality=85) | |
| except Exception: | |
| logger.warning("Failed to extract/encode first frame for vision grounding", exc_info=True) | |
| return None | |
| def _call_extraction_llm(raw_text: str, detector_key: str, first_frame_b64: Optional[str] = None) -> dict: | |
| """Call GPT-4o to extract structured mission fields from natural language.""" | |
| if not get_api_key(): | |
| raise MissionParseError( | |
| "OPENAI_API_KEY not set. Cannot parse natural language mission text. " | |
| "Use comma-separated class labels instead (e.g., 'person, car, boat')." | |
| ) | |
| detector_type = "COCO_ONLY" if _is_coco_only(detector_key) else "OPEN_VOCAB" | |
| user_prompt_text = ( | |
| f'OPERATOR MISSION TEXT:\n"{raw_text}"\n\n' | |
| f"DETECTOR TYPE: {detector_type}\n\n" | |
| "Extract the structured mission specification from the above text." | |
| ) | |
| # Build system prompt (append vision addendum when image is available) | |
| system_content = _SYSTEM_PROMPT | |
| if first_frame_b64: | |
| system_content = _SYSTEM_PROMPT + _VISION_GROUNDING_ADDENDUM | |
| # Build user message: mixed content array when image is available, plain string otherwise | |
| if first_frame_b64: | |
| user_message = { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": user_prompt_text}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{first_frame_b64}", | |
| "detail": "low", | |
| }, | |
| }, | |
| ], | |
| } | |
| else: | |
| user_message = {"role": "user", "content": user_prompt_text} | |
| max_tokens = 700 if first_frame_b64 else 500 | |
| timeout_s = 45 if first_frame_b64 else 30 | |
| payload = { | |
| "model": "gpt-4o", | |
| "temperature": 0.0, | |
| "max_tokens": max_tokens, | |
| "response_format": {"type": "json_object"}, | |
| "messages": [ | |
| {"role": "system", "content": system_content}, | |
| user_message, | |
| ], | |
| } | |
| try: | |
| resp_data = chat_completion(payload, timeout=timeout_s) | |
| content, _refusal = extract_content(resp_data) | |
| if not content: | |
| raise MissionParseError("GPT returned empty content during mission parsing.") | |
| return json.loads(content) | |
| except OpenAIAPIError as e: | |
| raise MissionParseError(f"Mission parsing API call failed: {e}") | |
| except json.JSONDecodeError: | |
| raise MissionParseError( | |
| "GPT returned invalid JSON. Please rephrase your mission." | |
| ) | |
| def _validate_and_build( | |
| llm_output: dict, raw_text: str, detector_key: str | |
| ) -> MissionSpecification: | |
| """Deterministic validation pipeline (Section 7.3 decision tree).""" | |
| # Step 2: Extract fields with defaults | |
| object_classes = llm_output.get("object_classes", []) | |
| mission_intent = llm_output.get("mission_intent", "DETECT") | |
| domain = llm_output.get("domain", "GENERIC") | |
| context_phrases = llm_output.get("context_phrases", []) | |
| stripped_modifiers = llm_output.get("stripped_modifiers", []) | |
| parse_confidence = llm_output.get("parse_confidence", "LOW") | |
| parse_warnings = llm_output.get("parse_warnings", []) | |
| # Validate enum values | |
| valid_intents = {"DETECT", "CLASSIFY", "TRACK", "ASSESS_THREAT", "MONITOR"} | |
| if mission_intent not in valid_intents: | |
| mission_intent = "DETECT" | |
| parse_warnings.append(f"Invalid mission_intent '{llm_output.get('mission_intent')}', defaulted to DETECT.") | |
| valid_domains = {"NAVAL", "GROUND", "AERIAL", "URBAN", "GENERIC"} | |
| if domain not in valid_domains: | |
| domain = "GENERIC" | |
| parse_warnings.append(f"Invalid domain '{llm_output.get('domain')}', defaulted to GENERIC.") | |
| valid_confidence = {"HIGH", "MEDIUM", "LOW"} | |
| if parse_confidence not in valid_confidence: | |
| parse_confidence = "LOW" | |
| # Step 3: Parse confidence check | |
| if parse_confidence == "LOW": | |
| warnings_str = "; ".join(parse_warnings) if parse_warnings else "No details" | |
| raise MissionParseError( | |
| f"Could not extract object classes from mission text. " | |
| f"Warnings: {warnings_str}. " | |
| f"Please specify concrete objects to detect (e.g., 'vessel, small boat').", | |
| warnings=parse_warnings, | |
| ) | |
| # Validate object_classes is non-empty | |
| if not object_classes: | |
| raise MissionParseError( | |
| "Mission text produced no detectable object classes. " | |
| "Please specify concrete objects to detect.", | |
| warnings=parse_warnings, | |
| ) | |
| # Filter out empty strings | |
| object_classes = [c.strip() for c in object_classes if c and c.strip()] | |
| if not object_classes: | |
| raise MissionParseError( | |
| "All extracted object classes were empty after cleanup.", | |
| warnings=parse_warnings, | |
| ) | |
| # Step 4: COCO vocabulary mapping | |
| mapped, unmappable, coco_warnings = _map_coco_classes(object_classes, detector_key) | |
| parse_warnings.extend(coco_warnings) | |
| if _is_coco_only(detector_key): | |
| if not mapped: | |
| raise MissionParseError( | |
| f"None of the requested objects ({', '.join(object_classes)}) match the " | |
| f"{detector_key} vocabulary. " | |
| f"This detector supports: {coco_class_catalog()}. " | |
| f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.", | |
| warnings=parse_warnings, | |
| ) | |
| final_classes = mapped | |
| else: | |
| final_classes = object_classes | |
| # Step 5: Build RelevanceCriteria deterministically | |
| relevance_criteria = RelevanceCriteria( | |
| required_classes=final_classes, | |
| min_confidence=0.0, | |
| ) | |
| # Step 6: Construct MissionSpecification | |
| return MissionSpecification( | |
| object_classes=final_classes, | |
| mission_intent=mission_intent, | |
| domain=domain, | |
| domain_source="INFERRED", | |
| relevance_criteria=relevance_criteria, | |
| # INVARIANT INV-13: context_phrases are forwarded to LLM reasoning layers | |
| # (GPT threat assessment, threat chat) as situational context ONLY. | |
| # They must NEVER be used in evaluate_relevance(), prioritization, | |
| # or any deterministic filtering/sorting logic. | |
| context_phrases=context_phrases, | |
| stripped_modifiers=stripped_modifiers, | |
| operator_text=raw_text, | |
| parse_mode="LLM_EXTRACTED", | |
| parse_confidence=parse_confidence, | |
| parse_warnings=parse_warnings, | |
| ) | |
| _DOMAIN_BROAD_CATEGORIES: dict[str, List[str]] = { | |
| "NAVAL": ["vessel", "ship", "boat", "buoy", "person"], | |
| "AERIAL": ["aircraft", "helicopter", "drone", "airplane"], | |
| "GROUND": ["vehicle", "car", "truck", "person", "building"], | |
| "URBAN": ["person", "vehicle", "car", "bicycle"], | |
| "GENERIC": ["object"], | |
| } | |
| def build_broad_queries( | |
| detector_key: str, mission_spec: MissionSpecification | |
| ) -> List[str]: | |
| """Build broad detector queries for LLM post-filter mode. | |
| For FAST_PATH: return object_classes directly (unchanged behavior). | |
| For COCO detectors (LLM_EXTRACTED): return ALL 80 COCO classes. | |
| For open-vocab detectors (LLM_EXTRACTED): return LLM-extracted classes | |
| PLUS broad domain categories to maximize recall. | |
| """ | |
| if mission_spec.parse_mode == "FAST_PATH": | |
| return mission_spec.object_classes | |
| # LLM_EXTRACTED path: detect broadly | |
| if _is_coco_only(detector_key): | |
| # COCO detectors ignore queries anyway (DETR detects all 80; | |
| # YOLO11 falls back to all if no matches). Send everything. | |
| return list(COCO_CLASSES) | |
| # Open-vocab detector (e.g. Grounding DINO): | |
| # Combine LLM-extracted classes with domain-specific broad categories | |
| broad = list(mission_spec.object_classes) | |
| domain_extras = _DOMAIN_BROAD_CATEGORIES.get( | |
| mission_spec.domain, _DOMAIN_BROAD_CATEGORIES["GENERIC"] | |
| ) | |
| seen = {c.lower() for c in broad} | |
| for cat in domain_extras: | |
| if cat.lower() not in seen: | |
| broad.append(cat) | |
| seen.add(cat.lower()) | |
| logger.info("Broad queries for %s: %s", detector_key, broad) | |
| return broad | |
| def parse_mission_text( | |
| raw_text: str, | |
| detector_key: str, | |
| video_path: Optional[str] = None, | |
| ) -> MissionSpecification: | |
| """Parse raw mission text into a validated MissionSpecification. | |
| Args: | |
| raw_text: Verbatim mission text from the operator. | |
| detector_key: Detector model key (determines COCO vocabulary constraints). | |
| video_path: Optional path to input video; first frame used for vision grounding. | |
| Returns: | |
| Validated MissionSpecification. | |
| Raises: | |
| MissionParseError: If mission text cannot produce a valid specification. | |
| """ | |
| if not raw_text or not raw_text.strip(): | |
| raise MissionParseError( | |
| "Mission text is empty. Specify objects to detect or use the default queries." | |
| ) | |
| raw_text = raw_text.strip() | |
| # Fast-path: simple comma-separated labels -> skip LLM | |
| if _is_comma_separated_labels(raw_text): | |
| object_classes = [t.strip() for t in raw_text.split(",") if t.strip()] | |
| logger.info( | |
| "Mission fast-path: comma-separated labels %s", object_classes | |
| ) | |
| return _build_fast_path_spec(raw_text, object_classes, detector_key) | |
| # LLM path: natural language mission text | |
| logger.info("Mission LLM-path: extracting from natural language") | |
| first_frame_b64 = _extract_and_encode_first_frame(video_path) | |
| if first_frame_b64: | |
| logger.info("Vision grounding: first frame encoded for LLM call") | |
| llm_output = _call_extraction_llm(raw_text, detector_key, first_frame_b64=first_frame_b64) | |
| logger.info("Mission LLM extraction result: %s", llm_output) | |
| mission_spec = _validate_and_build(llm_output, raw_text, detector_key) | |
| logger.info( | |
| "Mission parsed: classes=%s intent=%s domain=%s(%s) confidence=%s", | |
| mission_spec.object_classes, | |
| mission_spec.mission_intent, | |
| mission_spec.domain, | |
| mission_spec.domain_source, | |
| mission_spec.parse_confidence, | |
| ) | |
| return mission_spec | |