autoloop / apps /server /src /automation /engine.ts
shubhjn's picture
Deploy AutoLoop Server to Hugging Face
a6b6c66
import {
db,
eq,
and,
desc,
asc,
lte,
automations,
socialAccounts,
messages as dbMessages,
contacts,
scheduledMessages,
eventQueue,
rateLimitState,
analyticsEvents,
aiConversations,
automationMetrics,
notificationLogs
} from "@autoloop/db";
import {
matchesAutomationCondition,
sendInstagramMessage,
replyToInstagramComment,
getInstagramUserProfile,
likeMediaOrComment
} from "@autoloop/shared";
import { analyzeSentiment, generateSmartReply, trackAnalytics } from "../ai/pipeline";
import { incomingQueue } from "../queue";
import { InternalEvent, TriggerType, Workflow, ActionType } from "@autoloop/types";
// Rate limits per Instagram account
const RATE_LIMITS = {
dmPerMinute: 15,
dmPerHour: 150,
dmPerDay: 500,
};
export const automationEngine = {
// Process incoming normalized event
async processEvent(event: InternalEvent) {
console.log(`[Engine] Processing Event: ${event.triggerType} from ${event.userId}`);
// 1. Log and Queue for auditability
await this.queueEvent({
eventType: event.triggerType,
payload: event,
externalId: event.accountId,
recipientId: event.userId,
});
return { success: true, queued: 1 };
},
// Queue a new event for processing
async queueEvent(params: {
eventType: string;
payload: any;
externalId: string;
recipientId: string;
scheduledFor?: Date;
idempotencyKey?: string;
}) {
const idempotencyKey = params.idempotencyKey || `${params.externalId}-${params.recipientId}-${params.eventType}-${Math.floor(Date.now() / 60000)}`;
// Check for duplicate
const existing = await db.query.eventQueue.findFirst({
where: eq(eventQueue.idempotencyKey, idempotencyKey),
});
if (existing && existing.status !== "failed") return;
const id = crypto.randomUUID();
await db.insert(eventQueue).values({
id,
eventType: params.eventType,
payload: JSON.stringify(params.payload),
externalId: params.externalId,
recipientId: params.recipientId,
scheduledFor: params.scheduledFor || new Date(),
idempotencyKey,
status: "pending",
});
// Trigger BullMQ job for immediate processing
await incomingQueue.add('process-queued-event', { eventId: id }, {
jobId: idempotencyKey, // Use idempotency key as job ID to prevent duplicate jobs
removeOnComplete: true,
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
},
// Process a single queued event (called by worker)
async processQueuedEvent(eventId: string) {
const event = await db.query.eventQueue.findFirst({
where: eq(eventQueue.id, eventId),
});
if (!event || event.status === "completed") return;
await db.update(eventQueue).set({ status: "processing", attempts: (event.attempts || 0) + 1 }).where(eq(eventQueue.id, eventId));
try {
const eventData: InternalEvent = JSON.parse(event.payload);
// 1. Condition Engine Logic
const workflows = await db.query.automations.findMany({
where: and(
eq(automations.externalId, event.externalId || ""),
eq(automations.triggerType, eventData.triggerType),
eq(automations.isActive, true)
),
orderBy: [desc(automations.priority)],
});
console.log(`[Engine] Matching ${workflows.length} workflows for event ${event.id}`);
for (const workflow of workflows) {
// Evaluate conditions
const matches = matchesAutomationCondition(
workflow.conditionOperator || 'contains',
workflow.condition || '',
eventData.message || ""
);
if (!matches) continue;
console.log(`[Engine] Workflow triggered: ${workflow.name}`);
// 2. Action System Execution
await this.handleWorkflowExecution(workflow, eventData);
// Follow prompt: "Execute Actions... Pick randomly... variation engine"
// For now, we process first matching workflow
break;
}
await db.update(eventQueue).set({ status: "completed", processedAt: new Date() }).where(eq(eventQueue.id, eventId));
} catch (error: any) {
console.error(`[Engine] Event processing failed: ${error.message}`);
const attempts = (event.attempts || 0) + 1;
const status = attempts >= (event.maxAttempts || 3) ? "failed" : "pending";
await db.update(eventQueue).set({
status,
lastError: error.message,
scheduledFor: new Date(Date.now() + Math.pow(2, attempts) * 60000)
}).where(eq(eventQueue.id, eventId));
}
},
// NEW: Modular Workflow Executor
async handleWorkflowExecution(workflow: any, event: InternalEvent) {
const account = await db.query.socialAccounts.findFirst({
where: eq(socialAccounts.externalId, event.accountId)
});
if (!account?.accessToken) return;
// 1. Follower Gate Check (Deterministic Condition)
if (workflow.requireFollower) {
const profile = await getInstagramUserProfile(event.userId, account.accessToken).catch(() => null);
if (profile && !profile.is_user_follow_business) {
// Execute Follower Gate Action
await this.executeAction({
id: 'gate',
type: ActionType.SEND_DM,
payload: {
message: workflow.followerGateTemplate || "Please follow to continue!",
buttons: [
{ type: 'web_url', url: `https://instagram.com/${account.instagramUsername}`, title: workflow.followerGateButtonText || "Follow Me" },
{ type: 'postback', title: "I'm Following!", payload: `CHECK_FOLLOW_${workflow.id}` }
]
}
}, account, event.userId);
return;
}
}
// 2. Map Legacy Fields to Action List
const actions: any[] = [];
// Public Comment Reply
if (event.commentId && workflow.responseTemplate) {
actions.push({
id: 'comment-reply',
type: ActionType.REPLY_COMMENT,
payload: { message: workflow.responseTemplate, commentId: event.commentId }
});
}
// Main DM
if (workflow.dmTemplate) {
actions.push({
id: 'main-dm',
type: ActionType.SEND_DM,
payload: {
message: workflow.dmTemplate,
targetUrl: workflow.targetUrl,
linkText: workflow.linkText,
aiEnabled: workflow.aiEnabled,
aiPrompt: workflow.aiPrompt
}
});
}
// Follow-ups
if (workflow.followUpTemplate) {
actions.push({
id: 'follow-up-1',
type: ActionType.SEND_DM,
delay: workflow.followUpDelayMinutes || 60,
payload: { message: workflow.followUpTemplate, targetUrl: workflow.followUpUrl, linkText: workflow.followUpUrlText }
});
}
// 3. Execute Actions
for (const action of actions) {
if (action.delay && action.delay > 0) {
await this.scheduleAction(action, workflow.id, account, event.userId);
} else {
await this.executeAction(action, account, event.userId, event.message);
}
}
// Track Metrics
await this.trackAutomationMetrics(workflow.userId, workflow.id);
},
async executeAction(action: any, account: any, recipientId: string, triggerText?: string) {
console.log(`[Action] Executing ${action.type} for ${recipientId}`);
try {
switch (action.type) {
case ActionType.SEND_DM:
await this.executeDMAction(action, account, recipientId, triggerText);
break;
case ActionType.REPLY_COMMENT:
const interpolated = await this.interpolateVariables(action.payload.message, account.userId, account.externalId, recipientId);
await replyToInstagramComment(action.payload.commentId, interpolated, account.accessToken);
break;
// Add more action handlers
}
} catch (err: any) {
console.error(`[Action] Failed: ${err.message}`);
throw err;
}
},
async executeDMAction(action: any, account: any, recipientId: string, triggerText?: string) {
let message = action.payload.message;
// AI Generation if enabled
if (action.payload.aiEnabled && triggerText) {
const aiResponse = await generateSmartReply({
userMessage: triggerText,
prompt: action.payload.aiPrompt,
});
if (aiResponse?.reply) message = aiResponse.reply;
}
const interpolated = await this.interpolateVariables(message, account.userId, account.externalId, recipientId);
const buttons = [];
if (action.payload.targetUrl) {
buttons.push({ type: 'web_url' as const, url: action.payload.targetUrl, title: action.payload.linkText || "Learn More" });
}
if (action.payload.buttons) {
buttons.push(...action.payload.buttons);
}
await sendInstagramMessage((account.pageId || account.externalId)!, recipientId, interpolated, account.accessToken, { buttons });
},
async scheduleAction(action: any, workflowId: string, account: any, recipientId: string) {
console.log(`[Scheduler] Queuing ${action.type} in ${action.delay}m`);
await this.scheduleFollowUp(
account.userId,
workflowId,
account.externalId,
recipientId,
action.payload.message,
action.delay,
action.payload.targetUrl,
action.payload.linkText
);
},
async checkRateLimits(externalId: string, recipientId: string, cooldownMinutes: number) {
let state = await db.query.rateLimitState.findFirst({ where: eq(rateLimitState.externalId, externalId) });
if (!state) {
const id = crypto.randomUUID();
await db.insert(rateLimitState).values({ id, externalId, lastSendToRecipient: "{}" });
state = await db.query.rateLimitState.findFirst({ where: eq(rateLimitState.id, id) });
}
if (!state) return { allowed: false, retryAfterMs: 0 };
const lastSendMap = JSON.parse(state.lastSendToRecipient || "{}");
const lastSend = lastSendMap[recipientId];
const now = Date.now();
const cooldownMs = cooldownMinutes * 60000;
if (lastSend && (now - lastSend < cooldownMs)) {
return { allowed: false, retryAfterMs: cooldownMs - (now - lastSend) };
}
lastSendMap[recipientId] = now;
await db.update(rateLimitState).set({ lastSendToRecipient: JSON.stringify(lastSendMap) }).where(eq(rateLimitState.id, state.id));
return { allowed: true, retryAfterMs: 0 };
},
async interpolateVariables(text: string, userId: string, externalId: string, recipientId: string) {
const contact = await db.query.contacts.findFirst({
where: and(eq(contacts.userId, userId), eq(contacts.externalId, externalId), eq(contacts.senderId, recipientId)),
});
return text
.replace(/\{\{\s*first_name\s*\}\}/gi, contact?.name?.split(" ")[0] ?? "there")
.replace(/\{\{\s*name\s*\}\}/gi, contact?.name ?? "there")
.replace(/\{\{\s*username\s*\}\}/gi, contact?.username ?? "");
},
async upsertContact(userId: string, externalId: string, senderId: string, accessToken: string) {
const profile = await getInstagramUserProfile(senderId, accessToken).catch(() => null);
const existing = await db.query.contacts.findFirst({
where: and(eq(contacts.userId, userId), eq(contacts.externalId, externalId), eq(contacts.senderId, senderId)),
});
if (existing) {
await db.update(contacts).set({
lastSeenAt: new Date(),
username: profile?.username || existing.username,
name: profile?.name || existing.name,
profilePic: profile?.profile_pic || existing.profilePic,
isFollower: profile?.is_user_follow_business || existing.isFollower,
}).where(eq(contacts.id, existing.id));
} else {
await db.insert(contacts).values({
id: crypto.randomUUID(),
userId,
externalId,
senderId,
username: profile?.username || null,
name: profile?.name || null,
profilePic: profile?.profile_pic || null,
isFollower: profile?.is_user_follow_business || false,
status: "automated",
});
}
},
async scheduleFollowUp(
userId: string,
automationId: string,
externalId: string,
recipientId: string,
template: string,
delayMinutes: number,
targetUrl?: string | null,
linkText?: string | null
) {
await db.insert(scheduledMessages).values({
id: crypto.randomUUID(),
userId,
automationId,
externalId,
recipientId,
messageText: template,
targetUrl: targetUrl || null,
linkText: linkText || null,
status: "pending",
dueAt: new Date(Date.now() + delayMinutes * 60000),
});
},
async trackAutomationMetrics(userId: string, automationId: string) {
const existing = await db.query.automationMetrics.findFirst({
where: and(eq(automationMetrics.userId, userId), eq(automationMetrics.automationId, automationId)),
});
if (existing) {
await db.update(automationMetrics).set({ sendCount: existing.sendCount + 1, lastSentAt: new Date() }).where(eq(automationMetrics.id, existing.id));
} else {
await db.insert(automationMetrics).values({ id: crypto.randomUUID(), userId, automationId, sendCount: 1, lastSentAt: new Date() });
}
},
async createNotificationLog(params: any) {
await db.insert(notificationLogs).values({ id: crypto.randomUUID(), ...params });
},
async processScheduledMessage(msgId: string) {
const msg = await db.query.scheduledMessages.findFirst({ where: eq(scheduledMessages.id, msgId) });
if (!msg || msg.status !== 'pending') return;
const account = await db.query.socialAccounts.findFirst({
where: and(eq(socialAccounts.userId, msg.userId), eq(socialAccounts.externalId, msg.externalId))
});
if (!account?.accessToken) {
await db.update(scheduledMessages).set({ status: 'failed', lastError: 'Account disconnected' }).where(eq(scheduledMessages.id, msgId));
return;
}
try {
const interpolated = await this.interpolateVariables(msg.messageText, msg.userId, msg.externalId, msg.recipientId);
// Support buttons in follow-ups from stored data
const buttons: any[] = [];
if (msg.targetUrl) {
const btnText = msg.linkText || "Learn More";
buttons.push({ type: 'web_url' as const, url: msg.targetUrl, title: btnText });
} else if (msg.automationId) {
// Fallback to rule search if not in msg (legacy support)
const rule = await db.query.automations.findFirst({ where: eq(automations.id, msg.automationId) });
if (rule) {
if (msg.messageText === rule.followUpTemplate && rule.followUpUrl) {
buttons.push({ type: 'web_url' as const, url: rule.followUpUrl, title: rule.followUpUrlText || "Learn More" });
} else if (msg.messageText === rule.followUp2Template && rule.followUp2Url) {
buttons.push({ type: 'web_url' as const, url: rule.followUp2Url, title: rule.followUp2UrlText || "Get Details" });
}
}
}
await sendInstagramMessage((account.pageId || account.externalId)!, msg.recipientId, interpolated, account.accessToken, { buttons });
await db.insert(dbMessages).values({
id: crypto.randomUUID(),
userId: msg.userId,
externalId: msg.externalId,
senderId: msg.recipientId,
automationId: msg.automationId,
direction: "outbound",
status: "sent",
text: interpolated,
timestamp: new Date(),
});
await db.update(scheduledMessages).set({ status: 'sent', sentAt: new Date() }).where(eq(scheduledMessages.id, msgId));
} catch (error: any) {
const attempts = (msg.attempts || 0) + 1;
await db.update(scheduledMessages).set({
attempts,
lastError: error.message,
status: attempts >= 3 ? 'failed' : 'pending'
}).where(eq(scheduledMessages.id, msgId));
}
}
};