Spaces:
Running
Running
| /** | |
| * Pipeline Observability β Trace ID system | |
| * | |
| * Every pipeline run gets a unique trace_id. | |
| * Every operation within that run carries the trace_id. | |
| * This enables: debugging, cost tracking, latency analysis. | |
| * | |
| * Think of it like a receipt number β every action is linked. | |
| */ | |
| import { randomUUID } from "crypto"; | |
| import { getSupabaseClient } from "../supabase/client"; | |
| import { logger } from "../utils/logger"; | |
| export interface PipelineTrace { | |
| traceId: string; | |
| runId: string; // discovery_runs.id | |
| startedAt: number; // Date.now() | |
| operationCount: number; | |
| totalTokens: number; | |
| totalLatencyMs: number; | |
| errors: string[]; | |
| } | |
| // In-memory trace store (per pipeline run) | |
| const activeTraces = new Map<string, PipelineTrace>(); | |
| /** | |
| * Start a new pipeline trace. Call this at beginning of every discovery run. | |
| */ | |
| export function startTrace(runId: string): string { | |
| const traceId = `trace_${randomUUID().slice(0, 8)}_${Date.now()}`; | |
| activeTraces.set(traceId, { | |
| traceId, | |
| runId, | |
| startedAt: Date.now(), | |
| operationCount: 0, | |
| totalTokens: 0, | |
| totalLatencyMs: 0, | |
| errors: [], | |
| }); | |
| logger.info({ traceId, runId }, "π Pipeline trace started"); | |
| return traceId; | |
| } | |
| /** | |
| * Record an operation within a trace. | |
| */ | |
| export function recordOperation( | |
| traceId: string, | |
| operation: string, | |
| tokens: number, | |
| latencyMs: number, | |
| success: boolean, | |
| error?: string | |
| ): void { | |
| const trace = activeTraces.get(traceId); | |
| if (!trace) return; | |
| trace.operationCount++; | |
| trace.totalTokens += tokens; | |
| trace.totalLatencyMs += latencyMs; | |
| if (!success && error) { | |
| trace.errors.push(`${operation}: ${error}`); | |
| } | |
| } | |
| /** | |
| * End trace and persist summary to audit_log. | |
| */ | |
| export async function endTrace(traceId: string): Promise<PipelineTrace | null> { | |
| const trace = activeTraces.get(traceId); | |
| if (!trace) return null; | |
| const duration = Date.now() - trace.startedAt; | |
| logger.info({ | |
| traceId, | |
| operations: trace.operationCount, | |
| tokens: trace.totalTokens, | |
| durationMs: duration, | |
| errors: trace.errors.length, | |
| }, "β Pipeline trace completed"); | |
| // Persist to audit log | |
| try { | |
| const db = getSupabaseClient(); | |
| await db.from("audit_log").insert({ | |
| action: "pipeline_trace_completed", | |
| entity_type: "discovery_run", | |
| entity_id: trace.runId, | |
| details: { | |
| trace_id: traceId, | |
| duration_ms: duration, | |
| operations: trace.operationCount, | |
| total_tokens: trace.totalTokens, | |
| total_latency_ms: trace.totalLatencyMs, | |
| error_count: trace.errors.length, | |
| errors: trace.errors.slice(0, 10), // cap at 10 | |
| }, | |
| }); | |
| } catch (err) { | |
| logger.warn({ err }, "Failed to persist trace β non-critical"); | |
| } | |
| activeTraces.delete(traceId); | |
| return trace; | |
| } | |
| /** | |
| * Get active trace (for passing to LLM calls etc.) | |
| */ | |
| export function getTrace(traceId: string): PipelineTrace | undefined { | |
| return activeTraces.get(traceId); | |
| } | |