/** * Trigger.dev Task Definitions — Phase 1 Pipeline * * 5 chained tasks instead of 1 monolithic function: * * Task 1: daily-scheduler → CRON 4 AM UTC → picks territory → triggers process-company * Task 2: process-company → scrape + pain detect + gate 2 → triggers enrich-contacts * Task 3: enrich-contacts → emails + classify + verify + LinkedIn + social → triggers ai-profile * Task 4: ai-profile-score → Python service → save → triggers hot-alert if needed * Task 5: daily-digest → CRON 6:30 AM UTC → collects today's results → Slack digest * * Benefits: * - Company #3 fails → only #3 retries, rest continue * - 3 companies process in parallel (concurrency limit) * - Each task has its own retry policy * - Dashboard shows exact failure point */ import { task, schedules, queue } from "@trigger.dev/sdk/v3"; import { getSupabaseClient } from "../../shared/supabase/client"; import { startTrace, recordOperation, endTrace } from "../../shared/observability/tracer"; import { saveCheckpoint, isAlreadyProcessed } from "../../shared/pipeline/checkpoint"; import { getNextTerritory, getDailyQuota, markTerritorySearched, isSystemPaused, buildTerritoryQueries } from "../lib/territory-manager"; import { scrapeCompanyWebsite } from "../lib/web-scraper"; import { detectPainSignals } from "../lib/pain-signal-detector"; import { enrichContacts } from "../lib/contact-enricher"; import { sendRunStarted, sendRunProgress, sendDailyDigest, sendHotLeadAlert } from "../../slack/slack-service"; import { logger } from "../../shared/utils/logger"; import { randomUUID } from "crypto"; import axios from "axios"; import { getEnv } from "../../shared/config/env"; // ─── Queue: max 3 companies processing simultaneously ──────── const companyQueue = queue({ name: "company-processing", concurrencyLimit: 3, }); // ═══════════════════════════════════════════════════════════════ // TASK 1: Daily Scheduler (CRON — runs every day at 4 AM UTC) // ═══════════════════════════════════════════════════════════════ export const dailyScheduler = schedules.task({ id: "daily-lead-discovery", // Cron configured in Trigger.dev dashboard: 0 4 * * * (4 AM UTC = 9 AM PKT) maxDuration: 300, // 5 minutes for setup run: async () => { // Pre-flight if (await isSystemPaused()) { logger.info("⏸️ System paused — skipping today"); return { status: "paused" }; } const quota = await getDailyQuota(); const territories = await getNextTerritory(quota); if (territories.length === 0) { logger.warn("No fresh territory — all cooling down"); return { status: "no_territory" }; } // Create run record const db = getSupabaseClient(); const runId = randomUUID(); const traceId = startTrace(runId); const unit = territories[0]; await db.from("discovery_runs").insert({ id: runId, run_type: "auto", territory_id: unit.territoryId, country_code: unit.countryCode, city: unit.city, industry: unit.industry, quota_target: quota, status: "running", triggered_by: "system", }); // Slack: run started await sendRunStarted(`${unit.city}, ${unit.country}`, unit.industry, quota); // Search Google for companies const env = getEnv(); const allDomains: string[] = []; for (const territory of territories) { const queries = buildTerritoryQueries(territory, []); for (const query of queries) { try { const response = await axios.post( "https://google.serper.dev/search", { q: query, num: 10 }, { headers: { "X-API-KEY": env.SERPER_API_KEY, "Content-Type": "application/json" }, timeout: 8_000, } ); const organic = response.data?.organic ?? []; for (const result of organic) { try { const hostname = new URL(result.link).hostname.replace(/^www\./, ""); const skip = ["facebook.com", "linkedin.com", "twitter.com", "instagram.com", "youtube.com", "yelp.com", "yellowpages.com", "bbb.org", "wikipedia.org", "reddit.com", "crunchbase.com", "glassdoor.com"]; if (!skip.some(s => hostname.includes(s)) && !allDomains.includes(hostname)) { allDomains.push(hostname); } } catch { /* invalid URL */ } } } catch (err) { logger.warn({ query, err }, "Serper search failed — continuing"); } } } logger.info({ domains: allDomains.length, territory: unit.city }, "Domains found — triggering company tasks"); // Trigger Task 2 for each domain (queued, max 3 concurrent) const companyTasks = []; for (const domain of allDomains) { // Skip already processed if (await isAlreadyProcessed(domain, 30)) continue; companyTasks.push( processCompany.trigger({ domain, runId, traceId, industry: unit.industry, city: unit.city, country: unit.country, countryCode: unit.countryCode, territoryId: unit.territoryId, quota, linkedInUrl: null, }) ); } // Wait for all company tasks const results = await Promise.allSettled(companyTasks); const succeeded = results.filter(r => r.status === "fulfilled").length; // Mark territory searched await markTerritorySearched(unit.territoryId, unit.industry, succeeded); // Update run await db.from("discovery_runs").update({ status: "completed", companies_found: allDomains.length, completed_at: new Date().toISOString(), search_queries: buildTerritoryQueries(unit, []), }).eq("id", runId); await endTrace(traceId); return { status: "completed", domainsFound: allDomains.length, tasksTriggered: companyTasks.length, succeeded, }; }, }); // ═══════════════════════════════════════════════════════════════ // TASK 2: Process Company (per company, queued) // ═══════════════════════════════════════════════════════════════ export const processCompany = task({ id: "process-company", queue: companyQueue, retry: { maxAttempts: 2, minTimeoutInMs: 5_000, maxTimeoutInMs: 30_000, factor: 2, }, maxDuration: 120, // 2 minutes per company run: async (payload: { domain: string; runId: string; traceId: string; industry: string; city: string; country: string; countryCode: string; territoryId: string; quota: number; linkedInUrl: string | null; }) => { const { domain, runId, traceId, industry, city, country } = payload; logger.info({ domain }, "Processing company"); // ── Stage 1: Scrape website ──────────────────────────────── const websiteData = await scrapeCompanyWebsite(domain); if (!websiteData?.text) { await saveCheckpoint(runId, domain, "completed", { reason: "no_website" }); return { status: "skipped", reason: "no_website_data" }; } await saveCheckpoint(runId, domain, "scraped"); // ── Stage 2: Pain signal detection + Gate 2 ──────────────── const painResult = await detectPainSignals( websiteData.name ?? domain, industry, websiteData.employeeCount ?? null, websiteData.text ?? "", websiteData.html ?? "", traceId ); // Gate 2: minimum 2 pain signals OR service match if (painResult.painSignals.length < 2 && !painResult.serviceMatch) { await saveCheckpoint(runId, domain, "completed", { reason: "gate2_failed" }); return { status: "skipped", reason: "gate2_failed" }; } await saveCheckpoint(runId, domain, "filtered"); // ── Trigger Task 3: Enrich contacts ──────────────────────── const enrichResult = await enrichAndProfile.trigger({ domain, runId, traceId, industry, city, country, companyName: websiteData.name ?? domain, employeeCount: websiteData.employeeCount ?? null, description: websiteData.description ?? "", websiteText: (websiteData.text ?? "").slice(0, 800), websiteHtml: (websiteData.html ?? "").slice(0, 5000), techStack: websiteData.techStack ?? [], aiJobCount: websiteData.aiJobCount ?? 0, linkedInUrl: websiteData.linkedinUrl ?? null, painSignals: painResult.painSignals.map(p => p.signal), serviceMatch: painResult.serviceMatch, matchConfidence: painResult.matchConfidence, }); return { status: "passed_to_enrichment", domain }; }, }); // ═══════════════════════════════════════════════════════════════ // TASK 3: Enrich Contacts + AI Profile + Score (combined) // ═══════════════════════════════════════════════════════════════ export const enrichAndProfile = task({ id: "enrich-and-profile", retry: { maxAttempts: 2, minTimeoutInMs: 3_000, maxTimeoutInMs: 20_000, factor: 2, }, maxDuration: 180, // 3 minutes (email verification can be slow) run: async (payload: { domain: string; runId: string; traceId: string; industry: string; city: string; country: string; companyName: string; employeeCount: number | null; description: string; websiteText: string; websiteHtml: string; techStack: string[]; aiJobCount: number; linkedInUrl: string | null; painSignals: string[]; serviceMatch: string | null; matchConfidence: number; }) => { const db = getSupabaseClient(); const env = getEnv(); // ── Step 1: Enrich contacts ────────────────────────────── const contacts = await enrichContacts( "", payload.domain, payload.companyName, payload.employeeCount, payload.industry, payload.websiteText.slice(0, 300), payload.websiteHtml, payload.linkedInUrl, payload.traceId ); if (contacts.length === 0) { await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_contacts" }); return { status: "skipped", reason: "no_contacts" }; } // Must have authority-confirmed contact const authorityContacts = contacts.filter(c => c.authorityConfirmed); if (authorityContacts.length === 0) { await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_authority" }); return { status: "skipped", reason: "no_authority_contacts" }; } await saveCheckpoint(payload.runId, payload.domain, "emails_verified"); // ── Step 2: Save company ───────────────────────────────── const companyId = randomUUID(); await db.from("companies").upsert({ id: companyId, domain: payload.domain, name: payload.companyName, industry: payload.industry, employee_count: payload.employeeCount, description: payload.description, website_status: "active", linkedin_url: payload.linkedInUrl, tech_stack: payload.techStack, country: payload.country, city: payload.city, service_match: payload.serviceMatch, service_match_score: Math.round(payload.matchConfidence * 100), pain_signals: payload.painSignals, trace_id: payload.traceId, }, { onConflict: "domain" }); // Update contacts with company_id for (const contact of contacts) { await db.from("contacts").update({ company_id: companyId }).eq("id", contact.id); } // ── Step 3: AI Profile + Score (Python service) ────────── const profileResponse = await axios.post( `${env.PYTHON_AI_SERVICE_URL}/profile`, { company: { id: companyId, name: payload.companyName, industry: payload.industry, employee_count: payload.employeeCount, description: payload.description, website_text: payload.websiteText, linkedin_description: "", tech_stack: payload.techStack, ai_job_count: payload.aiJobCount, pain_signals: payload.painSignals, service_match: payload.serviceMatch, }, contacts: contacts.map(c => ({ full_name: c.fullName, email: c.email, email_verified: c.emailVerification?.status === "verified_deliverable", linkedin_personal_url: c.linkedinPersonalUrl, social_profiles: c.socialProfiles ?? {}, })), trace_id: payload.traceId, }, { headers: { Authorization: `Bearer ${env.PYTHON_AI_SERVICE_SECRET}` }, timeout: 45_000, } ); const profile = profileResponse.data?.profile; const score = profileResponse.data?.score; const totalScore = score?.total_score ?? 0; const tier = score?.tier ?? "archive"; // Save profile and score await db.from("lead_profiles").upsert({ company_id: companyId, ...profile, }, { onConflict: "company_id" }); await db.from("lead_scores").upsert({ company_id: companyId, ...score, }, { onConflict: "company_id" }); // Update run stats if (totalScore >= 70) { await db.rpc("increment_run_leads", { run_id: payload.runId }); } await saveCheckpoint(payload.runId, payload.domain, "completed"); // ── Step 4: Hot lead alert (85+) ───────────────────────── if (totalScore >= 85) { const best = authorityContacts[0]; await sendHotLeadAlert({ companyName: payload.companyName, domain: payload.domain, industry: payload.industry, employeeCount: payload.employeeCount, city: payload.city, score: totalScore, tier, contactName: best.fullName, contactTitle: best.title ?? "", email: best.email, emailVerified: best.emailVerification?.status === "verified_deliverable", linkedinPersonal: best.linkedinPersonalUrl, linkedinCompany: payload.linkedInUrl, serviceMatch: payload.serviceMatch, outreachAngle: profile?.outreach_angle ?? "", painPoints: (profile?.pain_points ?? []).slice(0, 3), socialProfiles: best.socialProfiles ?? {}, }); } recordOperation(payload.traceId, "enrich_and_profile", score?.tokens_used ?? 0, 0, true); return { status: "completed", domain: payload.domain, score: totalScore, tier, contactsFound: contacts.length, authorityConfirmed: authorityContacts.length, hasLinkedIn: contacts.some(c => c.linkedinPersonalUrl), }; }, }); // ═══════════════════════════════════════════════════════════════ // TASK 4: Daily Digest (CRON — runs at 6:30 AM UTC = 11:30 AM PKT) // ═══════════════════════════════════════════════════════════════ export const dailyDigestTask = schedules.task({ id: "daily-digest", // Cron configured in Trigger.dev dashboard: 30 6 * * * (6:30 AM UTC) maxDuration: 60, run: async () => { const db = getSupabaseClient(); const today = new Date(); today.setHours(0, 0, 0, 0); // Get today's run stats const { data: runs } = await db .from("discovery_runs") .select("*") .gte("ran_at", today.toISOString()); const latestRun = runs?.[0]; if (!latestRun) { logger.info("No runs today — skipping digest"); return; } // Count today's leads by tier const { data: scores } = await db .from("lead_scores") .select("total_score, tier") .gte("created_at", today.toISOString()); const hotLeads = scores?.filter(s => s.tier === "hot").length ?? 0; const warmLeads = scores?.filter(s => s.tier === "warm").length ?? 0; const nurtureLeads = scores?.filter(s => s.tier === "nurture").length ?? 0; // Get token usage const { data: traces } = await db .from("llm_traces") .select("total_tokens") .gte("created_at", today.toISOString()); const totalTokens = traces?.reduce((sum, t) => sum + (t.total_tokens ?? 0), 0) ?? 0; await sendDailyDigest({ territory: `${latestRun.city}, ${latestRun.country_code}`, industry: latestRun.industry, companiesSearched: latestRun.companies_found ?? 0, leadsQualified: (scores?.length ?? 0), hotLeads, warmLeads, nurtureLeads, tokensUsed: totalTokens, durationMinutes: latestRun.completed_at ? Math.round((new Date(latestRun.completed_at).getTime() - new Date(latestRun.ran_at).getTime()) / 60_000) : 0, }); return { sent: true, leads: scores?.length ?? 0 }; }, }); // ═══════════════════════════════════════════════════════════════ // TASK 5: Manual Discovery (triggered from Slack) // ═══════════════════════════════════════════════════════════════ export const manualDiscoveryTask = task({ id: "manual-discovery", maxDuration: 300, run: async (payload: { region: string; industry: string; maxCompanies: number; triggeredBy: string; }) => { // Reuse the daily scheduler logic but with custom territory logger.info({ payload }, "Manual discovery triggered from Slack"); // TODO: Build custom territory from region param // For now, trigger the same pipeline return { status: "manual_run_started", ...payload }; }, });