| import os |
| import sys |
| import torch |
| import logging |
| import tempfile |
| import traceback |
| import subprocess |
| import numpy as np |
| import cv2 |
| import gc |
| import shutil |
| import asyncio |
| import httpx |
| import re |
| from typing import List, Optional, Dict, Any |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Form, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from contextlib import asynccontextmanager |
| from pathlib import Path |
| from pydub import AudioSegment |
| import librosa |
| from ruamel.yaml import YAML |
| from PIL import Image |
| from dotenv import load_dotenv |
|
|
| |
| from session_manager import session_manager |
| from orchestrator import orchestrator |
| from mission_engine import buffer_manager, mission_evaluator, ObservationEvent |
| from tactical_specialists import tactical_specialists |
| from cognitive_specialists import cognitive_specialist |
| import io |
| import base64 |
|
|
| |
| class IdentifyRequest(BaseModel): |
| session_id: str |
| image_b64: str |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| WAVCAP_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "..", "training_audio", "Wavcap")) |
|
|
| |
| if WAVCAP_DIR not in sys.path: |
| sys.path.insert(0, WAVCAP_DIR) |
|
|
| load_dotenv() |
|
|
| |
| os.environ["TRANSFORMERS_OFFLINE"] = "1" |
| os.environ["HF_HUB_OFFLINE"] = "1" |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[logging.StreamHandler(sys.stdout)] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| clap_model = None |
| wavcap_config = None |
|
|
| |
| |
| import threading |
| _request_counter: Dict[str, int] = {} |
| _request_lock = threading.Lock() |
|
|
| def _new_request(session_id: str) -> int: |
| """Register a new request for this session, returns its request ID.""" |
| with _request_lock: |
| _request_counter[session_id] = _request_counter.get(session_id, 0) + 1 |
| return _request_counter[session_id] |
|
|
| PARALLEL_LEEWAY = 4 |
|
|
| def _is_stale(session_id: str, request_id: int) -> bool: |
| """Check if this request is significantly older than the latest one.""" |
| with _request_lock: |
| latest = _request_counter.get(session_id, 0) |
| |
| return latest > (request_id + PARALLEL_LEEWAY) |
|
|
| |
| _specialist_result_cache: Dict[str, Dict[str, Any]] = {} |
| _last_mission_prompt: Dict[str, str] = {} |
| _specialist_semaphore = asyncio.Semaphore(1) |
|
|
| async def run_specialist_analysis( |
| session_id: str, |
| request_id: int, |
| clean_mission_prompt: str, |
| predictions: dict, |
| yolo_detections: list, |
| raw_image: Any, |
| sampled_images: list, |
| raw_audio: Any |
| ): |
| """Background task to run heavy specialist models without blocking the API response.""" |
| async with _specialist_semaphore: |
| try: |
| |
| if _is_stale(session_id, request_id): |
| return |
|
|
| cancel_check = lambda: _is_stale(session_id, request_id) |
| |
| |
| phased_result = orchestrator.process_phased( |
| mission_prompt=clean_mission_prompt, |
| raw_captions=predictions, |
| detections=yolo_detections, |
| session_id=session_id, |
| raw_image=raw_image, |
| raw_images=sampled_images, |
| raw_audio=raw_audio, |
| cancel_check=cancel_check |
| ) |
| |
| |
| _specialist_result_cache[session_id] = phased_result |
| logger.info(f"[BG-SPECIALIST] Finished analysis for request #{request_id} (Session: {session_id})") |
| |
| except Exception as e: |
| logger.error(f"[BG-SPECIALIST] Error in background analysis: {e}") |
|
|
| def _cancelled_response(session_id: str): |
| """Return a minimal response for cancelled/superseded requests.""" |
| return FusionResponse( |
| session_id=session_id, |
| audio_context="", |
| visual_context="", |
| video_timeline=[], |
| situational_report="Request superseded by newer prompt.", |
| recommended_actions=[], |
| threat_level="NONE", |
| fusion_caption="", |
| mission_findings_text="", |
| mission_model_captions=[], |
| mission_model_fusion="", |
| ) |
|
|
| def consolidate_temporal_narrative(captions: List[str]) -> str: |
| """Consolidates multiple frame captions into a chronological narrative.""" |
| if not captions: return "Observation active." |
| |
| |
| unique_captions = [] |
| last_normalized = "" |
| |
| def normalize(t): |
| |
| return re.sub(r'\b(a|an|the|is|are|was|were|some|any)\b', '', t.lower()).strip() |
|
|
| for cap in captions: |
| cap = cap.strip().rstrip(".") |
| if not cap: continue |
| |
| current_normalized = normalize(cap) |
| |
| if current_normalized != last_normalized: |
| unique_captions.append(cap) |
| last_normalized = current_normalized |
| |
| if not unique_captions: return "Observation active." |
| if len(unique_captions) == 1: return unique_captions[0] |
| |
| |
| return " -> ".join(unique_captions) |
|
|
| def _error_response(session_id: str, error_msg: str): |
| """Return a standardized error response to keep the UI from hanging.""" |
| logger.error(f"[API] Generating error response for session {session_id}: {error_msg}") |
| return FusionResponse( |
| session_id=session_id, |
| audio_context="Error during sensor analysis", |
| visual_context="Pipeline interrupted", |
| video_timeline=[], |
| situational_report=f"System Alert: {error_msg}", |
| recommended_actions=["Review system logs", "Check sensor connectivity"], |
| threat_level="LOW", |
| fusion_caption="Operational failure in sensing pipeline.", |
| mission_findings_text=f"FAILURE: {error_msg}", |
| mission_model_captions=[{"model": "System Diagnostic", "caption": error_msg}], |
| mission_model_fusion="Error encountered during agentic dispatch.", |
| prompt_type="mission", |
| mission_result={"mission_status": "searching", "status_message": "Awaiting pipeline recovery..."} |
| ) |
|
|
| |
|
|
| def load_wavcap(): |
| global clap_model, wavcap_config |
| |
| |
| if not os.path.isdir(WAVCAP_DIR): |
| logger.warning(f"WavCap directory not found at {WAVCAP_DIR} — skipping audio captioning. This is expected on cloud deployments.") |
| return |
| |
| try: |
| from models.bart_captioning import BartCaptionModel |
| yaml_loader = YAML(typ='safe') |
| settings_path = os.path.join(WAVCAP_DIR, "settings", "settings.yaml") |
| if not os.path.exists(settings_path): |
| logger.warning(f"Wavcap settings not found at {settings_path}") |
| return |
| |
| with open(settings_path, "r") as f: |
| wavcap_config = yaml_loader.load(f) |
| |
| original_cwd = os.getcwd() |
| os.chdir(WAVCAP_DIR) |
| |
| |
| wavcap_cache = os.path.join(BASE_DIR, "mission_models", "AcousticIntelligence") |
| os.makedirs(wavcap_cache, exist_ok=True) |
| |
| try: |
| clap_model = BartCaptionModel(wavcap_config, cache_dir=wavcap_cache) |
| ckpt_paths = [ |
| os.path.join(BASE_DIR, "nerve_models", "wavcap", "best_model.pt"), |
| os.path.join(BASE_DIR, "best_model.pt"), |
| os.path.abspath(os.path.join(WAVCAP_DIR, "huggingface", "model.pth")) |
| ] |
| loaded = False |
| for path in ckpt_paths: |
| if os.path.exists(path): |
| logger.info(f"Loading Wavcap weights from {path}") |
| checkpoint = torch.load(path, map_location=device, weights_only=False) |
| state_dict = checkpoint['model'] if isinstance(checkpoint, dict) and 'model' in checkpoint else checkpoint |
| clap_model.load_state_dict(state_dict, strict=False) |
| loaded = True |
| break |
| |
| if loaded: |
| clap_model = clap_model.to(device) |
| clap_model.eval() |
| logger.info("Wavcap Audio Engine loaded successfully.") |
| else: |
| logger.warning("Wavcap model found but weights missing. Audio analysis may be poor.") |
| finally: |
| os.chdir(original_cwd) |
| except ImportError as e: |
| logger.warning(f"WavCap module not available — skipping: {e}") |
| except Exception as e: |
| logger.error(f"Failed to load Wavcap: {e}") |
|
|
| |
|
|
| |
| LENS_MAPPING = { |
| "military": { |
| "train": "armored supply transport", "car horn": "tactical signal alert", "car": "tactical ground vehicle", |
| "truck": "heavy logistics transport", "engine": "military-grade vehicle engine", "fireworks": "active combat/artillery echoes", |
| "dog": "service canine unit", "birds": "unidentified aerial biological signatures", "clapping": "sporadic rapid-fire echoes", |
| "breathing": "tactical heavy respiration", "footsteps": "march/troop movement", "siren": "emergency tactical alert", |
| "helicopter": "attack/recon helicopter", "airplane": "military aircraft", "clock alarm": "unit regroup signal", |
| "speaking": "radio communication/vocal intercept", "man": "subject/target (male)", "woman": "subject/target (female)", |
| "person": "field contact" |
| }, |
| "maritime": { |
| "train": "large vessel/ship engine", "car horn": "distant foghorn alert", "car": "on-shore support vehicle", |
| "truck": "port logistics vehicle", "engine": "diesel propulsion system", "wind": "offshore gale wind", |
| "waves": "heavy ocean swell", "sea waves": "rolling ocean waves", "water drops": "spray hitting the deck", |
| "splash": "surface impact in open water", "foghorn": "automated maritime signal", "rain": "maritime precipitation", |
| "birds": "coastal/sea bird activity", "speaking": "bridge/vessel communication" |
| }, |
| "medical": { |
| "breathing": "patient respiration", "coughing": "clinical coughing symptom", "sneezing": "acute sneezing event", |
| "siren": "emergency ambulance signal", "crying baby": "obstetric/pediatrics context", "snoring": "sleep apnea/respiratory monitoring", |
| "room": "medical ward/clinic", "clock alarm": "medication timer alert", "pouring water": "clinical fluid management", |
| "man": "male patient", "woman": "female patient", "speaking": "clinical consultation/staff communication", |
| "car": "emergency transport vehicle" |
| } |
| } |
|
|
| |
|
|
| def apply_industry_lens(text: str, industry: str) -> str: |
| if not industry: return text |
| industry_key = industry.lower().strip() |
| if industry_key not in LENS_MAPPING: return text |
| lens = LENS_MAPPING[industry_key] |
| sorted_keys = sorted(lens.keys(), key=len, reverse=True) |
| result = text |
| for key in sorted_keys: |
| pattern = re.compile(rf"\b{re.escape(key)}\b", re.IGNORECASE) |
| result = pattern.sub(lens[key], result) |
| return result |
|
|
| def fix_video_file(video_path: str): |
| """Uses FFmpeg to add missing headers/cues to a video file (especially browser-recorded webm).""" |
| |
| ext = os.path.splitext(video_path)[1].lower() |
| fixed_path = video_path.replace(ext, f"_fixed{ext}") |
| try: |
| |
| subprocess.run(['ffmpeg', '-y', '-i', video_path, '-c', 'copy', '-metadata', 'title=Fixed', fixed_path], |
| check=True, capture_output=True) |
| if os.path.exists(fixed_path) and os.path.getsize(fixed_path) > 100: |
| return fixed_path |
| except Exception as e: |
| logger.warning(f"FFmpeg remux failed: {e}") |
| return None |
|
|
| def get_video_frames_robust(video_path: str, max_frames=5): |
| """Tries to extract frames, falling back to sequential read for metadata-less files.""" |
| cap = cv2.VideoCapture(video_path) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 |
| |
| sampled_images = [] |
| timestamps = [] |
|
|
| |
| if total_frames > 5 and max_frames > 1: |
| indices = [int(i * (total_frames - 1) / (max_frames - 1)) for i in range(max_frames)] |
| for idx in indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
| ret, frame = cap.read() |
| if ret: |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| sampled_images.append(Image.fromarray(rgb)) |
| timestamps.append(round(idx / fps, 1)) |
| |
| if len(sampled_images) >= 3: |
| cap.release() |
| return sampled_images, timestamps |
|
|
| |
| logger.info(f"[PIPELINE] Seeking failed or 0 frames reported ({total_frames}). Sequential fallback...") |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
| count = 0 |
| interval = int(fps * 2) if fps > 0 else 15 |
| while len(sampled_images) < max_frames and count < 5000: |
| ret, frame = cap.read() |
| if not ret: break |
| if count % interval == 0: |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| sampled_images.append(Image.fromarray(rgb)) |
| timestamps.append(round(count / fps, 1) if fps > 0 else count) |
| count += 1 |
| |
| cap.release() |
| return sampled_images, timestamps |
|
|
| async def extract_and_save_media_context(file_path: str, session_id: str, is_video: bool): |
| try: |
| session_path = session_manager.get_session_path(session_id) |
| audio_dir = os.path.join(session_path, "audio") |
| |
| ext = os.path.splitext(file_path)[1].lower() |
| is_image = ext in ['.jpg', '.jpeg', '.png', '.webp'] |
| |
| if not is_image: |
| os.makedirs(audio_dir, exist_ok=True) |
| try: |
| audio = AudioSegment.from_file(file_path) |
| audio.export(os.path.join(audio_dir, "segment_0.wav"), format="wav") |
| except Exception as ae: |
| logger.warning(f"Could not extract audio chunk from {file_path}: {ae}") |
|
|
| if is_image: |
| img = Image.open(file_path) |
| session_manager.save_frame(session_id, img, 0.0) |
| elif is_video: |
| |
| sampled_images, timestamps = get_video_frames_robust(file_path, max_frames=10) |
| |
| |
| if not sampled_images: |
| fixed_path = fix_video_file(file_path) |
| if fixed_path: |
| sampled_images, timestamps = get_video_frames_robust(fixed_path, max_frames=10) |
| try: os.unlink(fixed_path) |
| except: pass |
| |
| for img, ts in zip(sampled_images, timestamps): |
| session_manager.save_frame(session_id, img, ts) |
| except Exception as e: |
| logger.error(f"Context extraction failed: {e}") |
|
|
| |
|
|
| class AudioResponse(BaseModel): |
| session_id: str |
| caption: str |
| audio_context: Optional[str] = None |
| visual_context: Optional[str] = None |
| intelligence_insight: Optional[str] = None |
| advice: Optional[str] = None |
| options: List[str] = [] |
| industry_context: Optional[str] = None |
| mission_prompt: Optional[str] = None |
| mission_result: Optional[Dict[str, Any]] = None |
| smart_checkmate: Optional[Dict[str, Any]] = None |
| mission_model_captions: Optional[List[Dict[str, Any]]] = None |
| mission_model_fusion: Optional[str] = None |
| fusion_caption: Optional[str] = None |
|
|
| class FusionResponse(BaseModel): |
| session_id: str |
| audio_context: str |
| visual_context: str |
| video_timeline: List[dict] |
| intelligence_insight: Optional[str] = None |
| situational_report: str |
| recommended_actions: List[str] |
| threat_level: str |
| mission_result: Optional[Dict[str, Any]] = None |
| mission_prompt: Optional[str] = None |
| yolo_detections: Optional[List[Dict[str, Any]]] = None |
| whisper_transcript: Optional[str] = None |
| active_models: Optional[List[str]] = None |
| fusion_caption: Optional[str] = None |
| mission_findings_text: Optional[str] = None |
| prompt_type: Optional[str] = None |
| mission_model_captions: Optional[List[Dict[str, Any]]] = None |
| mission_model_fusion: Optional[str] = None |
| smart_checkmate: Optional[Dict[str, Any]] = None |
| cognitive_state: Optional[Dict[str, Any]] = None |
|
|
| class QueryRequest(BaseModel): |
| session_id: str |
| query: str |
|
|
| class QueryResponse(BaseModel): |
| answer: str |
| evidence_frames: Optional[List[str]] = None |
| options: List[str] = [] |
|
|
| class SynthesizeRequest(BaseModel): |
| predictions: Dict[str, str] |
| mission_prompt: Optional[str] = None |
| session_id: Optional[str] = None |
|
|
| class SynthesizeResponse(BaseModel): |
| situational_report: str |
| recommended_actions: List[str] |
| intelligence_insight: Optional[str] = None |
|
|
| |
|
|
|
|
| |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| logger.info(f"Fusion Engine warming up (Device: {device})") |
| |
| load_wavcap() |
| |
| if orchestrator: |
| orchestrator.warmup_all_specialists() |
| logger.info("[STARTUP] All models warm. Fusion Engine ready for requests.") |
| yield |
| gc.collect() |
| if torch.cuda.is_available(): torch.cuda.empty_cache() |
| logger.info("Fusion Engine shutdown.") |
|
|
| app = FastAPI(title="Unified Situational Intelligence", lifespan=lifespan) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.post("/api/process-audio", response_model=AudioResponse) |
| async def process_audio( |
| file: UploadFile = File(...), |
| industry: Optional[str] = Form(None), |
| mission_prompt: Optional[str] = Form(None), |
| session_id: Optional[str] = Form(None) |
| ): |
| if not clap_model: raise HTTPException(status_code=503, detail="Audio Analysis Engine offline") |
| |
| logger.info(f"[PIPELINE] Audio file received: {file.filename}") |
| session_id = session_manager.get_or_create_session(session_id) |
| ext = os.path.splitext(file.filename)[1] |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: |
| tmp.write(await file.read()) |
| temp_path = tmp.name |
|
|
| try: |
| await extract_and_save_media_context(temp_path, session_id, is_video=False) |
| sr = wavcap_config["audio_args"]["sr"] |
| waveform, _ = librosa.load(temp_path, sr=sr, mono=True) |
| |
| |
| min_samples = sr |
| if len(waveform) < min_samples: |
| logger.info(f"[PIPELINE] Audio too short ({len(waveform)} samples). Padding to {min_samples} samples.") |
| padding = np.zeros(min_samples - len(waveform), dtype=waveform.dtype) |
| waveform = np.concatenate([waveform, padding]) |
|
|
| wave_tensor = torch.from_numpy(waveform).to(device).unsqueeze(0) |
| |
| logger.info("[PIPELINE] Calling Wavcap audio model...") |
| with torch.no_grad(): |
| captions = clap_model.generate(samples=wave_tensor, num_beams=3, max_length=30) |
| caption = captions[0] if captions else "No sound detected." |
| logger.info(f"[PIPELINE] Wavcap caption: '{caption}'") |
|
|
| |
| |
| phased_result = orchestrator.process_audio_phased( |
| mission_prompt=mission_prompt, |
| caption=caption, |
| raw_audio=waveform, |
| session_id=session_id |
| ) |
|
|
| return { |
| "session_id": session_id, |
| "caption": caption, |
| "audio_context": caption, |
| "visual_context": "Sensor inactive (Camera disconnected)", |
| "intelligence_insight": None, |
| "advice": phased_result.get("fusion_caption", ""), |
| "options": ["Review logs", "Check sensors", "Stand by", "Clear session"], |
| "industry_context": industry, |
| "mission_prompt": mission_prompt, |
| "mission_result": None, |
| "smart_checkmate": phased_result.get("smart_checkmate"), |
| "mission_model_captions": phased_result.get("mission_findings"), |
| "mission_model_fusion": phased_result.get("mission_model_fusion"), |
| "fusion_caption": phased_result.get("fusion_caption") |
| } |
| except Exception as e: |
| logger.error(f"Audio processing error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| finally: |
| if 'temp_path' in locals() and os.path.exists(temp_path): |
| os.unlink(temp_path) |
|
|
| |
|
|
| class VisualEnhancer: |
| """Handles image enhancements requested by the Supervisor.""" |
| @staticmethod |
| def apply(img: Image.Image, strategy: str) -> Image.Image: |
| from PIL import ImageEnhance |
| |
| if strategy == "enhance_brightness": |
| logger.info("[ENHANCER] Applying Brightness Boost...") |
| enhancer = ImageEnhance.Brightness(img) |
| return enhancer.enhance(1.8) |
| |
| if strategy == "super_resolution_zoom": |
| logger.info("[ENHANCER] Applying Digital Zoom (SR Placeholder)...") |
| w, h = img.size |
| left = w/4; top = h/4; right = 3*w/4; bottom = 3*h/4 |
| return img.crop((left, top, right, bottom)).resize((w, h), Image.Resampling.LANCZOS) |
| |
| return img |
|
|
| class AudioEnhancer: |
| """Handles acoustic enhancements requested by the Supervisor.""" |
| @staticmethod |
| def apply(waveform: np.ndarray, sr: int, strategy: str) -> np.ndarray: |
| if strategy == "boost_audio_gain": |
| logger.info("[ENHANCER] Applying +10dB Gain Boost...") |
| return waveform * 3.16 |
| |
| if strategy == "noise_reduction": |
| logger.info("[ENHANCER] Applying Low-Pass Noise Filter...") |
| |
| return np.convolve(waveform, np.ones(5)/5, mode='same') |
| |
| return waveform |
|
|
| |
|
|
| @app.get("/api/sessions") |
| async def list_sessions(): |
| """Returns all sessions with metadata for the Session Selector dropdown.""" |
| sessions = [] |
| base = session_manager.base_dir |
| if not os.path.exists(base): |
| return {"sessions": []} |
| |
| for d in os.listdir(base): |
| p = os.path.join(base, d) |
| if not os.path.isdir(p): |
| continue |
| |
| frames_dir = os.path.join(p, "frames") |
| frame_count = len(os.listdir(frames_dir)) if os.path.isdir(frames_dir) else 0 |
| |
| created = os.path.getctime(p) |
| modified = os.path.getmtime(p) |
| |
| |
| label_path = os.path.join(p, "label.txt") |
| if os.path.exists(label_path): |
| with open(label_path, "r") as f: |
| label = f.read().strip() |
| else: |
| from datetime import datetime |
| label = f"Session — {datetime.fromtimestamp(created).strftime('%b %d, %I:%M %p')}" |
| |
| sessions.append({ |
| "id": d, |
| "label": label, |
| "created_at": created, |
| "last_updated": modified, |
| "frame_count": frame_count, |
| }) |
| |
| |
| sessions.sort(key=lambda x: x["last_updated"], reverse=True) |
| return {"sessions": sessions} |
|
|
| @app.post("/api/sessions/new") |
| async def create_new_session(): |
| """Creates a fresh session folder and returns its ID.""" |
| sid = session_manager.create_session() |
| session_manager.set_active_session(sid) |
| logger.info(f"[SESSION API] Created new session: {sid}") |
| return {"session_id": sid} |
|
|
| @app.post("/api/sessions/select") |
| async def select_session(request: dict): |
| """Set the active session from the dropdown. All future requests use this session.""" |
| sid = request.get("session_id", "") |
| if session_manager.set_active_session(sid): |
| return {"status": "ok", "active_session": sid} |
| raise HTTPException(status_code=404, detail=f"Session {sid} not found") |
|
|
| @app.post("/api/sessions/rename") |
| async def rename_session(request: dict): |
| """Rename a session with a custom label.""" |
| sid = request.get("session_id", "") |
| new_name = request.get("name", "").strip() |
| if not new_name: |
| raise HTTPException(status_code=400, detail="Name cannot be empty") |
| |
| session_path = session_manager.get_session_path(sid) |
| if not os.path.exists(session_path): |
| raise HTTPException(status_code=404, detail=f"Session {sid} not found") |
| |
| label_path = os.path.join(session_path, "label.txt") |
| with open(label_path, "w") as f: |
| f.write(new_name) |
| |
| logger.info(f"[SESSION API] Renamed session {sid} to '{new_name}'") |
| return {"status": "ok", "session_id": sid, "name": new_name} |
|
|
| @app.delete("/api/sessions/{session_id}") |
| async def delete_session(session_id: str): |
| """Delete a session and all its data.""" |
| session_path = session_manager.get_session_path(session_id) |
| if not os.path.exists(session_path): |
| raise HTTPException(status_code=404, detail=f"Session {session_id} not found") |
| |
| |
| if session_manager.active_session_id == session_id: |
| session_manager.active_session_id = None |
| |
| import shutil |
| shutil.rmtree(session_path) |
| logger.info(f"[SESSION API] Deleted session {session_id}") |
| return {"status": "ok", "deleted": session_id} |
|
|
| @app.post("/api/analyze", response_model=FusionResponse) |
| async def analyze_multimodal( |
| background_tasks: BackgroundTasks, |
| audio_file: Optional[UploadFile] = File(None), |
| video_file: Optional[UploadFile] = File(None), |
| mission_prompt: Optional[str] = Form(None), |
| session_id: Optional[str] = Form(None) |
| ): |
| |
| clean_mission_prompt = None |
| if mission_prompt and str(mission_prompt).lower().strip() not in ["", "none", "undefined", "null"]: |
| clean_mission_prompt = mission_prompt |
|
|
| logger.info(f"[API] /api/analyze received mission_prompt='{clean_mission_prompt}' (raw: '{mission_prompt}') session_id='{session_id}'") |
| |
| |
| session_id = session_manager.get_or_create_session(session_id) |
| |
| |
| if clean_mission_prompt != _last_mission_prompt.get(session_id): |
| _specialist_result_cache.pop(session_id, None) |
| _last_mission_prompt[session_id] = clean_mission_prompt |
| logger.info(f"[MISSION PURGE] Prompt changed for session {session_id}. Specialist cache cleared.") |
| |
| |
| request_id = _new_request(session_id) |
| logger.info(f"[API] Request #{request_id} for session {session_id}") |
| |
| |
| video_path = None |
| audio_path = None |
| is_image = False |
| |
| if video_file: |
| ext = os.path.splitext(video_file.filename)[1].lower() |
| is_image = ext in ['.jpg', '.jpeg', '.png', '.webp'] |
| with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: |
| tmp.write(await video_file.read()) |
| video_path = tmp.name |
| |
| if audio_file: |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: |
| tmp.write(await audio_file.read()) |
| audio_path = tmp.name |
|
|
| try: |
| |
| MAX_INTERNAL_RETRIES = 1 |
| current_attempt = 0 |
| final_result = None |
| |
| while current_attempt <= MAX_INTERNAL_RETRIES: |
| predictions = {} |
| video_timeline = [] |
| yolo_detections = [] |
| whisper_transcript = "" |
| |
| |
| ws = session_manager.get_world_state(session_id) |
| current_strategy = ws.active_strategies[-1] if ws.active_strategies else None |
| |
| |
| |
| if _is_stale(session_id, request_id): |
| logger.info(f"[API] Request #{request_id} cancelled (superseded before Phase 1)") |
| return _cancelled_response(session_id) |
| |
| |
| if video_path: |
| if is_image: |
| img = Image.open(video_path).convert("RGB") |
| if current_strategy: img = VisualEnhancer.apply(img, current_strategy) |
| |
| caption = orchestrator.describe_image(img) |
| predictions["video"] = caption |
| video_timeline = [{"start": 0, "end": 1.0, "sentence": caption}] |
| |
| |
| else: |
| sampled_images, timestamps = get_video_frames_robust(video_path) |
| if sampled_images: |
| if current_strategy: |
| sampled_images = [VisualEnhancer.apply(img, current_strategy) for img in sampled_images] |
| |
| captions = orchestrator.describe_frames(sampled_images) |
| for ts, cap_text in zip(timestamps, captions): |
| video_timeline.append({"start": ts, "end": ts + 2.0, "sentence": cap_text}) |
| |
| |
| predictions["video"] = consolidate_temporal_narrative(captions) |
| |
|
|
|
|
| |
| if _is_stale(session_id, request_id): |
| logger.info(f"[API] Request #{request_id} cancelled (superseded before audio)") |
| return _cancelled_response(session_id) |
| |
| if audio_path and clap_model: |
| sr = wavcap_config["audio_args"]["sr"] |
| waveform, _ = librosa.load(audio_path, sr=sr, mono=True) |
| |
| |
| if current_strategy: |
| waveform = AudioEnhancer.apply(waveform, sr, current_strategy) |
| |
| if len(waveform) < sr: waveform = np.concatenate([waveform, np.zeros(sr - len(waveform))]) |
| wave_tensor = torch.from_numpy(waveform).to(device).float().unsqueeze(0) |
| with torch.no_grad(): |
| captions_a = clap_model.generate(samples=wave_tensor, num_beams=3, max_length=30) |
| predictions["audio"] = captions_a[0] if captions_a else "Acoustic signatures detected." |
| if "speech" in predictions["audio"].lower() or "talking" in predictions["audio"].lower(): |
| whisper_transcript = orchestrator.transcribe_audio(audio_path) |
| predictions["speech"] = whisper_transcript |
|
|
| |
| cognitive_state = cognitive_specialist.analyze_perception( |
| video_path=video_path, |
| audio_path=audio_path, |
| text_context=clean_mission_prompt |
| ) |
| |
| |
| |
| raw_image = None |
| if video_path: |
| if is_image: raw_image = Image.open(video_path).convert("RGB") |
| elif sampled_images: |
| raw_image = sampled_images[0] |
| else: |
| sampled_images, _ = get_video_frames_robust(video_path, max_frames=5) |
| raw_image = sampled_images[0] if sampled_images else None |
| |
| raw_audio = None |
| if audio_path and clap_model: |
| sr = wavcap_config["audio_args"]["sr"] |
| waveform, _ = librosa.load(audio_path, sr=sr, mono=True) |
| if len(waveform) < sr: waveform = np.concatenate([waveform, np.zeros(sr - len(waveform))]) |
| raw_audio = torch.from_numpy(waveform).to(device).float().unsqueeze(0) |
|
|
| |
| if _is_stale(session_id, request_id): |
| logger.info(f"[API] Request #{request_id} cancelled (superseded before Phase 2/3)") |
| return _cancelled_response(session_id) |
| |
| |
| |
| |
| background_tasks.add_task( |
| run_specialist_analysis, |
| session_id=session_id, |
| request_id=request_id, |
| clean_mission_prompt=clean_mission_prompt, |
| predictions=predictions, |
| yolo_detections=yolo_detections, |
| raw_image=raw_image, |
| sampled_images=sampled_images if 'sampled_images' in locals() else [], |
| raw_audio=raw_audio |
| ) |
| |
| |
| cached_result = _specialist_result_cache.get(session_id, {}) |
| |
| |
| final_audio = predictions.get("audio", "N/A") |
| final_visual = predictions.get("video", "N/A") |
| |
| |
| mission_findings_text = "" |
| mission_model_captions = [] |
| if cached_result.get("mission_findings"): |
| findings_parts = [] |
| for f in cached_result["mission_findings"]: |
| model_name = f.get('model', 'unknown') |
| output = f.get('explanation') or f.get('status') or "No significant findings." |
| display_name = model_name.replace("_", " ") |
| mission_model_captions.append({"model": display_name, "caption": str(output)}) |
| findings_parts.append(f"{display_name}: {output}") |
| mission_findings_text = "\n".join(findings_parts) |
|
|
| |
| smart_checkmate_result = None |
| if clean_mission_prompt and mission_model_captions: |
| all_captions_for_checkmate = list(mission_model_captions) |
| all_captions_for_checkmate.append({"model": "audio perception", "caption": str(final_audio)}) |
| all_captions_for_checkmate.append({"model": "visual perception", "caption": str(final_visual)}) |
| |
| smart_checkmate_result = mission_evaluator.caption_checkmate( |
| mission_prompt=clean_mission_prompt, |
| specialist_captions=all_captions_for_checkmate |
| ) |
|
|
| return FusionResponse( |
| session_id=session_id, |
| audio_context=str(final_audio), |
| visual_context=str(final_visual), |
| video_timeline=video_timeline, |
| situational_report=cached_result.get("fusion_caption") or "Observing field...", |
| recommended_actions=cached_result.get("recommended_actions") or ["Processing mission intelligence..."], |
| threat_level=cached_result.get("threat_level") or "MODERATE", |
| mission_prompt=clean_mission_prompt, |
| mission_result=cached_result.get("mission_status"), |
| yolo_detections=yolo_detections, |
| whisper_transcript=whisper_transcript, |
| active_models=cached_result.get("plan", {}).get("capabilities", []), |
| fusion_caption=cached_result.get("fusion_caption") or "Neural Engine processing...", |
| mission_findings_text=mission_findings_text, |
| prompt_type=cached_result.get("plan", {}).get("prompt_type", "query"), |
| mission_model_captions=mission_model_captions, |
| mission_model_fusion=cached_result.get("mission_model_fusion") or "", |
| smart_checkmate=smart_checkmate_result, |
| cognitive_state=cognitive_state |
| ) |
|
|
| |
| return _cancelled_response(session_id) |
|
|
| except Exception as e: |
| logger.error(f"Fusion error: {e}") |
| logger.error(traceback.format_exc()) |
| raise HTTPException(status_code=500, detail=str(e) or "Internal Multi-modal Fusion Error") |
|
|
|
|
| @app.post("/api/tools/identify") |
| async def identify_target(req: IdentifyRequest): |
| """Manual identification tool for user-provided crops.""" |
| try: |
| |
| header, encoded = req.image_b64.split(",", 1) if "," in req.image_b64 else (None, req.image_b64) |
| image_data = base64.b64decode(encoded) |
| image = Image.open(io.BytesIO(image_data)).convert("RGB") |
| |
| |
| identification = tactical_specialists.identify_region(image) |
| |
| return { |
| "status": "success", |
| "identification": identification, |
| "timestamp": time.time() |
| } |
| except Exception as e: |
| logger.error(f"[API] Identification tool error: {e}") |
| return {"status": "error", "message": str(e)} |
|
|
| @app.post("/api/query", response_model=QueryResponse) |
| async def query_intelligence(request: QueryRequest): |
| if not orchestrator: raise HTTPException(status_code=503, detail="Intelligence Engine offline") |
| try: |
| res_data = orchestrator.query(request.session_id, request.query) |
| |
| if isinstance(res_data, dict): |
| final_answer = f"(v2.0-ACTIVE) {res_data.get('answer', '')}" |
| return QueryResponse( |
| answer=final_answer, |
| options=res_data.get("options", []), |
| evidence_frames=res_data.get("evidence_frames") |
| ) |
| return QueryResponse(answer=f"(v2.0-FALLBACK) {res_data}") |
| except Exception as e: |
| logger.error(f"Query error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/api/synthesize", response_model=SynthesizeResponse) |
| async def synthesize_intelligence(request: SynthesizeRequest): |
| if not orchestrator: raise HTTPException(status_code=503, detail="Intelligence Engine offline") |
| try: |
| |
| logger.info(f"[PIPELINE] Synthesizing multimodal results for session {request.session_id}...") |
| res = orchestrator.synthesize_fusion( |
| mission_prompt=request.mission_prompt, |
| predictions=request.predictions, |
| session_id=request.session_id |
| ) |
| return SynthesizeResponse( |
| situational_report=res.get("situational_report", "Analysis complete."), |
| recommended_actions=res.get("recommended_actions", []), |
| intelligence_insight=res.get("intelligence_insight") |
| ) |
| except Exception as e: |
| logger.error(f"Synthesis error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8002) |
|
|