| import { |
| db, |
| eq, |
| and, |
| desc, |
| asc, |
| lte, |
| automations, |
| socialAccounts, |
| messages as dbMessages, |
| contacts, |
| scheduledMessages, |
| eventQueue, |
| rateLimitState, |
| analyticsEvents, |
| aiConversations, |
| automationMetrics, |
| notificationLogs |
| } from "@autoloop/db"; |
| import { |
| matchesAutomationCondition, |
| sendInstagramMessage, |
| replyToInstagramComment, |
| getInstagramUserProfile, |
| likeMediaOrComment |
| } from "@autoloop/shared"; |
| import { analyzeSentiment, generateSmartReply, trackAnalytics } from "../ai/pipeline"; |
| import { incomingQueue } from "../queue"; |
| import { InternalEvent, TriggerType, Workflow, ActionType } from "@autoloop/types"; |
|
|
| |
| const RATE_LIMITS = { |
| dmPerMinute: 15, |
| dmPerHour: 150, |
| dmPerDay: 500, |
| }; |
|
|
| export const automationEngine = { |
| |
| async processEvent(event: InternalEvent) { |
| console.log(`[Engine] Processing Event: ${event.triggerType} from ${event.userId}`); |
| |
| |
| await this.queueEvent({ |
| eventType: event.triggerType, |
| payload: event, |
| externalId: event.accountId, |
| recipientId: event.userId, |
| }); |
|
|
| return { success: true, queued: 1 }; |
| }, |
|
|
| |
| async queueEvent(params: { |
| eventType: string; |
| payload: any; |
| externalId: string; |
| recipientId: string; |
| scheduledFor?: Date; |
| idempotencyKey?: string; |
| }) { |
| const idempotencyKey = params.idempotencyKey || `${params.externalId}-${params.recipientId}-${params.eventType}-${Math.floor(Date.now() / 60000)}`; |
| |
| |
| const existing = await db.query.eventQueue.findFirst({ |
| where: eq(eventQueue.idempotencyKey, idempotencyKey), |
| }); |
|
|
| if (existing && existing.status !== "failed") return; |
|
|
| const id = crypto.randomUUID(); |
| await db.insert(eventQueue).values({ |
| id, |
| eventType: params.eventType, |
| payload: JSON.stringify(params.payload), |
| externalId: params.externalId, |
| recipientId: params.recipientId, |
| scheduledFor: params.scheduledFor || new Date(), |
| idempotencyKey, |
| status: "pending", |
| }); |
|
|
| |
| await incomingQueue.add('process-queued-event', { eventId: id }, { |
| jobId: idempotencyKey, |
| removeOnComplete: true, |
| attempts: 3, |
| backoff: { type: 'exponential', delay: 1000 } |
| }); |
| }, |
|
|
| |
| async processQueuedEvent(eventId: string) { |
| const event = await db.query.eventQueue.findFirst({ |
| where: eq(eventQueue.id, eventId), |
| }); |
|
|
| if (!event || event.status === "completed") return; |
|
|
| await db.update(eventQueue).set({ status: "processing", attempts: (event.attempts || 0) + 1 }).where(eq(eventQueue.id, eventId)); |
|
|
| try { |
| const eventData: InternalEvent = JSON.parse(event.payload); |
|
|
| |
| const workflows = await db.query.automations.findMany({ |
| where: and( |
| eq(automations.externalId, event.externalId || ""), |
| eq(automations.triggerType, eventData.triggerType), |
| eq(automations.isActive, true) |
| ), |
| orderBy: [desc(automations.priority)], |
| }); |
|
|
| console.log(`[Engine] Matching ${workflows.length} workflows for event ${event.id}`); |
|
|
| for (const workflow of workflows) { |
| |
| const matches = matchesAutomationCondition( |
| workflow.conditionOperator || 'contains', |
| workflow.condition || '', |
| eventData.message || "" |
| ); |
|
|
| if (!matches) continue; |
|
|
| console.log(`[Engine] Workflow triggered: ${workflow.name}`); |
| |
| |
| await this.handleWorkflowExecution(workflow, eventData); |
| |
| |
| |
| break; |
| } |
|
|
| await db.update(eventQueue).set({ status: "completed", processedAt: new Date() }).where(eq(eventQueue.id, eventId)); |
| } catch (error: any) { |
| console.error(`[Engine] Event processing failed: ${error.message}`); |
| const attempts = (event.attempts || 0) + 1; |
| const status = attempts >= (event.maxAttempts || 3) ? "failed" : "pending"; |
| await db.update(eventQueue).set({ |
| status, |
| lastError: error.message, |
| scheduledFor: new Date(Date.now() + Math.pow(2, attempts) * 60000) |
| }).where(eq(eventQueue.id, eventId)); |
| } |
| }, |
|
|
| |
| async handleWorkflowExecution(workflow: any, event: InternalEvent) { |
| const account = await db.query.socialAccounts.findFirst({ |
| where: eq(socialAccounts.externalId, event.accountId) |
| }); |
| if (!account?.accessToken) return; |
|
|
| |
| if (workflow.requireFollower) { |
| const profile = await getInstagramUserProfile(event.userId, account.accessToken).catch(() => null); |
| if (profile && !profile.is_user_follow_business) { |
| |
| await this.executeAction({ |
| id: 'gate', |
| type: ActionType.SEND_DM, |
| payload: { |
| message: workflow.followerGateTemplate || "Please follow to continue!", |
| buttons: [ |
| { type: 'web_url', url: `https://instagram.com/${account.instagramUsername}`, title: workflow.followerGateButtonText || "Follow Me" }, |
| { type: 'postback', title: "I'm Following!", payload: `CHECK_FOLLOW_${workflow.id}` } |
| ] |
| } |
| }, account, event.userId); |
| return; |
| } |
| } |
|
|
| |
| const actions: any[] = []; |
|
|
| |
| if (event.commentId && workflow.responseTemplate) { |
| actions.push({ |
| id: 'comment-reply', |
| type: ActionType.REPLY_COMMENT, |
| payload: { message: workflow.responseTemplate, commentId: event.commentId } |
| }); |
| } |
|
|
| |
| if (workflow.dmTemplate) { |
| actions.push({ |
| id: 'main-dm', |
| type: ActionType.SEND_DM, |
| payload: { |
| message: workflow.dmTemplate, |
| targetUrl: workflow.targetUrl, |
| linkText: workflow.linkText, |
| aiEnabled: workflow.aiEnabled, |
| aiPrompt: workflow.aiPrompt |
| } |
| }); |
| } |
|
|
| |
| if (workflow.followUpTemplate) { |
| actions.push({ |
| id: 'follow-up-1', |
| type: ActionType.SEND_DM, |
| delay: workflow.followUpDelayMinutes || 60, |
| payload: { message: workflow.followUpTemplate, targetUrl: workflow.followUpUrl, linkText: workflow.followUpUrlText } |
| }); |
| } |
|
|
| |
| for (const action of actions) { |
| if (action.delay && action.delay > 0) { |
| await this.scheduleAction(action, workflow.id, account, event.userId); |
| } else { |
| await this.executeAction(action, account, event.userId, event.message); |
| } |
| } |
|
|
| |
| await this.trackAutomationMetrics(workflow.userId, workflow.id); |
| }, |
|
|
| async executeAction(action: any, account: any, recipientId: string, triggerText?: string) { |
| console.log(`[Action] Executing ${action.type} for ${recipientId}`); |
| |
| try { |
| switch (action.type) { |
| case ActionType.SEND_DM: |
| await this.executeDMAction(action, account, recipientId, triggerText); |
| break; |
| case ActionType.REPLY_COMMENT: |
| const interpolated = await this.interpolateVariables(action.payload.message, account.userId, account.externalId, recipientId); |
| await replyToInstagramComment(action.payload.commentId, interpolated, account.accessToken); |
| break; |
| |
| } |
| } catch (err: any) { |
| console.error(`[Action] Failed: ${err.message}`); |
| throw err; |
| } |
| }, |
|
|
| async executeDMAction(action: any, account: any, recipientId: string, triggerText?: string) { |
| let message = action.payload.message; |
|
|
| |
| if (action.payload.aiEnabled && triggerText) { |
| const aiResponse = await generateSmartReply({ |
| userMessage: triggerText, |
| prompt: action.payload.aiPrompt, |
| }); |
| if (aiResponse?.reply) message = aiResponse.reply; |
| } |
|
|
| const interpolated = await this.interpolateVariables(message, account.userId, account.externalId, recipientId); |
| |
| const buttons = []; |
| if (action.payload.targetUrl) { |
| buttons.push({ type: 'web_url' as const, url: action.payload.targetUrl, title: action.payload.linkText || "Learn More" }); |
| } |
| if (action.payload.buttons) { |
| buttons.push(...action.payload.buttons); |
| } |
|
|
| await sendInstagramMessage((account.pageId || account.externalId)!, recipientId, interpolated, account.accessToken, { buttons }); |
| }, |
|
|
| async scheduleAction(action: any, workflowId: string, account: any, recipientId: string) { |
| console.log(`[Scheduler] Queuing ${action.type} in ${action.delay}m`); |
| await this.scheduleFollowUp( |
| account.userId, |
| workflowId, |
| account.externalId, |
| recipientId, |
| action.payload.message, |
| action.delay, |
| action.payload.targetUrl, |
| action.payload.linkText |
| ); |
| }, |
|
|
| async checkRateLimits(externalId: string, recipientId: string, cooldownMinutes: number) { |
| let state = await db.query.rateLimitState.findFirst({ where: eq(rateLimitState.externalId, externalId) }); |
| if (!state) { |
| const id = crypto.randomUUID(); |
| await db.insert(rateLimitState).values({ id, externalId, lastSendToRecipient: "{}" }); |
| state = await db.query.rateLimitState.findFirst({ where: eq(rateLimitState.id, id) }); |
| } |
| if (!state) return { allowed: false, retryAfterMs: 0 }; |
|
|
| const lastSendMap = JSON.parse(state.lastSendToRecipient || "{}"); |
| const lastSend = lastSendMap[recipientId]; |
| const now = Date.now(); |
| const cooldownMs = cooldownMinutes * 60000; |
| |
| if (lastSend && (now - lastSend < cooldownMs)) { |
| return { allowed: false, retryAfterMs: cooldownMs - (now - lastSend) }; |
| } |
|
|
| lastSendMap[recipientId] = now; |
| await db.update(rateLimitState).set({ lastSendToRecipient: JSON.stringify(lastSendMap) }).where(eq(rateLimitState.id, state.id)); |
| return { allowed: true, retryAfterMs: 0 }; |
| }, |
|
|
| async interpolateVariables(text: string, userId: string, externalId: string, recipientId: string) { |
| const contact = await db.query.contacts.findFirst({ |
| where: and(eq(contacts.userId, userId), eq(contacts.externalId, externalId), eq(contacts.senderId, recipientId)), |
| }); |
| return text |
| .replace(/\{\{\s*first_name\s*\}\}/gi, contact?.name?.split(" ")[0] ?? "there") |
| .replace(/\{\{\s*name\s*\}\}/gi, contact?.name ?? "there") |
| .replace(/\{\{\s*username\s*\}\}/gi, contact?.username ?? ""); |
| }, |
|
|
| async upsertContact(userId: string, externalId: string, senderId: string, accessToken: string) { |
| const profile = await getInstagramUserProfile(senderId, accessToken).catch(() => null); |
| const existing = await db.query.contacts.findFirst({ |
| where: and(eq(contacts.userId, userId), eq(contacts.externalId, externalId), eq(contacts.senderId, senderId)), |
| }); |
|
|
| if (existing) { |
| await db.update(contacts).set({ |
| lastSeenAt: new Date(), |
| username: profile?.username || existing.username, |
| name: profile?.name || existing.name, |
| profilePic: profile?.profile_pic || existing.profilePic, |
| isFollower: profile?.is_user_follow_business || existing.isFollower, |
| }).where(eq(contacts.id, existing.id)); |
| } else { |
| await db.insert(contacts).values({ |
| id: crypto.randomUUID(), |
| userId, |
| externalId, |
| senderId, |
| username: profile?.username || null, |
| name: profile?.name || null, |
| profilePic: profile?.profile_pic || null, |
| isFollower: profile?.is_user_follow_business || false, |
| status: "automated", |
| }); |
| } |
| }, |
|
|
| async scheduleFollowUp( |
| userId: string, |
| automationId: string, |
| externalId: string, |
| recipientId: string, |
| template: string, |
| delayMinutes: number, |
| targetUrl?: string | null, |
| linkText?: string | null |
| ) { |
| await db.insert(scheduledMessages).values({ |
| id: crypto.randomUUID(), |
| userId, |
| automationId, |
| externalId, |
| recipientId, |
| messageText: template, |
| targetUrl: targetUrl || null, |
| linkText: linkText || null, |
| status: "pending", |
| dueAt: new Date(Date.now() + delayMinutes * 60000), |
| }); |
| }, |
|
|
| async trackAutomationMetrics(userId: string, automationId: string) { |
| const existing = await db.query.automationMetrics.findFirst({ |
| where: and(eq(automationMetrics.userId, userId), eq(automationMetrics.automationId, automationId)), |
| }); |
| if (existing) { |
| await db.update(automationMetrics).set({ sendCount: existing.sendCount + 1, lastSentAt: new Date() }).where(eq(automationMetrics.id, existing.id)); |
| } else { |
| await db.insert(automationMetrics).values({ id: crypto.randomUUID(), userId, automationId, sendCount: 1, lastSentAt: new Date() }); |
| } |
| }, |
|
|
| async createNotificationLog(params: any) { |
| await db.insert(notificationLogs).values({ id: crypto.randomUUID(), ...params }); |
| }, |
|
|
| async processScheduledMessage(msgId: string) { |
| const msg = await db.query.scheduledMessages.findFirst({ where: eq(scheduledMessages.id, msgId) }); |
| if (!msg || msg.status !== 'pending') return; |
|
|
| const account = await db.query.socialAccounts.findFirst({ |
| where: and(eq(socialAccounts.userId, msg.userId), eq(socialAccounts.externalId, msg.externalId)) |
| }); |
|
|
| if (!account?.accessToken) { |
| await db.update(scheduledMessages).set({ status: 'failed', lastError: 'Account disconnected' }).where(eq(scheduledMessages.id, msgId)); |
| return; |
| } |
|
|
| try { |
| const interpolated = await this.interpolateVariables(msg.messageText, msg.userId, msg.externalId, msg.recipientId); |
| |
| |
| const buttons: any[] = []; |
| if (msg.targetUrl) { |
| const btnText = msg.linkText || "Learn More"; |
| buttons.push({ type: 'web_url' as const, url: msg.targetUrl, title: btnText }); |
| } else if (msg.automationId) { |
| |
| const rule = await db.query.automations.findFirst({ where: eq(automations.id, msg.automationId) }); |
| if (rule) { |
| if (msg.messageText === rule.followUpTemplate && rule.followUpUrl) { |
| buttons.push({ type: 'web_url' as const, url: rule.followUpUrl, title: rule.followUpUrlText || "Learn More" }); |
| } else if (msg.messageText === rule.followUp2Template && rule.followUp2Url) { |
| buttons.push({ type: 'web_url' as const, url: rule.followUp2Url, title: rule.followUp2UrlText || "Get Details" }); |
| } |
| } |
| } |
|
|
| await sendInstagramMessage((account.pageId || account.externalId)!, msg.recipientId, interpolated, account.accessToken, { buttons }); |
|
|
| await db.insert(dbMessages).values({ |
| id: crypto.randomUUID(), |
| userId: msg.userId, |
| externalId: msg.externalId, |
| senderId: msg.recipientId, |
| automationId: msg.automationId, |
| direction: "outbound", |
| status: "sent", |
| text: interpolated, |
| timestamp: new Date(), |
| }); |
|
|
| await db.update(scheduledMessages).set({ status: 'sent', sentAt: new Date() }).where(eq(scheduledMessages.id, msgId)); |
| } catch (error: any) { |
| const attempts = (msg.attempts || 0) + 1; |
| await db.update(scheduledMessages).set({ |
| attempts, |
| lastError: error.message, |
| status: attempts >= 3 ? 'failed' : 'pending' |
| }).where(eq(scheduledMessages.id, msgId)); |
| } |
| } |
| }; |
|
|