/** * Pipeline Checkpoint System — Crash Recovery * * Problem: Pipeline crashes at company #15 → restarts → * processes company #1-14 again = wasted API calls + duplicates * * Solution: Save checkpoint after each major stage. * On restart, resume from last checkpoint. * * Stages (in order): * scraped → filtered → contacts_found → profiled → scored → completed */ import { getSupabaseClient } from "../supabase/client"; import { logger } from "../utils/logger"; export type PipelineStage = | "discovered" // found in search results | "scraped" // website scraped | "filtered" // passed Gate 1 + 2 | "contacts_found" // decision makers identified | "emails_verified" // emails found and verified | "profiled" // LLM profile generated | "scored" // score computed | "completed"; // fully processed /** * Save checkpoint for a company in a specific run. * Stores intermediate data so pipeline can resume from this point. */ export async function saveCheckpoint( runId: string, domain: string, stage: PipelineStage, stageData: Record = {} ): Promise { const db = getSupabaseClient(); const { error } = await db.from("pipeline_checkpoints").upsert( { run_id: runId, company_domain: domain, stage, stage_data: stageData, completed: stage === "completed", updated_at: new Date().toISOString(), }, { onConflict: "run_id,company_domain" } ); if (error) { logger.warn({ domain, stage, error: error.message }, "Checkpoint save failed — non-critical"); } } /** * Get the last checkpoint for a domain in a run. * Returns null if no checkpoint exists (fresh start). */ export async function getCheckpoint( runId: string, domain: string ): Promise<{ stage: PipelineStage; stageData: Record } | null> { const db = getSupabaseClient(); const { data } = await db .from("pipeline_checkpoints") .select("stage, stage_data") .eq("run_id", runId) .eq("company_domain", domain) .maybeSingle(); if (!data) return null; return { stage: data.stage as PipelineStage, stageData: data.stage_data ?? {} }; } /** * Check if a domain was already fully processed in ANY recent run. * Prevents re-processing across separate runs (not just within one run). */ export async function isAlreadyProcessed(domain: string, withinDays = 30): Promise { const db = getSupabaseClient(); const cutoff = new Date(); cutoff.setDate(cutoff.getDate() - withinDays); const { data } = await db .from("pipeline_checkpoints") .select("id") .eq("company_domain", domain) .eq("completed", true) .gte("updated_at", cutoff.toISOString()) .limit(1) .maybeSingle(); return !!data; } /** * Get all incomplete companies in a run (for resume). * Returns list of domains and their last stage. */ export async function getIncompleteCompanies( runId: string ): Promise<{ domain: string; stage: PipelineStage; stageData: Record }[]> { const db = getSupabaseClient(); const { data } = await db .from("pipeline_checkpoints") .select("company_domain, stage, stage_data") .eq("run_id", runId) .eq("completed", false); return (data ?? []).map((d) => ({ domain: d.company_domain, stage: d.stage as PipelineStage, stageData: d.stage_data ?? {}, })); } /** * Stage ordering — used to determine if we can skip ahead. */ const STAGE_ORDER: PipelineStage[] = [ "discovered", "scraped", "filtered", "contacts_found", "emails_verified", "profiled", "scored", "completed", ]; export function isStageComplete(currentStage: PipelineStage, requiredStage: PipelineStage): boolean { return STAGE_ORDER.indexOf(currentStage) >= STAGE_ORDER.indexOf(requiredStage); } /** * Helper to determine where to resume processing for a company. */ export function getResumePoint(checkpoint: { stage: PipelineStage } | null): PipelineStage { if (!checkpoint) return "discovered"; // Resume from the NEXT stage after the last completed one const idx = STAGE_ORDER.indexOf(checkpoint.stage); if (idx < 0 || idx >= STAGE_ORDER.length - 1) return "discovered"; return STAGE_ORDER[idx + 1]; }