File size: 19,714 Bytes
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb6e650
 
a2ca6f9
 
 
 
 
 
f89fa0b
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
624478a
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98a4ff6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb6e650
a2ca6f9
98a4ff6
bb6e650
98a4ff6
 
 
 
 
 
a2ca6f9
bb6e650
a2ca6f9
 
 
 
 
 
 
98a4ff6
a2ca6f9
 
 
 
 
98a4ff6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca6f9
 
 
98a4ff6
a2ca6f9
 
98a4ff6
 
a2ca6f9
 
 
 
bb6e650
 
a2ca6f9
 
 
 
 
bb6e650
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
624478a
a2ca6f9
 
 
 
 
624478a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0719ba5
624478a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2ca6f9
 
 
98a4ff6
a2ca6f9
 
 
 
 
 
98a4ff6
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98a4ff6
 
 
 
a2ca6f9
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
"""
Mission text parser β€” converts raw operator text into a validated MissionSpecification.

Single public function: parse_mission_text(raw_text, detector_key) -> MissionSpecification

Internal flow:
1. Fast-path regex check -> skip LLM if comma-separated labels
2. LLM extraction call (GPT-4o, temperature 0.0)
3. Deterministic validation pipeline
4. COCO vocabulary mapping for COCO-only detectors
5. Build RelevanceCriteria deterministically from mapped classes
6. Return validated MissionSpecification or raise MissionParseError
"""

import json
import logging
import re
from typing import List, Optional

from utils.openai_client import chat_completion, extract_content, get_api_key, OpenAIAPIError

from coco_classes import COCO_CLASSES, canonicalize_coco_name, coco_class_catalog
from utils.schemas import MissionSpecification, RelevanceCriteria

logger = logging.getLogger(__name__)

# Detectors that only support COCO class vocabulary
_COCO_ONLY_DETECTORS = frozenset({"yolo11", "detr_resnet50"})


class MissionParseError(ValueError):
    """Raised when mission text cannot be parsed into a valid MissionSpecification."""
    def __init__(self, message: str, warnings: Optional[List[str]] = None):
        self.warnings = warnings or []
        super().__init__(message)


def _is_comma_separated_labels(text: str) -> bool:
    """Fast-path: detect simple comma-separated class labels (no LLM needed)."""
    # Match: word tokens separated by commas, each token <= 3 words
    pattern = r"^[\w\s]+(,\s*[\w\s]+)*$"
    if not re.match(pattern, text.strip()):
        return False
    tokens = [t.strip() for t in text.split(",") if t.strip()]
    return all(len(t.split()) <= 3 for t in tokens)


def _is_coco_only(detector_key: str) -> bool:
    return detector_key in _COCO_ONLY_DETECTORS


def _map_coco_classes(
    object_classes: List[str], detector_key: str
) -> tuple[List[str], List[str], List[str]]:
    """Map object classes to COCO vocabulary for COCO-only detectors.

    Returns:
        (mapped_classes, unmappable_classes, warnings)
    """
    if not _is_coco_only(detector_key):
        return object_classes, [], []

    mapped = []
    unmappable = []
    warnings = []
    seen = set()

    for cls in object_classes:
        canonical = canonicalize_coco_name(cls)
        if canonical is not None:
            if canonical not in seen:
                mapped.append(canonical)
                seen.add(canonical)
            if canonical.lower() != cls.lower():
                warnings.append(
                    f"'{cls}' mapped to COCO class '{canonical}'."
                )
        else:
            unmappable.append(cls)
            warnings.append(
                f"'{cls}' is not in COCO vocabulary. Will not be detected by {detector_key}."
            )

    return mapped, unmappable, warnings


