File size: 9,954 Bytes
aca8ab4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# Observability Module

This module provides comprehensive observability for the multi-agent RAG system using LangFuse tracing and analytics.

## Features

- **Trace Reading API**: Query and filter LangFuse traces programmatically
- **Performance Analytics**: Agent-level metrics including latency, token usage, and costs
- **Trajectory Analysis**: Analyze agent execution paths and workflow patterns
- **Export Capabilities**: Export traces to JSON/CSV for external analysis

## Quick Start

### 1. Configure LangFuse

Add your LangFuse credentials to `.env`:

```bash
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY=pk-lf-your-public-key-here
LANGFUSE_SECRET_KEY=sk-lf-your-secret-key-here
LANGFUSE_HOST=https://cloud.langfuse.com
```

### 2. Run Your Workflow

The system automatically traces all agent executions, LLM calls, and RAG operations.

### 3. Query Traces

Use the Python API to read and analyze traces:

```python
from observability import TraceReader, AgentPerformanceAnalyzer

# Initialize trace reader
reader = TraceReader()

# Get recent traces
traces = reader.get_traces(limit=10)

# Get traces for a specific session
session_traces = reader.get_traces(session_id="session-abc123")

# Filter by agent
retriever_spans = reader.filter_by_agent("retriever_agent", limit=50)

# Get specific trace
trace = reader.get_trace_by_id("trace-xyz")
```

## Trace Reader API

### TraceReader

Query and retrieve traces from LangFuse.

```python
from observability import TraceReader
from datetime import datetime, timedelta

reader = TraceReader()

# Get traces with filters
traces = reader.get_traces(
    limit=50,
    user_id="user-123",
    session_id="session-abc",
    from_timestamp=datetime.now() - timedelta(days=7),
    to_timestamp=datetime.now()
)

# Filter by date range
recent_traces = reader.filter_by_date_range(
    from_date=datetime.now() - timedelta(days=1),
    to_date=datetime.now(),
    limit=100
)

# Get LLM generations
generations = reader.get_generations(trace_id="trace-xyz")

# Export to files
reader.export_traces_to_json(traces, "traces.json")
reader.export_traces_to_csv(traces, "traces.csv")
```

## Performance Analytics API

### AgentPerformanceAnalyzer

Analyze agent performance metrics.

```python
from observability import AgentPerformanceAnalyzer

analyzer = AgentPerformanceAnalyzer()

# Get latency statistics for an agent
stats = analyzer.agent_latency_stats("retriever_agent", days=7)
print(f"Average latency: {stats.avg_latency_ms:.2f}ms")
print(f"P95 latency: {stats.p95_latency_ms:.2f}ms")
print(f"Success rate: {stats.success_rate:.1f}%")

# Get token usage breakdown
token_usage = analyzer.token_usage_breakdown(days=7)
for agent, usage in token_usage.items():
    print(f"{agent}: {usage['total']:,} tokens")

# Get cost breakdown per agent
costs = analyzer.cost_per_agent(session_id="session-abc")
for agent, cost in costs.items():
    print(f"{agent}: ${cost:.4f}")

# Get error rates
error_stats = analyzer.error_rates(days=30)
for agent, stats in error_stats.items():
    print(f"{agent}: {stats['error_rate_percent']:.2f}% errors")

# Get workflow performance summary
workflow_stats = analyzer.workflow_performance_summary(days=7)
print(f"Total runs: {workflow_stats.total_runs}")
print(f"Average duration: {workflow_stats.avg_duration_ms:.2f}ms")
print(f"Total cost: ${workflow_stats.total_cost:.4f}")
```

## Trajectory Analysis API

### AgentTrajectoryAnalyzer

Analyze agent execution paths and workflow patterns.

```python
from observability import AgentTrajectoryAnalyzer

analyzer = AgentTrajectoryAnalyzer()

# Get agent trajectories
trajectories = analyzer.get_trajectories(session_id="session-abc", days=7)

for traj in trajectories:
    print(f"Trace: {traj.trace_id}")
    print(f"Duration: {traj.total_duration_ms:.2f}ms")
    print(f"Path: {' → '.join(traj.agent_sequence)}")
    print(f"Success: {traj.success}")

# Analyze execution paths
path_analysis = analyzer.analyze_execution_paths(days=7)
print(f"Total workflows: {path_analysis['total_workflows']}")
print(f"Unique paths: {path_analysis['unique_paths']}")
print(f"Most common path: {path_analysis['most_common_path']}")

# Compare two workflow executions
comparison = analyzer.compare_trajectories("trace-1", "trace-2")
print(f"Duration difference: {comparison['duration_diff_ms']:.2f}ms")
print(f"Same path: {comparison['same_path']}")
```

## Data Models

### TraceInfo

```python
class TraceInfo(BaseModel):
    id: str
    name: str
    user_id: Optional[str]
    session_id: Optional[str]
    timestamp: datetime
    metadata: Dict[str, Any]
    duration_ms: Optional[float]
    total_cost: Optional[float]
    token_usage: Dict[str, int]
```

### AgentStats

```python
class AgentStats(BaseModel):
    agent_name: str
    execution_count: int
    avg_latency_ms: float
    p50_latency_ms: float
    p95_latency_ms: float
    p99_latency_ms: float
    min_latency_ms: float
    max_latency_ms: float
    success_rate: float
    total_cost: float
```

