File size: 5,493 Bytes
0e4dd30 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | """
server/traces.py — Distributed trace generation for inspect_traces action.
Generates realistic Jaeger/Zipkin-style trace trees showing request flow
through the service dependency graph. Healthy services show normal latencies;
failing services show errors, timeouts, and cascading delays.
Each trace is a tree of spans rooted at the inspected service.
"""
from __future__ import annotations
import random
from typing import Any, Dict, List, Optional
from server.graph import ServiceGraph
def _make_span_id(rng: random.Random) -> str:
return f"{rng.randint(0, 0xFFFFFFFF):08x}"
def _make_trace_id(rng: random.Random) -> str:
return f"{rng.randint(0, 0xFFFFFFFFFFFFFFFF):016x}"
def generate_trace(
service_id: str,
graph: ServiceGraph,
service_errors: Dict[str, float],
service_latencies: Dict[str, float],
rng: random.Random,
max_depth: int = 4,
) -> Dict[str, Any]:
"""
Generate a distributed trace tree rooted at service_id.
Returns a dict with trace_id, root_span, and flat spans list.
service_errors: service_id → error_rate (0.0–1.0)
service_latencies: service_id → p99_ms
"""
trace_id = _make_trace_id(rng)
spans: List[Dict[str, Any]] = []
def _build_span(
svc_id: str,
parent_span_id: Optional[str],
depth: int,
start_offset_ms: float,
) -> Dict[str, Any]:
span_id = _make_span_id(rng)
error_rate = service_errors.get(svc_id, 0.0)
base_latency = service_latencies.get(svc_id, rng.uniform(5, 50))
has_error = rng.random() < error_rate
# Span duration: base latency + noise
if has_error and error_rate > 0.8:
# Fast fail or timeout
duration_ms = rng.choice([
rng.uniform(0.5, 5), # Fast fail
rng.uniform(3000, 10000), # Timeout
])
elif has_error:
duration_ms = base_latency * rng.uniform(1.5, 5.0)
else:
duration_ms = base_latency * rng.uniform(0.3, 1.2)
duration_ms = max(0.1, duration_ms)
span = {
"span_id": span_id,
"parent_span_id": parent_span_id,
"service": svc_id,
"operation": _operation_name(svc_id, rng),
"start_ms": round(start_offset_ms, 1),
"duration_ms": round(duration_ms, 1),
"status": "ERROR" if has_error else "OK",
"tags": {},
}
if has_error:
span["tags"]["error"] = True
span["tags"]["error.message"] = _error_message(svc_id, error_rate, rng)
node = graph.node_map.get(svc_id)
if node:
span["tags"]["service.layer"] = node.layer
span["tags"]["service.region"] = node.region
spans.append(span)
# Recurse into downstream dependencies
if depth < max_depth:
deps = graph.adjacency.get(svc_id, [])
child_offset = start_offset_ms + rng.uniform(0.1, 2.0)
for dep_id in deps:
# Check edge activation (probabilistic)
edge = next(
(e for e in graph.edges if e.source == svc_id and e.target == dep_id),
None,
)
if edge and rng.random() > edge.activation_probability:
continue
child_span = _build_span(dep_id, span_id, depth + 1, child_offset)
child_offset += child_span["duration_ms"] + rng.uniform(0.1, 1.0)
return span
root_span = _build_span(service_id, None, 0, 0.0)
# Compute total trace duration
if spans:
total_duration = max(s["start_ms"] + s["duration_ms"] for s in spans)
else:
total_duration = 0.0
return {
"trace_id": trace_id,
"root_service": service_id,
"span_count": len(spans),
"total_duration_ms": round(total_duration, 1),
"spans": spans,
}
def _operation_name(service_id: str, rng: random.Random) -> str:
"""Generate a realistic operation name based on service type."""
if "gateway" in service_id or "bff" in service_id:
return rng.choice(["HTTP GET /api/v1/resource", "HTTP POST /api/v1/action", "HTTP GET /health"])
if "auth" in service_id or "identity" in service_id or "session" in service_id:
return rng.choice(["validateToken", "authenticate", "refreshSession"])
if "postgres" in service_id:
return rng.choice(["SELECT", "INSERT", "UPDATE", "pg_pool.checkout"])
if "redis" in service_id:
return rng.choice(["GET", "SET", "MGET", "EXPIRE"])
if "kafka" in service_id:
return rng.choice(["produce", "consume", "commitOffset"])
if "elasticsearch" in service_id:
return rng.choice(["search", "index", "bulk"])
return rng.choice(["processRequest", "handleMessage", "execute"])
def _error_message(service_id: str, error_rate: float, rng: random.Random) -> str:
"""Generate a trace-level error message."""
if error_rate > 0.8:
return rng.choice([
f"{service_id}: Connection refused",
f"{service_id}: Service unavailable (HTTP 503)",
f"{service_id}: Timeout after 5000ms",
])
return rng.choice([
f"{service_id}: Internal server error (HTTP 500)",
f"{service_id}: Upstream dependency timeout",
f"{service_id}: Rate limited (HTTP 429)",
f"{service_id}: Bad gateway (HTTP 502)",
])
|