def _build_fast_path_spec(
    raw_text: str, object_classes: List[str], detector_key: str
) -> MissionSpecification:
    """Build MissionSpecification for simple comma-separated input (no LLM call)."""
    mapped, unmappable, warnings = _map_coco_classes(object_classes, detector_key)

    if _is_coco_only(detector_key) and not mapped:
        raise MissionParseError(
            f"None of the requested objects ({', '.join(object_classes)}) match the "
            f"{detector_key} vocabulary. This detector supports: "
            f"{coco_class_catalog()}. "
            f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.",
            warnings=warnings,
        )

    final_classes = mapped if _is_coco_only(detector_key) else object_classes

    return MissionSpecification(
        object_classes=final_classes,
        mission_intent="DETECT",
        domain="GENERIC",
        domain_source="INFERRED",
        relevance_criteria=RelevanceCriteria(
            required_classes=final_classes,
            min_confidence=0.0,
        ),
        context_phrases=[],
        stripped_modifiers=[],
        operator_text=raw_text,
        parse_mode="FAST_PATH",
        parse_confidence="HIGH",
        parse_warnings=warnings,
    )


# --- LLM Extraction ---

_SYSTEM_PROMPT = (
    "You are a mission text parser for an object detection system. Your ONLY job is to extract "
    "structured fields from operator mission text. You do NOT assess threats. You do NOT reason "
    "about tactics. You extract and classify.\n\n"
    "OUTPUT SCHEMA (strict JSON):\n"
    "{\n"
    '  "object_classes": ["string"],\n'
    '  "mission_intent": "ENUM",\n'
    '  "domain": "ENUM",\n'
    '  "context_phrases": ["string"],\n'
    '  "stripped_modifiers": ["string"],\n'
    '  "parse_confidence": "ENUM",\n'
    '  "parse_warnings": ["string"]\n'
    "}\n\n"
    "EXTRACTION RULES:\n\n"
    "1. OBJECT_CLASSES β€” What to extract:\n"
    "   - Extract nouns and noun phrases that refer to PHYSICAL, VISUALLY DETECTABLE objects.\n"
    "   - Keep visual descriptors that narrow the category: 'small boat', 'military vehicle', 'cargo ship'.\n"
    "   - Use singular form: 'vessels' -> 'vessel', 'people' -> 'person'.\n"
    "   - If the input is already comma-separated class labels (e.g., 'person, car, boat'),\n"
    "     use them directly without modification.\n\n"
    "2. OBJECT_CLASSES β€” What to strip:\n"
    "   - Remove threat/intent adjectives: 'hostile', 'suspicious', 'friendly', 'dangerous', 'enemy'.\n"
    "     -> Move these to stripped_modifiers.\n"
    "   - Remove action verbs: 'approaching', 'fleeing', 'attacking'.\n"
    "     -> Move the full phrase to context_phrases.\n"
    "   - Remove spatial/temporal phrases: 'from the east', 'near the harbor', 'at night'.\n"
    "     -> Move to context_phrases.\n"
    "   - Do NOT extract abstract concepts: 'threat', 'danger', 'hazard', 'risk' are not objects.\n\n"
    "3. MISSION_INTENT β€” Infer from verbs:\n"
    "   - 'detect', 'find', 'locate', 'spot', 'search for' -> DETECT\n"
    "   - 'classify', 'identify', 'determine type of' -> CLASSIFY\n"
    "   - 'track', 'follow', 'monitor movement of' -> TRACK\n"
    "   - 'assess threat', 'evaluate danger', 'threat assessment' -> ASSESS_THREAT\n"
    "   - 'monitor', 'watch', 'observe', 'surveil' -> MONITOR\n"
    "   - If no verb present (bare class list), default to DETECT.\n\n"
    "4. DOMAIN β€” Infer from contextual clues:\n"
    "   - Maritime vocabulary (vessel, ship, boat, harbor, naval, maritime, wake, sea) -> NAVAL\n"
    "   - Ground vocabulary (vehicle, convoy, checkpoint, road, building, infantry) -> GROUND\n"
    "   - Aerial vocabulary (aircraft, drone, UAV, airspace, altitude, flight) -> AERIAL\n"
    "   - Urban vocabulary (pedestrian, intersection, storefront, crowd, building) -> URBAN\n"
    "   - If no domain clues present -> GENERIC\n\n"
    "5. PARSE_CONFIDENCE:\n"
    "   - HIGH: Clear object classes extracted, domain identifiable.\n"
    "   - MEDIUM: Some ambiguity but reasonable extraction possible. Include warnings.\n"
    "   - LOW: Cannot extract meaningful object classes. Input is too abstract,\n"
    "     contradictory, or contains no visual object references.\n"
    "     Examples of LOW: 'keep us safe', 'do your job', 'analyze everything'.\n\n"
    "FORBIDDEN:\n"
    "- Do NOT infer object classes not implied by the text. If the text says 'boats',\n"
    "  do not add 'person' or 'vehicle' unless mentioned.\n"
    "- Do NOT add threat scores, engagement rules, or tactical recommendations.\n"
    "- Do NOT interpret what 'threat' or 'danger' means in terms of specific objects.\n"
    "  If the operator writes 'detect threats', set parse_confidence to LOW and warn:\n"
    "  \"'threats' is not a visual object class. Specify what objects to detect.\""
)

