Spaces:
Sleeping
Sleeping
| import { task } from "@trigger.dev/sdk/v3"; | |
| import axios from "axios"; | |
| import { getEnv } from "../../shared/config/env"; | |
| import { getSupabaseClient } from "../../shared/supabase/client"; | |
| import { logger, auditLog } from "../../shared/utils/logger"; | |
| import { CompanyDiscoveredPayload } from "../../shared/supabase/schema"; | |
| /** | |
| * Profiling Router β Trigger.dev task that: | |
| * 1. Receives company.discovered event | |
| * 2. Calls Python AI service for LLM profiling + scoring | |
| * 3. Routes result: qualified β outreach queue, low score β nurture/archive | |
| */ | |
| export const profilingTask = task({ | |
| id: "profiling-router", | |
| maxDuration: 300, // 5 min per company | |
| run: async (payload: CompanyDiscoveredPayload) => { | |
| const { company_id, domain, name, region, source } = payload; | |
| const env = getEnv(); | |
| const db = getSupabaseClient(); | |
| logger.info({ company_id, domain }, "π§ Profiling started"); | |
| try { | |
| // ββ Call Python AI Service ββββββββββββββββββββββββββββββ | |
| const response = await axios.post( | |
| `${env.PYTHON_AI_SERVICE_URL}/profile`, | |
| { company_id, domain, name, region, source }, | |
| { | |
| headers: { | |
| "Content-Type": "application/json", | |
| "x-service-secret": env.PYTHON_AI_SERVICE_SECRET, | |
| }, | |
| timeout: 120_000, // 2 min timeout for LLM | |
| } | |
| ); | |
| const result = response.data; | |
| logger.info( | |
| { company_id, score: result.total_score, tier: result.tier }, | |
| "β Profiling complete" | |
| ); | |
| // ββ Route based on score tier βββββββββββββββββββββββββββ | |
| await routeByTier(company_id, result, db, env); | |
| // ββ Audit log βββββββββββββββββββββββββββββββββββββββββββ | |
| auditLog("lead_profiled", "company", { | |
| company_id, | |
| domain, | |
| score: result.total_score, | |
| tier: result.tier, | |
| is_fallback: result.is_fallback, | |
| }); | |
| return result; | |
| } catch (err: unknown) { | |
| // ββ Python service unavailable β fallback βββββββββββββββ | |
| if (axios.isAxiosError(err) && !err.response) { | |
| logger.error({ company_id, domain }, "Python service unreachable β queuing for review"); | |
| await db.from("human_review_queue").insert({ | |
| type: "score_anomaly", | |
| company_id, | |
| payload: { reason: "python_service_unavailable", domain }, | |
| }); | |
| return { success: false, reason: "python_service_unavailable" }; | |
| } | |
| throw err; | |
| } | |
| }, | |
| }); | |
| // βββ Score-based routing ββββββββββββββββββββββββββββββββββββββ | |
| async function routeByTier( | |
| companyId: string, | |
| result: { total_score: number; tier: string; needs_human_review: boolean }, | |
| db: ReturnType<typeof getSupabaseClient>, | |
| env: ReturnType<typeof getEnv> | |
| ) { | |
| const { tier, total_score, needs_human_review } = result; | |
| if (tier === "hot" || tier === "warm") { | |
| if (needs_human_review && env.HUMAN_REVIEW_ENABLED) { | |
| // Queue for human approval before outreach | |
| logger.info({ companyId, tier }, "Routing to human review queue"); | |
| await db.from("human_review_queue").insert({ | |
| type: "outreach_approval", | |
| company_id: companyId, | |
| payload: { score: total_score, tier, reason: "human_review_required" }, | |
| }); | |
| await notifySlack(companyId, total_score, tier, env, "review"); | |
| } else { | |
| // Qualified β trigger outreach (Step 3, to be built) | |
| logger.info({ companyId, tier, score: total_score }, "π Routing to outreach queue"); | |
| await notifySlack(companyId, total_score, tier, env, "qualified"); | |
| // Future: trigger outreach task | |
| // await outreachTask.trigger({ company_id: companyId, tier }); | |
| } | |
| } else if (tier === "nurture") { | |
| logger.info({ companyId }, "Routing to nurture β re-score in 30 days"); | |
| // Future: schedule re-scoring task | |
| } else { | |
| logger.info({ companyId }, "Archived β score too low"); | |
| } | |
| } | |
| async function notifySlack( | |
| companyId: string, | |
| score: number, | |
| tier: string, | |
| env: ReturnType<typeof getEnv>, | |
| type: "qualified" | "review" | |
| ) { | |
| try { | |
| const db = getSupabaseClient(); | |
| const { data: company } = await db | |
| .from("companies") | |
| .select("name, domain, industry, employee_count") | |
| .eq("id", companyId) | |
| .single(); | |
| if (!company) return; | |
| const emoji = tier === "hot" ? "π₯" : "β "; | |
| const action = type === "review" ? "β³ Needs Review" : "π€ Ready for Outreach"; | |
| const message = { | |
| text: `${emoji} New Qualified Lead β ${action}`, | |
| blocks: [ | |
| { | |
| type: "section", | |
| text: { | |
| type: "mrkdwn", | |
| text: `*${emoji} ${company.name}*\n${action}\n\n` + | |
| `β’ *Score:* ${score}/100 β ${tier.toUpperCase()}\n` + | |
| `β’ *Industry:* ${company.industry ?? "Unknown"}\n` + | |
| `β’ *Employees:* ${company.employee_count ?? "Unknown"}\n` + | |
| `β’ *Domain:* ${company.domain}`, | |
| }, | |
| }, | |
| ], | |
| }; | |
| const channelId = type === "review" ? env.SLACK_REVIEW_CHANNEL_ID : env.SLACK_ALERT_CHANNEL_ID; | |
| await axios.post("https://slack.com/api/chat.postMessage", { | |
| channel: channelId, | |
| ...message, | |
| }, { | |
| headers: { Authorization: `Bearer ${env.SLACK_BOT_TOKEN}` }, | |
| }); | |
| } catch (err) { | |
| logger.warn({ err }, "Slack notification failed β non-critical"); | |
| } | |
| } | |