import time import json import torch import gc from typing import Dict, List, Any, Optional, Generator from deepforest_agent.agents.memory_agent import MemoryAgent from deepforest_agent.agents.deepforest_detector_agent import DeepForestDetectorAgent from deepforest_agent.agents.visual_analysis_agent import VisualAnalysisAgent from deepforest_agent.agents.ecology_analysis_agent import EcologyAnalysisAgent from deepforest_agent.utils.state_manager import session_state_manager from deepforest_agent.utils.cache_utils import tool_call_cache from deepforest_agent.utils.image_utils import check_image_resolution_for_deepforest from deepforest_agent.utils.logging_utils import multi_agent_logger from deepforest_agent.utils.detection_narrative_generator import DetectionNarrativeGenerator class AgentOrchestrator: """ Orchestrates the multi-agent workflow with memory context + visual contexts + DeepForest detection context + ecological synthesis. """ def __init__(self): """Initialize the Agent Orchestrator.""" self.memory_agent = MemoryAgent() self.detector_agent = DeepForestDetectorAgent() self.visual_agent = VisualAnalysisAgent() self.ecology_agent = EcologyAnalysisAgent() self.execution_stats = { "total_runs": 0, "successful_runs": 0, "average_execution_time": 0.0, "memory_direct_answers": 0, "deepforest_skipped": 0 } def _log_gpu_memory(self, session_id: str, stage: str, agent_name: str): """ Log current GPU memory usage. Args: session_id (str): Unique identifier for the user session being processed stage (str): Workflow stage identifier (e.g., "before", "after", "cleanup") agent_name (str): Name of the agent being monitored (e.g., "Visual Analysis", "DeepForest Detection", "Memory Agent") """ if torch.cuda.is_available(): allocated_gb = torch.cuda.memory_allocated() / 1024**3 cached_gb = torch.cuda.memory_reserved() / 1024**3 multi_agent_logger.log_agent_execution( session_id=session_id, agent_name=f"gpu_memory_{stage}", agent_input=f"{agent_name} - {stage}", agent_output=f"GPU Memory - Allocated: {allocated_gb:.2f} GB, Cached: {cached_gb:.2f} GB", execution_time=0.0 ) print(f"Session {session_id} - {agent_name} {stage}: GPU Memory - Allocated: {allocated_gb:.2f} GB, Cached: {cached_gb:.2f} GB") def cleanup_all_agents(self): """Cleanup models to manage memory.""" print("Orchestrator cleanup:") gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() torch.cuda.ipc_collect() print(f"Final GPU memory after orchestrator cleanup: {torch.cuda.memory_allocated() / 1024**3:.2f} GB") def _aggressive_gpu_cleanup(self, session_id: str, stage: str): """ Perform aggressive GPU memory cleanup. Args: session_id (str): Unique identifier for the user session stage (str): Workflow stage identifier for logging context """ if torch.cuda.is_available(): for i in range(3): gc.collect() torch.cuda.empty_cache() torch.cuda.ipc_collect() torch.cuda.synchronize() try: torch.cuda.reset_peak_memory_stats() torch.cuda.reset_accumulated_memory_stats() except: pass allocated = torch.cuda.memory_allocated() / 1024**3 cached = torch.cuda.memory_reserved() / 1024**3 print(f"Session {session_id} - {stage} aggressive cleanup: {allocated:.2f} GB allocated, {cached:.2f} GB cached") def _format_detection_data_for_monitor(self, detection_narrative: str, detections_list: Optional[List[Dict[str, Any]]] = None) -> str: """ Format detection data for monitor display. Args: detection_narrative: Generated detection context from DeepForest Data detections_list: Full DeepForest detection data Returns: Formatted detection data for monitor """ monitor_parts = [] if detections_list: monitor_parts.append("=== DEEPFOREST DETECTIONS ===") monitor_parts.append(json.dumps(detections_list, indent=2)) monitor_parts.append("") if detection_narrative: monitor_parts.append("=== DETECTION NARRATIVE ===") monitor_parts.append(detection_narrative) return "\n".join(monitor_parts) if monitor_parts else "No detection data available" def _get_cached_detection_narrative(self, tool_cache_id: str) -> Optional[str]: """ Retrieve detection narrative using tool cache ID from the tool_call_cache. Args: tool_cache_id: Tool cache identifier Returns: Detection context from DeepForest Data if found, None otherwise """ try: print(f"Looking up cached detection narrative for tool_cache_id: {tool_cache_id}") # Handle multiple cache IDs cache_ids = [id.strip() for id in tool_cache_id.split(",")] if tool_cache_id else [] all_narratives = [] for cache_id in cache_ids: if cache_id in tool_call_cache.cache_data: cached_entry = tool_call_cache.cache_data[cache_id] cached_result = cached_entry.get("result", {}) tool_name = cached_entry.get("tool_name", "unknown") tool_arguments = cached_entry.get("arguments", {}) # Get all possible arguments including defaults from Config from deepforest_agent.conf.config import Config all_arguments = Config.DEEPFOREST_DEFAULTS.copy() all_arguments.update(tool_arguments) # Format tool call info with all arguments args_str = ", ".join([f"{k}={v}" for k, v in all_arguments.items()]) # Check if we have detections_list to generate narrative from detections_list = cached_result.get("detections_list", []) if detections_list: print(f"Found {len(detections_list)} cached detections for cache ID {cache_id}") # Get image dimensions for narrative generation try: session_keys = list(session_state_manager._sessions.keys()) if session_keys: current_image = session_state_manager.get(session_keys[0], "current_image") if current_image: image_width, image_height = current_image.size else: image_width, image_height = 0, 0 else: image_width, image_height = 0, 0 except: image_width, image_height = 0, 0 # Generate fresh narrative from cached detection data narrative_generator = DetectionNarrativeGenerator(image_width, image_height) cached_detection_narrative = narrative_generator.generate_comprehensive_narrative(detections_list) # Format with proper tool cache ID structure formatted_narrative = f"**TOOL CACHE ID:** {cache_id}\nDeepForest tool run with arguments ({args_str}) and got the below narratives:\nDETECTION NARRATIVE:\n{cached_detection_narrative}" all_narratives.append(formatted_narrative) else: detection_summary = cached_result.get("detection_summary", "") if detection_summary: formatted_summary = f"**TOOL CACHE ID:** {cache_id}\nDeepForest tool run with arguments ({args_str}) and got the below narratives:\nDETECTION NARRATIVE:\n{detection_summary}" all_narratives.append(formatted_summary) if all_narratives: print(f"Generated {len(all_narratives)} cached detection narratives") return "\n\n".join(all_narratives) print(f"No cached data found for tool_cache_id(s): {tool_cache_id}") return None except Exception as e: print(f"Error retrieving cached detection narrative for {tool_cache_id}: {e}") return None def process_user_message_streaming( self, user_message: str, conversation_history: List[Dict[str, Any]], session_id: str ) -> Generator[Dict[str, Any], None, None]: """ Orchestrate the multi-agent workflow with memory context and detection narrative flow. Args: user_message: Current user message/query to be processed conversation_history: Full conversation history session_id: Unique session identifier for this user's workflow Yields: Dict[str, Any]: Progress updates during processing """ start_time = time.perf_counter() self.execution_stats["total_runs"] += 1 print(f"Session {session_id} - Query: {user_message}") print(f"Session {session_id} - Conversation history length: {len(conversation_history)}") agent_results = {} execution_summary = { "agents_executed": [], "execution_order": [], "timings": {}, "status": "in_progress", "session_id": session_id, "workflow_type": "memory_narrative_flow", "memory_provided_direct_answer": False, "deepforest_executed": False } memory_context = "" visual_context = "" detection_narrative = "" memory_tool_cache_id = None current_tool_cache_id = None try: if not session_state_manager.session_exists(session_id): raise ValueError(f"Session {session_id} not found") session_state_manager.set_processing_state(session_id, True) session_state_manager.reset_cancellation(session_id) yield { "stage": "memory", "message": "Analyzing conversation memory and context...", "type": "progress" } if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 1: Memory Agent Processing (Session {session_id})") self._log_gpu_memory(session_id, "before", "Memory Agent") memory_start = time.perf_counter() memory_result = self.memory_agent.process_conversation_history_structured( conversation_history=conversation_history, latest_message=user_message, session_id=session_id ) memory_time = time.perf_counter() - memory_start self._log_gpu_memory(session_id, "after", "Memory Agent") self._aggressive_gpu_cleanup(session_id, "after_memory_agent") execution_summary["timings"]["memory_agent"] = memory_time execution_summary["agents_executed"].append("memory") execution_summary["execution_order"].append("memory") agent_results["memory"] = memory_result # Extract memory context and tool cache ID memory_context = memory_result.get("relevant_context", "No memory context available") tool_cache_id = memory_result.get("tool_cache_id") print(f"Session {session_id} - Memory Agent: Completed in {memory_time:.2f}s") print(f"Session {session_id} - Memory Has Answer: {memory_result['answer_present']}") print(f"Session {session_id} - Tool Cache ID: {tool_cache_id}") if memory_result["answer_present"]: print(f"Session {session_id} - Memory has direct answer - using cached data for synthesis") self.execution_stats["memory_direct_answers"] += 1 execution_summary["memory_provided_direct_answer"] = True # Get cached detection narrative if available cached_detection_narrative = "" if tool_cache_id: cached_detection_narrative = self._get_cached_detection_narrative(tool_cache_id) or "" yield { "stage": "ecology", "message": "Using memory context and cached detection narrative for synthesis...", "type": "progress" } if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 2 (MEMORY PATH): Ecology Agent with Memory Context (Session {session_id})") self._log_gpu_memory(session_id, "before", "Ecology Agent (Memory Path)") ecology_start = time.perf_counter() # Prepare comprehensive context comprehensive_context = self._prepare_comprehensive_context( memory_context=memory_context, visual_context="", detection_narrative=cached_detection_narrative, tool_cache_id=tool_cache_id ) final_response = "" for token_result in self.ecology_agent.synthesize_analysis_streaming( user_message=user_message, memory_context=comprehensive_context, cached_json=None, current_json=None, session_id=session_id ): if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") final_response += token_result["token"] yield { "stage": "ecology_streaming", "message": final_response, "type": "streaming", "is_complete": token_result["is_complete"] } if token_result["is_complete"]: ecology_time = time.perf_counter() - ecology_start self._log_gpu_memory(session_id, "after", "Ecology Agent (Memory Path)") execution_summary["timings"]["ecology_agent"] = ecology_time execution_summary["agents_executed"].append("ecology") execution_summary["execution_order"].append("ecology") agent_results["ecology"] = {"final_response": final_response} print(f"Session {session_id} - Ecology (Memory Path): Completed in {ecology_time:.2f}s") break total_time = time.perf_counter() - start_time execution_summary["timings"]["total"] = total_time execution_summary["status"] = "completed_via_memory" detection_data_monitor = self._format_detection_data_for_monitor( detection_narrative=cached_detection_narrative ) yield { "stage": "complete", "message": final_response, "type": "final", "detection_data": detection_data_monitor, "agent_results": agent_results, "execution_summary": execution_summary, "execution_time": total_time, "status": "success" } return else: for result in self._execute_full_pipeline_with_narrative_flow( user_message=user_message, conversation_history=conversation_history, session_id=session_id, memory_context=memory_context, memory_tool_cache_id=memory_result.get("tool_cache_id"), start_time=start_time ): yield result if result["type"] == "final": return except Exception as e: error_msg = f"Orchestrator error (Session {session_id}): {str(e)}" print(f"ORCHESTRATOR ERROR: {error_msg}") try: self._aggressive_gpu_cleanup(session_id, "emergency") except Exception as cleanup_error: print(f"Emergency cleanup error: {cleanup_error}") partial_time = time.perf_counter() - start_time execution_summary["timings"]["total"] = partial_time execution_summary["status"] = "error" execution_summary["error"] = error_msg fallback_response = self._create_fallback_response( user_message=user_message, agent_results=agent_results, error=error_msg, session_id=session_id ) yield { "stage": "error", "message": fallback_response, "type": "final", "detection_data": "Error occurred - no detection data available", "agent_results": agent_results, "execution_summary": execution_summary, "execution_time": partial_time, "status": "error", "error": error_msg } finally: session_state_manager.set_processing_state(session_id, False) def _execute_full_pipeline_with_narrative_flow( self, user_message: str, conversation_history: List[Dict[str, Any]], session_id: str, memory_context: str, memory_tool_cache_id: Optional[str], start_time: float ) -> Generator[Dict[str, Any], None, None]: """ Execute the complete pipeline using memory context, visual contexts, and detection narratives. Args: user_message: Current user query conversation_history: Complete conversation context session_id: Unique session identifier memory_context: Context from memory agent memory_tool_cache_id (Optional[str]): Cache identifier from memory agent start_time: Start time for total execution calculation Yields: Dict[str, Any]: Progress updates during processing containing: - stage (str): Current workflow stage ("visual_analysis", "detector", etc.) - message (str): Human-readable progress message - type (str): Update type ("progress", "streaming", "final") - Additional stage-specific data (detection_data, agent_results, etc.) """ agent_results = {} execution_summary = { "agents_executed": [], "execution_order": [], "timings": {}, "status": "in_progress", "session_id": session_id, "workflow_type": "Full Pipeline with Narrative Flow", "memory_provided_direct_answer": False, "deepforest_executed": False } visual_context = "" detection_narrative = "" yield {"stage": "visual_analysis", "message": "Analyzing image with unified full/tiled approach...", "type": "progress"} if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 1: Visual Analysis (Session {session_id})") self._log_gpu_memory(session_id, "before", "Visual Analysis") visual_start = time.perf_counter() # Unified visual analysis visual_analysis_result = self.visual_agent.analyze_full_image( user_message=user_message, session_id=session_id ) visual_time = time.perf_counter() - visual_start self._log_gpu_memory(session_id, "after", "Visual Analysis") self._aggressive_gpu_cleanup(session_id, "after_visual_analysis") execution_summary["timings"]["visual_analysis"] = visual_time execution_summary["agents_executed"].append("visual_analysis") execution_summary["execution_order"].append("visual_analysis") agent_results["visual_analysis"] = visual_analysis_result # Extract visual context visual_context = visual_analysis_result.get("visual_analysis", "No visual analysis available") print(f"Session {session_id} - Visual Analysis: {visual_analysis_result.get('status')}") print(f"Session {session_id} - Analysis Type: {visual_analysis_result.get('analysis_type')}") yield {"stage": "resolution_check", "message": "Checking image resolution for DeepForest suitability...", "type": "progress"} if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 2: Resolution Check (Session {session_id})") resolution_start = time.perf_counter() image_file_path = session_state_manager.get(session_id, "image_file_path") resolution_result = None if image_file_path: resolution_result = check_image_resolution_for_deepforest(image_file_path) resolution_time = time.perf_counter() - resolution_start multi_agent_logger.log_resolution_check( session_id=session_id, image_file_path=image_file_path, resolution_result=resolution_result, execution_time=resolution_time ) else: resolution_result = { "is_suitable": True, "resolution_info": "No file path available for resolution check", "error": None } resolution_time = time.perf_counter() - resolution_start execution_summary["timings"]["resolution_check"] = resolution_time execution_summary["agents_executed"].append("resolution_check") execution_summary["execution_order"].append("resolution_check") agent_results["resolution_check"] = resolution_result # Determine if DeepForest should run detection_result = None image_quality_good = visual_analysis_result.get("image_quality_for_deepforest", "No").lower() == "yes" resolution_suitable = resolution_result.get("is_suitable", True) if resolution_suitable and image_quality_good: yield {"stage": "detector", "message": "Quality and resolution good - executing DeepForest detection with narrative generation...", "type": "progress"} if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 3: DeepForest Detection with R-tree and Narrative (Session {session_id})") self._log_gpu_memory(session_id, "before", "DeepForest Detection") detector_start = time.perf_counter() visual_objects = visual_analysis_result.get("deepforest_objects_present", []) try: detection_result = self.detector_agent.execute_detection_with_context( user_message=user_message, session_id=session_id, visual_objects_detected=visual_objects, memory_context=memory_context ) detector_time = time.perf_counter() - detector_start self._log_gpu_memory(session_id, "after", "DeepForest Detection") self._aggressive_gpu_cleanup(session_id, "after_deepforest_detection") execution_summary["timings"]["detector_agent"] = detector_time execution_summary["agents_executed"].append("detector") execution_summary["execution_order"].append("detector") execution_summary["deepforest_executed"] = True agent_results["detector"] = detection_result # Extract detection narrative and tool cache ID from current run current_detection_narrative = detection_result.get("detection_narrative", "No detection narrative available") # Combine cached narratives from memory with current detection narrative combined_narratives = [] # Add cached narratives from memory's tool cache IDs (if any) if memory_tool_cache_id: cached_narrative = self._get_cached_detection_narrative(memory_tool_cache_id) if cached_narrative: combined_narratives.append(cached_narrative) # Add current detection narratives for ALL tool results tool_results = detection_result.get("tool_results", []) if tool_results: for tool_result in tool_results: cache_key = tool_result.get("cache_key") tool_arguments = tool_result.get("tool_arguments", {}) if cache_key and tool_arguments: # Get all possible arguments including defaults from Config from deepforest_agent.conf.config import Config all_arguments = Config.DEEPFOREST_DEFAULTS.copy() all_arguments.update(tool_arguments) # Format tool call info with all arguments args_str = ", ".join([f"{k}={v}" for k, v in all_arguments.items()]) formatted_current = f"**TOOL CACHE ID:** {cache_key}\nDeepForest tool run with arguments ({args_str}) and got the below narratives:\nDETECTION NARRATIVE:\n{current_detection_narrative}" combined_narratives.append(formatted_current) # If no tool results but we have narrative, add it without formatting if not tool_results and current_detection_narrative and current_detection_narrative != "No detection narrative available": combined_narratives.append(current_detection_narrative) # Combine all narratives detection_narrative = "\n\n".join(combined_narratives) if combined_narratives else "No detection narrative available" print(f"Session {session_id} - DeepForest Detection completed with narrative") except Exception as detector_error: print(f"Session {session_id} - DeepForest Detection FAILED: {detector_error}") detection_result = None detection_narrative = f"DeepForest detection failed: {str(detector_error)}" else: skip_reasons = [] if not resolution_suitable: skip_reasons.append("insufficient resolution") if not image_quality_good: skip_reasons.append("poor image quality") print(f"Session {session_id} - Skipping DeepForest detection: {', '.join(skip_reasons)}") execution_summary["deepforest_executed"] = False execution_summary["deepforest_skip_reason"] = ", ".join(skip_reasons) detection_narrative = f"DeepForest detection was skipped due to: {', '.join(skip_reasons)}" yield {"stage": "ecology", "message": "Synthesizing ecological insights from all contexts...", "type": "progress"} if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") print(f"\nSTEP 4: Ecology Analysis with Comprehensive Context (Session {session_id})") self._log_gpu_memory(session_id, "before", "Ecology Analysis") ecology_start = time.perf_counter() # Prepare comprehensive context for ecology agent comprehensive_context = self._prepare_comprehensive_context( memory_context=memory_context, visual_context=visual_context, detection_narrative=detection_narrative, tool_cache_id=memory_tool_cache_id ) final_response = "" try: for token_result in self.ecology_agent.synthesize_analysis_streaming( user_message=user_message, memory_context=comprehensive_context, cached_json=None, current_json=None, session_id=session_id ): if session_state_manager.is_cancelled(session_id): raise Exception("Processing cancelled by user") final_response += token_result["token"] yield { "stage": "ecology_streaming", "message": final_response, "type": "streaming", "is_complete": token_result["is_complete"] } if token_result["is_complete"]: break except Exception as ecology_error: print(f"Session {session_id} - Ecology streaming error: {ecology_error}") if not final_response: final_response = f"Ecology analysis failed: {str(ecology_error)}" finally: ecology_time = time.perf_counter() - ecology_start self._log_gpu_memory(session_id, "after", "Ecology Analysis") self._aggressive_gpu_cleanup(session_id, "after_ecology_analysis") execution_summary["timings"]["ecology_agent"] = ecology_time execution_summary["agents_executed"].append("ecology") execution_summary["execution_order"].append("ecology") agent_results["ecology"] = {"final_response": final_response} # Store context data for memory agent's next turn current_turn = len(session_state_manager.get(session_id, "conversation_history", [])) // 2 + 1 all_tool_cache_ids = [] if memory_tool_cache_id: all_tool_cache_ids.extend([id.strip() for id in memory_tool_cache_id.split(",")]) # Add all current tool cache IDs tool_results = detection_result.get("tool_results", []) if detection_result else [] for tool_result in tool_results: cache_key = tool_result.get("cache_key") if cache_key: all_tool_cache_ids.append(cache_key) combined_tool_cache_id = ", ".join(all_tool_cache_ids) if all_tool_cache_ids else None self.memory_agent.store_turn_context( session_id=session_id, turn_number=current_turn, visual_context=visual_context, detection_narrative=detection_narrative, tool_cache_id=combined_tool_cache_id ) # Final result total_time = time.perf_counter() - start_time execution_summary["timings"]["total"] = total_time execution_summary["status"] = "completed_narrative_flow" detection_data_monitor = self._format_detection_data_for_monitor( detection_narrative=detection_narrative, detections_list=detection_result.get("detections_list", []) if detection_result else None ) print(f"Session {session_id} - NARRATIVE FLOW WORKFLOW COMPLETED") yield { "stage": "complete", "message": final_response, "type": "final", "detection_data": detection_data_monitor, "agent_results": agent_results, "execution_summary": execution_summary, "execution_time": total_time, "status": "success" } def _prepare_comprehensive_context( self, memory_context: str, visual_context: str, detection_narrative: str, tool_cache_id: Optional[str] ) -> str: """ Prepare comprehensive context combining all data sources with better formatting. Args: memory_context: Context from memory agent visual_context: Visual analysis context detection_narrative: R-tree based detection narrative tool_cache_id: Tool cache reference if available Returns: Combined context string for ecology agent """ context_parts = [] # Memory context section if memory_context and memory_context != "No memory context available": context_parts.append("--- START OF MEMORY CONTEXT ---") context_parts.append(memory_context) context_parts.append("--- END OF MEMORY CONTEXT ---") context_parts.append("") # Tool cache reference if tool_cache_id: context_parts.append(f"**TOOL CACHE ID:** {tool_cache_id}") context_parts.append("") # Detection narrative section if detection_narrative and detection_narrative not in ["No detection analysis available", ""]: context_parts.append("--- START OF DETECTION ANALYSIS ---") context_parts.append(detection_narrative) context_parts.append("--- END OF DETECTION ANALYSIS ---") context_parts.append("") # Visual context section if visual_context and visual_context != "No visual analysis available": context_parts.append("--- START OF VISUAL ANALYSIS ---") context_parts.append(visual_context) context_parts.append("There may be information that are not clear or accurate in this visual analysis. So make sure to mention that this analysis is provided by a visual analysis agent and it may not be very accurate as there is no confidence score associated with it. You can only provide this analysis seperately in a different section and inform the user that you are not very confident about this analysis.") context_parts.append("--- END OF VISUAL ANALYSIS ---") context_parts.append("") # If we have very little context, provide a meaningful message if not context_parts or len("".join(context_parts)) < 50: return "No comprehensive context available for this query. Please provide more information or try a different approach." result_context = "\n".join(context_parts) print(f"Prepared comprehensive context ({len(result_context)} characters)") print(f"Context preview: {result_context[:200]}...") return result_context def _create_fallback_response( self, user_message: str, agent_results: Dict[str, Any], error: str, session_id: str ) -> str: """Create a fallback response when the orchestrator encounters errors.""" response_parts = [] response_parts.append(f"I encountered some processing issues but can provide analysis based on available data:") response_parts.append("") memory_result = agent_results.get("memory", {}) if memory_result and memory_result.get("relevant_context"): response_parts.append(f"**Memory Context**: {memory_result['relevant_context']}") response_parts.append("") visual_result = agent_results.get("visual_analysis", {}) if visual_result and visual_result.get("visual_analysis"): response_parts.append(f"**Visual Analysis**: {visual_result['visual_analysis']}") response_parts.append("") detector_result = agent_results.get("detector", {}) if detector_result and detector_result.get("detection_narrative"): response_parts.append(f"**Detection Results**: {detector_result['detection_narrative']}") response_parts.append("") response_parts.append(f"Note: Workflow was interrupted ({error}). Please try your query again for full results.") return "\n".join(response_parts)