File size: 5,493 Bytes
0e4dd30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""
server/traces.py — Distributed trace generation for inspect_traces action.

Generates realistic Jaeger/Zipkin-style trace trees showing request flow
through the service dependency graph. Healthy services show normal latencies;
failing services show errors, timeouts, and cascading delays.

Each trace is a tree of spans rooted at the inspected service.
"""

from __future__ import annotations

import random
from typing import Any, Dict, List, Optional

from server.graph import ServiceGraph


def _make_span_id(rng: random.Random) -> str:
    return f"{rng.randint(0, 0xFFFFFFFF):08x}"


def _make_trace_id(rng: random.Random) -> str:
    return f"{rng.randint(0, 0xFFFFFFFFFFFFFFFF):016x}"


def generate_trace(
    service_id: str,
    graph: ServiceGraph,
    service_errors: Dict[str, float],
    service_latencies: Dict[str, float],
    rng: random.Random,
    max_depth: int = 4,
) -> Dict[str, Any]:
    """
    Generate a distributed trace tree rooted at service_id.

    Returns a dict with trace_id, root_span, and flat spans list.
    service_errors: service_id → error_rate (0.0–1.0)
    service_latencies: service_id → p99_ms
    """
    trace_id = _make_trace_id(rng)
    spans: List[Dict[str, Any]] = []

    def _build_span(
        svc_id: str,
        parent_span_id: Optional[str],
        depth: int,
        start_offset_ms: float,
    ) -> Dict[str, Any]:
        span_id = _make_span_id(rng)
        error_rate = service_errors.get(svc_id, 0.0)
        base_latency = service_latencies.get(svc_id, rng.uniform(5, 50))
        has_error = rng.random() < error_rate

        # Span duration: base latency + noise
        if has_error and error_rate > 0.8:
            # Fast fail or timeout
            duration_ms = rng.choice([
                rng.uniform(0.5, 5),       # Fast fail
                rng.uniform(3000, 10000),   # Timeout
            ])
        elif has_error:
            duration_ms = base_latency * rng.uniform(1.5, 5.0)
        else:
            duration_ms = base_latency * rng.uniform(0.3, 1.2)

        duration_ms = max(0.1, duration_ms)

        span = {
            "span_id": span_id,
            "parent_span_id": parent_span_id,
            "service": svc_id,
            "operation": _operation_name(svc_id, rng),
            "start_ms": round(start_offset_ms, 1),
            "duration_ms": round(duration_ms, 1),
            "status": "ERROR" if has_error else "OK",
            "tags": {},
        }

        if has_error:
            span["tags"]["error"] = True
            span["tags"]["error.message"] = _error_message(svc_id, error_rate, rng)

        node = graph.node_map.get(svc_id)
        if node:
            span["tags"]["service.layer"] = node.layer
            span["tags"]["service.region"] = node.region

        spans.append(span)

        # Recurse into downstream dependencies
        if depth < max_depth:
            deps = graph.adjacency.get(svc_id, [])
            child_offset = start_offset_ms + rng.uniform(0.1, 2.0)
            for dep_id in deps:
                # Check edge activation (probabilistic)
                edge = next(
                    (e for e in graph.edges if e.source == svc_id and e.target == dep_id),
                    None,
                )
                if edge and rng.random() > edge.activation_probability:
                    continue

                child_span = _build_span(dep_id, span_id, depth + 1, child_offset)
                child_offset += child_span["duration_ms"] + rng.uniform(0.1, 1.0)

        return span

    root_span = _build_span(service_id, None, 0, 0.0)

    # Compute total trace duration
    if spans:
        total_duration = max(s["start_ms"] + s["duration_ms"] for s in spans)
    else:
        total_duration = 0.0

    return {
        "trace_id": trace_id,
        "root_service": service_id,
        "span_count": len(spans),
        "total_duration_ms": round(total_duration, 1),
        "spans": spans,
    }


def _operation_name(service_id: str, rng: random.Random) -> str:
    """Generate a realistic operation name based on service type."""
    if "gateway" in service_id or "bff" in service_id:
        return rng.choice(["HTTP GET /api/v1/resource", "HTTP POST /api/v1/action", "HTTP GET /health"])
    if "auth" in service_id or "identity" in service_id or "session" in service_id:
        return rng.choice(["validateToken", "authenticate", "refreshSession"])
    if "postgres" in service_id:
        return rng.choice(["SELECT", "INSERT", "UPDATE", "pg_pool.checkout"])
    if "redis" in service_id:
        return rng.choice(["GET", "SET", "MGET", "EXPIRE"])
    if "kafka" in service_id:
        return rng.choice(["produce", "consume", "commitOffset"])
    if "elasticsearch" in service_id:
        return rng.choice(["search", "index", "bulk"])
    return rng.choice(["processRequest", "handleMessage", "execute"])


def _error_message(service_id: str, error_rate: float, rng: random.Random) -> str:
    """Generate a trace-level error message."""
    if error_rate > 0.8:
        return rng.choice([
            f"{service_id}: Connection refused",
            f"{service_id}: Service unavailable (HTTP 503)",
            f"{service_id}: Timeout after 5000ms",
        ])
    return rng.choice([
        f"{service_id}: Internal server error (HTTP 500)",
        f"{service_id}: Upstream dependency timeout",
        f"{service_id}: Rate limited (HTTP 429)",
        f"{service_id}: Bad gateway (HTTP 502)",
    ])