_VISION_GROUNDING_ADDENDUM = (
    "\n\nVISION GROUNDING (when an image is provided):\n"
    "You may receive the first frame of the operator's video feed as an image.\n"
    "Use it to REFINE your object_classes extraction:\n\n"
    "1. If the operator uses a general term (e.g., 'vessels', 'vehicles'),\n"
    "   inspect the image and add MORE SPECIFIC subcategories visible in the scene.\n"
    "   Example: operator says 'detect vessels', image shows a speedboat and a cargo ship\n"
    "   -> object_classes: ['vessel', 'speedboat', 'cargo ship']\n\n"
    "2. If the operator mentions objects NOT visible in the first frame,\n"
    "   still include them (later frames may contain them), but add a\n"
    "   parse_warning noting they were not visible in the first frame.\n\n"
    "3. Use the image to CONFIRM or REFINE the domain. If the text is ambiguous\n"
    "   but the image clearly shows open water, set domain to NAVAL.\n\n"
    "4. Do NOT hallucinate objects. Only add specific subcategories if clearly\n"
    "   identifiable. When uncertain, keep the general term.\n\n"
    "5. The same OUTPUT SCHEMA and all EXTRACTION RULES still apply.\n"
    "   The image is supplementary context, not a replacement for the text.\n"
)


def _extract_and_encode_first_frame(video_path: Optional[str]) -> Optional[str]:
    """Extract the first frame from a video and return it as a base64-encoded JPEG.

    Never raises β€” returns None on any failure so the caller can fall back
    to text-only parsing.
    """
    if not video_path:
        return None
    try:
        from inference import extract_first_frame
        from utils.gpt_reasoning import encode_frame_to_b64

        frame, _fps, _w, _h = extract_first_frame(video_path)
        return encode_frame_to_b64(frame, quality=85)
    except Exception:
        logger.warning("Failed to extract/encode first frame for vision grounding", exc_info=True)
        return None


def _call_extraction_llm(raw_text: str, detector_key: str, first_frame_b64: Optional[str] = None) -> dict:
    """Call GPT-4o to extract structured mission fields from natural language."""
    if not get_api_key():
        raise MissionParseError(
            "OPENAI_API_KEY not set. Cannot parse natural language mission text. "
            "Use comma-separated class labels instead (e.g., 'person, car, boat')."
        )

    detector_type = "COCO_ONLY" if _is_coco_only(detector_key) else "OPEN_VOCAB"

    user_prompt_text = (
        f'OPERATOR MISSION TEXT:\n"{raw_text}"\n\n'
        f"DETECTOR TYPE: {detector_type}\n\n"
        "Extract the structured mission specification from the above text."
    )

    # Build system prompt (append vision addendum when image is available)
    system_content = _SYSTEM_PROMPT
    if first_frame_b64:
        system_content = _SYSTEM_PROMPT + _VISION_GROUNDING_ADDENDUM

    # Build user message: mixed content array when image is available, plain string otherwise
    if first_frame_b64:
        user_message = {
            "role": "user",
            "content": [
                {"type": "text", "text": user_prompt_text},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{first_frame_b64}",
                        "detail": "low",
                    },
                },
            ],
        }
    else:
        user_message = {"role": "user", "content": user_prompt_text}

    max_tokens = 700 if first_frame_b64 else 500
    timeout_s = 45 if first_frame_b64 else 30

    payload = {
        "model": "gpt-4o",
        "temperature": 0.0,
        "max_tokens": max_tokens,
        "response_format": {"type": "json_object"},
        "messages": [
            {"role": "system", "content": system_content},
            user_message,
        ],
    }

    try:
        resp_data = chat_completion(payload, timeout=timeout_s)
        content, _refusal = extract_content(resp_data)
        if not content:
            raise MissionParseError("GPT returned empty content during mission parsing.")

        return json.loads(content)

    except OpenAIAPIError as e:
        raise MissionParseError(f"Mission parsing API call failed: {e}")
    except json.JSONDecodeError:
        raise MissionParseError(
            "GPT returned invalid JSON. Please rephrase your mission."
        )