### WorkflowStats

```python
class WorkflowStats(BaseModel):
    total_runs: int
    avg_duration_ms: float
    p50_duration_ms: float
    p95_duration_ms: float
    p99_duration_ms: float
    success_rate: float
    total_cost: float
    avg_cost_per_run: float
    total_tokens: int
```

### AgentTrajectory

```python
class AgentTrajectory(BaseModel):
    trace_id: str
    session_id: Optional[str]
    start_time: datetime
    total_duration_ms: float
    agent_sequence: List[str]
    agent_timings: Dict[str, float]
    agent_costs: Dict[str, float]
    errors: List[str]
    success: bool
```

## Example: Performance Dashboard Script

```python
#!/usr/bin/env python3
"""Generate performance dashboard from traces."""

from datetime import datetime, timedelta
from observability import AgentPerformanceAnalyzer, AgentTrajectoryAnalyzer

def main():
    perf = AgentPerformanceAnalyzer()
    traj = AgentTrajectoryAnalyzer()

    print("=" * 60)
    print("AGENT PERFORMANCE DASHBOARD - Last 7 Days")
    print("=" * 60)

    # Workflow summary
    workflow_stats = perf.workflow_performance_summary(days=7)
    if workflow_stats:
        print(f"\nWorkflow Summary:")
        print(f"  Total Runs: {workflow_stats.total_runs}")
        print(f"  Avg Duration: {workflow_stats.avg_duration_ms/1000:.2f}s")
        print(f"  P95 Duration: {workflow_stats.p95_duration_ms/1000:.2f}s")
        print(f"  Success Rate: {workflow_stats.success_rate:.1f}%")
        print(f"  Total Cost: ${workflow_stats.total_cost:.4f}")
        print(f"  Avg Cost/Run: ${workflow_stats.avg_cost_per_run:.4f}")

    # Agent latency stats
    print(f"\nAgent Latency Statistics:")
    for agent_name in ["retriever_agent", "analyzer_agent", "synthesis_agent"]:
        stats = perf.agent_latency_stats(agent_name, days=7)
        if stats:
            print(f"\n  {agent_name}:")
            print(f"    Executions: {stats.execution_count}")
            print(f"    Avg Latency: {stats.avg_latency_ms/1000:.2f}s")
            print(f"    P95 Latency: {stats.p95_latency_ms/1000:.2f}s")
            print(f"    Success Rate: {stats.success_rate:.1f}%")

    # Cost breakdown
    print(f"\nCost Breakdown:")
    costs = perf.cost_per_agent(days=7)
    for agent, cost in sorted(costs.items(), key=lambda x: x[1], reverse=True):
        print(f"  {agent}: ${cost:.4f}")

    # Path analysis
    print(f"\nExecution Path Analysis:")
    path_analysis = traj.analyze_execution_paths(days=7)
    if path_analysis:
        print(f"  Total Workflows: {path_analysis['total_workflows']}")
        print(f"  Unique Paths: {path_analysis['unique_paths']}")
        if path_analysis['most_common_path']:
            path, count = path_analysis['most_common_path']
            print(f"  Most Common: {path} ({count} times)")

if __name__ == "__main__":
    main()
```

Save as `scripts/performance_dashboard.py` and run:

```bash
python scripts/performance_dashboard.py
```

## Advanced Usage

### Custom Metrics

```python
from observability import TraceReader

reader = TraceReader()

# Calculate custom metric: papers processed per second
traces = reader.get_traces(limit=100)
total_papers = 0
total_time_ms = 0

for trace in traces:
    if trace.metadata.get("num_papers"):
        total_papers += trace.metadata["num_papers"]
        total_time_ms += trace.duration_ms or 0

if total_time_ms > 0:
    papers_per_second = (total_papers / total_time_ms) * 1000
    print(f"Papers/second: {papers_per_second:.2f}")
```

### Monitoring Alerts

```python
from observability import AgentPerformanceAnalyzer

analyzer = AgentPerformanceAnalyzer()

# Check if error rate exceeds threshold
error_stats = analyzer.error_rates(days=1)
for agent, stats in error_stats.items():
    if stats['error_rate_percent'] > 10:
        print(f"⚠️  ALERT: {agent} error rate is {stats['error_rate_percent']:.1f}%")

# Check if P95 latency is too high
stats = analyzer.agent_latency_stats("analyzer_agent", days=1)
if stats and stats.p95_latency_ms > 30000:  # 30 seconds
    print(f"⚠️  ALERT: Analyzer P95 latency is {stats.p95_latency_ms/1000:.1f}s")
```

## Troubleshooting

### No Traces Found

1. Check that LangFuse is enabled: `LANGFUSE_ENABLED=true`
2. Verify API keys are correct in `.env`
3. Ensure network connectivity to LangFuse Cloud
4. Check that at least one workflow has been executed

### Missing Token/Cost Data

- Token usage requires `langfuse-openai` instrumentation
- Ensure `instrument_openai()` is called before creating Azure OpenAI clients
- Cost data depends on LangFuse pricing configuration

### Slow Query Performance

- Reduce `limit` parameter for large trace datasets
- Use date range filters to narrow results
- Consider exporting traces to CSV for offline analysis

## See Also

- [LangFuse Documentation](https://langfuse.com/docs)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- Main README: `../README.md`
- Architecture: `../CLAUDE.md`