Spaces:
Paused
Paused
| """ | |
| Threat Chat Module - GPT-powered Q&A about detected threats. | |
| """ | |
| import logging | |
| from typing import List, Dict, Any | |
| from utils.openai_client import chat_completion, extract_content, get_api_key, OpenAIAPIError | |
| from utils.gpt_reasoning import _DOMAIN_ROLES | |
| logger = logging.getLogger(__name__) | |
| def chat_about_threats( | |
| question: str, | |
| detections: List[Dict[str, Any]], | |
| mission_spec_dict: Dict[str, Any] = None, | |
| ) -> str: | |
| """ | |
| Answer user questions about detected threats using GPT. | |
| Args: | |
| question: User's question about the current threat situation. | |
| detections: List of detection dicts with gpt_raw threat analysis. | |
| mission_spec_dict: Optional dict of mission specification fields. | |
| Returns: | |
| GPT's response as a string. | |
| """ | |
| if not get_api_key(): | |
| logger.warning("OPENAI_API_KEY not set. Cannot process threat chat.") | |
| return "Error: OpenAI API key not configured." | |
| if not detections: | |
| return "No threats detected yet. Run detection first to analyze the scene." | |
| # Build threat context from detections | |
| threat_context = _build_threat_context(detections) | |
| # Domain-aware role selection | |
| domain = "GENERIC" | |
| if mission_spec_dict: | |
| domain = mission_spec_dict.get("domain", "GENERIC") | |
| role_label = _DOMAIN_ROLES.get(domain, _DOMAIN_ROLES["GENERIC"]) | |
| # Build mission context block (INV-8: mission context forwarded to LLM calls) | |
| mission_block = "" | |
| if mission_spec_dict: | |
| mission_block = "\nMISSION CONTEXT:\n" | |
| if mission_spec_dict.get("mission_intent"): | |
| mission_block += f"- Intent: {mission_spec_dict['mission_intent']}\n" | |
| if mission_spec_dict.get("domain"): | |
| mission_block += f"- Domain: {mission_spec_dict['domain']}\n" | |
| if mission_spec_dict.get("object_classes"): | |
| mission_block += f"- Target Classes: {', '.join(mission_spec_dict['object_classes'])}\n" | |
| if mission_spec_dict.get("context_phrases"): | |
| mission_block += f"- Situation: {'; '.join(mission_spec_dict['context_phrases'])}\n" | |
| mission_block += "\n" | |
| system_prompt = ( | |
| f"You are a {role_label} providing real-time threat analysis support. " | |
| "You have access to the current threat assessment data from optical surveillance. " | |
| "Answer questions concisely and tactically. Use military terminology where appropriate. " | |
| "If asked about engagement recommendations, always note that final decisions rest with the commanding officer.\n\n" | |
| f"{mission_block}" | |
| "CURRENT THREAT PICTURE:\n" | |
| f"{threat_context}\n\n" | |
| "Respond to the operator's question based on this threat data." | |
| ) | |
| payload = { | |
| "model": "gpt-4o", | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": question} | |
| ], | |
| "max_tokens": 500, | |
| "temperature": 0.3, | |
| } | |
| try: | |
| resp_data = chat_completion(payload) | |
| content, _refusal = extract_content(resp_data) | |
| return content.strip() if content else "No response generated." | |
| except OpenAIAPIError as e: | |
| logger.error("OpenAI API error: %s", e) | |
| return f"API Error: {e}" | |
| except Exception as e: | |
| logger.error("Threat chat failed: %s", e) | |
| return f"Error processing question: {str(e)}" | |
| def _build_threat_context(detections: List[Dict[str, Any]]) -> str: | |
| """Build a text summary of all detected threats for GPT context.""" | |
| lines = [] | |
| for det in detections: | |
| obj_id = det.get("id", "Unknown") | |
| label = det.get("label", "object") | |
| # Extract GPT raw data if available | |
| gpt_raw = det.get("gpt_raw") or det.get("features") or {} | |
| # Universal schema fields (with fallbacks to legacy names) | |
| obj_type = gpt_raw.get("object_type") or gpt_raw.get("vessel_category", label) | |
| size = gpt_raw.get("size", "") | |
| threat_score = ( | |
| det.get("threat_level_score") | |
| or gpt_raw.get("threat_level") | |
| or gpt_raw.get("threat_level_score", "?") | |
| ) | |
| threat_class = ( | |
| det.get("threat_classification") | |
| or gpt_raw.get("threat_classification", "Unknown") | |
| ) | |
| weapons = gpt_raw.get("visible_weapons", []) | |
| weapon_ready = gpt_raw.get("weapon_readiness") or det.get("weapon_readiness", "Unknown") | |
| motion = gpt_raw.get("motion_status", "Unknown") | |
| range_est = gpt_raw.get("range_estimate") or gpt_raw.get("range_estimation_nm", "") | |
| bearing = gpt_raw.get("bearing") or gpt_raw.get("bearing_clock") or det.get("gpt_direction", "") | |
| intent = gpt_raw.get("tactical_intent", "") | |
| dynamic_features = gpt_raw.get("dynamic_features", []) | |
| # Build entry | |
| entry = f"[{obj_id}] {obj_type}" | |
| if size and size != "Unknown": | |
| entry += f" ({size})" | |
| entry += f"\n - Threat: {threat_class} (Score: {threat_score}/10)" | |
| if range_est: | |
| entry += f"\n - Range: {range_est}" | |
| if bearing and bearing != "Unknown": | |
| entry += f", Bearing: {bearing}" | |
| if motion and motion != "Unknown": | |
| entry += f"\n - Motion: {motion}" | |
| if weapons: | |
| entry += f"\n - Weapons: {', '.join(weapons) if isinstance(weapons, list) else weapons}" | |
| if weapon_ready and weapon_ready != "Unknown": | |
| entry += f" ({weapon_ready})" | |
| if intent: | |
| entry += f"\n - Assessed Intent: {intent}" | |
| # Append dynamic features | |
| for feat in dynamic_features: | |
| if isinstance(feat, dict): | |
| key = feat.get("key", "") | |
| value = feat.get("value", "") | |
| if key and value: | |
| entry += f"\n - {key}: {value}" | |
| lines.append(entry) | |
| return "\n\n".join(lines) if lines else "No threat data available." | |