iDevBuddy
feat: Add Slack Events integration, Dockerfiles, and Hugging Face deployment config
5f138d4
raw
history blame
7.95 kB
import { task } from "@trigger.dev/sdk/v3";
import axios from "axios";
import { getEnv } from "../../shared/config/env";
import { getSupabaseClient } from "../../shared/supabase/client";
import { logger, auditLog } from "../../shared/utils/logger";
import { CompanyDiscoveredPayload } from "../../shared/supabase/schema";
/**
* Profiling Router β€” Trigger.dev task that:
* 1. Receives company.discovered event
* 2. Calls Python AI service for LLM profiling + scoring
* 3. Routes result: qualified β†’ outreach queue, low score β†’ nurture/archive
*/
export const profilingTask = task({
id: "profiling-router",
maxDuration: 300, // 5 min per company
run: async (payload: CompanyDiscoveredPayload) => {
const { company_id, domain, name, region, source } = payload;
const env = getEnv();
const db = getSupabaseClient();
logger.info({ company_id, domain }, "🧠 Profiling started");
try {
// 1. Fetch company data from Supabase
const { data: company, error: companyErr } = await db
.from("companies")
.select("*")
.eq("id", company_id)
.single();
if (companyErr || !company) {
throw new Error(`Company not found: ${companyErr?.message}`);
}
// 2. Fetch contacts from Supabase
const { data: contacts, error: contactsErr } = await db
.from("contacts")
.select("*")
.eq("company_id", company_id);
if (contactsErr) {
throw new Error(`Failed to fetch contacts: ${contactsErr.message}`);
}
// 3. Structure payload and call Python AI Service
const response = await axios.post(
`${env.PYTHON_AI_SERVICE_URL}/profile`,
{
company: {
id: company.id,
name: company.name,
industry: company.industry || "",
employee_count: company.employee_count,
description: company.description || "",
website_text: company.description || "", // Using description as website excerpt fallback
linkedin_description: "",
tech_stack: company.tech_stack || [],
ai_job_count: (company.growth_signals || []).filter((s: any) => s.type === "job_post").length,
pain_signals: company.pain_signals || [],
service_match: company.service_match,
},
contacts: (contacts || []).map((c: any) => ({
full_name: c.full_name,
email: c.email,
email_verified: c.email_verified,
linkedin_personal_url: c.linkedin_personal_url || c.linkedin_url,
social_profiles: c.social_profiles || {},
})),
trace_id: company.trace_id || `trace-${company_id.slice(0, 8)}`,
},
{
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${env.PYTHON_AI_SERVICE_SECRET}`,
},
timeout: 120_000, // 2 min timeout for LLM
}
);
const { profile, score, validation, meta } = response.data;
const totalScore = score?.total_score ?? 0;
const tier = score?.tier ?? "archive";
const needs_human_review = !(validation?.score_valid ?? true);
logger.info(
{ company_id, score: totalScore, tier },
"βœ… Profiling complete"
);
// Save profile and score in DB
await db.from("lead_profiles").upsert({
company_id,
...profile,
}, { onConflict: "company_id" });
await db.from("lead_scores").upsert({
company_id,
...score,
}, { onConflict: "company_id" });
// ── Route based on score tier ───────────────────────────
await routeByTier(company_id, { total_score: totalScore, tier, needs_human_review }, db, env);
// ── Audit log ───────────────────────────────────────────
auditLog("lead_profiled", "company", {
company_id,
domain,
score: totalScore,
tier,
is_fallback: meta?.is_fallback ?? false,
});
return response.data;
} catch (err: unknown) {
// ── Python service unavailable β†’ fallback ───────────────
if (axios.isAxiosError(err) && !err.response) {
logger.error({ company_id, domain }, "Python service unreachable β€” queuing for review");
await db.from("human_review_queue").insert({
type: "score_anomaly",
company_id,
payload: { reason: "python_service_unavailable", domain },
});
return { success: false, reason: "python_service_unavailable" };
}
throw err;
}
},
});
// ─── Score-based routing ──────────────────────────────────────
async function routeByTier(
companyId: string,
result: { total_score: number; tier: string; needs_human_review: boolean },
db: ReturnType<typeof getSupabaseClient>,
env: ReturnType<typeof getEnv>
) {
const { tier, total_score, needs_human_review } = result;
if (tier === "hot" || tier === "warm") {
if (needs_human_review && env.HUMAN_REVIEW_ENABLED) {
// Queue for human approval before outreach
logger.info({ companyId, tier }, "Routing to human review queue");
await db.from("human_review_queue").insert({
type: "outreach_approval",
company_id: companyId,
payload: { score: total_score, tier, reason: "human_review_required" },
});
await notifySlack(companyId, total_score, tier, env, "review");
} else {
// Qualified β€” trigger outreach (Step 3, to be built)
logger.info({ companyId, tier, score: total_score }, "πŸš€ Routing to outreach queue");
await notifySlack(companyId, total_score, tier, env, "qualified");
// Future: trigger outreach task
// await outreachTask.trigger({ company_id: companyId, tier });
}
} else if (tier === "nurture") {
logger.info({ companyId }, "Routing to nurture β€” re-score in 30 days");
// Future: schedule re-scoring task
} else {
logger.info({ companyId }, "Archived β€” score too low");
}
}
async function notifySlack(
companyId: string,
score: number,
tier: string,
env: ReturnType<typeof getEnv>,
type: "qualified" | "review"
) {
try {
const db = getSupabaseClient();
const { data: company } = await db
.from("companies")
.select("name, domain, industry, employee_count")
.eq("id", companyId)
.single();
if (!company) return;
const emoji = tier === "hot" ? "πŸ”₯" : "βœ…";
const action = type === "review" ? "⏳ Needs Review" : "πŸ“€ Ready for Outreach";
const message = {
text: `${emoji} New Qualified Lead β€” ${action}`,
blocks: [
{
type: "section",
text: {
type: "mrkdwn",
text: `*${emoji} ${company.name}*\n${action}\n\n` +
`β€’ *Score:* ${score}/100 β€” ${tier.toUpperCase()}\n` +
`β€’ *Industry:* ${company.industry ?? "Unknown"}\n` +
`β€’ *Employees:* ${company.employee_count ?? "Unknown"}\n` +
`β€’ *Domain:* ${company.domain}`,
},
},
],
};
const channelId = type === "review" ? env.SLACK_REVIEW_CHANNEL_ID : env.SLACK_ALERT_CHANNEL_ID;
const res = await axios.post("https://slack.com/api/chat.postMessage", {
channel: channelId,
...message,
}, {
headers: { Authorization: `Bearer ${env.SLACK_BOT_TOKEN}` },
});
if (res.data && res.data.ok === false) {
logger.warn({ error: res.data.error }, "Slack API responded with error. Make sure to invite the bot to the channel!");
}
} catch (err) {
logger.warn({ err }, "Slack notification failed β€” non-critical");
}
}