iDevBuddy
feat: Phase 1 β€” AI Client Acquisition System
bd28470
raw
history blame
4.32 kB
/**
* Pipeline Checkpoint System β€” Crash Recovery
*
* Problem: Pipeline crashes at company #15 β†’ restarts β†’
* processes company #1-14 again = wasted API calls + duplicates
*
* Solution: Save checkpoint after each major stage.
* On restart, resume from last checkpoint.
*
* Stages (in order):
* scraped β†’ filtered β†’ contacts_found β†’ profiled β†’ scored β†’ completed
*/
import { getSupabaseClient } from "../supabase/client";
import { logger } from "../utils/logger";
export type PipelineStage =
| "discovered" // found in search results
| "scraped" // website scraped
| "filtered" // passed Gate 1 + 2
| "contacts_found" // decision makers identified
| "emails_verified" // emails found and verified
| "profiled" // LLM profile generated
| "scored" // score computed
| "completed"; // fully processed
/**
* Save checkpoint for a company in a specific run.
* Stores intermediate data so pipeline can resume from this point.
*/
export async function saveCheckpoint(
runId: string,
domain: string,
stage: PipelineStage,
stageData: Record<string, unknown> = {}
): Promise<void> {
const db = getSupabaseClient();
const { error } = await db.from("pipeline_checkpoints").upsert(
{
run_id: runId,
company_domain: domain,
stage,
stage_data: stageData,
completed: stage === "completed",
updated_at: new Date().toISOString(),
},
{ onConflict: "run_id,company_domain" }
);
if (error) {
logger.warn({ domain, stage, error: error.message }, "Checkpoint save failed β€” non-critical");
}
}
/**
* Get the last checkpoint for a domain in a run.
* Returns null if no checkpoint exists (fresh start).
*/
export async function getCheckpoint(
runId: string,
domain: string
): Promise<{ stage: PipelineStage; stageData: Record<string, unknown> } | null> {
const db = getSupabaseClient();
const { data } = await db
.from("pipeline_checkpoints")
.select("stage, stage_data")
.eq("run_id", runId)
.eq("company_domain", domain)
.maybeSingle();
if (!data) return null;
return { stage: data.stage as PipelineStage, stageData: data.stage_data ?? {} };
}
/**
* Check if a domain was already fully processed in ANY recent run.
* Prevents re-processing across separate runs (not just within one run).
*/
export async function isAlreadyProcessed(domain: string, withinDays = 30): Promise<boolean> {
const db = getSupabaseClient();
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - withinDays);
const { data } = await db
.from("pipeline_checkpoints")
.select("id")
.eq("company_domain", domain)
.eq("completed", true)
.gte("updated_at", cutoff.toISOString())
.limit(1)
.maybeSingle();
return !!data;
}
/**
* Get all incomplete companies in a run (for resume).
* Returns list of domains and their last stage.
*/
export async function getIncompleteCompanies(
runId: string
): Promise<{ domain: string; stage: PipelineStage; stageData: Record<string, unknown> }[]> {
const db = getSupabaseClient();
const { data } = await db
.from("pipeline_checkpoints")
.select("company_domain, stage, stage_data")
.eq("run_id", runId)
.eq("completed", false);
return (data ?? []).map((d) => ({
domain: d.company_domain,
stage: d.stage as PipelineStage,
stageData: d.stage_data ?? {},
}));
}
/**
* Stage ordering β€” used to determine if we can skip ahead.
*/
const STAGE_ORDER: PipelineStage[] = [
"discovered", "scraped", "filtered", "contacts_found",
"emails_verified", "profiled", "scored", "completed",
];
export function isStageComplete(currentStage: PipelineStage, requiredStage: PipelineStage): boolean {
return STAGE_ORDER.indexOf(currentStage) >= STAGE_ORDER.indexOf(requiredStage);
}
/**
* Helper to determine where to resume processing for a company.
*/
export function getResumePoint(checkpoint: { stage: PipelineStage } | null): PipelineStage {
if (!checkpoint) return "discovered";
// Resume from the NEXT stage after the last completed one
const idx = STAGE_ORDER.indexOf(checkpoint.stage);
if (idx < 0 || idx >= STAGE_ORDER.length - 1) return "discovered";
return STAGE_ORDER[idx + 1];
}