import { task } from "@trigger.dev/sdk/v3"; import axios from "axios"; import { getEnv } from "../../shared/config/env"; import { getSupabaseClient } from "../../shared/supabase/client"; import { logger, auditLog } from "../../shared/utils/logger"; import { CompanyDiscoveredPayload } from "../../shared/supabase/schema"; /** * Profiling Router — Trigger.dev task that: * 1. Receives company.discovered event * 2. Calls Python AI service for LLM profiling + scoring * 3. Routes result: qualified → outreach queue, low score → nurture/archive */ export const profilingTask = task({ id: "profiling-router", maxDuration: 300, // 5 min per company run: async (payload: CompanyDiscoveredPayload) => { const { company_id, domain, name, region, source } = payload; const env = getEnv(); const db = getSupabaseClient(); logger.info({ company_id, domain }, "🧠 Profiling started"); try { // 1. Fetch company data from Supabase const { data: company, error: companyErr } = await db .from("companies") .select("*") .eq("id", company_id) .single(); if (companyErr || !company) { throw new Error(`Company not found: ${companyErr?.message}`); } // 2. Fetch contacts from Supabase const { data: contacts, error: contactsErr } = await db .from("contacts") .select("*") .eq("company_id", company_id); if (contactsErr) { throw new Error(`Failed to fetch contacts: ${contactsErr.message}`); } // 3. Structure payload and call Python AI Service const response = await axios.post( `${env.PYTHON_AI_SERVICE_URL}/profile`, { company: { id: company.id, name: company.name, industry: company.industry || "", employee_count: company.employee_count, description: company.description || "", website_text: company.description || "", // Using description as website excerpt fallback linkedin_description: "", tech_stack: company.tech_stack || [], ai_job_count: (company.growth_signals || []).filter((s: any) => s.type === "job_post").length, pain_signals: company.pain_signals || [], service_match: company.service_match, }, contacts: (contacts || []).map((c: any) => ({ full_name: c.full_name, email: c.email, email_verified: c.email_verified, linkedin_personal_url: c.linkedin_personal_url || c.linkedin_url, social_profiles: c.social_profiles || {}, })), trace_id: company.trace_id || `trace-${company_id.slice(0, 8)}`, }, { headers: { "Content-Type": "application/json", "Authorization": `Bearer ${env.PYTHON_AI_SERVICE_SECRET}`, }, timeout: 120_000, // 2 min timeout for LLM } ); const { profile, score, validation, meta } = response.data; const totalScore = score?.total_score ?? 0; const tier = score?.tier ?? "archive"; const needs_human_review = !(validation?.score_valid ?? true); logger.info( { company_id, score: totalScore, tier }, "✅ Profiling complete" ); // Save profile and score in DB await db.from("lead_profiles").upsert({ company_id, ...profile, }, { onConflict: "company_id" }); await db.from("lead_scores").upsert({ company_id, ...score, }, { onConflict: "company_id" }); // ── Route based on score tier ─────────────────────────── await routeByTier(company_id, { total_score: totalScore, tier, needs_human_review }, db, env); // ── Audit log ─────────────────────────────────────────── auditLog("lead_profiled", "company", { company_id, domain, score: totalScore, tier, is_fallback: meta?.is_fallback ?? false, }); return response.data; } catch (err: unknown) { // ── Python service unavailable → fallback ─────────────── if (axios.isAxiosError(err) && !err.response) { logger.error({ company_id, domain }, "Python service unreachable — queuing for review"); await db.from("human_review_queue").insert({ type: "score_anomaly", company_id, payload: { reason: "python_service_unavailable", domain }, }); return { success: false, reason: "python_service_unavailable" }; } throw err; } }, }); // ─── Score-based routing ────────────────────────────────────── async function routeByTier( companyId: string, result: { total_score: number; tier: string; needs_human_review: boolean }, db: ReturnType, env: ReturnType ) { const { tier, total_score, needs_human_review } = result; if (tier === "hot" || tier === "warm") { if (needs_human_review && env.HUMAN_REVIEW_ENABLED) { // Queue for human approval before outreach logger.info({ companyId, tier }, "Routing to human review queue"); await db.from("human_review_queue").insert({ type: "outreach_approval", company_id: companyId, payload: { score: total_score, tier, reason: "human_review_required" }, }); await notifySlack(companyId, total_score, tier, env, "review"); } else { // Qualified — trigger outreach (Step 3, to be built) logger.info({ companyId, tier, score: total_score }, "🚀 Routing to outreach queue"); await notifySlack(companyId, total_score, tier, env, "qualified"); // Future: trigger outreach task // await outreachTask.trigger({ company_id: companyId, tier }); } } else if (tier === "nurture") { logger.info({ companyId }, "Routing to nurture — re-score in 30 days"); // Future: schedule re-scoring task } else { logger.info({ companyId }, "Archived — score too low"); } } async function notifySlack( companyId: string, score: number, tier: string, env: ReturnType, type: "qualified" | "review" ) { try { const db = getSupabaseClient(); const { data: company } = await db .from("companies") .select("name, domain, industry, employee_count") .eq("id", companyId) .single(); if (!company) return; const emoji = tier === "hot" ? "🔥" : "✅"; const action = type === "review" ? "⏳ Needs Review" : "📤 Ready for Outreach"; const message = { text: `${emoji} New Qualified Lead — ${action}`, blocks: [ { type: "section", text: { type: "mrkdwn", text: `*${emoji} ${company.name}*\n${action}\n\n` + `• *Score:* ${score}/100 — ${tier.toUpperCase()}\n` + `• *Industry:* ${company.industry ?? "Unknown"}\n` + `• *Employees:* ${company.employee_count ?? "Unknown"}\n` + `• *Domain:* ${company.domain}`, }, }, ], }; const channelId = type === "review" ? env.SLACK_REVIEW_CHANNEL_ID : env.SLACK_ALERT_CHANNEL_ID; const res = await axios.post("https://slack.com/api/chat.postMessage", { channel: channelId, ...message, }, { headers: { Authorization: `Bearer ${env.SLACK_BOT_TOKEN}` }, }); if (res.data && res.data.ok === false) { logger.warn({ error: res.data.error }, "Slack API responded with error. Make sure to invite the bot to the channel!"); } } catch (err) { logger.warn({ err }, "Slack notification failed — non-critical"); } }