Spaces:
Running
Running
File size: 4,317 Bytes
bd28470 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | /**
* Pipeline Checkpoint System β Crash Recovery
*
* Problem: Pipeline crashes at company #15 β restarts β
* processes company #1-14 again = wasted API calls + duplicates
*
* Solution: Save checkpoint after each major stage.
* On restart, resume from last checkpoint.
*
* Stages (in order):
* scraped β filtered β contacts_found β profiled β scored β completed
*/
import { getSupabaseClient } from "../supabase/client";
import { logger } from "../utils/logger";
export type PipelineStage =
| "discovered" // found in search results
| "scraped" // website scraped
| "filtered" // passed Gate 1 + 2
| "contacts_found" // decision makers identified
| "emails_verified" // emails found and verified
| "profiled" // LLM profile generated
| "scored" // score computed
| "completed"; // fully processed
/**
* Save checkpoint for a company in a specific run.
* Stores intermediate data so pipeline can resume from this point.
*/
export async function saveCheckpoint(
runId: string,
domain: string,
stage: PipelineStage,
stageData: Record<string, unknown> = {}
): Promise<void> {
const db = getSupabaseClient();
const { error } = await db.from("pipeline_checkpoints").upsert(
{
run_id: runId,
company_domain: domain,
stage,
stage_data: stageData,
completed: stage === "completed",
updated_at: new Date().toISOString(),
},
{ onConflict: "run_id,company_domain" }
);
if (error) {
logger.warn({ domain, stage, error: error.message }, "Checkpoint save failed β non-critical");
}
}
/**
* Get the last checkpoint for a domain in a run.
* Returns null if no checkpoint exists (fresh start).
*/
export async function getCheckpoint(
runId: string,
domain: string
): Promise<{ stage: PipelineStage; stageData: Record<string, unknown> } | null> {
const db = getSupabaseClient();
const { data } = await db
.from("pipeline_checkpoints")
.select("stage, stage_data")
.eq("run_id", runId)
.eq("company_domain", domain)
.maybeSingle();
if (!data) return null;
return { stage: data.stage as PipelineStage, stageData: data.stage_data ?? {} };
}
/**
* Check if a domain was already fully processed in ANY recent run.
* Prevents re-processing across separate runs (not just within one run).
*/
export async function isAlreadyProcessed(domain: string, withinDays = 30): Promise<boolean> {
const db = getSupabaseClient();
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - withinDays);
const { data } = await db
.from("pipeline_checkpoints")
.select("id")
.eq("company_domain", domain)
.eq("completed", true)
.gte("updated_at", cutoff.toISOString())
.limit(1)
.maybeSingle();
return !!data;
}
/**
* Get all incomplete companies in a run (for resume).
* Returns list of domains and their last stage.
*/
export async function getIncompleteCompanies(
runId: string
): Promise<{ domain: string; stage: PipelineStage; stageData: Record<string, unknown> }[]> {
const db = getSupabaseClient();
const { data } = await db
.from("pipeline_checkpoints")
.select("company_domain, stage, stage_data")
.eq("run_id", runId)
.eq("completed", false);
return (data ?? []).map((d) => ({
domain: d.company_domain,
stage: d.stage as PipelineStage,
stageData: d.stage_data ?? {},
}));
}
/**
* Stage ordering β used to determine if we can skip ahead.
*/
const STAGE_ORDER: PipelineStage[] = [
"discovered", "scraped", "filtered", "contacts_found",
"emails_verified", "profiled", "scored", "completed",
];
export function isStageComplete(currentStage: PipelineStage, requiredStage: PipelineStage): boolean {
return STAGE_ORDER.indexOf(currentStage) >= STAGE_ORDER.indexOf(requiredStage);
}
/**
* Helper to determine where to resume processing for a company.
*/
export function getResumePoint(checkpoint: { stage: PipelineStage } | null): PipelineStage {
if (!checkpoint) return "discovered";
// Resume from the NEXT stage after the last completed one
const idx = STAGE_ORDER.indexOf(checkpoint.stage);
if (idx < 0 || idx >= STAGE_ORDER.length - 1) return "discovered";
return STAGE_ORDER[idx + 1];
}
|