def _validate_and_build(
    llm_output: dict, raw_text: str, detector_key: str
) -> MissionSpecification:
    """Deterministic validation pipeline (Section 7.3 decision tree)."""

    # Step 2: Extract fields with defaults
    object_classes = llm_output.get("object_classes", [])
    mission_intent = llm_output.get("mission_intent", "DETECT")
    domain = llm_output.get("domain", "GENERIC")
    context_phrases = llm_output.get("context_phrases", [])
    stripped_modifiers = llm_output.get("stripped_modifiers", [])
    parse_confidence = llm_output.get("parse_confidence", "LOW")
    parse_warnings = llm_output.get("parse_warnings", [])

    # Validate enum values
    valid_intents = {"DETECT", "CLASSIFY", "TRACK", "ASSESS_THREAT", "MONITOR"}
    if mission_intent not in valid_intents:
        mission_intent = "DETECT"
        parse_warnings.append(f"Invalid mission_intent '{llm_output.get('mission_intent')}', defaulted to DETECT.")

    valid_domains = {"NAVAL", "GROUND", "AERIAL", "URBAN", "GENERIC"}
    if domain not in valid_domains:
        domain = "GENERIC"
        parse_warnings.append(f"Invalid domain '{llm_output.get('domain')}', defaulted to GENERIC.")

    valid_confidence = {"HIGH", "MEDIUM", "LOW"}
    if parse_confidence not in valid_confidence:
        parse_confidence = "LOW"

    # Step 3: Parse confidence check
    if parse_confidence == "LOW":
        warnings_str = "; ".join(parse_warnings) if parse_warnings else "No details"
        raise MissionParseError(
            f"Could not extract object classes from mission text. "
            f"Warnings: {warnings_str}. "
            f"Please specify concrete objects to detect (e.g., 'vessel, small boat').",
            warnings=parse_warnings,
        )

    # Validate object_classes is non-empty
    if not object_classes:
        raise MissionParseError(
            "Mission text produced no detectable object classes. "
            "Please specify concrete objects to detect.",
            warnings=parse_warnings,
        )

    # Filter out empty strings
    object_classes = [c.strip() for c in object_classes if c and c.strip()]
    if not object_classes:
        raise MissionParseError(
            "All extracted object classes were empty after cleanup.",
            warnings=parse_warnings,
        )

    # Step 4: COCO vocabulary mapping
    mapped, unmappable, coco_warnings = _map_coco_classes(object_classes, detector_key)
    parse_warnings.extend(coco_warnings)

    if _is_coco_only(detector_key):
        if not mapped:
            raise MissionParseError(
                f"None of the requested objects ({', '.join(object_classes)}) match the "
                f"{detector_key} vocabulary. "
                f"This detector supports: {coco_class_catalog()}. "
                f"Use an open-vocabulary detector (Grounding DINO) or adjust your mission.",
                warnings=parse_warnings,
            )
        final_classes = mapped
    else:
        final_classes = object_classes

    # Step 5: Build RelevanceCriteria deterministically
    relevance_criteria = RelevanceCriteria(
        required_classes=final_classes,
        min_confidence=0.0,
    )

    # Step 6: Construct MissionSpecification
    return MissionSpecification(
        object_classes=final_classes,
        mission_intent=mission_intent,
        domain=domain,
        domain_source="INFERRED",
        relevance_criteria=relevance_criteria,
        # INVARIANT INV-13: context_phrases are forwarded to LLM reasoning layers
        # (GPT threat assessment, threat chat) as situational context ONLY.
        # They must NEVER be used in evaluate_relevance(), prioritization,
        # or any deterministic filtering/sorting logic.
        context_phrases=context_phrases,
        stripped_modifiers=stripped_modifiers,
        operator_text=raw_text,
        parse_mode="LLM_EXTRACTED",
        parse_confidence=parse_confidence,
        parse_warnings=parse_warnings,
    )


