/** * Pipeline Observability — Trace ID system * * Every pipeline run gets a unique trace_id. * Every operation within that run carries the trace_id. * This enables: debugging, cost tracking, latency analysis. * * Think of it like a receipt number — every action is linked. */ import { randomUUID } from "crypto"; import { getSupabaseClient } from "../supabase/client"; import { logger } from "../utils/logger"; export interface PipelineTrace { traceId: string; runId: string; // discovery_runs.id startedAt: number; // Date.now() operationCount: number; totalTokens: number; totalLatencyMs: number; errors: string[]; } // In-memory trace store (per pipeline run) const activeTraces = new Map(); /** * Start a new pipeline trace. Call this at beginning of every discovery run. */ export function startTrace(runId: string): string { const traceId = `trace_${randomUUID().slice(0, 8)}_${Date.now()}`; activeTraces.set(traceId, { traceId, runId, startedAt: Date.now(), operationCount: 0, totalTokens: 0, totalLatencyMs: 0, errors: [], }); logger.info({ traceId, runId }, "🔍 Pipeline trace started"); return traceId; } /** * Record an operation within a trace. */ export function recordOperation( traceId: string, operation: string, tokens: number, latencyMs: number, success: boolean, error?: string ): void { const trace = activeTraces.get(traceId); if (!trace) return; trace.operationCount++; trace.totalTokens += tokens; trace.totalLatencyMs += latencyMs; if (!success && error) { trace.errors.push(`${operation}: ${error}`); } } /** * End trace and persist summary to audit_log. */ export async function endTrace(traceId: string): Promise { const trace = activeTraces.get(traceId); if (!trace) return null; const duration = Date.now() - trace.startedAt; logger.info({ traceId, operations: trace.operationCount, tokens: trace.totalTokens, durationMs: duration, errors: trace.errors.length, }, "✅ Pipeline trace completed"); // Persist to audit log try { const db = getSupabaseClient(); await db.from("audit_log").insert({ action: "pipeline_trace_completed", entity_type: "discovery_run", entity_id: trace.runId, details: { trace_id: traceId, duration_ms: duration, operations: trace.operationCount, total_tokens: trace.totalTokens, total_latency_ms: trace.totalLatencyMs, error_count: trace.errors.length, errors: trace.errors.slice(0, 10), // cap at 10 }, }); } catch (err) { logger.warn({ err }, "Failed to persist trace — non-critical"); } activeTraces.delete(traceId); return trace; } /** * Get active trace (for passing to LLM calls etc.) */ export function getTrace(traceId: string): PipelineTrace | undefined { return activeTraces.get(traceId); }