| # Observability Module | |
| This module provides comprehensive observability for the multi-agent RAG system using LangFuse tracing and analytics. | |
| ## Features | |
| - **Trace Reading API**: Query and filter LangFuse traces programmatically | |
| - **Performance Analytics**: Agent-level metrics including latency, token usage, and costs | |
| - **Trajectory Analysis**: Analyze agent execution paths and workflow patterns | |
| - **Export Capabilities**: Export traces to JSON/CSV for external analysis | |
| ## Quick Start | |
| ### 1. Configure LangFuse | |
| Add your LangFuse credentials to `.env`: | |
| ```bash | |
| LANGFUSE_ENABLED=true | |
| LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here | |
| LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here | |
| LANGFUSE_HOST=https://cloud.langfuse.com | |
| ``` | |
| ### 2. Run Your Workflow | |
| The system automatically traces all agent executions, LLM calls, and RAG operations. | |
| ### 3. Query Traces | |
| Use the Python API to read and analyze traces: | |
| ```python | |
| from observability import TraceReader, AgentPerformanceAnalyzer | |
| # Initialize trace reader | |
| reader = TraceReader() | |
| # Get recent traces | |
| traces = reader.get_traces(limit=10) | |
| # Get traces for a specific session | |
| session_traces = reader.get_traces(session_id="session-abc123") | |
| # Filter by agent | |
| retriever_spans = reader.filter_by_agent("retriever_agent", limit=50) | |
| # Get specific trace | |
| trace = reader.get_trace_by_id("trace-xyz") | |
| ``` | |
| ## Trace Reader API | |
| ### TraceReader | |
| Query and retrieve traces from LangFuse. | |
| ```python | |
| from observability import TraceReader | |
| from datetime import datetime, timedelta | |
| reader = TraceReader() | |
| # Get traces with filters | |
| traces = reader.get_traces( | |
| limit=50, | |
| user_id="user-123", | |
| session_id="session-abc", | |
| from_timestamp=datetime.now() - timedelta(days=7), | |
| to_timestamp=datetime.now() | |
| ) | |
| # Filter by date range | |
| recent_traces = reader.filter_by_date_range( | |
| from_date=datetime.now() - timedelta(days=1), | |
| to_date=datetime.now(), | |
| limit=100 | |
| ) | |
| # Get LLM generations | |
| generations = reader.get_generations(trace_id="trace-xyz") | |
| # Export to files | |
| reader.export_traces_to_json(traces, "traces.json") | |
| reader.export_traces_to_csv(traces, "traces.csv") | |
| ``` | |
| ## Performance Analytics API | |
| ### AgentPerformanceAnalyzer | |
| Analyze agent performance metrics. | |
| ```python | |
| from observability import AgentPerformanceAnalyzer | |
| analyzer = AgentPerformanceAnalyzer() | |
| # Get latency statistics for an agent | |
| stats = analyzer.agent_latency_stats("retriever_agent", days=7) | |
| print(f"Average latency: {stats.avg_latency_ms:.2f}ms") | |
| print(f"P95 latency: {stats.p95_latency_ms:.2f}ms") | |
| print(f"Success rate: {stats.success_rate:.1f}%") | |
| # Get token usage breakdown | |
| token_usage = analyzer.token_usage_breakdown(days=7) | |
| for agent, usage in token_usage.items(): | |
| print(f"{agent}: {usage['total']:,} tokens") | |
| # Get cost breakdown per agent | |
| costs = analyzer.cost_per_agent(session_id="session-abc") | |
| for agent, cost in costs.items(): | |
| print(f"{agent}: ${cost:.4f}") | |
| # Get error rates | |
| error_stats = analyzer.error_rates(days=30) | |
| for agent, stats in error_stats.items(): | |
| print(f"{agent}: {stats['error_rate_percent']:.2f}% errors") | |
| # Get workflow performance summary | |
| workflow_stats = analyzer.workflow_performance_summary(days=7) | |
| print(f"Total runs: {workflow_stats.total_runs}") | |
| print(f"Average duration: {workflow_stats.avg_duration_ms:.2f}ms") | |
| print(f"Total cost: ${workflow_stats.total_cost:.4f}") | |
| ``` | |
| ## Trajectory Analysis API | |
| ### AgentTrajectoryAnalyzer | |
| Analyze agent execution paths and workflow patterns. | |
| ```python | |
| from observability import AgentTrajectoryAnalyzer | |
| analyzer = AgentTrajectoryAnalyzer() | |
| # Get agent trajectories | |
| trajectories = analyzer.get_trajectories(session_id="session-abc", days=7) | |
| for traj in trajectories: | |
| print(f"Trace: {traj.trace_id}") | |
| print(f"Duration: {traj.total_duration_ms:.2f}ms") | |
| print(f"Path: {' → '.join(traj.agent_sequence)}") | |
| print(f"Success: {traj.success}") | |
| # Analyze execution paths | |
| path_analysis = analyzer.analyze_execution_paths(days=7) | |
| print(f"Total workflows: {path_analysis['total_workflows']}") | |
| print(f"Unique paths: {path_analysis['unique_paths']}") | |
| print(f"Most common path: {path_analysis['most_common_path']}") | |
| # Compare two workflow executions | |
| comparison = analyzer.compare_trajectories("trace-1", "trace-2") | |
| print(f"Duration difference: {comparison['duration_diff_ms']:.2f}ms") | |
| print(f"Same path: {comparison['same_path']}") | |
| ``` | |
| ## Data Models | |
| ### TraceInfo | |
| ```python | |
| class TraceInfo(BaseModel): | |
| id: str | |
| name: str | |
| user_id: Optional[str] | |
| session_id: Optional[str] | |
| timestamp: datetime | |
| metadata: Dict[str, Any] | |
| duration_ms: Optional[float] | |
| total_cost: Optional[float] | |
| token_usage: Dict[str, int] | |
| ``` | |
| ### AgentStats | |
| ```python | |
| class AgentStats(BaseModel): | |
| agent_name: str | |
| execution_count: int | |
| avg_latency_ms: float | |
| p50_latency_ms: float | |
| p95_latency_ms: float | |
| p99_latency_ms: float | |
| min_latency_ms: float | |
| max_latency_ms: float | |
| success_rate: float | |
| total_cost: float | |
| ``` | |
| ### WorkflowStats | |
| ```python | |
| class WorkflowStats(BaseModel): | |
| total_runs: int | |
| avg_duration_ms: float | |
| p50_duration_ms: float | |
| p95_duration_ms: float | |
| p99_duration_ms: float | |
| success_rate: float | |
| total_cost: float | |
| avg_cost_per_run: float | |
| total_tokens: int | |
| ``` | |
| ### AgentTrajectory | |
| ```python | |
| class AgentTrajectory(BaseModel): | |
| trace_id: str | |
| session_id: Optional[str] | |
| start_time: datetime | |
| total_duration_ms: float | |
| agent_sequence: List[str] | |
| agent_timings: Dict[str, float] | |
| agent_costs: Dict[str, float] | |
| errors: List[str] | |
| success: bool | |
| ``` | |
| ## Example: Performance Dashboard Script | |
| ```python | |
| #!/usr/bin/env python3 | |
| """Generate performance dashboard from traces.""" | |
| from datetime import datetime, timedelta | |
| from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer | |
| def main(): | |
| perf = AgentPerformanceAnalyzer() | |
| traj = AgentTrajectoryAnalyzer() | |
| print("=" * 60) | |
| print("AGENT PERFORMANCE DASHBOARD - Last 7 Days") | |
| print("=" * 60) | |
| # Workflow summary | |
| workflow_stats = perf.workflow_performance_summary(days=7) | |
| if workflow_stats: | |
| print(f"\nWorkflow Summary:") | |
| print(f" Total Runs: {workflow_stats.total_runs}") | |
| print(f" Avg Duration: {workflow_stats.avg_duration_ms/1000:.2f}s") | |
| print(f" P95 Duration: {workflow_stats.p95_duration_ms/1000:.2f}s") | |
| print(f" Success Rate: {workflow_stats.success_rate:.1f}%") | |
| print(f" Total Cost: ${workflow_stats.total_cost:.4f}") | |
| print(f" Avg Cost/Run: ${workflow_stats.avg_cost_per_run:.4f}") | |
| # Agent latency stats | |
| print(f"\nAgent Latency Statistics:") | |
| for agent_name in ["retriever_agent", "analyzer_agent", "synthesis_agent"]: | |
| stats = perf.agent_latency_stats(agent_name, days=7) | |
| if stats: | |
| print(f"\n {agent_name}:") | |
| print(f" Executions: {stats.execution_count}") | |
| print(f" Avg Latency: {stats.avg_latency_ms/1000:.2f}s") | |
| print(f" P95 Latency: {stats.p95_latency_ms/1000:.2f}s") | |
| print(f" Success Rate: {stats.success_rate:.1f}%") | |
| # Cost breakdown | |
| print(f"\nCost Breakdown:") | |
| costs = perf.cost_per_agent(days=7) | |
| for agent, cost in sorted(costs.items(), key=lambda x: x[1], reverse=True): | |
| print(f" {agent}: ${cost:.4f}") | |
| # Path analysis | |
| print(f"\nExecution Path Analysis:") | |
| path_analysis = traj.analyze_execution_paths(days=7) | |
| if path_analysis: | |
| print(f" Total Workflows: {path_analysis['total_workflows']}") | |
| print(f" Unique Paths: {path_analysis['unique_paths']}") | |
| if path_analysis['most_common_path']: | |
| path, count = path_analysis['most_common_path'] | |
| print(f" Most Common: {path} ({count} times)") | |
| if __name__ == "__main__": | |
| main() | |
| ``` | |
| Save as `scripts/performance_dashboard.py` and run: | |
| ```bash | |
| python scripts/performance_dashboard.py | |
| ``` | |
| ## Advanced Usage | |
| ### Custom Metrics | |
| ```python | |
| from observability import TraceReader | |
| reader = TraceReader() | |
| # Calculate custom metric: papers processed per second | |
| traces = reader.get_traces(limit=100) | |
| total_papers = 0 | |
| total_time_ms = 0 | |
| for trace in traces: | |
| if trace.metadata.get("num_papers"): | |
| total_papers += trace.metadata["num_papers"] | |
| total_time_ms += trace.duration_ms or 0 | |
| if total_time_ms > 0: | |
| papers_per_second = (total_papers / total_time_ms) * 1000 | |
| print(f"Papers/second: {papers_per_second:.2f}") | |
| ``` | |
| ### Monitoring Alerts | |
| ```python | |
| from observability import AgentPerformanceAnalyzer | |
| analyzer = AgentPerformanceAnalyzer() | |
| # Check if error rate exceeds threshold | |
| error_stats = analyzer.error_rates(days=1) | |
| for agent, stats in error_stats.items(): | |
| if stats['error_rate_percent'] > 10: | |
| print(f"⚠️ ALERT: {agent} error rate is {stats['error_rate_percent']:.1f}%") | |
| # Check if P95 latency is too high | |
| stats = analyzer.agent_latency_stats("analyzer_agent", days=1) | |
| if stats and stats.p95_latency_ms > 30000: # 30 seconds | |
| print(f"⚠️ ALERT: Analyzer P95 latency is {stats.p95_latency_ms/1000:.1f}s") | |
| ``` | |
| ## Troubleshooting | |
| ### No Traces Found | |
| 1. Check that LangFuse is enabled: `LANGFUSE_ENABLED=true` | |
| 2. Verify API keys are correct in `.env` | |
| 3. Ensure network connectivity to LangFuse Cloud | |
| 4. Check that at least one workflow has been executed | |
| ### Missing Token/Cost Data | |
| - Token usage requires `langfuse-openai` instrumentation | |
| - Ensure `instrument_openai()` is called before creating Azure OpenAI clients | |
| - Cost data depends on LangFuse pricing configuration | |
| ### Slow Query Performance | |
| - Reduce `limit` parameter for large trace datasets | |
| - Use date range filters to narrow results | |
| - Consider exporting traces to CSV for offline analysis | |
| ## See Also | |
| - [LangFuse Documentation](https://langfuse.com/docs) | |
| - [LangGraph Documentation](https://langchain-ai.github.io/langgraph/) | |
| - Main README: `../README.md` | |
| - Architecture: `../CLAUDE.md` | |