Spaces:
Runtime error
Runtime error
| """ | |
| Agent Observability and Debugging | |
| Provides transparency into agent interactions and decision-making | |
| Based on the OpenAI Deep Research observability pattern | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import traceback | |
| logger = logging.getLogger(__name__) | |
| class AgentEvent: | |
| """Single event in agent execution""" | |
| timestamp: datetime | |
| agent_name: str | |
| event_type: str # 'start', 'tool_call', 'reasoning', 'output', 'error', 'handoff' | |
| data: Dict[str, Any] | |
| duration_ms: Optional[float] = None | |
| parent_event: Optional[str] = None | |
| def to_dict(self) -> Dict: | |
| return { | |
| 'timestamp': self.timestamp.isoformat(), | |
| 'agent_name': self.agent_name, | |
| 'event_type': self.event_type, | |
| 'data': self.data, | |
| 'duration_ms': self.duration_ms, | |
| 'parent_event': self.parent_event | |
| } | |
| class AgentTracer: | |
| """ | |
| Trace and log agent interactions for debugging and monitoring | |
| Similar to OpenAI's print_agent_interaction function | |
| """ | |
| def __init__(self, trace_file: Optional[str] = "agent_traces.jsonl"): | |
| self.events: List[AgentEvent] = [] | |
| self.trace_file = Path(trace_file) if trace_file else None | |
| self.active_agents: Dict[str, float] = {} # Track active agent start times | |
| def start_agent(self, agent_name: str, input_data: Any) -> str: | |
| """Log agent start""" | |
| event_id = f"{agent_name}_{int(time.time() * 1000)}" | |
| self.active_agents[agent_name] = time.time() | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=agent_name, | |
| event_type='start', | |
| data={ | |
| 'event_id': event_id, | |
| 'input': str(input_data)[:500] # Truncate for readability | |
| } | |
| ) | |
| self._log_event(event) | |
| return event_id | |
| def tool_call( | |
| self, | |
| agent_name: str, | |
| tool_name: str, | |
| tool_args: Dict, | |
| result: Any = None | |
| ): | |
| """Log tool call""" | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=agent_name, | |
| event_type='tool_call', | |
| data={ | |
| 'tool': tool_name, | |
| 'args': tool_args, | |
| 'result': str(result)[:500] if result else None | |
| } | |
| ) | |
| self._log_event(event) | |
| def reasoning_step(self, agent_name: str, reasoning: str): | |
| """Log reasoning or thought process""" | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=agent_name, | |
| event_type='reasoning', | |
| data={'reasoning': reasoning} | |
| ) | |
| self._log_event(event) | |
| def agent_output(self, agent_name: str, output: Any): | |
| """Log agent output""" | |
| duration = None | |
| if agent_name in self.active_agents: | |
| duration = (time.time() - self.active_agents[agent_name]) * 1000 | |
| del self.active_agents[agent_name] | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=agent_name, | |
| event_type='output', | |
| data={'output': str(output)[:1000]}, | |
| duration_ms=duration | |
| ) | |
| self._log_event(event) | |
| def agent_handoff( | |
| self, | |
| from_agent: str, | |
| to_agent: str, | |
| handoff_data: Any | |
| ): | |
| """Log handoff between agents""" | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=from_agent, | |
| event_type='handoff', | |
| data={ | |
| 'to_agent': to_agent, | |
| 'handoff_data': str(handoff_data)[:500] | |
| } | |
| ) | |
| self._log_event(event) | |
| def error(self, agent_name: str, error: Exception): | |
| """Log error""" | |
| event = AgentEvent( | |
| timestamp=datetime.now(), | |
| agent_name=agent_name, | |
| event_type='error', | |
| data={ | |
| 'error_type': type(error).__name__, | |
| 'error_message': str(error), | |
| 'traceback': traceback.format_exc() | |
| } | |
| ) | |
| self._log_event(event) | |
| def _log_event(self, event: AgentEvent): | |
| """Log event to memory and file""" | |
| self.events.append(event) | |
| # Log to file if configured | |
| if self.trace_file: | |
| with open(self.trace_file, 'a') as f: | |
| f.write(json.dumps(event.to_dict()) + '\n') | |
| # Also log to standard logger | |
| logger.info(f"[{event.agent_name}] {event.event_type}: {event.data}") | |
| def print_interaction_flow(self, start_time: Optional[datetime] = None): | |
| """ | |
| Print human-readable interaction flow | |
| Similar to OpenAI's print_agent_interaction | |
| """ | |
| print("\n" + "="*60) | |
| print("AGENT INTERACTION FLOW") | |
| print("="*60 + "\n") | |
| filtered_events = self.events | |
| if start_time: | |
| filtered_events = [e for e in self.events if e.timestamp >= start_time] | |
| for i, event in enumerate(filtered_events, 1): | |
| prefix = f"{i:3}. [{event.timestamp.strftime('%H:%M:%S')}] {event.agent_name}" | |
| if event.event_type == 'start': | |
| print(f"{prefix} β STARTED") | |
| print(f" Input: {event.data.get('input', '')[:100]}...") | |
| elif event.event_type == 'tool_call': | |
| tool = event.data.get('tool', 'unknown') | |
| print(f"{prefix} β TOOL: {tool}") | |
| if event.data.get('args'): | |
| print(f" Args: {event.data['args']}") | |
| elif event.event_type == 'reasoning': | |
| print(f"{prefix} β THINKING:") | |
| print(f" {event.data.get('reasoning', '')[:200]}...") | |
| elif event.event_type == 'handoff': | |
| to_agent = event.data.get('to_agent', 'unknown') | |
| print(f"{prefix} β HANDOFF to {to_agent}") | |
| elif event.event_type == 'output': | |
| print(f"{prefix} β OUTPUT:") | |
| print(f" {event.data.get('output', '')[:200]}...") | |
| if event.duration_ms: | |
| print(f" Duration: {event.duration_ms:.0f}ms") | |
| elif event.event_type == 'error': | |
| print(f"{prefix} β ERROR: {event.data.get('error_type', 'unknown')}") | |
| print(f" {event.data.get('error_message', '')}") | |
| print() | |
| print("="*60 + "\n") | |
| def get_metrics(self) -> Dict[str, Any]: | |
| """Get execution metrics""" | |
| metrics = { | |
| 'total_events': len(self.events), | |
| 'agents_involved': len(set(e.agent_name for e in self.events)), | |
| 'tool_calls': len([e for e in self.events if e.event_type == 'tool_call']), | |
| 'errors': len([e for e in self.events if e.event_type == 'error']), | |
| 'handoffs': len([e for e in self.events if e.event_type == 'handoff']), | |
| 'avg_duration_ms': 0 | |
| } | |
| durations = [e.duration_ms for e in self.events if e.duration_ms] | |
| if durations: | |
| metrics['avg_duration_ms'] = sum(durations) / len(durations) | |
| return metrics | |
| class TriageAgent: | |
| """ | |
| Triage agent that routes requests to appropriate specialized agents | |
| Based on OpenAI's Deep Research triage pattern | |
| """ | |
| def __init__(self, tracer: Optional[AgentTracer] = None): | |
| self.tracer = tracer or AgentTracer() | |
| def triage_request(self, request: str) -> Dict[str, Any]: | |
| """ | |
| Analyze request and determine routing | |
| """ | |
| self.tracer.start_agent("TriageAgent", request) | |
| # Analyze request type | |
| request_lower = request.lower() | |
| routing = { | |
| 'needs_clarification': False, | |
| 'route_to': None, | |
| 'confidence': 0.0, | |
| 'reasoning': '', | |
| 'suggested_agents': [] | |
| } | |
| # Check if clarification needed | |
| if len(request.split()) < 5 or '?' in request: | |
| routing['needs_clarification'] = True | |
| routing['reasoning'] = "Request is too brief or unclear" | |
| self.tracer.reasoning_step("TriageAgent", routing['reasoning']) | |
| # Determine routing based on keywords | |
| if 'research' in request_lower or 'analyze' in request_lower: | |
| routing['route_to'] = 'ResearchAgent' | |
| routing['suggested_agents'] = ['ResearchAgent', 'WebSearchAgent'] | |
| routing['confidence'] = 0.9 | |
| elif 'resume' in request_lower or 'cv' in request_lower: | |
| routing['route_to'] = 'CVAgent' | |
| routing['suggested_agents'] = ['CVAgent', 'ATSOptimizer'] | |
| routing['confidence'] = 0.95 | |
| elif 'cover' in request_lower or 'letter' in request_lower: | |
| routing['route_to'] = 'CoverLetterAgent' | |
| routing['suggested_agents'] = ['CoverLetterAgent'] | |
| routing['confidence'] = 0.95 | |
| elif 'job' in request_lower or 'application' in request_lower: | |
| routing['route_to'] = 'OrchestratorAgent' | |
| routing['suggested_agents'] = ['OrchestratorAgent', 'CVAgent', 'CoverLetterAgent'] | |
| routing['confidence'] = 0.85 | |
| else: | |
| routing['route_to'] = 'GeneralAgent' | |
| routing['confidence'] = 0.5 | |
| self.tracer.agent_output("TriageAgent", routing) | |
| return routing | |
| class AgentMonitor: | |
| """ | |
| Monitor agent performance and health | |
| """ | |
| def __init__(self): | |
| self.performance_stats: Dict[str, Dict] = {} | |
| self.error_counts: Dict[str, int] = {} | |
| self.last_errors: Dict[str, str] = {} | |
| def record_execution( | |
| self, | |
| agent_name: str, | |
| duration_ms: float, | |
| success: bool, | |
| error: Optional[str] = None | |
| ): | |
| """Record agent execution stats""" | |
| if agent_name not in self.performance_stats: | |
| self.performance_stats[agent_name] = { | |
| 'total_runs': 0, | |
| 'successful_runs': 0, | |
| 'failed_runs': 0, | |
| 'total_duration_ms': 0, | |
| 'avg_duration_ms': 0, | |
| 'min_duration_ms': float('inf'), | |
| 'max_duration_ms': 0 | |
| } | |
| stats = self.performance_stats[agent_name] | |
| stats['total_runs'] += 1 | |
| if success: | |
| stats['successful_runs'] += 1 | |
| else: | |
| stats['failed_runs'] += 1 | |
| self.error_counts[agent_name] = self.error_counts.get(agent_name, 0) + 1 | |
| if error: | |
| self.last_errors[agent_name] = error | |
| stats['total_duration_ms'] += duration_ms | |
| stats['avg_duration_ms'] = stats['total_duration_ms'] / stats['total_runs'] | |
| stats['min_duration_ms'] = min(stats['min_duration_ms'], duration_ms) | |
| stats['max_duration_ms'] = max(stats['max_duration_ms'], duration_ms) | |
| def get_health_status(self) -> Dict[str, Any]: | |
| """Get overall system health""" | |
| total_errors = sum(self.error_counts.values()) | |
| total_runs = sum(s['total_runs'] for s in self.performance_stats.values()) | |
| if total_runs == 0: | |
| error_rate = 0 | |
| else: | |
| error_rate = (total_errors / total_runs) * 100 | |
| # Determine health status | |
| if error_rate < 5: | |
| status = "healthy" | |
| elif error_rate < 15: | |
| status = "degraded" | |
| else: | |
| status = "unhealthy" | |
| return { | |
| 'status': status, | |
| 'error_rate': f"{error_rate:.1f}%", | |
| 'total_runs': total_runs, | |
| 'total_errors': total_errors, | |
| 'agent_stats': self.performance_stats, | |
| 'recent_errors': self.last_errors | |
| } | |
| def reset_stats(self): | |
| """Reset all statistics""" | |
| self.performance_stats.clear() | |
| self.error_counts.clear() | |
| self.last_errors.clear() | |
| # Global instances for easy access | |
| global_tracer = AgentTracer() | |
| global_monitor = AgentMonitor() | |
| # Decorator for automatic tracing | |
| def trace_agent(agent_name: str): | |
| """Decorator to automatically trace agent execution""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| event_id = global_tracer.start_agent(agent_name, args) | |
| start_time = time.time() | |
| try: | |
| result = func(*args, **kwargs) | |
| duration = (time.time() - start_time) * 1000 | |
| global_tracer.agent_output(agent_name, result) | |
| global_monitor.record_execution(agent_name, duration, True) | |
| return result | |
| except Exception as e: | |
| duration = (time.time() - start_time) * 1000 | |
| global_tracer.error(agent_name, e) | |
| global_monitor.record_execution(agent_name, duration, False, str(e)) | |
| raise | |
| return wrapper | |
| return decorator | |
| # Demo usage | |
| def demo_observability(): | |
| """Demonstrate observability features""" | |
| tracer = AgentTracer() | |
| monitor = AgentMonitor() | |
| triage = TriageAgent(tracer) | |
| # Simulate agent interactions | |
| routing = triage.triage_request("Help me write a resume for a software engineering position") | |
| # Simulate tool calls | |
| tracer.tool_call("CVAgent", "extract_keywords", {"text": "software engineering"}) | |
| tracer.tool_call("CVAgent", "optimize_ats", {"resume": "..."}) | |
| # Simulate handoff | |
| tracer.agent_handoff("CVAgent", "ATSOptimizer", {"resume_draft": "..."}) | |
| # Print interaction flow | |
| tracer.print_interaction_flow() | |
| # Show metrics | |
| print("Metrics:", tracer.get_metrics()) | |
| if __name__ == "__main__": | |
| demo_observability() |