""" FLAN-T5 Mission Interpreter — Phase 2 Reasoning Model. Handles: 1. Caption Fusion: Combine multiple perception outputs into coherent reports. 2. Mission Interpreter: Convert abstract prompts into structured plans (fallback for rules). 3. Interactive Q&A: Answer questions using multi-modal context. FLAN-T5 Small is ~80MB, extremely fast, and rock-solid on CPU/RAM. """ import os import logging import torch import re from typing import Optional, List, Dict, Any logger = logging.getLogger(__name__) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class ReasoningEngine: def __init__(self, model_name: str = "google/flan-t5-small"): """Initialize with lazy loading.""" self.model_name = model_name self._model = None self._tokenizer = None self._failed = False self.device = "cpu" # T5-Small is ~80MB, rock-solid on CPU/RAM def load_model(self): """Load FLAN-T5 Small (Lightweight).""" if self._model is not None or self._failed: return self._model is not None try: from transformers import T5ForConditionalGeneration, T5Tokenizer logger.info(f"Loading Reasoning Engine ({self.model_name}) from local sanctuary...") # Local sanctuary for Cognitive Reasoning cache_dir = os.path.join(BASE_DIR, "mission_models", "CognitiveReasoning") # --- Offline First Loading Strategy --- try: # 1. Attempt local-only load first self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir, local_files_only=True) self._model = T5ForConditionalGeneration.from_pretrained( self.model_name, cache_dir=cache_dir, local_files_only=True ).to(self.device) except (IOError, ValueError, Exception): # 2. Fallback to online download if missing locally logger.info(f"[BOOTSTRAP] {self.model_name} not found in sanctuary. Fetching from hub...") self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir) self._model = T5ForConditionalGeneration.from_pretrained( self.model_name, cache_dir=cache_dir ).to(self.device) logger.info(f"Reasoning Engine ready in {cache_dir}") self._model.eval() logger.info(f"FLAN-T5 Small Mission Interpreter loaded successfully.") return True except Exception as e: logger.error(f"Failed to load FLAN-T5: {e}") self._failed = False # Allow retry return False def is_available(self) -> bool: """Check if the interpreter is loaded.""" return self._model is not None and not self._failed def _generate(self, prompt: str, max_length: int = 256) -> str: """Helper to run inference with hardened fallbacks.""" if not self.load_model(): return "Interpreter offline." try: # Truncate to 450 tokens to stay within Flan-T5's 512 limit inputs = self._tokenizer(prompt, return_tensors="pt", max_length=450, truncation=True).to(self.device) with torch.no_grad(): outputs = self._model.generate(**inputs, max_new_tokens=max_length, do_sample=True, temperature=0.7) result = self._tokenizer.decode(outputs[0], skip_special_tokens=True).strip() # HARDENED FALLBACK: If model is silent or repeating headers, provide status if not result or len(result) < 5: # If we have a prompt targeting the 'Official Statement', try to find it return "The situational data is currently inconclusive or the target is maintaining a low signature." return result except Exception as e: logger.error(f"FLAN-T5 inference error: {e}") return "Internal reasoning failure. Diagnostic logs required." def interpret_mission(self, prompt: str) -> Dict[str, Any]: """ Convert abstract prompt into structured intent with Expanded Keyword Support. Turns "threat" into ["knife", "gun", "balaclava", etc.] """ if not self.load_model(): return {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}} template = ( "Instruction: Expand the mission prompt into as many concrete, detectable target objects as possible.\n" "Example: 'threat' -> knife, gun, blood, mask, weapon\n" "Format: Target: , Expanded: , Capabilities: , Attributes: \n" "Capabilities: object_detection, human_detection, audio_event_detection, speech_to_text\n\n" f"Prompt: {prompt}\n" "Intent:" ) result = self._generate(template) # Robust Parsing intent = {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}} try: # Extract Main Target target_match = re.search(r"Target:\s*([^,\n]*)", result, re.IGNORECASE) if target_match: intent["target"] = target_match.group(1).strip() if intent["target"].lower() in ["none", "null", "unknown"]: intent["target"] = None # Extract Expanded Targets (The "Infinite Keywords") expanded_match = re.search(r"Expanded:\s*([^,\n]*)", result, re.IGNORECASE) if expanded_match: intent["expanded_targets"] = [t.strip().lower() for t in expanded_match.group(1).split(",") if t.strip()] # If no expanded targets found, use the main target as the base if not intent["expanded_targets"] and intent["target"]: intent["expanded_targets"] = [intent["target"].lower()] # Extract Capabilities cap_match = re.search(r"Capabilities:\s*([^,\n]*)", result, re.IGNORECASE) if cap_match: intent["capabilities"] = [c.strip() for c in cap_match.group(1).split(",") if c.strip() and c.strip().lower() != "none"] # Post-processing Fallback if not intent["target"]: fallback_match = re.search(r"(?:find|identify|look for|detect|is there|if there is any)\s+(?:a|an|the|any)?\s*([a-z\s\-]+)", prompt.lower()) if fallback_match: intent["target"] = fallback_match.group(1).strip() if not intent["expanded_targets"]: intent["expanded_targets"] = [intent["target"]] # Extract Attributes attr_block = re.search(r"Attributes:\s*(.*)", result, re.IGNORECASE) if attr_block: attrs_raw = attr_block.group(1).split(",") for a in attrs_raw: if "=" in a: k, v = a.split("=", 1) intent["attributes"][k.strip().lower()] = v.strip().lower() return intent except Exception as e: logger.warning(f"Failed to parse LLM Intent: {e}. Raw: {result}") # Heuristic fallback: if the model just says "color", assume it's color_identification if "color" in result.lower() and not intent["capabilities"]: intent["capabilities"] = ["color_identification"] return intent def _fix_grammar(self, text: str) -> str: """Fixes common 'stiff' AI captions by injecting missing auxiliary verbs.""" if not text: return text # List of subjects likely to need 'is' subjects = r"(man|woman|person|child|group|someone|something|bird|car|dog|cat|subject)" # List of verbs likely to be in -ing form without auxiliary verbs = r"(wearing|playing|holding|doing|walking|sitting|standing|chirping|moving|running|crying)" # 1. Match "Subject Verb-ing" and inject "is" if not already present # Pattern: (optional article) (subject) (NOT 'is') (verb-ing) def inject_is(match): prefix = match.group(1) or "" subject = match.group(2) verb = match.group(3) return f"{prefix}{subject} is {verb}" pattern = rf"\b((?:(?:a|the|an)\s+)?){subjects}\s+(?!(?:is|was|has\s+been)\s+){verbs}\b" t = re.sub(pattern, inject_is, text, flags=re.IGNORECASE) # 2. Add "A " if it starts with subject (but not 'someone'/'something') and has no article standalone_subjects = r"(man|woman|person|child|group|bird|car|dog|cat|subject)" if re.match(rf"^{standalone_subjects}\s+is\s+", t, re.IGNORECASE): t = "A " + t return t def fuse_captions(self, perception_data: Dict[str, Any], mission_focus: str = None) -> str: """ Base Fusion pass: Combines video and audio into a structured statement. Template: "A sound of [audio] with a visual of [video]" Used for 'INTEGRATED FUSION CAPTION' / SITUATIONAL ASSESSMENT. """ v_cap = self._fix_grammar(perception_data.get("video") or "") a_cap = self._fix_grammar(perception_data.get("audio") or "") s_cap = perception_data.get("speech") or "" if v_cap and a_cap: if a_cap.lower() in v_cap.lower(): combined = v_cap elif v_cap.lower() in a_cap.lower(): combined = a_cap else: # Structured multimodal template — reads naturally # Strip leading articles/caps from sub-captions for clean fusion a_clean = a_cap[0].lower() + a_cap[1:] if a_cap else "" v_clean = v_cap[0].lower() + v_cap[1:] if v_cap else "" combined = f'A sound of "{a_clean}" — with a visual of "{v_clean}"' elif v_cap: combined = v_cap elif a_cap: combined = a_cap else: return "Observation active." if s_cap: combined = f"{combined} (Speech: {s_cap})" # Ensure first letter is capitalized return combined[0].upper() + combined[1:] if combined else "Observation active." def _clean_specialist_text(self, text: str) -> str: """Centralized high-fidelity cleaning for specialist findings.""" if not text: return text # --- NOISE SUPPRESSION --- lower_text = text.lower() noise_keywords = [ "no distinctive", "no environmental", "nothing detected", "searching...", "no target objects detected", "no target", "no significant findings", "open-vocabulary scan:", "model unavailable", "awaiting", "intelligence synthesis pending", "scanning search" ] if any(k in lower_text for k in noise_keywords): return "" # 1. Strip internal labels clean = text.replace("Detection Inventory:", "").strip() # 2. Strip IDs [ID:...] clean = re.sub(r'\[ID:[^\]]+\]', '', clean).strip() # 3. Strip Confidence Scores (85%) clean = re.sub(r'\(\d+%\)', '', clean).strip() # 4. Strip trailing punctuation and common artifacts clean = clean.rstrip(".;, ").strip() return clean def synthesize_mission_report(self, perception_data: Dict[str, Any]) -> str: """ Specialist Fusion pass: Returns the accumulated specialist narrative history. Now completely independent of the literal base narrative. Ensures technical jargon is purged from both history and fresh data. """ # Retrieval of the Recurrent Specialist Narrative created in Orchestrator spec_narrative = perception_data.get("specialist_video") or "" if not spec_narrative: # Fallback to current findings if no history is yet established findings_list = perception_data.get("mission_findings", []) findings = [f.get("status") or f.get("explanation") or "" for f in findings_list] cleaned_findings = [] for f in findings: if f and len(f) > 2 and "unavailable" not in f.lower(): cleaned_findings.append(self._clean_specialist_text(f)) if not cleaned_findings: return "Intelligence synthesis pending..." spec_narrative = " -> ".join(list(set(cleaned_findings))) # Final safety purge (In case persistent memory in orchestrator is still "dirty") segments = [self._clean_specialist_text(s) for s in spec_narrative.split(" -> ")] return " -> ".join([s for s in segments if s]) def interactive_query(self, user_query: str, vision_context: str = "", audio_context: str = "", timeline: str = "") -> str: """Interactive Q&A using multimodal timeline and sensory context.""" if not self.load_model(): return "Interpreter offline." # Truncate long timelines to avoid exceeding token limits max_timeline_chars = 600 if len(timeline) > max_timeline_chars: timeline = timeline[:max_timeline_chars] + "...[truncated]" template = ( "You are a professional forensic analyst providing a detailed intelligence report. " "Write a comprehensive paragraph about the scene, including: what objects are visible, " "what activities are occurring, the environment and conditions, and any notable details. " "Use professional, descriptive language. Be specific about colors, positions, and actions.\n\n" f"Visual evidence: {vision_context}\n" f"Audio evidence: {audio_context}\n" f"Timeline records: {timeline}\n\n" f"Question: {user_query}\n" "Detailed forensic report:" ) return self._generate(template, max_length=250) # Singleton reasoning_engine = ReasoningEngine()