Spaces:
Running
Running
| /** | |
| * Trigger.dev Task Definitions β Phase 1 Pipeline | |
| * | |
| * 5 chained tasks instead of 1 monolithic function: | |
| * | |
| * Task 1: daily-scheduler β CRON 4 AM UTC β picks territory β triggers process-company | |
| * Task 2: process-company β scrape + pain detect + gate 2 β triggers enrich-contacts | |
| * Task 3: enrich-contacts β emails + classify + verify + LinkedIn + social β triggers ai-profile | |
| * Task 4: ai-profile-score β Python service β save β triggers hot-alert if needed | |
| * Task 5: daily-digest β CRON 6:30 AM UTC β collects today's results β Slack digest | |
| * | |
| * Benefits: | |
| * - Company #3 fails β only #3 retries, rest continue | |
| * - 3 companies process in parallel (concurrency limit) | |
| * - Each task has its own retry policy | |
| * - Dashboard shows exact failure point | |
| */ | |
| import { task, schedules, queue } from "@trigger.dev/sdk/v3"; | |
| import { getSupabaseClient } from "../../shared/supabase/client"; | |
| import { startTrace, recordOperation, endTrace } from "../../shared/observability/tracer"; | |
| import { saveCheckpoint, isAlreadyProcessed } from "../../shared/pipeline/checkpoint"; | |
| import { getNextTerritory, getDailyQuota, markTerritorySearched, isSystemPaused, buildTerritoryQueries } from "../lib/territory-manager"; | |
| import { scrapeCompanyWebsite } from "../lib/web-scraper"; | |
| import { detectPainSignals } from "../lib/pain-signal-detector"; | |
| import { enrichContacts } from "../lib/contact-enricher"; | |
| import { sendRunStarted, sendRunProgress, sendDailyDigest, sendHotLeadAlert } from "../../slack/slack-service"; | |
| import { logger } from "../../shared/utils/logger"; | |
| import { randomUUID } from "crypto"; | |
| import axios from "axios"; | |
| import { getEnv } from "../../shared/config/env"; | |
| // βββ Queue: max 3 companies processing simultaneously ββββββββ | |
| const companyQueue = queue({ | |
| name: "company-processing", | |
| concurrencyLimit: 3, | |
| }); | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // TASK 1: Daily Scheduler (CRON β runs every day at 4 AM UTC) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export const dailyScheduler = schedules.task({ | |
| id: "daily-lead-discovery", | |
| // Cron configured in Trigger.dev dashboard: 0 4 * * * (4 AM UTC = 9 AM PKT) | |
| maxDuration: 300, // 5 minutes for setup | |
| run: async () => { | |
| // Pre-flight | |
| if (await isSystemPaused()) { | |
| logger.info("βΈοΈ System paused β skipping today"); | |
| return { status: "paused" }; | |
| } | |
| const quota = await getDailyQuota(); | |
| const territories = await getNextTerritory(quota); | |
| if (territories.length === 0) { | |
| logger.warn("No fresh territory β all cooling down"); | |
| return { status: "no_territory" }; | |
| } | |
| // Create run record | |
| const db = getSupabaseClient(); | |
| const runId = randomUUID(); | |
| const traceId = startTrace(runId); | |
| const unit = territories[0]; | |
| await db.from("discovery_runs").insert({ | |
| id: runId, | |
| run_type: "auto", | |
| territory_id: unit.territoryId, | |
| country_code: unit.countryCode, | |
| city: unit.city, | |
| industry: unit.industry, | |
| quota_target: quota, | |
| status: "running", | |
| triggered_by: "system", | |
| }); | |
| // Slack: run started | |
| await sendRunStarted(`${unit.city}, ${unit.country}`, unit.industry, quota); | |
| // Search Google for companies | |
| const env = getEnv(); | |
| const allDomains: string[] = []; | |
| for (const territory of territories) { | |
| const queries = buildTerritoryQueries(territory, []); | |
| for (const query of queries) { | |
| try { | |
| const response = await axios.post( | |
| "https://google.serper.dev/search", | |
| { q: query, num: 10 }, | |
| { | |
| headers: { "X-API-KEY": env.SERPER_API_KEY, "Content-Type": "application/json" }, | |
| timeout: 8_000, | |
| } | |
| ); | |
| const organic = response.data?.organic ?? []; | |
| for (const result of organic) { | |
| try { | |
| const hostname = new URL(result.link).hostname.replace(/^www\./, ""); | |
| const skip = ["facebook.com", "linkedin.com", "twitter.com", "instagram.com", | |
| "youtube.com", "yelp.com", "yellowpages.com", "bbb.org", "wikipedia.org", | |
| "reddit.com", "crunchbase.com", "glassdoor.com"]; | |
| if (!skip.some(s => hostname.includes(s)) && !allDomains.includes(hostname)) { | |
| allDomains.push(hostname); | |
| } | |
| } catch { /* invalid URL */ } | |
| } | |
| } catch (err) { | |
| logger.warn({ query, err }, "Serper search failed β continuing"); | |
| } | |
| } | |
| } | |
| logger.info({ domains: allDomains.length, territory: unit.city }, "Domains found β triggering company tasks"); | |
| // Trigger Task 2 for each domain (queued, max 3 concurrent) | |
| const companyTasks = []; | |
| for (const domain of allDomains) { | |
| // Skip already processed | |
| if (await isAlreadyProcessed(domain, 30)) continue; | |
| companyTasks.push( | |
| processCompany.trigger({ | |
| domain, | |
| runId, | |
| traceId, | |
| industry: unit.industry, | |
| city: unit.city, | |
| country: unit.country, | |
| countryCode: unit.countryCode, | |
| territoryId: unit.territoryId, | |
| quota, | |
| linkedInUrl: null, | |
| }) | |
| ); | |
| } | |
| // Wait for all company tasks | |
| const results = await Promise.allSettled(companyTasks); | |
| const succeeded = results.filter(r => r.status === "fulfilled").length; | |
| // Mark territory searched | |
| await markTerritorySearched(unit.territoryId, unit.industry, succeeded); | |
| // Update run | |
| await db.from("discovery_runs").update({ | |
| status: "completed", | |
| companies_found: allDomains.length, | |
| completed_at: new Date().toISOString(), | |
| search_queries: buildTerritoryQueries(unit, []), | |
| }).eq("id", runId); | |
| await endTrace(traceId); | |
| return { | |
| status: "completed", | |
| domainsFound: allDomains.length, | |
| tasksTriggered: companyTasks.length, | |
| succeeded, | |
| }; | |
| }, | |
| }); | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // TASK 2: Process Company (per company, queued) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export const processCompany = task({ | |
| id: "process-company", | |
| queue: companyQueue, | |
| retry: { | |
| maxAttempts: 2, | |
| minTimeoutInMs: 5_000, | |
| maxTimeoutInMs: 30_000, | |
| factor: 2, | |
| }, | |
| maxDuration: 120, // 2 minutes per company | |
| run: async (payload: { | |
| domain: string; | |
| runId: string; | |
| traceId: string; | |
| industry: string; | |
| city: string; | |
| country: string; | |
| countryCode: string; | |
| territoryId: string; | |
| quota: number; | |
| linkedInUrl: string | null; | |
| }) => { | |
| const { domain, runId, traceId, industry, city, country } = payload; | |
| logger.info({ domain }, "Processing company"); | |
| // ββ Stage 1: Scrape website ββββββββββββββββββββββββββββββββ | |
| const websiteData = await scrapeCompanyWebsite(domain); | |
| if (!websiteData?.text) { | |
| await saveCheckpoint(runId, domain, "completed", { reason: "no_website" }); | |
| return { status: "skipped", reason: "no_website_data" }; | |
| } | |
| await saveCheckpoint(runId, domain, "scraped"); | |
| // ββ Stage 2: Pain signal detection + Gate 2 ββββββββββββββββ | |
| const painResult = await detectPainSignals( | |
| websiteData.name ?? domain, | |
| industry, | |
| websiteData.employeeCount ?? null, | |
| websiteData.text ?? "", | |
| websiteData.html ?? "", | |
| traceId | |
| ); | |
| // Gate 2: minimum 2 pain signals OR service match | |
| if (painResult.painSignals.length < 2 && !painResult.serviceMatch) { | |
| await saveCheckpoint(runId, domain, "completed", { reason: "gate2_failed" }); | |
| return { status: "skipped", reason: "gate2_failed" }; | |
| } | |
| await saveCheckpoint(runId, domain, "filtered"); | |
| // ββ Trigger Task 3: Enrich contacts ββββββββββββββββββββββββ | |
| const enrichResult = await enrichAndProfile.trigger({ | |
| domain, | |
| runId, | |
| traceId, | |
| industry, | |
| city, | |
| country, | |
| companyName: websiteData.name ?? domain, | |
| employeeCount: websiteData.employeeCount ?? null, | |
| description: websiteData.description ?? "", | |
| websiteText: (websiteData.text ?? "").slice(0, 800), | |
| websiteHtml: (websiteData.html ?? "").slice(0, 5000), | |
| techStack: websiteData.techStack ?? [], | |
| aiJobCount: websiteData.aiJobCount ?? 0, | |
| linkedInUrl: websiteData.linkedinUrl ?? null, | |
| painSignals: painResult.painSignals.map(p => p.signal), | |
| serviceMatch: painResult.serviceMatch, | |
| matchConfidence: painResult.matchConfidence, | |
| }); | |
| return { status: "passed_to_enrichment", domain }; | |
| }, | |
| }); | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // TASK 3: Enrich Contacts + AI Profile + Score (combined) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export const enrichAndProfile = task({ | |
| id: "enrich-and-profile", | |
| retry: { | |
| maxAttempts: 2, | |
| minTimeoutInMs: 3_000, | |
| maxTimeoutInMs: 20_000, | |
| factor: 2, | |
| }, | |
| maxDuration: 180, // 3 minutes (email verification can be slow) | |
| run: async (payload: { | |
| domain: string; | |
| runId: string; | |
| traceId: string; | |
| industry: string; | |
| city: string; | |
| country: string; | |
| companyName: string; | |
| employeeCount: number | null; | |
| description: string; | |
| websiteText: string; | |
| websiteHtml: string; | |
| techStack: string[]; | |
| aiJobCount: number; | |
| linkedInUrl: string | null; | |
| painSignals: string[]; | |
| serviceMatch: string | null; | |
| matchConfidence: number; | |
| }) => { | |
| const db = getSupabaseClient(); | |
| const env = getEnv(); | |
| // ββ Step 1: Enrich contacts ββββββββββββββββββββββββββββββ | |
| const contacts = await enrichContacts( | |
| "", | |
| payload.domain, | |
| payload.companyName, | |
| payload.employeeCount, | |
| payload.industry, | |
| payload.websiteText.slice(0, 300), | |
| payload.websiteHtml, | |
| payload.linkedInUrl, | |
| payload.traceId | |
| ); | |
| if (contacts.length === 0) { | |
| await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_contacts" }); | |
| return { status: "skipped", reason: "no_contacts" }; | |
| } | |
| // Must have authority-confirmed contact | |
| const authorityContacts = contacts.filter(c => c.authorityConfirmed); | |
| if (authorityContacts.length === 0) { | |
| await saveCheckpoint(payload.runId, payload.domain, "completed", { reason: "no_authority" }); | |
| return { status: "skipped", reason: "no_authority_contacts" }; | |
| } | |
| await saveCheckpoint(payload.runId, payload.domain, "emails_verified"); | |
| // ββ Step 2: Save company βββββββββββββββββββββββββββββββββ | |
| const companyId = randomUUID(); | |
| await db.from("companies").upsert({ | |
| id: companyId, | |
| domain: payload.domain, | |
| name: payload.companyName, | |
| industry: payload.industry, | |
| employee_count: payload.employeeCount, | |
| description: payload.description, | |
| website_status: "active", | |
| linkedin_url: payload.linkedInUrl, | |
| tech_stack: payload.techStack, | |
| country: payload.country, | |
| city: payload.city, | |
| service_match: payload.serviceMatch, | |
| service_match_score: Math.round(payload.matchConfidence * 100), | |
| pain_signals: payload.painSignals, | |
| trace_id: payload.traceId, | |
| }, { onConflict: "domain" }); | |
| // Update contacts with company_id | |
| for (const contact of contacts) { | |
| await db.from("contacts").update({ company_id: companyId }).eq("id", contact.id); | |
| } | |
| // ββ Step 3: AI Profile + Score (Python service) ββββββββββ | |
| const profileResponse = await axios.post( | |
| `${env.PYTHON_AI_SERVICE_URL}/profile`, | |
| { | |
| company: { | |
| id: companyId, | |
| name: payload.companyName, | |
| industry: payload.industry, | |
| employee_count: payload.employeeCount, | |
| description: payload.description, | |
| website_text: payload.websiteText, | |
| linkedin_description: "", | |
| tech_stack: payload.techStack, | |
| ai_job_count: payload.aiJobCount, | |
| pain_signals: payload.painSignals, | |
| service_match: payload.serviceMatch, | |
| }, | |
| contacts: contacts.map(c => ({ | |
| full_name: c.fullName, | |
| email: c.email, | |
| email_verified: c.emailVerification?.status === "verified_deliverable", | |
| linkedin_personal_url: c.linkedinPersonalUrl, | |
| social_profiles: c.socialProfiles ?? {}, | |
| })), | |
| trace_id: payload.traceId, | |
| }, | |
| { | |
| headers: { Authorization: `Bearer ${env.PYTHON_AI_SERVICE_SECRET}` }, | |
| timeout: 45_000, | |
| } | |
| ); | |
| const profile = profileResponse.data?.profile; | |
| const score = profileResponse.data?.score; | |
| const totalScore = score?.total_score ?? 0; | |
| const tier = score?.tier ?? "archive"; | |
| // Save profile and score | |
| await db.from("lead_profiles").upsert({ | |
| company_id: companyId, | |
| ...profile, | |
| }, { onConflict: "company_id" }); | |
| await db.from("lead_scores").upsert({ | |
| company_id: companyId, | |
| ...score, | |
| }, { onConflict: "company_id" }); | |
| // Update run stats | |
| if (totalScore >= 70) { | |
| await db.rpc("increment_run_leads", { run_id: payload.runId }); | |
| } | |
| await saveCheckpoint(payload.runId, payload.domain, "completed"); | |
| // ββ Step 4: Hot lead alert (85+) βββββββββββββββββββββββββ | |
| if (totalScore >= 85) { | |
| const best = authorityContacts[0]; | |
| await sendHotLeadAlert({ | |
| companyName: payload.companyName, | |
| domain: payload.domain, | |
| industry: payload.industry, | |
| employeeCount: payload.employeeCount, | |
| city: payload.city, | |
| score: totalScore, | |
| tier, | |
| contactName: best.fullName, | |
| contactTitle: best.title ?? "", | |
| email: best.email, | |
| emailVerified: best.emailVerification?.status === "verified_deliverable", | |
| linkedinPersonal: best.linkedinPersonalUrl, | |
| linkedinCompany: payload.linkedInUrl, | |
| serviceMatch: payload.serviceMatch, | |
| outreachAngle: profile?.outreach_angle ?? "", | |
| painPoints: (profile?.pain_points ?? []).slice(0, 3), | |
| socialProfiles: best.socialProfiles ?? {}, | |
| }); | |
| } | |
| recordOperation(payload.traceId, "enrich_and_profile", score?.tokens_used ?? 0, 0, true); | |
| return { | |
| status: "completed", | |
| domain: payload.domain, | |
| score: totalScore, | |
| tier, | |
| contactsFound: contacts.length, | |
| authorityConfirmed: authorityContacts.length, | |
| hasLinkedIn: contacts.some(c => c.linkedinPersonalUrl), | |
| }; | |
| }, | |
| }); | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // TASK 4: Daily Digest (CRON β runs at 6:30 AM UTC = 11:30 AM PKT) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export const dailyDigestTask = schedules.task({ | |
| id: "daily-digest", | |
| // Cron configured in Trigger.dev dashboard: 30 6 * * * (6:30 AM UTC) | |
| maxDuration: 60, | |
| run: async () => { | |
| const db = getSupabaseClient(); | |
| const today = new Date(); | |
| today.setHours(0, 0, 0, 0); | |
| // Get today's run stats | |
| const { data: runs } = await db | |
| .from("discovery_runs") | |
| .select("*") | |
| .gte("ran_at", today.toISOString()); | |
| const latestRun = runs?.[0]; | |
| if (!latestRun) { | |
| logger.info("No runs today β skipping digest"); | |
| return; | |
| } | |
| // Count today's leads by tier | |
| const { data: scores } = await db | |
| .from("lead_scores") | |
| .select("total_score, tier") | |
| .gte("created_at", today.toISOString()); | |
| const hotLeads = scores?.filter(s => s.tier === "hot").length ?? 0; | |
| const warmLeads = scores?.filter(s => s.tier === "warm").length ?? 0; | |
| const nurtureLeads = scores?.filter(s => s.tier === "nurture").length ?? 0; | |
| // Get token usage | |
| const { data: traces } = await db | |
| .from("llm_traces") | |
| .select("total_tokens") | |
| .gte("created_at", today.toISOString()); | |
| const totalTokens = traces?.reduce((sum, t) => sum + (t.total_tokens ?? 0), 0) ?? 0; | |
| await sendDailyDigest({ | |
| territory: `${latestRun.city}, ${latestRun.country_code}`, | |
| industry: latestRun.industry, | |
| companiesSearched: latestRun.companies_found ?? 0, | |
| leadsQualified: (scores?.length ?? 0), | |
| hotLeads, | |
| warmLeads, | |
| nurtureLeads, | |
| tokensUsed: totalTokens, | |
| durationMinutes: latestRun.completed_at | |
| ? Math.round((new Date(latestRun.completed_at).getTime() - new Date(latestRun.ran_at).getTime()) / 60_000) | |
| : 0, | |
| }); | |
| return { sent: true, leads: scores?.length ?? 0 }; | |
| }, | |
| }); | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| // TASK 5: Manual Discovery (triggered from Slack) | |
| // βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| export const manualDiscoveryTask = task({ | |
| id: "manual-discovery", | |
| maxDuration: 300, | |
| run: async (payload: { | |
| region: string; | |
| industry: string; | |
| maxCompanies: number; | |
| triggeredBy: string; | |
| }) => { | |
| // Reuse the daily scheduler logic but with custom territory | |
| logger.info({ payload }, "Manual discovery triggered from Slack"); | |
| // TODO: Build custom territory from region param | |
| // For now, trigger the same pipeline | |
| return { status: "manual_run_started", ...payload }; | |
| }, | |
| }); | |