File size: 2,987 Bytes
bd28470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
 * Pipeline Observability — Trace ID system
 * 
 * Every pipeline run gets a unique trace_id.
 * Every operation within that run carries the trace_id.
 * This enables: debugging, cost tracking, latency analysis.
 * 
 * Think of it like a receipt number — every action is linked.
 */

import { randomUUID } from "crypto";
import { getSupabaseClient } from "../supabase/client";
import { logger } from "../utils/logger";

export interface PipelineTrace {
  traceId: string;
  runId: string;         // discovery_runs.id
  startedAt: number;     // Date.now()
  operationCount: number;
  totalTokens: number;
  totalLatencyMs: number;
  errors: string[];
}

// In-memory trace store (per pipeline run)
const activeTraces = new Map<string, PipelineTrace>();

/**
 * Start a new pipeline trace. Call this at beginning of every discovery run.
 */
export function startTrace(runId: string): string {
  const traceId = `trace_${randomUUID().slice(0, 8)}_${Date.now()}`;
  
  activeTraces.set(traceId, {
    traceId,
    runId,
    startedAt: Date.now(),
    operationCount: 0,
    totalTokens: 0,
    totalLatencyMs: 0,
    errors: [],
  });

  logger.info({ traceId, runId }, "🔍 Pipeline trace started");
  return traceId;
}

/**
 * Record an operation within a trace.
 */
export function recordOperation(
  traceId: string,
  operation: string,
  tokens: number,
  latencyMs: number,
  success: boolean,
  error?: string
): void {
  const trace = activeTraces.get(traceId);
  if (!trace) return;

  trace.operationCount++;
  trace.totalTokens += tokens;
  trace.totalLatencyMs += latencyMs;
  
  if (!success && error) {
    trace.errors.push(`${operation}: ${error}`);
  }
}

/**
 * End trace and persist summary to audit_log.
 */
export async function endTrace(traceId: string): Promise<PipelineTrace | null> {
  const trace = activeTraces.get(traceId);
  if (!trace) return null;

  const duration = Date.now() - trace.startedAt;

  logger.info({
    traceId,
    operations: trace.operationCount,
    tokens: trace.totalTokens,
    durationMs: duration,
    errors: trace.errors.length,
  }, "✅ Pipeline trace completed");

  // Persist to audit log
  try {
    const db = getSupabaseClient();
    await db.from("audit_log").insert({
      action: "pipeline_trace_completed",
      entity_type: "discovery_run",
      entity_id: trace.runId,
      details: {
        trace_id: traceId,
        duration_ms: duration,
        operations: trace.operationCount,
        total_tokens: trace.totalTokens,
        total_latency_ms: trace.totalLatencyMs,
        error_count: trace.errors.length,
        errors: trace.errors.slice(0, 10), // cap at 10
      },
    });
  } catch (err) {
    logger.warn({ err }, "Failed to persist trace — non-critical");
  }

  activeTraces.delete(traceId);
  return trace;
}

/**
 * Get active trace (for passing to LLM calls etc.)
 */
export function getTrace(traceId: string): PipelineTrace | undefined {
  return activeTraces.get(traceId);
}