GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4

A newer version of the Gradio SDK is available: 6.5.1

Upgrade

Observability Module

This module provides comprehensive observability for the multi-agent RAG system using LangFuse tracing and analytics.

Features

  • Trace Reading API: Query and filter LangFuse traces programmatically
  • Performance Analytics: Agent-level metrics including latency, token usage, and costs
  • Trajectory Analysis: Analyze agent execution paths and workflow patterns
  • Export Capabilities: Export traces to JSON/CSV for external analysis

Quick Start

1. Configure LangFuse

Add your LangFuse credentials to .env:

LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_HOST=https://cloud.langfuse.com

2. Run Your Workflow

The system automatically traces all agent executions, LLM calls, and RAG operations.

3. Query Traces

Use the Python API to read and analyze traces:

from observability import TraceReader, AgentPerformanceAnalyzer

# Initialize trace reader
reader = TraceReader()

# Get recent traces
traces = reader.get_traces(limit=10)

# Get traces for a specific session
session_traces = reader.get_traces(session_id="session-abc123")

# Filter by agent
retriever_spans = reader.filter_by_agent("retriever_agent", limit=50)

# Get specific trace
trace = reader.get_trace_by_id("trace-xyz")

Trace Reader API

TraceReader

Query and retrieve traces from LangFuse.

from observability import TraceReader
from datetime import datetime, timedelta

reader = TraceReader()

# Get traces with filters
traces = reader.get_traces(
    limit=50,
    user_id="user-123",
    session_id="session-abc",
    from_timestamp=datetime.now() - timedelta(days=7),
    to_timestamp=datetime.now()
)

# Filter by date range
recent_traces = reader.filter_by_date_range(
    from_date=datetime.now() - timedelta(days=1),
    to_date=datetime.now(),
    limit=100
)

# Get LLM generations
generations = reader.get_generations(trace_id="trace-xyz")

# Export to files
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")

Performance Analytics API

AgentPerformanceAnalyzer

Analyze agent performance metrics.

from observability import AgentPerformanceAnalyzer

analyzer = AgentPerformanceAnalyzer()

# Get latency statistics for an agent
stats = analyzer.agent_latency_stats("retriever_agent", days=7)
print(f"Average latency: {stats.avg_latency_ms:.2f}ms")
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")
print(f"Success rate: {stats.success_rate:.1f}%")

# Get token usage breakdown
token_usage = analyzer.token_usage_breakdown(days=7)
for agent, usage in token_usage.items():
    print(f"{agent}: {usage['total']:,} tokens")

# Get cost breakdown per agent
costs = analyzer.cost_per_agent(session_id="session-abc")
for agent, cost in costs.items():
    print(f"{agent}: ${cost:.4f}")

# Get error rates
error_stats = analyzer.error_rates(days=30)
for agent, stats in error_stats.items():
    print(f"{agent}: {stats['error_rate_percent']:.2f}% errors")

# Get workflow performance summary
workflow_stats = analyzer.workflow_performance_summary(days=7)
print(f"Total runs: {workflow_stats.total_runs}")
print(f"Average duration: {workflow_stats.avg_duration_ms:.2f}ms")
print(f"Total cost: ${workflow_stats.total_cost:.4f}")

Trajectory Analysis API

AgentTrajectoryAnalyzer

Analyze agent execution paths and workflow patterns.

from observability import AgentTrajectoryAnalyzer

analyzer = AgentTrajectoryAnalyzer()

# Get agent trajectories
trajectories = analyzer.get_trajectories(session_id="session-abc", days=7)

for traj in trajectories:
    print(f"Trace: {traj.trace_id}")
    print(f"Duration: {traj.total_duration_ms:.2f}ms")
    print(f"Path: {' → '.join(traj.agent_sequence)}")
    print(f"Success: {traj.success}")

# Analyze execution paths
path_analysis = analyzer.analyze_execution_paths(days=7)
print(f"Total workflows: {path_analysis['total_workflows']}")
print(f"Unique paths: {path_analysis['unique_paths']}")
print(f"Most common path: {path_analysis['most_common_path']}")

# Compare two workflow executions
comparison = analyzer.compare_trajectories("trace-1", "trace-2")
print(f"Duration difference: {comparison['duration_diff_ms']:.2f}ms")
print(f"Same path: {comparison['same_path']}")

Data Models

TraceInfo

class TraceInfo(BaseModel):
    id: str
    name: str
    user_id: Optional[str]
    session_id: Optional[str]
    timestamp: datetime
    metadata: Dict[str, Any]
    duration_ms: Optional[float]
    total_cost: Optional[float]
    token_usage: Dict[str, int]

AgentStats

class AgentStats(BaseModel):
    agent_name: str
    execution_count: int
    avg_latency_ms: float
    p50_latency_ms: float
    p95_latency_ms: float
    p99_latency_ms: float
    min_latency_ms: float
    max_latency_ms: float
    success_rate: float
    total_cost: float

WorkflowStats

