A newer version of the Gradio SDK is available:
6.5.1
Observability Module
This module provides comprehensive observability for the multi-agent RAG system using LangFuse tracing and analytics.
Features
- Trace Reading API: Query and filter LangFuse traces programmatically
- Performance Analytics: Agent-level metrics including latency, token usage, and costs
- Trajectory Analysis: Analyze agent execution paths and workflow patterns
- Export Capabilities: Export traces to JSON/CSV for external analysis
Quick Start
1. Configure LangFuse
Add your LangFuse credentials to .env:
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_HOST=https://cloud.langfuse.com
2. Run Your Workflow
The system automatically traces all agent executions, LLM calls, and RAG operations.
3. Query Traces
Use the Python API to read and analyze traces:
from observability import TraceReader, AgentPerformanceAnalyzer
# Initialize trace reader
reader = TraceReader()
# Get recent traces
traces = reader.get_traces(limit=10)
# Get traces for a specific session
session_traces = reader.get_traces(session_id="session-abc123")
# Filter by agent
retriever_spans = reader.filter_by_agent("retriever_agent", limit=50)
# Get specific trace
trace = reader.get_trace_by_id("trace-xyz")
Trace Reader API
TraceReader
Query and retrieve traces from LangFuse.
from observability import TraceReader
from datetime import datetime, timedelta
reader = TraceReader()
# Get traces with filters
traces = reader.get_traces(
limit=50,
user_id="user-123",
session_id="session-abc",
from_timestamp=datetime.now() - timedelta(days=7),
to_timestamp=datetime.now()
)
# Filter by date range
recent_traces = reader.filter_by_date_range(
from_date=datetime.now() - timedelta(days=1),
to_date=datetime.now(),
limit=100
)
# Get LLM generations
generations = reader.get_generations(trace_id="trace-xyz")
# Export to files
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")
Performance Analytics API
AgentPerformanceAnalyzer
Analyze agent performance metrics.
from observability import AgentPerformanceAnalyzer
analyzer = AgentPerformanceAnalyzer()
# Get latency statistics for an agent
stats = analyzer.agent_latency_stats("retriever_agent", days=7)
print(f"Average latency: {stats.avg_latency_ms:.2f}ms")
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")
print(f"Success rate: {stats.success_rate:.1f}%")
# Get token usage breakdown
token_usage = analyzer.token_usage_breakdown(days=7)
for agent, usage in token_usage.items():
print(f"{agent}: {usage['total']:,} tokens")
# Get cost breakdown per agent
costs = analyzer.cost_per_agent(session_id="session-abc")
for agent, cost in costs.items():
print(f"{agent}: ${cost:.4f}")
# Get error rates
error_stats = analyzer.error_rates(days=30)
for agent, stats in error_stats.items():
print(f"{agent}: {stats['error_rate_percent']:.2f}% errors")
# Get workflow performance summary
workflow_stats = analyzer.workflow_performance_summary(days=7)
print(f"Total runs: {workflow_stats.total_runs}")
print(f"Average duration: {workflow_stats.avg_duration_ms:.2f}ms")
print(f"Total cost: ${workflow_stats.total_cost:.4f}")
Trajectory Analysis API
AgentTrajectoryAnalyzer
Analyze agent execution paths and workflow patterns.
from observability import AgentTrajectoryAnalyzer
analyzer = AgentTrajectoryAnalyzer()
# Get agent trajectories
trajectories = analyzer.get_trajectories(session_id="session-abc", days=7)
for traj in trajectories:
print(f"Trace: {traj.trace_id}")
print(f"Duration: {traj.total_duration_ms:.2f}ms")
print(f"Path: {' → '.join(traj.agent_sequence)}")
print(f"Success: {traj.success}")
# Analyze execution paths
path_analysis = analyzer.analyze_execution_paths(days=7)
print(f"Total workflows: {path_analysis['total_workflows']}")
print(f"Unique paths: {path_analysis['unique_paths']}")
print(f"Most common path: {path_analysis['most_common_path']}")
# Compare two workflow executions
comparison = analyzer.compare_trajectories("trace-1", "trace-2")
print(f"Duration difference: {comparison['duration_diff_ms']:.2f}ms")
print(f"Same path: {comparison['same_path']}")
Data Models
TraceInfo
class TraceInfo(BaseModel):
id: str
name: str
user_id: Optional[str]
session_id: Optional[str]
timestamp: datetime
metadata: Dict[str, Any]
duration_ms: Optional[float]
total_cost: Optional[float]
token_usage: Dict[str, int]
AgentStats
class AgentStats(BaseModel):
agent_name: str
execution_count: int
avg_latency_ms: float
p50_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
min_latency_ms: float
max_latency_ms: float
success_rate: float
total_cost: float
WorkflowStats
class WorkflowStats(BaseModel):
total_runs: int
avg_duration_ms: float
p50_duration_ms: float
p95_duration_ms: float
p99_duration_ms: float
success_rate: float
total_cost: float
avg_cost_per_run: float
total_tokens: int
AgentTrajectory
class AgentTrajectory(BaseModel):
trace_id: str
session_id: Optional[str]
start_time: datetime
total_duration_ms: float
agent_sequence: List[str]
agent_timings: Dict[str, float]
agent_costs: Dict[str, float]
errors: List[str]
success: bool
Example: Performance Dashboard Script
#!/usr/bin/env python3
"""Generate performance dashboard from traces."""
from datetime import datetime, timedelta
from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer
def main():
perf = AgentPerformanceAnalyzer()
traj = AgentTrajectoryAnalyzer()
print("=" * 60)
print("AGENT PERFORMANCE DASHBOARD - Last 7 Days")
print("=" * 60)
# Workflow summary
workflow_stats = perf.workflow_performance_summary(days=7)
if workflow_stats:
print(f"\nWorkflow Summary:")
print(f" Total Runs: {workflow_stats.total_runs}")
print(f" Avg Duration: {workflow_stats.avg_duration_ms/1000:.2f}s")
print(f" P95 Duration: {workflow_stats.p95_duration_ms/1000:.2f}s")
print(f" Success Rate: {workflow_stats.success_rate:.1f}%")
print(f" Total Cost: ${workflow_stats.total_cost:.4f}")
print(f" Avg Cost/Run: ${workflow_stats.avg_cost_per_run:.4f}")
# Agent latency stats
print(f"\nAgent Latency Statistics:")
for agent_name in ["retriever_agent", "analyzer_agent", "synthesis_agent"]:
stats = perf.agent_latency_stats(agent_name, days=7)
if stats:
print(f"\n {agent_name}:")
print(f" Executions: {stats.execution_count}")
print(f" Avg Latency: {stats.avg_latency_ms/1000:.2f}s")
print(f" P95 Latency: {stats.p95_latency_ms/1000:.2f}s")
print(f" Success Rate: {stats.success_rate:.1f}%")
# Cost breakdown
print(f"\nCost Breakdown:")
costs = perf.cost_per_agent(days=7)
for agent, cost in sorted(costs.items(), key=lambda x: x[1], reverse=True):
print(f" {agent}: ${cost:.4f}")
# Path analysis
print(f"\nExecution Path Analysis:")
path_analysis = traj.analyze_execution_paths(days=7)
if path_analysis:
print(f" Total Workflows: {path_analysis['total_workflows']}")
print(f" Unique Paths: {path_analysis['unique_paths']}")
if path_analysis['most_common_path']:
path, count = path_analysis['most_common_path']
print(f" Most Common: {path} ({count} times)")
if __name__ == "__main__":
main()
Save as scripts/performance_dashboard.py and run:
python scripts/performance_dashboard.py
Advanced Usage
Custom Metrics
from observability import TraceReader
reader = TraceReader()
# Calculate custom metric: papers processed per second
traces = reader.get_traces(limit=100)
total_papers = 0
total_time_ms = 0
for trace in traces:
if trace.metadata.get("num_papers"):
total_papers += trace.metadata["num_papers"]
total_time_ms += trace.duration_ms or 0
if total_time_ms > 0:
papers_per_second = (total_papers / total_time_ms) * 1000
print(f"Papers/second: {papers_per_second:.2f}")
Monitoring Alerts
from observability import AgentPerformanceAnalyzer
analyzer = AgentPerformanceAnalyzer()
# Check if error rate exceeds threshold
error_stats = analyzer.error_rates(days=1)
for agent, stats in error_stats.items():
if stats['error_rate_percent'] > 10:
print(f"⚠️ ALERT: {agent} error rate is {stats['error_rate_percent']:.1f}%")
# Check if P95 latency is too high
stats = analyzer.agent_latency_stats("analyzer_agent", days=1)
if stats and stats.p95_latency_ms > 30000: # 30 seconds
print(f"⚠️ ALERT: Analyzer P95 latency is {stats.p95_latency_ms/1000:.1f}s")
Troubleshooting
No Traces Found
- Check that LangFuse is enabled:
LANGFUSE_ENABLED=true - Verify API keys are correct in
.env - Ensure network connectivity to LangFuse Cloud
- Check that at least one workflow has been executed
Missing Token/Cost Data
- Token usage requires
langfuse-openaiinstrumentation - Ensure
instrument_openai()is called before creating Azure OpenAI clients - Cost data depends on LangFuse pricing configuration
Slow Query Performance
- Reduce
limitparameter for large trace datasets - Use date range filters to narrow results
- Consider exporting traces to CSV for offline analysis
See Also
- LangFuse Documentation
- LangGraph Documentation
- Main README:
../README.md - Architecture:
../CLAUDE.md