Spaces:
Running
Running
| /** | |
| * Pipeline Checkpoint System β Crash Recovery | |
| * | |
| * Problem: Pipeline crashes at company #15 β restarts β | |
| * processes company #1-14 again = wasted API calls + duplicates | |
| * | |
| * Solution: Save checkpoint after each major stage. | |
| * On restart, resume from last checkpoint. | |
| * | |
| * Stages (in order): | |
| * scraped β filtered β contacts_found β profiled β scored β completed | |
| */ | |
| import { getSupabaseClient } from "../supabase/client"; | |
| import { logger } from "../utils/logger"; | |
| export type PipelineStage = | |
| | "discovered" // found in search results | |
| | "scraped" // website scraped | |
| | "filtered" // passed Gate 1 + 2 | |
| | "contacts_found" // decision makers identified | |
| | "emails_verified" // emails found and verified | |
| | "profiled" // LLM profile generated | |
| | "scored" // score computed | |
| | "completed"; // fully processed | |
| /** | |
| * Save checkpoint for a company in a specific run. | |
| * Stores intermediate data so pipeline can resume from this point. | |
| */ | |
| export async function saveCheckpoint( | |
| runId: string, | |
| domain: string, | |
| stage: PipelineStage, | |
| stageData: Record<string, unknown> = {} | |
| ): Promise<void> { | |
| const db = getSupabaseClient(); | |
| const { error } = await db.from("pipeline_checkpoints").upsert( | |
| { | |
| run_id: runId, | |
| company_domain: domain, | |
| stage, | |
| stage_data: stageData, | |
| completed: stage === "completed", | |
| updated_at: new Date().toISOString(), | |
| }, | |
| { onConflict: "run_id,company_domain" } | |
| ); | |
| if (error) { | |
| logger.warn({ domain, stage, error: error.message }, "Checkpoint save failed β non-critical"); | |
| } | |
| } | |
| /** | |
| * Get the last checkpoint for a domain in a run. | |
| * Returns null if no checkpoint exists (fresh start). | |
| */ | |
| export async function getCheckpoint( | |
| runId: string, | |
| domain: string | |
| ): Promise<{ stage: PipelineStage; stageData: Record<string, unknown> } | null> { | |
| const db = getSupabaseClient(); | |
| const { data } = await db | |
| .from("pipeline_checkpoints") | |
| .select("stage, stage_data") | |
| .eq("run_id", runId) | |
| .eq("company_domain", domain) | |
| .maybeSingle(); | |
| if (!data) return null; | |
| return { stage: data.stage as PipelineStage, stageData: data.stage_data ?? {} }; | |
| } | |
| /** | |
| * Check if a domain was already fully processed in ANY recent run. | |
| * Prevents re-processing across separate runs (not just within one run). | |
| */ | |
| export async function isAlreadyProcessed(domain: string, withinDays = 30): Promise<boolean> { | |
| const db = getSupabaseClient(); | |
| const cutoff = new Date(); | |
| cutoff.setDate(cutoff.getDate() - withinDays); | |
| const { data } = await db | |
| .from("pipeline_checkpoints") | |
| .select("id") | |
| .eq("company_domain", domain) | |
| .eq("completed", true) | |
| .gte("updated_at", cutoff.toISOString()) | |
| .limit(1) | |
| .maybeSingle(); | |
| return !!data; | |
| } | |
| /** | |
| * Get all incomplete companies in a run (for resume). | |
| * Returns list of domains and their last stage. | |
| */ | |
| export async function getIncompleteCompanies( | |
| runId: string | |
| ): Promise<{ domain: string; stage: PipelineStage; stageData: Record<string, unknown> }[]> { | |
| const db = getSupabaseClient(); | |
| const { data } = await db | |
| .from("pipeline_checkpoints") | |
| .select("company_domain, stage, stage_data") | |
| .eq("run_id", runId) | |
| .eq("completed", false); | |
| return (data ?? []).map((d) => ({ | |
| domain: d.company_domain, | |
| stage: d.stage as PipelineStage, | |
| stageData: d.stage_data ?? {}, | |
| })); | |
| } | |
| /** | |
| * Stage ordering β used to determine if we can skip ahead. | |
| */ | |
| const STAGE_ORDER: PipelineStage[] = [ | |
| "discovered", "scraped", "filtered", "contacts_found", | |
| "emails_verified", "profiled", "scored", "completed", | |
| ]; | |
| export function isStageComplete(currentStage: PipelineStage, requiredStage: PipelineStage): boolean { | |
| return STAGE_ORDER.indexOf(currentStage) >= STAGE_ORDER.indexOf(requiredStage); | |
| } | |
| /** | |
| * Helper to determine where to resume processing for a company. | |
| */ | |
| export function getResumePoint(checkpoint: { stage: PipelineStage } | null): PipelineStage { | |
| if (!checkpoint) return "discovered"; | |
| // Resume from the NEXT stage after the last completed one | |
| const idx = STAGE_ORDER.indexOf(checkpoint.stage); | |
| if (idx < 0 || idx >= STAGE_ORDER.length - 1) return "discovered"; | |
| return STAGE_ORDER[idx + 1]; | |
| } | |