iDevBuddy
feat: Phase 1 β€” AI Client Acquisition System
bd28470
/**
* Trigger.dev Task Definitions β€” Phase 1 Pipeline
*
* 5 chained tasks instead of 1 monolithic function:
*
* Task 1: daily-scheduler β†’ CRON 4 AM UTC β†’ picks territory β†’ triggers process-company
* Task 2: process-company β†’ scrape + pain detect + gate 2 β†’ triggers enrich-contacts
* Task 3: enrich-contacts β†’ emails + classify + verify + LinkedIn + social β†’ triggers ai-profile
* Task 4: ai-profile-score β†’ Python service β†’ save β†’ triggers hot-alert if needed
* Task 5: daily-digest β†’ CRON 6:30 AM UTC β†’ collects today's results β†’ Slack digest
*
* Benefits:
* - Company #3 fails β†’ only #3 retries, rest continue
* - 3 companies process in parallel (concurrency limit)
* - Each task has its own retry policy
* - Dashboard shows exact failure point
*/
import { task, schedules, queue } from "@trigger.dev/sdk/v3";
import { getSupabaseClient } from "../../shared/supabase/client";
import { startTrace, recordOperation, endTrace } from "../../shared/observability/tracer";
import { saveCheckpoint, isAlreadyProcessed } from "../../shared/pipeline/checkpoint";
import { getNextTerritory, getDailyQuota, markTerritorySearched, isSystemPaused, buildTerritoryQueries } from "../lib/territory-manager";
import { scrapeCompanyWebsite } from "../lib/web-scraper";
import { detectPainSignals } from "../lib/pain-signal-detector";
import { enrichContacts } from "../lib/contact-enricher";
import { sendRunStarted, sendRunProgress, sendDailyDigest, sendHotLeadAlert } from "../../slack/slack-service";
import { logger } from "../../shared/utils/logger";
import { randomUUID } from "crypto";
import axios from "axios";
import { getEnv } from "../../shared/config/env";
// ─── Queue: max 3 companies processing simultaneously ────────
const companyQueue = queue({
name: "company-processing",
concurrencyLimit: 3,
});
// ═══════════════════════════════════════════════════════════════
// TASK 1: Daily Scheduler (CRON β€” runs every day at 4 AM UTC)
// ═══════════════════════════════════════════════════════════════
export const dailyScheduler = schedules.task({
id: "daily-lead-discovery",
// Cron configured in Trigger.dev dashboard: 0 4 * * * (4 AM UTC = 9 AM PKT)
maxDuration: 300, // 5 minutes for setup
run: async () => {
// Pre-flight
if (await isSystemPaused()) {
logger.info("⏸️ System paused β€” skipping today");
return { status: "paused" };
}
const quota = await getDailyQuota();
const territories = await getNextTerritory(quota);
if (territories.length === 0) {
logger.warn("No fresh territory β€” all cooling down");
return { status: "no_territory" };
}
// Create run record
const db = getSupabaseClient();
const runId = randomUUID();
const traceId = startTrace(runId);
const unit = territories[0];
await db.from("discovery_runs").insert({
id: runId,
run_type: "auto",
territory_id: unit.territoryId,
country_code: unit.countryCode,
city: unit.city,
industry: unit.industry,
quota_target: quota,
status: "running",
triggered_by: "system",
});
// Slack: run started
await sendRunStarted(`${unit.city}, ${unit.country}`, unit.industry, quota);
// Search Google for companies
const env = getEnv();
const allDomains: string[] = [];
for (const territory of territories) {
const queries = buildTerritoryQueries(territory, []);
for (const query of queries) {
try {
const response = await axios.post(
"https://google.serper.dev/search",
{ q: query, num: 10 },
{
headers: { "X-API-KEY": env.SERPER_API_KEY, "Content-Type": "application/json" },
timeout: 8_000,
}
);
const organic = response.data?.organic ?? [];
for (const result of organic) {
try {
const hostname = new URL(result.link).hostname.replace(/^www\./, "");
const skip = ["facebook.com", "linkedin.com", "twitter.com", "instagram.com",
"youtube.com", "yelp.com", "yellowpages.com", "bbb.org", "wikipedia.org",
"reddit.com", "crunchbase.com", "glassdoor.com"];
if (!skip.some(s => hostname.includes(s)) && !allDomains.includes(hostname)) {
allDomains.push(hostname);
}
} catch { /* invalid URL */ }
}
} catch (err) {
logger.warn({ query, err }, "Serper search failed β€” continuing");
}
}
}
logger.info({ domains: allDomains.length, territory: unit.city }, "Domains found β€” triggering company tasks");
// Trigger Task 2 for each domain (queued, max 3 concurrent)
const companyTasks = [];
for (const domain of allDomains) {
// Skip already processed
if (await isAlreadyProcessed(domain, 30)) continue;
companyTasks.push(
processCompany.trigger({
domain,
runId,
traceId,
industry: unit.industry,
city: unit.city,
country: unit.country,
countryCode: unit.countryCode,
territoryId: unit.territoryId,
quota,
linkedInUrl: null,
})
);
}
// Wait for all company tasks
const results = await Promise.allSettled(companyTasks);
const succeeded = results.filter(r => r.status === "fulfilled").length;
// Mark territory searched
await markTerritorySearched(unit.territoryId, unit.industry, succeeded);
// Update run
await db.from("discovery_runs").update({
status: "completed",
companies_found: allDomains.length,
completed_at: new Date().toISOString(),
search_queries: buildTerritoryQueries(unit, []),
}).eq("id", runId);
await endTrace(traceId);
return {
status: "completed",
domainsFound: allDomains.length,
tasksTriggered: companyTasks.length,
succeeded,
};
},
});
// ═══════════════════════════════════════════════════════════════
// TASK 2: Process Company (per company, queued)
// ═══════════════════════════════════════════════════════════════
export const processCompany = task({
id: "process-company",
queue: companyQueue,
retry: {
maxAttempts: 2,
minTimeoutInMs: 5_000,
maxTimeoutInMs: 30_000,
factor: 2,
},
maxDuration: 120, // 2 minutes per company
run: async (payload: {
domain: string;
runId: string;
traceId: string;
industry: string;
city: string;
country: string;
countryCode: string;
territoryId: string;
quota: number;
linkedInUrl: string | null;
}) => {
const { domain, runId, traceId, industry, city, country } = payload;
logger.info({ domain }, "Processing company");
// ── Stage 1: Scrape website ────────────────────────────────
const websiteData = await scrapeCompanyWebsite(domain);
if (!websiteData?.text) {
await saveCheckpoint(runId, domain, "completed", { reason: "no_website" });
return { status: "skipped", reason: "no_website_data" };
}
await saveCheckpoint(runId, domain, "scraped");
// ── Stage 2: Pain signal detection + Gate 2 ────────────────
const painResult = await detectPainSignals(
websiteData.name ?? domain,
industry,
websiteData.employeeCount ?? null,
websiteData.text ?? "",
websiteData.html ?? "",
traceId
);
// Gate 2: minimum 2 pain signals OR service match
if (painResult.painSignals.length < 2 && !painResult.serviceMatch) {
await saveCheckpoint(runId, domain, "completed", { reason: "gate2_failed" });
return { status: "skipped", reason: "gate2_failed" };
}
await saveCheckpoint(runId, domain, "filtered");
// ── Trigger Task 3: Enrich contacts ────────────────────────
const enrichResult = await enrichAndProfile.trigger({
domain,
runId,
traceId,
industry,
city,
country,
companyName: websiteData.name ?? domain,
employeeCount: websiteData.employeeCount ?? null,
description: websiteData.description ?? "",
websiteText: (websiteData.text ?? "").slice(0, 800),
websiteHtml: (websiteData.html ?? "").slice(0, 5000),
techStack: websiteData.techStack ?? [],
aiJobCount: websiteData.aiJobCount ?? 0,
linkedInUrl: websiteData.linkedinUrl ?? null,
painSignals: painResult.painSignals.map(p => p.signal),
serviceMatch: painResult.serviceMatch,
matchConfidence: painResult.matchConfidence,
});
return { status: "passed_to_enrichment", domain };
},
});
// ═══════════════════════════════════════════════════════════════
// TASK 3: Enrich Contacts + AI Profile + Score (combined)
// ═══════════════════════════════════════════════════════════════
export const enrichAndProfile = task({
id: "enrich-and-profile",
retry: {
maxAttempts: 2,
minTimeoutInMs: 3_000,
maxTimeoutInMs: 20_000,
factor: 2,
},
maxDuration: 180, // 3 minutes (email verification can be slow)
run: async (payload: {
domain: string;
runId: string;
traceId: string;
industry: string;
city: string;
country: string;
companyName: string;
employeeCount: number | null;
description: string;
websiteText: string;
websiteHtml: string;
techStack: string[];
aiJobCount: number;
linkedInUrl: string | null;
painSignals: string[];
serviceMatch: string | null;
matchConfidence: number;
}) => {
const db = getSupabaseClient();
const env = getEnv();
// ── Step 1: Enrich contacts ──────────────────────────────
const contacts = await enrichContacts(
"",
payload.domain,
payload.companyName,
payload.employeeCount,
payload.industry,
payload.websiteText.slice(0, 300),
payload.websiteHtml,
payload.linkedInUrl,
payload.traceId
);
if (contacts.length === 0) {
await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_contacts" });
return { status: "skipped", reason: "no_contacts" };
}
// Must have authority-confirmed contact
const authorityContacts = contacts.filter(c => c.authorityConfirmed);
if (authorityContacts.length === 0) {
await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_authority" });
return { status: "skipped", reason: "no_authority_contacts" };
}
await saveCheckpoint(payload.runId, payload.domain, "emails_verified");
// ── Step 2: Save company ─────────────────────────────────
const companyId = randomUUID();
await db.from("companies").upsert({
id: companyId,
domain: payload.domain,
name: payload.companyName,
industry: payload.industry,
employee_count: payload.employeeCount,
description: payload.description,
website_status: "active",
linkedin_url: payload.linkedInUrl,
tech_stack: payload.techStack,
country: payload.country,
city: payload.city,
service_match: payload.serviceMatch,
service_match_score: Math.round(payload.matchConfidence * 100),
pain_signals: payload.painSignals,
trace_id: payload.traceId,
}, { onConflict: "domain" });
// Update contacts with company_id
for (const contact of contacts) {
await db.from("contacts").update({ company_id: companyId }).eq("id", contact.id);
}
// ── Step 3: AI Profile + Score (Python service) ──────────
const profileResponse = await axios.post(
`${env.PYTHON_AI_SERVICE_URL}/profile`,
{
company: {
id: companyId,
name: payload.companyName,
industry: payload.industry,
employee_count: payload.employeeCount,
description: payload.description,
website_text: payload.websiteText,
linkedin_description: "",
tech_stack: payload.techStack,
ai_job_count: payload.aiJobCount,
pain_signals: payload.painSignals,
service_match: payload.serviceMatch,
},
contacts: contacts.map(c => ({
full_name: c.fullName,
email: c.email,
email_verified: c.emailVerification?.status === "verified_deliverable",
linkedin_personal_url: c.linkedinPersonalUrl,
social_profiles: c.socialProfiles ?? {},
})),
trace_id: payload.traceId,
},
{
headers: { Authorization: `Bearer ${env.PYTHON_AI_SERVICE_SECRET}` },
timeout: 45_000,
}
);
const profile = profileResponse.data?.profile;
const score = profileResponse.data?.score;
const totalScore = score?.total_score ?? 0;
const tier = score?.tier ?? "archive";
// Save profile and score
await db.from("lead_profiles").upsert({
company_id: companyId,
...profile,
}, { onConflict: "company_id" });
await db.from("lead_scores").upsert({
company_id: companyId,
...score,
}, { onConflict: "company_id" });
// Update run stats
if (totalScore >= 70) {
await db.rpc("increment_run_leads", { run_id: payload.runId });
}
await saveCheckpoint(payload.runId, payload.domain, "completed");
// ── Step 4: Hot lead alert (85+) ─────────────────────────
if (totalScore >= 85) {
const best = authorityContacts[0];
await sendHotLeadAlert({
companyName: payload.companyName,
domain: payload.domain,
industry: payload.industry,
employeeCount: payload.employeeCount,
city: payload.city,
score: totalScore,
tier,
contactName: best.fullName,
contactTitle: best.title ?? "",
email: best.email,
emailVerified: best.emailVerification?.status === "verified_deliverable",
linkedinPersonal: best.linkedinPersonalUrl,
linkedinCompany: payload.linkedInUrl,
serviceMatch: payload.serviceMatch,
outreachAngle: profile?.outreach_angle ?? "",
painPoints: (profile?.pain_points ?? []).slice(0, 3),
socialProfiles: best.socialProfiles ?? {},
});
}
recordOperation(payload.traceId, "enrich_and_profile", score?.tokens_used ?? 0, 0, true);
return {
status: "completed",
domain: payload.domain,
score: totalScore,
tier,
contactsFound: contacts.length,
authorityConfirmed: authorityContacts.length,
hasLinkedIn: contacts.some(c => c.linkedinPersonalUrl),
};
},
});
// ═══════════════════════════════════════════════════════════════
// TASK 4: Daily Digest (CRON β€” runs at 6:30 AM UTC = 11:30 AM PKT)
// ═══════════════════════════════════════════════════════════════
export const dailyDigestTask = schedules.task({
id: "daily-digest",
// Cron configured in Trigger.dev dashboard: 30 6 * * * (6:30 AM UTC)
maxDuration: 60,
run: async () => {
const db = getSupabaseClient();
const today = new Date();
today.setHours(0, 0, 0, 0);
// Get today's run stats
const { data: runs } = await db
.from("discovery_runs")
.select("*")
.gte("ran_at", today.toISOString());
const latestRun = runs?.[0];
if (!latestRun) {
logger.info("No runs today β€” skipping digest");
return;
}
// Count today's leads by tier
const { data: scores } = await db
.from("lead_scores")
.select("total_score, tier")
.gte("created_at", today.toISOString());
const hotLeads = scores?.filter(s => s.tier === "hot").length ?? 0;
const warmLeads = scores?.filter(s => s.tier === "warm").length ?? 0;
const nurtureLeads = scores?.filter(s => s.tier === "nurture").length ?? 0;
// Get token usage
const { data: traces } = await db
.from("llm_traces")
.select("total_tokens")
.gte("created_at", today.toISOString());
const totalTokens = traces?.reduce((sum, t) => sum + (t.total_tokens ?? 0), 0) ?? 0;
await sendDailyDigest({
territory: `${latestRun.city}, ${latestRun.country_code}`,
industry: latestRun.industry,
companiesSearched: latestRun.companies_found ?? 0,
leadsQualified: (scores?.length ?? 0),
hotLeads,
warmLeads,
nurtureLeads,
tokensUsed: totalTokens,
durationMinutes: latestRun.completed_at
? Math.round((new Date(latestRun.completed_at).getTime() - new Date(latestRun.ran_at).getTime()) / 60_000)
: 0,
});
return { sent: true, leads: scores?.length ?? 0 };
},
});
// ═══════════════════════════════════════════════════════════════
// TASK 5: Manual Discovery (triggered from Slack)
// ═══════════════════════════════════════════════════════════════
export const manualDiscoveryTask = task({
id: "manual-discovery",
maxDuration: 300,
run: async (payload: {
region: string;
industry: string;
maxCompanies: number;
triggeredBy: string;
}) => {
// Reuse the daily scheduler logic but with custom territory
logger.info({ payload }, "Manual discovery triggered from Slack");
// TODO: Build custom territory from region param
// For now, trigger the same pipeline
return { status: "manual_run_started", ...payload };
},
});