| """ |
| FLAN-T5 Mission Interpreter — Phase 2 Reasoning Model. |
| Handles: |
| 1. Caption Fusion: Combine multiple perception outputs into coherent reports. |
| 2. Mission Interpreter: Convert abstract prompts into structured plans (fallback for rules). |
| 3. Interactive Q&A: Answer questions using multi-modal context. |
| FLAN-T5 Small is ~80MB, extremely fast, and rock-solid on CPU/RAM. |
| """ |
| import os |
| import logging |
| import torch |
| import re |
| from typing import Optional, List, Dict, Any |
|
|
| logger = logging.getLogger(__name__) |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| class ReasoningEngine: |
| def __init__(self, model_name: str = "google/flan-t5-small"): |
| """Initialize with lazy loading.""" |
| self.model_name = model_name |
| self._model = None |
| self._tokenizer = None |
| self._failed = False |
| self.device = "cpu" |
|
|
| def load_model(self): |
| """Load FLAN-T5 Small (Lightweight).""" |
| if self._model is not None or self._failed: |
| return self._model is not None |
|
|
| try: |
| from transformers import T5ForConditionalGeneration, T5Tokenizer |
| |
| logger.info(f"Loading Reasoning Engine ({self.model_name}) from local sanctuary...") |
| |
| cache_dir = os.path.join(BASE_DIR, "mission_models", "CognitiveReasoning") |
| |
| |
| try: |
| |
| self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir, local_files_only=True) |
| self._model = T5ForConditionalGeneration.from_pretrained( |
| self.model_name, |
| cache_dir=cache_dir, |
| local_files_only=True |
| ).to(self.device) |
| except (IOError, ValueError, Exception): |
| |
| logger.info(f"[BOOTSTRAP] {self.model_name} not found in sanctuary. Fetching from hub...") |
| self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir) |
| self._model = T5ForConditionalGeneration.from_pretrained( |
| self.model_name, |
| cache_dir=cache_dir |
| ).to(self.device) |
| |
| logger.info(f"Reasoning Engine ready in {cache_dir}") |
| self._model.eval() |
| logger.info(f"FLAN-T5 Small Mission Interpreter loaded successfully.") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to load FLAN-T5: {e}") |
| self._failed = False |
| return False |
|
|
| def is_available(self) -> bool: |
| """Check if the interpreter is loaded.""" |
| return self._model is not None and not self._failed |
|
|
| def _generate(self, prompt: str, max_length: int = 256) -> str: |
| """Helper to run inference with hardened fallbacks.""" |
| if not self.load_model(): |
| return "Interpreter offline." |
| |
| try: |
| |
| inputs = self._tokenizer(prompt, return_tensors="pt", max_length=450, truncation=True).to(self.device) |
| with torch.no_grad(): |
| outputs = self._model.generate(**inputs, max_new_tokens=max_length, do_sample=True, temperature=0.7) |
| |
| result = self._tokenizer.decode(outputs[0], skip_special_tokens=True).strip() |
| |
| |
| if not result or len(result) < 5: |
| |
| return "The situational data is currently inconclusive or the target is maintaining a low signature." |
| |
| return result |
| except Exception as e: |
| logger.error(f"FLAN-T5 inference error: {e}") |
| return "Internal reasoning failure. Diagnostic logs required." |
|
|
| def interpret_mission(self, prompt: str) -> Dict[str, Any]: |
| """ |
| Convert abstract prompt into structured intent with Expanded Keyword Support. |
| Turns "threat" into ["knife", "gun", "balaclava", etc.] |
| """ |
| if not self.load_model(): |
| return {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}} |
|
|
| template = ( |
| "Instruction: Expand the mission prompt into as many concrete, detectable target objects as possible.\n" |
| "Example: 'threat' -> knife, gun, blood, mask, weapon\n" |
| "Format: Target: <main_target>, Expanded: <related_items>, Capabilities: <caps>, Attributes: <attrs>\n" |
| "Capabilities: object_detection, human_detection, audio_event_detection, speech_to_text\n\n" |
| f"Prompt: {prompt}\n" |
| "Intent:" |
| ) |
| |
| result = self._generate(template) |
| |
| |
| intent = {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}} |
| try: |
| |
| target_match = re.search(r"Target:\s*([^,\n]*)", result, re.IGNORECASE) |
| if target_match: |
| intent["target"] = target_match.group(1).strip() |
| if intent["target"].lower() in ["none", "null", "unknown"]: intent["target"] = None |
|
|
| |
| expanded_match = re.search(r"Expanded:\s*([^,\n]*)", result, re.IGNORECASE) |
| if expanded_match: |
| intent["expanded_targets"] = [t.strip().lower() for t in expanded_match.group(1).split(",") if t.strip()] |
| |
| |
| if not intent["expanded_targets"] and intent["target"]: |
| intent["expanded_targets"] = [intent["target"].lower()] |
|
|
| |
| cap_match = re.search(r"Capabilities:\s*([^,\n]*)", result, re.IGNORECASE) |
| if cap_match: |
| intent["capabilities"] = [c.strip() for c in cap_match.group(1).split(",") if c.strip() and c.strip().lower() != "none"] |
| |
| |
| if not intent["target"]: |
| fallback_match = re.search(r"(?:find|identify|look for|detect|is there|if there is any)\s+(?:a|an|the|any)?\s*([a-z\s\-]+)", prompt.lower()) |
| if fallback_match: |
| intent["target"] = fallback_match.group(1).strip() |
| if not intent["expanded_targets"]: intent["expanded_targets"] = [intent["target"]] |
|
|
| |
| attr_block = re.search(r"Attributes:\s*(.*)", result, re.IGNORECASE) |
| if attr_block: |
| attrs_raw = attr_block.group(1).split(",") |
| for a in attrs_raw: |
| if "=" in a: |
| k, v = a.split("=", 1) |
| intent["attributes"][k.strip().lower()] = v.strip().lower() |
| |
| return intent |
| except Exception as e: |
| logger.warning(f"Failed to parse LLM Intent: {e}. Raw: {result}") |
| |
| if "color" in result.lower() and not intent["capabilities"]: |
| intent["capabilities"] = ["color_identification"] |
| return intent |
|
|
| def _fix_grammar(self, text: str) -> str: |
| """Fixes common 'stiff' AI captions by injecting missing auxiliary verbs.""" |
| if not text: return text |
| |
| |
| subjects = r"(man|woman|person|child|group|someone|something|bird|car|dog|cat|subject)" |
| |
| verbs = r"(wearing|playing|holding|doing|walking|sitting|standing|chirping|moving|running|crying)" |
| |
| |
| |
| def inject_is(match): |
| prefix = match.group(1) or "" |
| subject = match.group(2) |
| verb = match.group(3) |
| return f"{prefix}{subject} is {verb}" |
|
|
| pattern = rf"\b((?:(?:a|the|an)\s+)?){subjects}\s+(?!(?:is|was|has\s+been)\s+){verbs}\b" |
| t = re.sub(pattern, inject_is, text, flags=re.IGNORECASE) |
| |
| |
| standalone_subjects = r"(man|woman|person|child|group|bird|car|dog|cat|subject)" |
| if re.match(rf"^{standalone_subjects}\s+is\s+", t, re.IGNORECASE): |
| t = "A " + t |
| |
| return t |
|
|
| def fuse_captions(self, perception_data: Dict[str, Any], mission_focus: str = None) -> str: |
| """ |
| Base Fusion pass: Combines video and audio into a structured statement. |
| Template: "A sound of [audio] with a visual of [video]" |
| Used for 'INTEGRATED FUSION CAPTION' / SITUATIONAL ASSESSMENT. |
| """ |
| v_cap = self._fix_grammar(perception_data.get("video") or "") |
| a_cap = self._fix_grammar(perception_data.get("audio") or "") |
| s_cap = perception_data.get("speech") or "" |
|
|
| if v_cap and a_cap: |
| if a_cap.lower() in v_cap.lower(): |
| combined = v_cap |
| elif v_cap.lower() in a_cap.lower(): |
| combined = a_cap |
| else: |
| |
| |
| a_clean = a_cap[0].lower() + a_cap[1:] if a_cap else "" |
| v_clean = v_cap[0].lower() + v_cap[1:] if v_cap else "" |
| combined = f'A sound of "{a_clean}" — with a visual of "{v_clean}"' |
| elif v_cap: |
| combined = v_cap |
| elif a_cap: |
| combined = a_cap |
| else: |
| return "Observation active." |
|
|
| if s_cap: combined = f"{combined} (Speech: {s_cap})" |
| |
| return combined[0].upper() + combined[1:] if combined else "Observation active." |
|
|
| def _clean_specialist_text(self, text: str) -> str: |
| """Centralized high-fidelity cleaning for specialist findings.""" |
| if not text: return text |
| |
| |
| lower_text = text.lower() |
| noise_keywords = [ |
| "no distinctive", "no environmental", "nothing detected", "searching...", |
| "no target objects detected", "no target", "no significant findings", |
| "open-vocabulary scan:", "model unavailable", "awaiting", |
| "intelligence synthesis pending", "scanning search" |
| ] |
| if any(k in lower_text for k in noise_keywords): |
| return "" |
|
|
| |
| clean = text.replace("Detection Inventory:", "").strip() |
| |
| clean = re.sub(r'\[ID:[^\]]+\]', '', clean).strip() |
| |
| clean = re.sub(r'\(\d+%\)', '', clean).strip() |
| |
| clean = clean.rstrip(".;, ").strip() |
| return clean |
|
|
| def synthesize_mission_report(self, perception_data: Dict[str, Any]) -> str: |
| """ |
| Specialist Fusion pass: Returns the accumulated specialist narrative history. |
| Now completely independent of the literal base narrative. |
| Ensures technical jargon is purged from both history and fresh data. |
| """ |
| |
| spec_narrative = perception_data.get("specialist_video") or "" |
| |
| if not spec_narrative: |
| |
| findings_list = perception_data.get("mission_findings", []) |
| findings = [f.get("status") or f.get("explanation") or "" for f in findings_list] |
| cleaned_findings = [] |
| for f in findings: |
| if f and len(f) > 2 and "unavailable" not in f.lower(): |
| cleaned_findings.append(self._clean_specialist_text(f)) |
| |
| if not cleaned_findings: |
| return "Intelligence synthesis pending..." |
| spec_narrative = " -> ".join(list(set(cleaned_findings))) |
|
|
| |
| segments = [self._clean_specialist_text(s) for s in spec_narrative.split(" -> ")] |
| return " -> ".join([s for s in segments if s]) |
|
|
| def interactive_query(self, user_query: str, vision_context: str = "", audio_context: str = "", timeline: str = "") -> str: |
| """Interactive Q&A using multimodal timeline and sensory context.""" |
| if not self.load_model(): |
| return "Interpreter offline." |
|
|
| |
| max_timeline_chars = 600 |
| if len(timeline) > max_timeline_chars: |
| timeline = timeline[:max_timeline_chars] + "...[truncated]" |
|
|
| template = ( |
| "You are a professional forensic analyst providing a detailed intelligence report. " |
| "Write a comprehensive paragraph about the scene, including: what objects are visible, " |
| "what activities are occurring, the environment and conditions, and any notable details. " |
| "Use professional, descriptive language. Be specific about colors, positions, and actions.\n\n" |
| f"Visual evidence: {vision_context}\n" |
| f"Audio evidence: {audio_context}\n" |
| f"Timeline records: {timeline}\n\n" |
| f"Question: {user_query}\n" |
| "Detailed forensic report:" |
| ) |
| |
| return self._generate(template, max_length=250) |
|
|
| |
| reasoning_engine = ReasoningEngine() |
|
|