_DOMAIN_BROAD_CATEGORIES: dict[str, List[str]] = {
    "NAVAL": ["vessel", "ship", "boat", "buoy", "person"],
    "AERIAL": ["aircraft", "helicopter", "drone", "airplane"],
    "GROUND": ["vehicle", "car", "truck", "person", "building"],
    "URBAN": ["person", "vehicle", "car", "bicycle"],
    "GENERIC": ["object"],
}


def build_broad_queries(
    detector_key: str, mission_spec: MissionSpecification
) -> List[str]:
    """Build broad detector queries for LLM post-filter mode.

    For FAST_PATH: return object_classes directly (unchanged behavior).
    For COCO detectors (LLM_EXTRACTED): return ALL 80 COCO classes.
    For open-vocab detectors (LLM_EXTRACTED): return LLM-extracted classes
    PLUS broad domain categories to maximize recall.
    """
    if mission_spec.parse_mode == "FAST_PATH":
        return mission_spec.object_classes

    # LLM_EXTRACTED path: detect broadly
    if _is_coco_only(detector_key):
        # COCO detectors ignore queries anyway (DETR detects all 80;
        # YOLO11 falls back to all if no matches). Send everything.
        return list(COCO_CLASSES)

    # Open-vocab detector (e.g. Grounding DINO):
    # Combine LLM-extracted classes with domain-specific broad categories
    broad = list(mission_spec.object_classes)
    domain_extras = _DOMAIN_BROAD_CATEGORIES.get(
        mission_spec.domain, _DOMAIN_BROAD_CATEGORIES["GENERIC"]
    )
    seen = {c.lower() for c in broad}
    for cat in domain_extras:
        if cat.lower() not in seen:
            broad.append(cat)
            seen.add(cat.lower())

    logger.info("Broad queries for %s: %s", detector_key, broad)
    return broad


def parse_mission_text(
    raw_text: str,
    detector_key: str,
    video_path: Optional[str] = None,
) -> MissionSpecification:
    """Parse raw mission text into a validated MissionSpecification.

    Args:
        raw_text: Verbatim mission text from the operator.
        detector_key: Detector model key (determines COCO vocabulary constraints).
        video_path: Optional path to input video; first frame used for vision grounding.

    Returns:
        Validated MissionSpecification.

    Raises:
        MissionParseError: If mission text cannot produce a valid specification.
    """
    if not raw_text or not raw_text.strip():
        raise MissionParseError(
            "Mission text is empty. Specify objects to detect or use the default queries."
        )

    raw_text = raw_text.strip()

    # Fast-path: simple comma-separated labels -> skip LLM
    if _is_comma_separated_labels(raw_text):
        object_classes = [t.strip() for t in raw_text.split(",") if t.strip()]
        logger.info(
            "Mission fast-path: comma-separated labels %s", object_classes
        )
        return _build_fast_path_spec(raw_text, object_classes, detector_key)

    # LLM path: natural language mission text
    logger.info("Mission LLM-path: extracting from natural language")
    first_frame_b64 = _extract_and_encode_first_frame(video_path)
    if first_frame_b64:
        logger.info("Vision grounding: first frame encoded for LLM call")
    llm_output = _call_extraction_llm(raw_text, detector_key, first_frame_b64=first_frame_b64)
    logger.info("Mission LLM extraction result: %s", llm_output)

    mission_spec = _validate_and_build(llm_output, raw_text, detector_key)
    logger.info(
        "Mission parsed: classes=%s intent=%s domain=%s(%s) confidence=%s",
        mission_spec.object_classes,
        mission_spec.mission_intent,
        mission_spec.domain,
        mission_spec.domain_source,
        mission_spec.parse_confidence,
    )
    return mission_spec