""" Threat Chat Module - GPT-powered Q&A about detected threats. """ import logging from typing import List, Dict, Any from utils.openai_client import chat_completion, extract_content, get_api_key, OpenAIAPIError from utils.gpt_reasoning import _DOMAIN_ROLES logger = logging.getLogger(__name__) def chat_about_threats( question: str, detections: List[Dict[str, Any]], mission_spec_dict: Dict[str, Any] = None, ) -> str: """ Answer user questions about detected threats using GPT. Args: question: User's question about the current threat situation. detections: List of detection dicts with gpt_raw threat analysis. mission_spec_dict: Optional dict of mission specification fields. Returns: GPT's response as a string. """ if not get_api_key(): logger.warning("OPENAI_API_KEY not set. Cannot process threat chat.") return "Error: OpenAI API key not configured." if not detections: return "No threats detected yet. Run detection first to analyze the scene." # Build threat context from detections threat_context = _build_threat_context(detections) # Domain-aware role selection domain = "GENERIC" if mission_spec_dict: domain = mission_spec_dict.get("domain", "GENERIC") role_label = _DOMAIN_ROLES.get(domain, _DOMAIN_ROLES["GENERIC"]) # Build mission context block (INV-8: mission context forwarded to LLM calls) mission_block = "" if mission_spec_dict: mission_block = "\nMISSION CONTEXT:\n" if mission_spec_dict.get("mission_intent"): mission_block += f"- Intent: {mission_spec_dict['mission_intent']}\n" if mission_spec_dict.get("domain"): mission_block += f"- Domain: {mission_spec_dict['domain']}\n" if mission_spec_dict.get("object_classes"): mission_block += f"- Target Classes: {', '.join(mission_spec_dict['object_classes'])}\n" if mission_spec_dict.get("context_phrases"): mission_block += f"- Situation: {'; '.join(mission_spec_dict['context_phrases'])}\n" mission_block += "\n" system_prompt = ( f"You are a {role_label} providing real-time threat analysis support. " "You have access to the current threat assessment data from optical surveillance. " "Answer questions concisely and tactically. Use military terminology where appropriate. " "If asked about engagement recommendations, always note that final decisions rest with the commanding officer.\n\n" f"{mission_block}" "CURRENT THREAT PICTURE:\n" f"{threat_context}\n\n" "Respond to the operator's question based on this threat data." ) payload = { "model": "gpt-4o", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ], "max_tokens": 500, "temperature": 0.3, } try: resp_data = chat_completion(payload) content, _refusal = extract_content(resp_data) return content.strip() if content else "No response generated." except OpenAIAPIError as e: logger.error("OpenAI API error: %s", e) return f"API Error: {e}" except Exception as e: logger.error("Threat chat failed: %s", e) return f"Error processing question: {str(e)}" def _build_threat_context(detections: List[Dict[str, Any]]) -> str: """Build a text summary of all detected threats for GPT context.""" lines = [] for det in detections: obj_id = det.get("id", "Unknown") label = det.get("label", "object") # Extract GPT raw data if available gpt_raw = det.get("gpt_raw") or det.get("features") or {} # Universal schema fields (with fallbacks to legacy names) obj_type = gpt_raw.get("object_type") or gpt_raw.get("vessel_category", label) size = gpt_raw.get("size", "") threat_score = ( det.get("threat_level_score") or gpt_raw.get("threat_level") or gpt_raw.get("threat_level_score", "?") ) threat_class = ( det.get("threat_classification") or gpt_raw.get("threat_classification", "Unknown") ) weapons = gpt_raw.get("visible_weapons", []) weapon_ready = gpt_raw.get("weapon_readiness") or det.get("weapon_readiness", "Unknown") motion = gpt_raw.get("motion_status", "Unknown") range_est = gpt_raw.get("range_estimate") or gpt_raw.get("range_estimation_nm", "") bearing = gpt_raw.get("bearing") or gpt_raw.get("bearing_clock") or det.get("gpt_direction", "") intent = gpt_raw.get("tactical_intent", "") dynamic_features = gpt_raw.get("dynamic_features", []) # Build entry entry = f"[{obj_id}] {obj_type}" if size and size != "Unknown": entry += f" ({size})" entry += f"\n - Threat: {threat_class} (Score: {threat_score}/10)" if range_est: entry += f"\n - Range: {range_est}" if bearing and bearing != "Unknown": entry += f", Bearing: {bearing}" if motion and motion != "Unknown": entry += f"\n - Motion: {motion}" if weapons: entry += f"\n - Weapons: {', '.join(weapons) if isinstance(weapons, list) else weapons}" if weapon_ready and weapon_ready != "Unknown": entry += f" ({weapon_ready})" if intent: entry += f"\n - Assessed Intent: {intent}" # Append dynamic features for feat in dynamic_features: if isinstance(feat, dict): key = feat.get("key", "") value = feat.get("value", "") if key and value: entry += f"\n - {key}: {value}" lines.append(entry) return "\n\n".join(lines) if lines else "No threat data available."