Toun / reasoning_engine.py
babaTEEpe's picture
Upload 17 files
513d6d1 verified
"""
FLAN-T5 Mission Interpreter — Phase 2 Reasoning Model.
Handles:
1. Caption Fusion: Combine multiple perception outputs into coherent reports.
2. Mission Interpreter: Convert abstract prompts into structured plans (fallback for rules).
3. Interactive Q&A: Answer questions using multi-modal context.
FLAN-T5 Small is ~80MB, extremely fast, and rock-solid on CPU/RAM.
"""
import os
import logging
import torch
import re
from typing import Optional, List, Dict, Any
logger = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
class ReasoningEngine:
def __init__(self, model_name: str = "google/flan-t5-small"):
"""Initialize with lazy loading."""
self.model_name = model_name
self._model = None
self._tokenizer = None
self._failed = False
self.device = "cpu" # T5-Small is ~80MB, rock-solid on CPU/RAM
def load_model(self):
"""Load FLAN-T5 Small (Lightweight)."""
if self._model is not None or self._failed:
return self._model is not None
try:
from transformers import T5ForConditionalGeneration, T5Tokenizer
logger.info(f"Loading Reasoning Engine ({self.model_name}) from local sanctuary...")
# Local sanctuary for Cognitive Reasoning
cache_dir = os.path.join(BASE_DIR, "mission_models", "CognitiveReasoning")
# --- Offline First Loading Strategy ---
try:
# 1. Attempt local-only load first
self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir, local_files_only=True)
self._model = T5ForConditionalGeneration.from_pretrained(
self.model_name,
cache_dir=cache_dir,
local_files_only=True
).to(self.device)
except (IOError, ValueError, Exception):
# 2. Fallback to online download if missing locally
logger.info(f"[BOOTSTRAP] {self.model_name} not found in sanctuary. Fetching from hub...")
self._tokenizer = T5Tokenizer.from_pretrained(self.model_name, cache_dir=cache_dir)
self._model = T5ForConditionalGeneration.from_pretrained(
self.model_name,
cache_dir=cache_dir
).to(self.device)
logger.info(f"Reasoning Engine ready in {cache_dir}")
self._model.eval()
logger.info(f"FLAN-T5 Small Mission Interpreter loaded successfully.")
return True
except Exception as e:
logger.error(f"Failed to load FLAN-T5: {e}")
self._failed = False # Allow retry
return False
def is_available(self) -> bool:
"""Check if the interpreter is loaded."""
return self._model is not None and not self._failed
def _generate(self, prompt: str, max_length: int = 256) -> str:
"""Helper to run inference with hardened fallbacks."""
if not self.load_model():
return "Interpreter offline."
try:
# Truncate to 450 tokens to stay within Flan-T5's 512 limit
inputs = self._tokenizer(prompt, return_tensors="pt", max_length=450, truncation=True).to(self.device)
with torch.no_grad():
outputs = self._model.generate(**inputs, max_new_tokens=max_length, do_sample=True, temperature=0.7)
result = self._tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
# HARDENED FALLBACK: If model is silent or repeating headers, provide status
if not result or len(result) < 5:
# If we have a prompt targeting the 'Official Statement', try to find it
return "The situational data is currently inconclusive or the target is maintaining a low signature."
return result
except Exception as e:
logger.error(f"FLAN-T5 inference error: {e}")
return "Internal reasoning failure. Diagnostic logs required."
def interpret_mission(self, prompt: str) -> Dict[str, Any]:
"""
Convert abstract prompt into structured intent with Expanded Keyword Support.
Turns "threat" into ["knife", "gun", "balaclava", etc.]
"""
if not self.load_model():
return {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}}
template = (
"Instruction: Expand the mission prompt into as many concrete, detectable target objects as possible.\n"
"Example: 'threat' -> knife, gun, blood, mask, weapon\n"
"Format: Target: <main_target>, Expanded: <related_items>, Capabilities: <caps>, Attributes: <attrs>\n"
"Capabilities: object_detection, human_detection, audio_event_detection, speech_to_text\n\n"
f"Prompt: {prompt}\n"
"Intent:"
)
result = self._generate(template)
# Robust Parsing
intent = {"capabilities": [], "target": None, "expanded_targets": [], "attributes": {}}
try:
# Extract Main Target
target_match = re.search(r"Target:\s*([^,\n]*)", result, re.IGNORECASE)
if target_match:
intent["target"] = target_match.group(1).strip()
if intent["target"].lower() in ["none", "null", "unknown"]: intent["target"] = None
# Extract Expanded Targets (The "Infinite Keywords")
expanded_match = re.search(r"Expanded:\s*([^,\n]*)", result, re.IGNORECASE)
if expanded_match:
intent["expanded_targets"] = [t.strip().lower() for t in expanded_match.group(1).split(",") if t.strip()]
# If no expanded targets found, use the main target as the base
if not intent["expanded_targets"] and intent["target"]:
intent["expanded_targets"] = [intent["target"].lower()]
# Extract Capabilities
cap_match = re.search(r"Capabilities:\s*([^,\n]*)", result, re.IGNORECASE)
if cap_match:
intent["capabilities"] = [c.strip() for c in cap_match.group(1).split(",") if c.strip() and c.strip().lower() != "none"]
# Post-processing Fallback
if not intent["target"]:
fallback_match = re.search(r"(?:find|identify|look for|detect|is there|if there is any)\s+(?:a|an|the|any)?\s*([a-z\s\-]+)", prompt.lower())
if fallback_match:
intent["target"] = fallback_match.group(1).strip()
if not intent["expanded_targets"]: intent["expanded_targets"] = [intent["target"]]
# Extract Attributes
attr_block = re.search(r"Attributes:\s*(.*)", result, re.IGNORECASE)
if attr_block:
attrs_raw = attr_block.group(1).split(",")
for a in attrs_raw:
if "=" in a:
k, v = a.split("=", 1)
intent["attributes"][k.strip().lower()] = v.strip().lower()
return intent
except Exception as e:
logger.warning(f"Failed to parse LLM Intent: {e}. Raw: {result}")
# Heuristic fallback: if the model just says "color", assume it's color_identification
if "color" in result.lower() and not intent["capabilities"]:
intent["capabilities"] = ["color_identification"]
return intent
def _fix_grammar(self, text: str) -> str:
"""Fixes common 'stiff' AI captions by injecting missing auxiliary verbs."""
if not text: return text
# List of subjects likely to need 'is'
subjects = r"(man|woman|person|child|group|someone|something|bird|car|dog|cat|subject)"
# List of verbs likely to be in -ing form without auxiliary
verbs = r"(wearing|playing|holding|doing|walking|sitting|standing|chirping|moving|running|crying)"
# 1. Match "Subject Verb-ing" and inject "is" if not already present
# Pattern: (optional article) (subject) (NOT 'is') (verb-ing)
def inject_is(match):
prefix = match.group(1) or ""
subject = match.group(2)
verb = match.group(3)
return f"{prefix}{subject} is {verb}"
pattern = rf"\b((?:(?:a|the|an)\s+)?){subjects}\s+(?!(?:is|was|has\s+been)\s+){verbs}\b"
t = re.sub(pattern, inject_is, text, flags=re.IGNORECASE)
# 2. Add "A " if it starts with subject (but not 'someone'/'something') and has no article
standalone_subjects = r"(man|woman|person|child|group|bird|car|dog|cat|subject)"
if re.match(rf"^{standalone_subjects}\s+is\s+", t, re.IGNORECASE):
t = "A " + t
return t
def fuse_captions(self, perception_data: Dict[str, Any], mission_focus: str = None) -> str:
"""
Base Fusion pass: Combines video and audio into a structured statement.
Template: "A sound of [audio] with a visual of [video]"
Used for 'INTEGRATED FUSION CAPTION' / SITUATIONAL ASSESSMENT.
"""
v_cap = self._fix_grammar(perception_data.get("video") or "")
a_cap = self._fix_grammar(perception_data.get("audio") or "")
s_cap = perception_data.get("speech") or ""
if v_cap and a_cap:
if a_cap.lower() in v_cap.lower():
combined = v_cap
elif v_cap.lower() in a_cap.lower():
combined = a_cap
else:
# Structured multimodal template — reads naturally
# Strip leading articles/caps from sub-captions for clean fusion
a_clean = a_cap[0].lower() + a_cap[1:] if a_cap else ""
v_clean = v_cap[0].lower() + v_cap[1:] if v_cap else ""
combined = f'A sound of "{a_clean}" — with a visual of "{v_clean}"'
elif v_cap:
combined = v_cap
elif a_cap:
combined = a_cap
else:
return "Observation active."
if s_cap: combined = f"{combined} (Speech: {s_cap})"
# Ensure first letter is capitalized
return combined[0].upper() + combined[1:] if combined else "Observation active."
def _clean_specialist_text(self, text: str) -> str:
"""Centralized high-fidelity cleaning for specialist findings."""
if not text: return text
# --- NOISE SUPPRESSION ---
lower_text = text.lower()
noise_keywords = [
"no distinctive", "no environmental", "nothing detected", "searching...",
"no target objects detected", "no target", "no significant findings",
"open-vocabulary scan:", "model unavailable", "awaiting",
"intelligence synthesis pending", "scanning search"
]
if any(k in lower_text for k in noise_keywords):
return ""
# 1. Strip internal labels
clean = text.replace("Detection Inventory:", "").strip()
# 2. Strip IDs [ID:...]
clean = re.sub(r'\[ID:[^\]]+\]', '', clean).strip()
# 3. Strip Confidence Scores (85%)
clean = re.sub(r'\(\d+%\)', '', clean).strip()
# 4. Strip trailing punctuation and common artifacts
clean = clean.rstrip(".;, ").strip()
return clean
def synthesize_mission_report(self, perception_data: Dict[str, Any]) -> str:
"""
Specialist Fusion pass: Returns the accumulated specialist narrative history.
Now completely independent of the literal base narrative.
Ensures technical jargon is purged from both history and fresh data.
"""
# Retrieval of the Recurrent Specialist Narrative created in Orchestrator
spec_narrative = perception_data.get("specialist_video") or ""
if not spec_narrative:
# Fallback to current findings if no history is yet established
findings_list = perception_data.get("mission_findings", [])
findings = [f.get("status") or f.get("explanation") or "" for f in findings_list]
cleaned_findings = []
for f in findings:
if f and len(f) > 2 and "unavailable" not in f.lower():
cleaned_findings.append(self._clean_specialist_text(f))
if not cleaned_findings:
return "Intelligence synthesis pending..."
spec_narrative = " -> ".join(list(set(cleaned_findings)))
# Final safety purge (In case persistent memory in orchestrator is still "dirty")
segments = [self._clean_specialist_text(s) for s in spec_narrative.split(" -> ")]
return " -> ".join([s for s in segments if s])
def interactive_query(self, user_query: str, vision_context: str = "", audio_context: str = "", timeline: str = "") -> str:
"""Interactive Q&A using multimodal timeline and sensory context."""
if not self.load_model():
return "Interpreter offline."
# Truncate long timelines to avoid exceeding token limits
max_timeline_chars = 600
if len(timeline) > max_timeline_chars:
timeline = timeline[:max_timeline_chars] + "...[truncated]"
template = (
"You are a professional forensic analyst providing a detailed intelligence report. "
"Write a comprehensive paragraph about the scene, including: what objects are visible, "
"what activities are occurring, the environment and conditions, and any notable details. "
"Use professional, descriptive language. Be specific about colors, positions, and actions.\n\n"
f"Visual evidence: {vision_context}\n"
f"Audio evidence: {audio_context}\n"
f"Timeline records: {timeline}\n\n"
f"Question: {user_query}\n"
"Detailed forensic report:"
)
return self._generate(template, max_length=250)
# Singleton
reasoning_engine = ReasoningEngine()