class WorkflowStats(BaseModel):
    total_runs: int
    avg_duration_ms: float
    p50_duration_ms: float
    p95_duration_ms: float
    p99_duration_ms: float
    success_rate: float
    total_cost: float
    avg_cost_per_run: float
    total_tokens: int

AgentTrajectory

class AgentTrajectory(BaseModel):
    trace_id: str
    session_id: Optional[str]
    start_time: datetime
    total_duration_ms: float
    agent_sequence: List[str]
    agent_timings: Dict[str, float]
    agent_costs: Dict[str, float]
    errors: List[str]
    success: bool

Example: Performance Dashboard Script

#!/usr/bin/env python3
"""Generate performance dashboard from traces."""

from datetime import datetime, timedelta
from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer

def main():
    perf = AgentPerformanceAnalyzer()
    traj = AgentTrajectoryAnalyzer()

    print("=" * 60)
    print("AGENT PERFORMANCE DASHBOARD - Last 7 Days")
    print("=" * 60)

    # Workflow summary
    workflow_stats = perf.workflow_performance_summary(days=7)
    if workflow_stats:
        print(f"\nWorkflow Summary:")
        print(f"  Total Runs: {workflow_stats.total_runs}")
        print(f"  Avg Duration: {workflow_stats.avg_duration_ms/1000:.2f}s")
        print(f"  P95 Duration: {workflow_stats.p95_duration_ms/1000:.2f}s")
        print(f"  Success Rate: {workflow_stats.success_rate:.1f}%")
        print(f"  Total Cost: ${workflow_stats.total_cost:.4f}")
        print(f"  Avg Cost/Run: ${workflow_stats.avg_cost_per_run:.4f}")

    # Agent latency stats
    print(f"\nAgent Latency Statistics:")
    for agent_name in ["retriever_agent", "analyzer_agent", "synthesis_agent"]:
        stats = perf.agent_latency_stats(agent_name, days=7)
        if stats:
            print(f"\n  {agent_name}:")
            print(f"    Executions: {stats.execution_count}")
            print(f"    Avg Latency: {stats.avg_latency_ms/1000:.2f}s")
            print(f"    P95 Latency: {stats.p95_latency_ms/1000:.2f}s")
            print(f"    Success Rate: {stats.success_rate:.1f}%")

    # Cost breakdown
    print(f"\nCost Breakdown:")
    costs = perf.cost_per_agent(days=7)
    for agent, cost in sorted(costs.items(), key=lambda x: x[1], reverse=True):
        print(f"  {agent}: ${cost:.4f}")

    # Path analysis
    print(f"\nExecution Path Analysis:")
    path_analysis = traj.analyze_execution_paths(days=7)
    if path_analysis:
        print(f"  Total Workflows: {path_analysis['total_workflows']}")
        print(f"  Unique Paths: {path_analysis['unique_paths']}")
        if path_analysis['most_common_path']:
            path, count = path_analysis['most_common_path']
            print(f"  Most Common: {path} ({count} times)")

if __name__ == "__main__":
    main()

Save as scripts/performance_dashboard.py and run:

python scripts/performance_dashboard.py

Advanced Usage

Custom Metrics

from observability import TraceReader

reader = TraceReader()

# Calculate custom metric: papers processed per second
traces = reader.get_traces(limit=100)
total_papers = 0
total_time_ms = 0

for trace in traces:
    if trace.metadata.get("num_papers"):
        total_papers += trace.metadata["num_papers"]
        total_time_ms += trace.duration_ms or 0

if total_time_ms > 0:
    papers_per_second = (total_papers / total_time_ms) * 1000
    print(f"Papers/second: {papers_per_second:.2f}")

Monitoring Alerts

from observability import AgentPerformanceAnalyzer

analyzer = AgentPerformanceAnalyzer()

# Check if error rate exceeds threshold
error_stats = analyzer.error_rates(days=1)
for agent, stats in error_stats.items():
    if stats['error_rate_percent'] > 10:
        print(f"⚠️  ALERT: {agent} error rate is {stats['error_rate_percent']:.1f}%")

# Check if P95 latency is too high
stats = analyzer.agent_latency_stats("analyzer_agent", days=1)
if stats and stats.p95_latency_ms > 30000:  # 30 seconds
    print(f"⚠️  ALERT: Analyzer P95 latency is {stats.p95_latency_ms/1000:.1f}s")

Troubleshooting

No Traces Found

  1. Check that LangFuse is enabled: LANGFUSE_ENABLED=true
  2. Verify API keys are correct in .env
  3. Ensure network connectivity to LangFuse Cloud
  4. Check that at least one workflow has been executed

Missing Token/Cost Data

  • Token usage requires langfuse-openai instrumentation
  • Ensure instrument_openai() is called before creating Azure OpenAI clients
  • Cost data depends on LangFuse pricing configuration

Slow Query Performance

  • Reduce limit parameter for large trace datasets
  • Use date range filters to narrow results
  • Consider exporting traces to CSV for offline analysis

See Also