Spaces:
No application file
No application file
| import os | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, Optional, List | |
| from pathlib import Path | |
| import threading | |
| import json as json_module | |
| class MultiAgentLogger: | |
| """ | |
| Logging system for conversation-style logs. | |
| """ | |
| def __init__(self, logs_dir: str = "logs"): | |
| """ | |
| Initialize the multi-agent logger. | |
| Args: | |
| logs_dir: Directory to store log files | |
| """ | |
| self.logs_dir = Path(logs_dir) | |
| self.logs_dir.mkdir(exist_ok=True) | |
| self._lock = threading.Lock() | |
| print(f"Logging initialized. Logs directory: {self.logs_dir.absolute()}") | |
| def _get_log_file_path(self, session_id: str) -> Path: | |
| """ | |
| Get the log file path for a specific session. | |
| Args: | |
| session_id: Unique session identifier | |
| Returns: | |
| Path object for the session's log file | |
| """ | |
| date_str = datetime.now().strftime("%Y%m%d") | |
| filename = f"session_{session_id}_{date_str}.log" | |
| return self.logs_dir / filename | |
| def _write_log_entry(self, session_id: str, agent_name: str, content: str) -> None: | |
| """ | |
| Write a log entry to the session's log file. | |
| Args: | |
| session_id: Session identifier | |
| agent_name: Current agent in the process | |
| content: Current agent response | |
| """ | |
| with self._lock: | |
| log_file_path = self._get_log_file_path(session_id) | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| try: | |
| with open(log_file_path, 'a', encoding='utf-8') as f: | |
| if agent_name == "SESSION_START": | |
| f.write(f"=== SESSION {session_id} STARTED ===\n\n") | |
| elif agent_name == "SESSION_EVENT": | |
| f.write(f"{timestamp} - {content}\n\n") | |
| else: | |
| f.write(f"{timestamp} - {agent_name}: {content}\n\n") | |
| f.flush() | |
| except Exception as e: | |
| print(f"Error writing to log file {log_file_path}: {e}") | |
| def log_session_event(self, session_id: str, event_type: str, details: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Log session lifecycle events (creation, image upload, clearing, etc.). | |
| Args: | |
| session_id: Session identifier | |
| event_type: Type of session event | |
| details: Additional event details | |
| """ | |
| if event_type == "session_created": | |
| self._write_log_entry(session_id, "SESSION_START", "") | |
| if details: | |
| image_size = details.get("image_size", "unknown") | |
| image_mode = details.get("image_mode", "unknown") | |
| self._write_log_entry(session_id, "SESSION_EVENT", f"Image uploaded: {image_size}, mode: {image_mode}") | |
| else: | |
| self._write_log_entry(session_id, "SESSION_EVENT", "Image uploaded: unknown") | |
| elif event_type == "conversation_cleared": | |
| self._write_log_entry(session_id, "SESSION_EVENT", "Conversation cleared") | |
| elif event_type == "multi_agent_workflow_started": | |
| self._write_log_entry(session_id, "SESSION_EVENT", "Multi-agent workflow started") | |
| def log_user_query(self, session_id: str, user_message: str, message_context: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Log user queries and context. | |
| Args: | |
| session_id: Session identifier | |
| user_message: User's input message | |
| message_context: Additional context (conversation length, etc.) | |
| """ | |
| self._write_log_entry(session_id, "USER", user_message) | |
| def log_agent_execution( | |
| self, | |
| session_id: str, | |
| agent_name: str, | |
| agent_input: str, | |
| agent_output: str, | |
| execution_time: float, | |
| additional_data: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Log individual agent execution details. | |
| Args: | |
| session_id: Session identifier | |
| agent_name: Name of the agent (memory, detector, visual, ecology) | |
| agent_input: Input provided to the agent | |
| agent_output: Output generated by the agent | |
| execution_time: Time taken for agent execution in seconds | |
| additional_data: Agent-specific additional data | |
| """ | |
| if agent_name == "memory": | |
| formatted_name = "Memory Agent" | |
| elif agent_name == "detector": | |
| formatted_name = "DeepForest Detector Agent" | |
| elif agent_name == "visual": | |
| formatted_name = "Visual Agent" | |
| elif agent_name == "ecology": | |
| formatted_name = "Ecology Agent" | |
| else: | |
| formatted_name = agent_name.title() | |
| formatted_name_with_time = f"{formatted_name} ({execution_time:.2f}s)" | |
| content = agent_output | |
| self._write_log_entry(session_id, formatted_name_with_time, content) | |
| def log_tool_call( | |
| self, | |
| session_id: str, | |
| tool_name: str, | |
| tool_arguments: Dict[str, Any], | |
| tool_result: Dict[str, Any], | |
| execution_time: float, | |
| cache_hit: bool, | |
| reasoning: Optional[str] = None | |
| ) -> None: | |
| """ | |
| Log tool calls, their results, and cache information. | |
| Args: | |
| session_id: Session identifier | |
| tool_name: Name of the tool that was called | |
| tool_arguments: Arguments passed to the tool | |
| tool_result: Result returned by the tool | |
| execution_time: Time taken for tool execution | |
| cache_hit: Whether this was served from cache | |
| reasoning: AI's reasoning for this tool call | |
| """ | |
| if cache_hit: | |
| status = "Cache Hit (0.00s)" | |
| else: | |
| status = f"Cache Miss - Executed DeepForest detection ({execution_time:.2f}s)" | |
| content = f"{status}\n" | |
| content += f"Detection Summary: {tool_result.get('detection_summary', 'No summary')}\n" | |
| detections = tool_result.get('detections_list', []) | |
| if detections: | |
| content += f"Detection Data: {detections}" | |
| self._write_log_entry(session_id, "DeepForest Function execution", content) | |
| def log_error(self, session_id: str, error_type: str, error_message: str, context: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Log errors in simple format. | |
| Args: | |
| session_id: Session identifier | |
| error_type: Type/category of error | |
| error_message: Error message | |
| context: Additional context about where the error occurred | |
| """ | |
| self._write_log_entry(session_id, "ERROR", f"{error_type}: {error_message}") | |
| def log_resolution_check( | |
| self, | |
| session_id: str, | |
| image_file_path: str, | |
| resolution_result: Dict[str, Any], | |
| execution_time: float | |
| ) -> None: | |
| """ | |
| Log image resolution check results. | |
| Args: | |
| session_id: Session identifier | |
| image_file_path: Path to the image that was checked | |
| resolution_result: Results from simplified resolution check | |
| execution_time: Time taken for resolution check | |
| """ | |
| is_suitable = resolution_result.get("is_suitable", True) | |
| resolution_info = resolution_result.get("resolution_info", "No resolution info") | |
| is_georeferenced = resolution_result.get("is_georeferenced", False) | |
| resolution_cm = resolution_result.get("resolution_cm") | |
| warning = resolution_result.get("warning") | |
| content = f"Image Resolution Check ({execution_time:.3f}s)\n" | |
| content += f"File: {image_file_path}\n" | |
| content += f"Result: {'Suitable' if is_suitable else 'Insufficient'} for DeepForest\n" | |
| content += f"Details: {resolution_info}\n" | |
| content += f"Type: {'GeoTIFF' if is_georeferenced else 'Regular image'}\n" | |
| if resolution_cm is not None: | |
| content += f"Resolution: {resolution_cm:.2f} cm/pixel\n" | |
| if warning: | |
| content += f"Warning: {warning}\n" | |
| if not is_suitable: | |
| content += "Impact: DeepForest detection will be skipped due to insufficient resolution" | |
| elif warning: | |
| content += "Impact: DeepForest detection will proceed with noted warning" | |
| else: | |
| content += "Impact: Resolution suitable for DeepForest detection" | |
| self._write_log_entry(session_id, "Resolution Check", content) | |
| def log_deepforest_skip( | |
| self, | |
| session_id: str, | |
| skip_reasons: List[str], | |
| resolution_result: Optional[Dict[str, Any]] = None, | |
| visual_result: Optional[Dict[str, Any]] = None | |
| ) -> None: | |
| """ | |
| Log when DeepForest detection is skipped and why. | |
| Args: | |
| session_id: Session identifier | |
| skip_reasons: List of reasons why DeepForest was skipped | |
| resolution_result: Resolution check results (optional) | |
| visual_result: Visual analysis results (optional) | |
| """ | |
| content = "DeepForest Detection Skipped\n" | |
| content += f"Reasons: {', '.join(skip_reasons)}\n" | |
| # Add detailed reason breakdown | |
| if "insufficient resolution" in ' '.join(skip_reasons).lower(): | |
| if resolution_result: | |
| resolution_info = resolution_result.get("resolution_info", "No details") | |
| content += f"Resolution Details: {resolution_info}\n" | |
| if "poor image quality" in ' '.join(skip_reasons).lower(): | |
| if visual_result: | |
| quality_assessment = visual_result.get("image_quality_for_deepforest", "Unknown") | |
| content += f"Visual Quality Assessment: {quality_assessment}\n" | |
| content += "Impact: Analysis will rely on visual analysis only" | |
| self._write_log_entry(session_id, "DeepForest Skip Decision", content) | |
| def log_tile_analysis(self, session_id: str, tile_id: int, result: Dict[str, Any], execution_time: float) -> None: | |
| """ | |
| Log individual tile analysis results. | |
| Args: | |
| session_id: Session identifier | |
| tile_id: Tile identifier | |
| result: Tile analysis result | |
| execution_time: Time taken for tile analysis | |
| """ | |
| content = f"Tile {tile_id} Analysis ({execution_time:.2f}s)\n" | |
| coordinates = result.get('coordinates', {}) | |
| content += f"Coordinates: x={coordinates.get('x', 0)}, y={coordinates.get('y', 0)}, " | |
| content += f"width={coordinates.get('width', 0)}, height={coordinates.get('height', 0)}\n" | |
| additional_objects = result.get('additional_objects', []) | |
| if additional_objects: | |
| content += f"Additional Objects: {len(additional_objects)} objects detected\n" | |
| for obj in additional_objects: | |
| label = obj.get('label', 'unknown') | |
| bbox = obj.get('bbox', 'no coordinates') | |
| content += f" - {label} at {bbox}\n" | |
| else: | |
| content += f"Additional Objects: None detected\n" | |
| visual_analysis = result.get('visual_analysis', '') | |
| if visual_analysis: | |
| content += f"Visual Analysis: {visual_analysis}\n" | |
| assigned_detections = result.get('assigned_detections', []) | |
| content += f"Assigned DeepForest Detections: {len(assigned_detections)}\n" | |
| if 'error' in result: | |
| content += f"Error: {result['error']}\n" | |
| self._write_log_entry(session_id, f"Tile {tile_id} Analysis", content) | |
| def log_spatial_relationships( | |
| self, | |
| session_id: str, | |
| spatial_relationships: List[Dict[str, Any]], | |
| execution_time: float | |
| ) -> None: | |
| """Log spatial relationships analysis results. | |
| Args: | |
| session_id: The unique identifier for the current session. | |
| spatial_relationships: A list of dictionaries, where each | |
| dictionary contains details about an object's spatial | |
| relationships, including its grid region and intersecting | |
| objects. | |
| execution_time: The time taken to perform the spatial | |
| relationships analysis, in seconds. | |
| """ | |
| relationships_count = len(spatial_relationships) | |
| content = f"Spatial Relationships Analysis ({execution_time:.3f}s)\n" | |
| content += f"Analyzed {relationships_count} objects with confidence ≥ 0.3\n" | |
| # Group by regions | |
| by_region = {} | |
| for rel in spatial_relationships: | |
| region = rel['grid_region'] | |
| by_region[region] = by_region.get(region, 0) + 1 | |
| content += f"Distribution by region: {dict(by_region)}\n" | |
| content += f"Objects with neighbors: {sum(1 for r in spatial_relationships if r['intersecting_objects'])}\n" | |
| self._write_log_entry(session_id, "Spatial Relationships Analysis", content) | |
| def log_detection_narrative( | |
| self, | |
| session_id: str, | |
| detection_narrative: str, | |
| detections_count: int, | |
| execution_time: float | |
| ) -> None: | |
| """Log detection narrative generation. | |
| Args: | |
| session_id: The unique identifier for the current session. | |
| detection_narrative: The string containing the generated narrative. | |
| detections_count: The total number of detections used to | |
| generate the narrative. | |
| execution_time: The time taken for narrative generation, in seconds. | |
| """ | |
| narrative_length = len(detection_narrative) | |
| content = f"Detection Narrative Generation ({execution_time:.3f}s)\n" | |
| content += f"Generated narrative for {detections_count} detections\n" | |
| content += f"Narrative length: {narrative_length} characters\n" | |
| content += f"Narrative content:\n{detection_narrative}" | |
| self._write_log_entry(session_id, "Detection Narrative", content) | |
| def log_visual_analysis_unified( | |
| self, | |
| session_id: str, | |
| analysis_type: str, | |
| visual_analysis: str, | |
| additional_objects_count: int, | |
| execution_time: float | |
| ) -> None: | |
| """Log unified visual analysis results. | |
| Args: | |
| session_id: The unique identifier for the current session. | |
| analysis_type: A string specifying the type of visual analysis | |
| performed (e.g., 'segmentation', 'classification'). | |
| visual_analysis: The string containing the final analysis result. | |
| additional_objects_count: The number of objects detected beyond | |
| the initial set. | |
| execution_time: The time taken for the visual analysis, in seconds. | |
| """ | |
| content = f"Visual Analysis - {analysis_type} ({execution_time:.3f}s)\n" | |
| content += f"Additional objects detected: {additional_objects_count}\n" | |
| content += f"Analysis: {visual_analysis}" | |
| self._write_log_entry(session_id, f"Visual Analysis ({analysis_type})", content) | |
| def get_session_log_summary(self, session_id: str) -> Dict[str, Any]: | |
| """ | |
| Get a summary of all logged events for a session. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Dictionary containing session log summary | |
| """ | |
| log_file_path = self._get_log_file_path(session_id) | |
| if not log_file_path.exists(): | |
| return {"error": f"No log file found for session {session_id}"} | |
| try: | |
| with open(log_file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return { | |
| "session_id": session_id, | |
| "log_file": str(log_file_path), | |
| "content_preview": content | |
| } | |
| except Exception as e: | |
| return {"error": f"Error reading log file: {str(e)}"} | |
| def get_all_session_logs(self) -> List[str]: | |
| """ | |
| Get a list of all session IDs that have log files. | |
| Returns: | |
| List of session IDs with existing log files | |
| """ | |
| session_ids = [] | |
| for log_file in self.logs_dir.glob("session_*.log"): | |
| filename = log_file.stem | |
| parts = filename.split("_") | |
| if len(parts) >= 2: | |
| session_id = parts[1] | |
| session_ids.append(session_id) | |
| return sorted(set(session_ids)) | |
| def cleanup_old_logs(self, days_to_keep: int = 7) -> int: | |
| """ | |
| Clean up log files older than specified days. | |
| Args: | |
| days_to_keep: Number of days of logs to retain | |
| Returns: | |
| Number of log files deleted | |
| """ | |
| cutoff_time = time.time() - (days_to_keep * 24 * 60 * 60) | |
| deleted_count = 0 | |
| for log_file in self.logs_dir.glob("session_*.log"): | |
| if log_file.stat().st_mtime < cutoff_time: | |
| try: | |
| log_file.unlink() | |
| deleted_count += 1 | |
| except Exception as e: | |
| print(f"Error deleting old log file {log_file}: {e}") | |
| return deleted_count | |
| multi_agent_logger = MultiAgentLogger() |