iDevBuddy
feat: Phase 1 β€” AI Client Acquisition System
bd28470
/**
* Pipeline Observability β€” Trace ID system
*
* Every pipeline run gets a unique trace_id.
* Every operation within that run carries the trace_id.
* This enables: debugging, cost tracking, latency analysis.
*
* Think of it like a receipt number β€” every action is linked.
*/
import { randomUUID } from "crypto";
import { getSupabaseClient } from "../supabase/client";
import { logger } from "../utils/logger";
export interface PipelineTrace {
traceId: string;
runId: string; // discovery_runs.id
startedAt: number; // Date.now()
operationCount: number;
totalTokens: number;
totalLatencyMs: number;
errors: string[];
}
// In-memory trace store (per pipeline run)
const activeTraces = new Map<string, PipelineTrace>();
/**
* Start a new pipeline trace. Call this at beginning of every discovery run.
*/
export function startTrace(runId: string): string {
const traceId = `trace_${randomUUID().slice(0, 8)}_${Date.now()}`;
activeTraces.set(traceId, {
traceId,
runId,
startedAt: Date.now(),
operationCount: 0,
totalTokens: 0,
totalLatencyMs: 0,
errors: [],
});
logger.info({ traceId, runId }, "πŸ” Pipeline trace started");
return traceId;
}
/**
* Record an operation within a trace.
*/
export function recordOperation(
traceId: string,
operation: string,
tokens: number,
latencyMs: number,
success: boolean,
error?: string
): void {
const trace = activeTraces.get(traceId);
if (!trace) return;
trace.operationCount++;
trace.totalTokens += tokens;
trace.totalLatencyMs += latencyMs;
if (!success && error) {
trace.errors.push(`${operation}: ${error}`);
}
}
/**
* End trace and persist summary to audit_log.
*/
export async function endTrace(traceId: string): Promise<PipelineTrace | null> {
const trace = activeTraces.get(traceId);
if (!trace) return null;
const duration = Date.now() - trace.startedAt;
logger.info({
traceId,
operations: trace.operationCount,
tokens: trace.totalTokens,
durationMs: duration,
errors: trace.errors.length,
}, "βœ… Pipeline trace completed");
// Persist to audit log
try {
const db = getSupabaseClient();
await db.from("audit_log").insert({
action: "pipeline_trace_completed",
entity_type: "discovery_run",
entity_id: trace.runId,
details: {
trace_id: traceId,
duration_ms: duration,
operations: trace.operationCount,
total_tokens: trace.totalTokens,
total_latency_ms: trace.totalLatencyMs,
error_count: trace.errors.length,
errors: trace.errors.slice(0, 10), // cap at 10
},
});
} catch (err) {
logger.warn({ err }, "Failed to persist trace β€” non-critical");
}
activeTraces.delete(traceId);
return trace;
}
/**
* Get active trace (for passing to LLM calls etc.)
*/
export function getTrace(traceId: string): PipelineTrace | undefined {
return activeTraces.get(traceId);
}