""" Mission text parser — converts raw operator text into a validated MissionSpecification. Single public function: parse_mission_text(raw_text, detector_key) -> MissionSpecification Internal flow: 1. Fast-path regex check -> skip LLM if comma-separated labels 2. LLM extraction call (GPT-4o, temperature 0.0) 3. Deterministic validation pipeline 4. COCO vocabulary mapping for COCO-only detectors 5. Build RelevanceCriteria deterministically from mapped classes 6. Return validated MissionSpecification or raise MissionParseError """ import json import logging import re from typing import List, Optional from utils.openai_client import chat_completion, extract_content, get_api_key, OpenAIAPIError from coco_classes import COCO_CLASSES, canonicalize_coco_name, coco_class_catalog from utils.schemas import MissionSpecification, RelevanceCriteria logger = logging.getLogger(__name__) # Detectors that only support COCO class vocabulary _COCO_ONLY_DETECTORS = frozenset({"yolo11", "detr_resnet50"}) class MissionParseError(ValueError): """Raised when mission text cannot be parsed into a valid MissionSpecification.""" def __init__(self, message: str, warnings: Optional[List[str]] = None): self.warnings = warnings or [] super().__init__(message) def _is_comma_separated_labels(text: str) -> bool: """Fast-path: detect simple comma-separated class labels (no LLM needed).""" # Match: word tokens separated by commas, each token <= 3 words pattern = r"^[\w\s]+(,\s*[\w\s]+)*$" if not re.match(pattern, text.strip()): return False tokens = [t.strip() for t in text.split(",") if t.strip()] return all(len(t.split()) <= 3 for t in tokens) def _is_coco_only(detector_key: str) -> bool: return detector_key in _COCO_ONLY_DETECTORS def _map_coco_classes( object_classes: List[str], detector_key: str ) -> tuple[List[str], List[str], List[str]]: """Map object classes to COCO vocabulary for COCO-only detectors. Returns: (mapped_classes, unmappable_classes, warnings) """ if not _is_coco_only(detector_key): return object_classes, [], [] mapped = [] unmappable = [] warnings = [] seen = set() for cls in object_classes: canonical = canonicalize_coco_name(cls) if canonical is not None: if canonical not in seen: mapped.append(canonical) seen.add(canonical) if canonical.lower() != cls.lower(): warnings.append( f"'{cls}' mapped to COCO class '{canonical}'." ) else: unmappable.append(cls) warnings.append( f"'{cls}' is not in COCO vocabulary. Will not be detected by {detector_key}." ) return mapped, unmappable, warnings def _build_fast_path_spec( raw_text: str, object_classes: List[str], detector_key: str ) -> MissionSpecification: """Build MissionSpecification for simple comma-separated input (no LLM call).""" mapped, unmappable, warnings = _map_coco_classes(object_classes, detector_key) if _is_coco_only(detector_key) and not mapped: raise MissionParseError( f"None of the requested objects ({', '.join(object_classes)}) match the " f"{detector_key} vocabulary. This detector supports: " f"{coco_class_catalog()}. " f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.", warnings=warnings, ) final_classes = mapped if _is_coco_only(detector_key) else object_classes return MissionSpecification( object_classes=final_classes, mission_intent="DETECT", domain="GENERIC", domain_source="INFERRED", relevance_criteria=RelevanceCriteria( required_classes=final_classes, min_confidence=0.0, ), context_phrases=[], stripped_modifiers=[], operator_text=raw_text, parse_mode="FAST_PATH", parse_confidence="HIGH", parse_warnings=warnings, ) # --- LLM Extraction --- _SYSTEM_PROMPT = ( "You are a mission text parser for an object detection system. Your ONLY job is to extract " "structured fields from operator mission text. You do NOT assess threats. You do NOT reason " "about tactics. You extract and classify.\n\n" "OUTPUT SCHEMA (strict JSON):\n" "{\n" ' "object_classes": ["string"],\n' ' "mission_intent": "ENUM",\n' ' "domain": "ENUM",\n' ' "context_phrases": ["string"],\n' ' "stripped_modifiers": ["string"],\n' ' "parse_confidence": "ENUM",\n' ' "parse_warnings": ["string"]\n' "}\n\n" "EXTRACTION RULES:\n\n" "1. OBJECT_CLASSES — What to extract:\n" " - Extract nouns and noun phrases that refer to PHYSICAL, VISUALLY DETECTABLE objects.\n" " - Keep visual descriptors that narrow the category: 'small boat', 'military vehicle', 'cargo ship'.\n" " - Use singular form: 'vessels' -> 'vessel', 'people' -> 'person'.\n" " - If the input is already comma-separated class labels (e.g., 'person, car, boat'),\n" " use them directly without modification.\n\n" "2. OBJECT_CLASSES — What to strip:\n" " - Remove threat/intent adjectives: 'hostile', 'suspicious', 'friendly', 'dangerous', 'enemy'.\n" " -> Move these to stripped_modifiers.\n" " - Remove action verbs: 'approaching', 'fleeing', 'attacking'.\n" " -> Move the full phrase to context_phrases.\n" " - Remove spatial/temporal phrases: 'from the east', 'near the harbor', 'at night'.\n" " -> Move to context_phrases.\n" " - Do NOT extract abstract concepts: 'threat', 'danger', 'hazard', 'risk' are not objects.\n\n" "3. MISSION_INTENT — Infer from verbs:\n" " - 'detect', 'find', 'locate', 'spot', 'search for' -> DETECT\n" " - 'classify', 'identify', 'determine type of' -> CLASSIFY\n" " - 'track', 'follow', 'monitor movement of' -> TRACK\n" " - 'assess threat', 'evaluate danger', 'threat assessment' -> ASSESS_THREAT\n" " - 'monitor', 'watch', 'observe', 'surveil' -> MONITOR\n" " - If no verb present (bare class list), default to DETECT.\n\n" "4. DOMAIN — Infer from contextual clues:\n" " - Maritime vocabulary (vessel, ship, boat, harbor, naval, maritime, wake, sea) -> NAVAL\n" " - Ground vocabulary (vehicle, convoy, checkpoint, road, building, infantry) -> GROUND\n" " - Aerial vocabulary (aircraft, drone, UAV, airspace, altitude, flight) -> AERIAL\n" " - Urban vocabulary (pedestrian, intersection, storefront, crowd, building) -> URBAN\n" " - If no domain clues present -> GENERIC\n\n" "5. PARSE_CONFIDENCE:\n" " - HIGH: Clear object classes extracted, domain identifiable.\n" " - MEDIUM: Some ambiguity but reasonable extraction possible. Include warnings.\n" " - LOW: Cannot extract meaningful object classes. Input is too abstract,\n" " contradictory, or contains no visual object references.\n" " Examples of LOW: 'keep us safe', 'do your job', 'analyze everything'.\n\n" "FORBIDDEN:\n" "- Do NOT infer object classes not implied by the text. If the text says 'boats',\n" " do not add 'person' or 'vehicle' unless mentioned.\n" "- Do NOT add threat scores, engagement rules, or tactical recommendations.\n" "- Do NOT interpret what 'threat' or 'danger' means in terms of specific objects.\n" " If the operator writes 'detect threats', set parse_confidence to LOW and warn:\n" " \"'threats' is not a visual object class. Specify what objects to detect.\"" ) _VISION_GROUNDING_ADDENDUM = ( "\n\nVISION GROUNDING (when an image is provided):\n" "You may receive the first frame of the operator's video feed as an image.\n" "Use it to REFINE your object_classes extraction:\n\n" "1. If the operator uses a general term (e.g., 'vessels', 'vehicles'),\n" " inspect the image and add MORE SPECIFIC subcategories visible in the scene.\n" " Example: operator says 'detect vessels', image shows a speedboat and a cargo ship\n" " -> object_classes: ['vessel', 'speedboat', 'cargo ship']\n\n" "2. If the operator mentions objects NOT visible in the first frame,\n" " still include them (later frames may contain them), but add a\n" " parse_warning noting they were not visible in the first frame.\n\n" "3. Use the image to CONFIRM or REFINE the domain. If the text is ambiguous\n" " but the image clearly shows open water, set domain to NAVAL.\n\n" "4. Do NOT hallucinate objects. Only add specific subcategories if clearly\n" " identifiable. When uncertain, keep the general term.\n\n" "5. The same OUTPUT SCHEMA and all EXTRACTION RULES still apply.\n" " The image is supplementary context, not a replacement for the text.\n" ) def _extract_and_encode_first_frame(video_path: Optional[str]) -> Optional[str]: """Extract the first frame from a video and return it as a base64-encoded JPEG. Never raises — returns None on any failure so the caller can fall back to text-only parsing. """ if not video_path: return None try: from inference import extract_first_frame from utils.gpt_reasoning import encode_frame_to_b64 frame, _fps, _w, _h = extract_first_frame(video_path) return encode_frame_to_b64(frame, quality=85) except Exception: logger.warning("Failed to extract/encode first frame for vision grounding", exc_info=True) return None def _call_extraction_llm(raw_text: str, detector_key: str, first_frame_b64: Optional[str] = None) -> dict: """Call GPT-4o to extract structured mission fields from natural language.""" if not get_api_key(): raise MissionParseError( "OPENAI_API_KEY not set. Cannot parse natural language mission text. " "Use comma-separated class labels instead (e.g., 'person, car, boat')." ) detector_type = "COCO_ONLY" if _is_coco_only(detector_key) else "OPEN_VOCAB" user_prompt_text = ( f'OPERATOR MISSION TEXT:\n"{raw_text}"\n\n' f"DETECTOR TYPE: {detector_type}\n\n" "Extract the structured mission specification from the above text." ) # Build system prompt (append vision addendum when image is available) system_content = _SYSTEM_PROMPT if first_frame_b64: system_content = _SYSTEM_PROMPT + _VISION_GROUNDING_ADDENDUM # Build user message: mixed content array when image is available, plain string otherwise if first_frame_b64: user_message = { "role": "user", "content": [ {"type": "text", "text": user_prompt_text}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{first_frame_b64}", "detail": "low", }, }, ], } else: user_message = {"role": "user", "content": user_prompt_text} max_tokens = 700 if first_frame_b64 else 500 timeout_s = 45 if first_frame_b64 else 30 payload = { "model": "gpt-4o", "temperature": 0.0, "max_tokens": max_tokens, "response_format": {"type": "json_object"}, "messages": [ {"role": "system", "content": system_content}, user_message, ], } try: resp_data = chat_completion(payload, timeout=timeout_s) content, _refusal = extract_content(resp_data) if not content: raise MissionParseError("GPT returned empty content during mission parsing.") return json.loads(content) except OpenAIAPIError as e: raise MissionParseError(f"Mission parsing API call failed: {e}") except json.JSONDecodeError: raise MissionParseError( "GPT returned invalid JSON. Please rephrase your mission." ) def _validate_and_build( llm_output: dict, raw_text: str, detector_key: str ) -> MissionSpecification: """Deterministic validation pipeline (Section 7.3 decision tree).""" # Step 2: Extract fields with defaults object_classes = llm_output.get("object_classes", []) mission_intent = llm_output.get("mission_intent", "DETECT") domain = llm_output.get("domain", "GENERIC") context_phrases = llm_output.get("context_phrases", []) stripped_modifiers = llm_output.get("stripped_modifiers", []) parse_confidence = llm_output.get("parse_confidence", "LOW") parse_warnings = llm_output.get("parse_warnings", []) # Validate enum values valid_intents = {"DETECT", "CLASSIFY", "TRACK", "ASSESS_THREAT", "MONITOR"} if mission_intent not in valid_intents: mission_intent = "DETECT" parse_warnings.append(f"Invalid mission_intent '{llm_output.get('mission_intent')}', defaulted to DETECT.") valid_domains = {"NAVAL", "GROUND", "AERIAL", "URBAN", "GENERIC"} if domain not in valid_domains: domain = "GENERIC" parse_warnings.append(f"Invalid domain '{llm_output.get('domain')}', defaulted to GENERIC.") valid_confidence = {"HIGH", "MEDIUM", "LOW"} if parse_confidence not in valid_confidence: parse_confidence = "LOW" # Step 3: Parse confidence check if parse_confidence == "LOW": warnings_str = "; ".join(parse_warnings) if parse_warnings else "No details" raise MissionParseError( f"Could not extract object classes from mission text. " f"Warnings: {warnings_str}. " f"Please specify concrete objects to detect (e.g., 'vessel, small boat').", warnings=parse_warnings, ) # Validate object_classes is non-empty if not object_classes: raise MissionParseError( "Mission text produced no detectable object classes. " "Please specify concrete objects to detect.", warnings=parse_warnings, ) # Filter out empty strings object_classes = [c.strip() for c in object_classes if c and c.strip()] if not object_classes: raise MissionParseError( "All extracted object classes were empty after cleanup.", warnings=parse_warnings, ) # Step 4: COCO vocabulary mapping mapped, unmappable, coco_warnings = _map_coco_classes(object_classes, detector_key) parse_warnings.extend(coco_warnings) if _is_coco_only(detector_key): if not mapped: raise MissionParseError( f"None of the requested objects ({', '.join(object_classes)}) match the " f"{detector_key} vocabulary. " f"This detector supports: {coco_class_catalog()}. " f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.", warnings=parse_warnings, ) final_classes = mapped else: final_classes = object_classes # Step 5: Build RelevanceCriteria deterministically relevance_criteria = RelevanceCriteria( required_classes=final_classes, min_confidence=0.0, ) # Step 6: Construct MissionSpecification return MissionSpecification( object_classes=final_classes, mission_intent=mission_intent, domain=domain, domain_source="INFERRED", relevance_criteria=relevance_criteria, # INVARIANT INV-13: context_phrases are forwarded to LLM reasoning layers # (GPT threat assessment, threat chat) as situational context ONLY. # They must NEVER be used in evaluate_relevance(), prioritization, # or any deterministic filtering/sorting logic. context_phrases=context_phrases, stripped_modifiers=stripped_modifiers, operator_text=raw_text, parse_mode="LLM_EXTRACTED", parse_confidence=parse_confidence, parse_warnings=parse_warnings, ) _DOMAIN_BROAD_CATEGORIES: dict[str, List[str]] = { "NAVAL": ["vessel", "ship", "boat", "buoy", "person"], "AERIAL": ["aircraft", "helicopter", "drone", "airplane"], "GROUND": ["vehicle", "car", "truck", "person", "building"], "URBAN": ["person", "vehicle", "car", "bicycle"], "GENERIC": ["object"], } def build_broad_queries( detector_key: str, mission_spec: MissionSpecification ) -> List[str]: """Build broad detector queries for LLM post-filter mode. For FAST_PATH: return object_classes directly (unchanged behavior). For COCO detectors (LLM_EXTRACTED): return ALL 80 COCO classes. For open-vocab detectors (LLM_EXTRACTED): return LLM-extracted classes PLUS broad domain categories to maximize recall. """ if mission_spec.parse_mode == "FAST_PATH": return mission_spec.object_classes # LLM_EXTRACTED path: detect broadly if _is_coco_only(detector_key): # COCO detectors ignore queries anyway (DETR detects all 80; # YOLO11 falls back to all if no matches). Send everything. return list(COCO_CLASSES) # Open-vocab detector (e.g. Grounding DINO): # Combine LLM-extracted classes with domain-specific broad categories broad = list(mission_spec.object_classes) domain_extras = _DOMAIN_BROAD_CATEGORIES.get( mission_spec.domain, _DOMAIN_BROAD_CATEGORIES["GENERIC"] ) seen = {c.lower() for c in broad} for cat in domain_extras: if cat.lower() not in seen: broad.append(cat) seen.add(cat.lower()) logger.info("Broad queries for %s: %s", detector_key, broad) return broad def parse_mission_text( raw_text: str, detector_key: str, video_path: Optional[str] = None, ) -> MissionSpecification: """Parse raw mission text into a validated MissionSpecification. Args: raw_text: Verbatim mission text from the operator. detector_key: Detector model key (determines COCO vocabulary constraints). video_path: Optional path to input video; first frame used for vision grounding. Returns: Validated MissionSpecification. Raises: MissionParseError: If mission text cannot produce a valid specification. """ if not raw_text or not raw_text.strip(): raise MissionParseError( "Mission text is empty. Specify objects to detect or use the default queries." ) raw_text = raw_text.strip() # Fast-path: simple comma-separated labels -> skip LLM if _is_comma_separated_labels(raw_text): object_classes = [t.strip() for t in raw_text.split(",") if t.strip()] logger.info( "Mission fast-path: comma-separated labels %s", object_classes ) return _build_fast_path_spec(raw_text, object_classes, detector_key) # LLM path: natural language mission text logger.info("Mission LLM-path: extracting from natural language") first_frame_b64 = _extract_and_encode_first_frame(video_path) if first_frame_b64: logger.info("Vision grounding: first frame encoded for LLM call") llm_output = _call_extraction_llm(raw_text, detector_key, first_frame_b64=first_frame_b64) logger.info("Mission LLM extraction result: %s", llm_output) mission_spec = _validate_and_build(llm_output, raw_text, detector_key) logger.info( "Mission parsed: classes=%s intent=%s domain=%s(%s) confidence=%s", mission_spec.object_classes, mission_spec.mission_intent, mission_spec.domain, mission_spec.domain_source, mission_spec.parse_confidence, ) return mission_spec