File size: 4,317 Bytes
bd28470
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
 * Pipeline Checkpoint System β€” Crash Recovery
 * 
 * Problem: Pipeline crashes at company #15 β†’ restarts β†’ 
 *          processes company #1-14 again = wasted API calls + duplicates
 * 
 * Solution: Save checkpoint after each major stage.
 *           On restart, resume from last checkpoint.
 * 
 * Stages (in order):
 *   scraped β†’ filtered β†’ contacts_found β†’ profiled β†’ scored β†’ completed
 */

import { getSupabaseClient } from "../supabase/client";
import { logger } from "../utils/logger";

export type PipelineStage =
  | "discovered"       // found in search results
  | "scraped"          // website scraped
  | "filtered"         // passed Gate 1 + 2
  | "contacts_found"   // decision makers identified
  | "emails_verified"  // emails found and verified
  | "profiled"         // LLM profile generated
  | "scored"           // score computed
  | "completed";       // fully processed

/**
 * Save checkpoint for a company in a specific run.
 * Stores intermediate data so pipeline can resume from this point.
 */
export async function saveCheckpoint(
  runId: string,
  domain: string,
  stage: PipelineStage,
  stageData: Record<string, unknown> = {}
): Promise<void> {
  const db = getSupabaseClient();
  
  const { error } = await db.from("pipeline_checkpoints").upsert(
    {
      run_id: runId,
      company_domain: domain,
      stage,
      stage_data: stageData,
      completed: stage === "completed",
      updated_at: new Date().toISOString(),
    },
    { onConflict: "run_id,company_domain" }
  );

  if (error) {
    logger.warn({ domain, stage, error: error.message }, "Checkpoint save failed β€” non-critical");
  }
}

/**
 * Get the last checkpoint for a domain in a run.
 * Returns null if no checkpoint exists (fresh start).
 */
export async function getCheckpoint(
  runId: string,
  domain: string
): Promise<{ stage: PipelineStage; stageData: Record<string, unknown> } | null> {
  const db = getSupabaseClient();
  
  const { data } = await db
    .from("pipeline_checkpoints")
    .select("stage, stage_data")
    .eq("run_id", runId)
    .eq("company_domain", domain)
    .maybeSingle();

  if (!data) return null;
  return { stage: data.stage as PipelineStage, stageData: data.stage_data ?? {} };
}

/**
 * Check if a domain was already fully processed in ANY recent run.
 * Prevents re-processing across separate runs (not just within one run).
 */
export async function isAlreadyProcessed(domain: string, withinDays = 30): Promise<boolean> {
  const db = getSupabaseClient();
  
  const cutoff = new Date();
  cutoff.setDate(cutoff.getDate() - withinDays);

  const { data } = await db
    .from("pipeline_checkpoints")
    .select("id")
    .eq("company_domain", domain)
    .eq("completed", true)
    .gte("updated_at", cutoff.toISOString())
    .limit(1)
    .maybeSingle();

  return !!data;
}

/**
 * Get all incomplete companies in a run (for resume).
 * Returns list of domains and their last stage.
 */
export async function getIncompleteCompanies(
  runId: string
): Promise<{ domain: string; stage: PipelineStage; stageData: Record<string, unknown> }[]> {
  const db = getSupabaseClient();
  
  const { data } = await db
    .from("pipeline_checkpoints")
    .select("company_domain, stage, stage_data")
    .eq("run_id", runId)
    .eq("completed", false);

  return (data ?? []).map((d) => ({
    domain: d.company_domain,
    stage: d.stage as PipelineStage,
    stageData: d.stage_data ?? {},
  }));
}

/**
 * Stage ordering β€” used to determine if we can skip ahead.
 */
const STAGE_ORDER: PipelineStage[] = [
  "discovered", "scraped", "filtered", "contacts_found",
  "emails_verified", "profiled", "scored", "completed",
];

export function isStageComplete(currentStage: PipelineStage, requiredStage: PipelineStage): boolean {
  return STAGE_ORDER.indexOf(currentStage) >= STAGE_ORDER.indexOf(requiredStage);
}

/**
 * Helper to determine where to resume processing for a company.
 */
export function getResumePoint(checkpoint: { stage: PipelineStage } | null): PipelineStage {
  if (!checkpoint) return "discovered";
  
  // Resume from the NEXT stage after the last completed one
  const idx = STAGE_ORDER.indexOf(checkpoint.stage);
  if (idx < 0 || idx >= STAGE_ORDER.length - 1) return "discovered";
  return STAGE_ORDER[idx + 1];
}