import os import time from datetime import datetime, timezone from typing import Dict, Any, Optional, List from pathlib import Path import threading import json as json_module class MultiAgentLogger: """ Logging system for conversation-style logs. """ def __init__(self, logs_dir: str = "logs"): """ Initialize the multi-agent logger. Args: logs_dir: Directory to store log files """ self.logs_dir = Path(logs_dir) self.logs_dir.mkdir(exist_ok=True) self._lock = threading.Lock() print(f"Logging initialized. Logs directory: {self.logs_dir.absolute()}") def _get_log_file_path(self, session_id: str) -> Path: """ Get the log file path for a specific session. Args: session_id: Unique session identifier Returns: Path object for the session's log file """ date_str = datetime.now().strftime("%Y%m%d") filename = f"session_{session_id}_{date_str}.log" return self.logs_dir / filename def _write_log_entry(self, session_id: str, agent_name: str, content: str) -> None: """ Write a log entry to the session's log file. Args: session_id: Session identifier agent_name: Current agent in the process content: Current agent response """ with self._lock: log_file_path = self._get_log_file_path(session_id) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: with open(log_file_path, 'a', encoding='utf-8') as f: if agent_name == "SESSION_START": f.write(f"=== SESSION {session_id} STARTED ===\n\n") elif agent_name == "SESSION_EVENT": f.write(f"{timestamp} - {content}\n\n") else: f.write(f"{timestamp} - {agent_name}: {content}\n\n") f.flush() except Exception as e: print(f"Error writing to log file {log_file_path}: {e}") def log_session_event(self, session_id: str, event_type: str, details: Optional[Dict[str, Any]] = None) -> None: """ Log session lifecycle events (creation, image upload, clearing, etc.). Args: session_id: Session identifier event_type: Type of session event details: Additional event details """ if event_type == "session_created": self._write_log_entry(session_id, "SESSION_START", "") if details: image_size = details.get("image_size", "unknown") image_mode = details.get("image_mode", "unknown") self._write_log_entry(session_id, "SESSION_EVENT", f"Image uploaded: {image_size}, mode: {image_mode}") else: self._write_log_entry(session_id, "SESSION_EVENT", "Image uploaded: unknown") elif event_type == "conversation_cleared": self._write_log_entry(session_id, "SESSION_EVENT", "Conversation cleared") elif event_type == "multi_agent_workflow_started": self._write_log_entry(session_id, "SESSION_EVENT", "Multi-agent workflow started") def log_user_query(self, session_id: str, user_message: str, message_context: Optional[Dict[str, Any]] = None) -> None: """ Log user queries and context. Args: session_id: Session identifier user_message: User's input message message_context: Additional context (conversation length, etc.) """ self._write_log_entry(session_id, "USER", user_message) def log_agent_execution( self, session_id: str, agent_name: str, agent_input: str, agent_output: str, execution_time: float, additional_data: Optional[Dict[str, Any]] = None ) -> None: """ Log individual agent execution details. Args: session_id: Session identifier agent_name: Name of the agent (memory, detector, visual, ecology) agent_input: Input provided to the agent agent_output: Output generated by the agent execution_time: Time taken for agent execution in seconds additional_data: Agent-specific additional data """ if agent_name == "memory": formatted_name = "Memory Agent" elif agent_name == "detector": formatted_name = "DeepForest Detector Agent" elif agent_name == "visual": formatted_name = "Visual Agent" elif agent_name == "ecology": formatted_name = "Ecology Agent" else: formatted_name = agent_name.title() formatted_name_with_time = f"{formatted_name} ({execution_time:.2f}s)" content = agent_output self._write_log_entry(session_id, formatted_name_with_time, content) def log_tool_call( self, session_id: str, tool_name: str, tool_arguments: Dict[str, Any], tool_result: Dict[str, Any], execution_time: float, cache_hit: bool, reasoning: Optional[str] = None ) -> None: """ Log tool calls, their results, and cache information. Args: session_id: Session identifier tool_name: Name of the tool that was called tool_arguments: Arguments passed to the tool tool_result: Result returned by the tool execution_time: Time taken for tool execution cache_hit: Whether this was served from cache reasoning: AI's reasoning for this tool call """ if cache_hit: status = "Cache Hit (0.00s)" else: status = f"Cache Miss - Executed DeepForest detection ({execution_time:.2f}s)" content = f"{status}\n" content += f"Detection Summary: {tool_result.get('detection_summary', 'No summary')}\n" detections = tool_result.get('detections_list', []) if detections: content += f"Detection Data: {detections}" self._write_log_entry(session_id, "DeepForest Function execution", content) def log_error(self, session_id: str, error_type: str, error_message: str, context: Optional[Dict[str, Any]] = None) -> None: """ Log errors in simple format. Args: session_id: Session identifier error_type: Type/category of error error_message: Error message context: Additional context about where the error occurred """ self._write_log_entry(session_id, "ERROR", f"{error_type}: {error_message}") def log_resolution_check( self, session_id: str, image_file_path: str, resolution_result: Dict[str, Any], execution_time: float ) -> None: """ Log image resolution check results. Args: session_id: Session identifier image_file_path: Path to the image that was checked resolution_result: Results from simplified resolution check execution_time: Time taken for resolution check """ is_suitable = resolution_result.get("is_suitable", True) resolution_info = resolution_result.get("resolution_info", "No resolution info") is_georeferenced = resolution_result.get("is_georeferenced", False) resolution_cm = resolution_result.get("resolution_cm") warning = resolution_result.get("warning") content = f"Image Resolution Check ({execution_time:.3f}s)\n" content += f"File: {image_file_path}\n" content += f"Result: {'Suitable' if is_suitable else 'Insufficient'} for DeepForest\n" content += f"Details: {resolution_info}\n" content += f"Type: {'GeoTIFF' if is_georeferenced else 'Regular image'}\n" if resolution_cm is not None: content += f"Resolution: {resolution_cm:.2f} cm/pixel\n" if warning: content += f"Warning: {warning}\n" if not is_suitable: content += "Impact: DeepForest detection will be skipped due to insufficient resolution" elif warning: content += "Impact: DeepForest detection will proceed with noted warning" else: content += "Impact: Resolution suitable for DeepForest detection" self._write_log_entry(session_id, "Resolution Check", content) def log_deepforest_skip( self, session_id: str, skip_reasons: List[str], resolution_result: Optional[Dict[str, Any]] = None, visual_result: Optional[Dict[str, Any]] = None ) -> None: """ Log when DeepForest detection is skipped and why. Args: session_id: Session identifier skip_reasons: List of reasons why DeepForest was skipped resolution_result: Resolution check results (optional) visual_result: Visual analysis results (optional) """ content = "DeepForest Detection Skipped\n" content += f"Reasons: {', '.join(skip_reasons)}\n" # Add detailed reason breakdown if "insufficient resolution" in ' '.join(skip_reasons).lower(): if resolution_result: resolution_info = resolution_result.get("resolution_info", "No details") content += f"Resolution Details: {resolution_info}\n" if "poor image quality" in ' '.join(skip_reasons).lower(): if visual_result: quality_assessment = visual_result.get("image_quality_for_deepforest", "Unknown") content += f"Visual Quality Assessment: {quality_assessment}\n" content += "Impact: Analysis will rely on visual analysis only" self._write_log_entry(session_id, "DeepForest Skip Decision", content) def log_tile_analysis(self, session_id: str, tile_id: int, result: Dict[str, Any], execution_time: float) -> None: """ Log individual tile analysis results. Args: session_id: Session identifier tile_id: Tile identifier result: Tile analysis result execution_time: Time taken for tile analysis """ content = f"Tile {tile_id} Analysis ({execution_time:.2f}s)\n" coordinates = result.get('coordinates', {}) content += f"Coordinates: x={coordinates.get('x', 0)}, y={coordinates.get('y', 0)}, " content += f"width={coordinates.get('width', 0)}, height={coordinates.get('height', 0)}\n" additional_objects = result.get('additional_objects', []) if additional_objects: content += f"Additional Objects: {len(additional_objects)} objects detected\n" for obj in additional_objects: label = obj.get('label', 'unknown') bbox = obj.get('bbox', 'no coordinates') content += f" - {label} at {bbox}\n" else: content += f"Additional Objects: None detected\n" visual_analysis = result.get('visual_analysis', '') if visual_analysis: content += f"Visual Analysis: {visual_analysis}\n" assigned_detections = result.get('assigned_detections', []) content += f"Assigned DeepForest Detections: {len(assigned_detections)}\n" if 'error' in result: content += f"Error: {result['error']}\n" self._write_log_entry(session_id, f"Tile {tile_id} Analysis", content) def log_spatial_relationships( self, session_id: str, spatial_relationships: List[Dict[str, Any]], execution_time: float ) -> None: """Log spatial relationships analysis results. Args: session_id: The unique identifier for the current session. spatial_relationships: A list of dictionaries, where each dictionary contains details about an object's spatial relationships, including its grid region and intersecting objects. execution_time: The time taken to perform the spatial relationships analysis, in seconds. """ relationships_count = len(spatial_relationships) content = f"Spatial Relationships Analysis ({execution_time:.3f}s)\n" content += f"Analyzed {relationships_count} objects with confidence ≥ 0.3\n" # Group by regions by_region = {} for rel in spatial_relationships: region = rel['grid_region'] by_region[region] = by_region.get(region, 0) + 1 content += f"Distribution by region: {dict(by_region)}\n" content += f"Objects with neighbors: {sum(1 for r in spatial_relationships if r['intersecting_objects'])}\n" self._write_log_entry(session_id, "Spatial Relationships Analysis", content) def log_detection_narrative( self, session_id: str, detection_narrative: str, detections_count: int, execution_time: float ) -> None: """Log detection narrative generation. Args: session_id: The unique identifier for the current session. detection_narrative: The string containing the generated narrative. detections_count: The total number of detections used to generate the narrative. execution_time: The time taken for narrative generation, in seconds. """ narrative_length = len(detection_narrative) content = f"Detection Narrative Generation ({execution_time:.3f}s)\n" content += f"Generated narrative for {detections_count} detections\n" content += f"Narrative length: {narrative_length} characters\n" content += f"Narrative content:\n{detection_narrative}" self._write_log_entry(session_id, "Detection Narrative", content) def log_visual_analysis_unified( self, session_id: str, analysis_type: str, visual_analysis: str, additional_objects_count: int, execution_time: float ) -> None: """Log unified visual analysis results. Args: session_id: The unique identifier for the current session. analysis_type: A string specifying the type of visual analysis performed (e.g., 'segmentation', 'classification'). visual_analysis: The string containing the final analysis result. additional_objects_count: The number of objects detected beyond the initial set. execution_time: The time taken for the visual analysis, in seconds. """ content = f"Visual Analysis - {analysis_type} ({execution_time:.3f}s)\n" content += f"Additional objects detected: {additional_objects_count}\n" content += f"Analysis: {visual_analysis}" self._write_log_entry(session_id, f"Visual Analysis ({analysis_type})", content) def get_session_log_summary(self, session_id: str) -> Dict[str, Any]: """ Get a summary of all logged events for a session. Args: session_id: Session identifier Returns: Dictionary containing session log summary """ log_file_path = self._get_log_file_path(session_id) if not log_file_path.exists(): return {"error": f"No log file found for session {session_id}"} try: with open(log_file_path, 'r', encoding='utf-8') as f: content = f.read() return { "session_id": session_id, "log_file": str(log_file_path), "content_preview": content } except Exception as e: return {"error": f"Error reading log file: {str(e)}"} def get_all_session_logs(self) -> List[str]: """ Get a list of all session IDs that have log files. Returns: List of session IDs with existing log files """ session_ids = [] for log_file in self.logs_dir.glob("session_*.log"): filename = log_file.stem parts = filename.split("_") if len(parts) >= 2: session_id = parts[1] session_ids.append(session_id) return sorted(set(session_ids)) def cleanup_old_logs(self, days_to_keep: int = 7) -> int: """ Clean up log files older than specified days. Args: days_to_keep: Number of days of logs to retain Returns: Number of log files deleted """ cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60) deleted_count = 0 for log_file in self.logs_dir.glob("session_*.log"): if log_file.stat().st_mtime < cutoff_time: try: log_file.unlink() deleted_count += 1 except Exception as e: print(f"Error deleting old log file {log_file}: {e}") return deleted_count multi_agent_logger = MultiAgentLogger()