// packages/server/src/services/aiService.ts // // AI-powered email parsing with auto-switcher failover // Uses Groq (primary) and Mistral (fallback) free tier APIs // // Install: npm install zod import { z } from 'zod'; import type { InteracTransactionInput } from '../../shared/types/transaction'; // ═══════════════════════════════════════════ // ZOD VALIDATION SCHEMA // ═══════════════════════════════════════════ const TransactionSchema = z.object({ sender: z.string().min(1), amount: z.number().positive(), currency: z.string().default('CAD'), reference: z.string().nullable().default(null), message: z.string().nullable().default(null), recipient_email: z.string().email().nullable().default(null), date: z.string(), status: z.enum(['deposited', 'pending', 'expired', 'cancelled']).default('deposited'), }); // ═══════════════════════════════════════════ // AI PROMPT TEMPLATE // ═══════════════════════════════════════════ const SYSTEM_PROMPT = `You are a financial data extraction assistant. Given the raw text/HTML of an Interac e-Transfer notification email from notify@payments.interac.ca, extract the following fields into a JSON object: - sender: The name of the person who sent the money - amount: The dollar amount (numeric, no $ sign) - currency: Always "CAD" - reference: The Interac reference number - message: The personal message or memo (sometimes called "dime" in French) - recipient_email: The email address the transfer was sent TO (the ICC branch email) - date: The date/time of the transfer in ISO 8601 format - status: One of "deposited", "pending", "expired", "cancelled" Rules: - If a field is not found, set it to null. - The amount must be a number (e.g., 150.00 not "$150.00"). - Return ONLY valid JSON, no markdown, no explanation, no backticks.`; function buildUserPrompt(emailBody: string): string { return `Email content:\n"""\n${emailBody}\n"""`; } // ═══════════════════════════════════════════ // PROVIDER ADAPTERS // ═══════════════════════════════════════════ interface AIProvider { name: string; model: string; call(systemPrompt: string, userPrompt: string): Promise; } /** Groq provider — OpenAI-compatible API */ class GroqProvider implements AIProvider { name = 'groq'; model: string; private apiKey: string; private baseUrl = 'https://api.groq.com/openai/v1/chat/completions'; constructor(model = 'llama-3.3-70b-versatile', apiKey?: string) { this.model = model; this.apiKey = apiKey || process.env.GROQ_API_KEY || ''; if (!this.apiKey) console.warn('[AI] GROQ_API_KEY not set'); } async call(systemPrompt: string, userPrompt: string): Promise { const response = await fetch(this.baseUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.apiKey}`, }, body: JSON.stringify({ model: this.model, messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: userPrompt }, ], temperature: 0.1, max_tokens: 500, response_format: { type: 'json_object' }, }), }); if (!response.ok) { const errorBody = await response.text(); if (response.status === 429) { throw new RateLimitError(`Groq rate limited: ${errorBody}`); } throw new Error(`Groq API error ${response.status}: ${errorBody}`); } const data = await response.json(); return data.choices[0]?.message?.content || ''; } } /** Mistral provider — OpenAI-compatible API */ class MistralProvider implements AIProvider { name = 'mistral'; model: string; private apiKey: string; private baseUrl = 'https://api.mistral.ai/v1/chat/completions'; constructor(model = 'mistral-small-latest', apiKey?: string) { this.model = model; this.apiKey = apiKey || process.env.MISTRAL_API_KEY || ''; if (!this.apiKey) console.warn('[AI] MISTRAL_API_KEY not set'); } async call(systemPrompt: string, userPrompt: string): Promise { const response = await fetch(this.baseUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.apiKey}`, }, body: JSON.stringify({ model: this.model, messages: [ { role: 'system', content: systemPrompt }, { role: 'user', content: userPrompt }, ], temperature: 0.1, max_tokens: 500, response_format: { type: 'json_object' }, }), }); if (!response.ok) { const errorBody = await response.text(); if (response.status === 429) { throw new RateLimitError(`Mistral rate limited: ${errorBody}`); } throw new Error(`Mistral API error ${response.status}: ${errorBody}`); } const data = await response.json(); return data.choices[0]?.message?.content || ''; } } // ═══════════════════════════════════════════ // RATE LIMIT ERROR // ═══════════════════════════════════════════ class RateLimitError extends Error { constructor(message: string) { super(message); this.name = 'RateLimitError'; } } // ═══════════════════════════════════════════ // AUTO-SWITCHER POOL // ═══════════════════════════════════════════ interface ProviderSlotRuntime { provider: AIProvider; priority: number; enabled: boolean; status: 'available' | 'rate_limited' | 'error'; cooldownUntil?: Date; requestsUsed: number; successCount: number; errorCount: number; } export class AIProviderPool { private slots: ProviderSlotRuntime[] = []; private currentSlotIndex = 0; private onSwitch?: (from: string, to: string, reason: string) => void; constructor(onSwitch?: (from: string, to: string, reason: string) => void) { this.onSwitch = onSwitch; this.initializeSlots(); } private initializeSlots() { // Priority order: Groq first (faster), Mistral fallback const groqKey = process.env.GROQ_API_KEY; const mistralKey = process.env.MISTRAL_API_KEY; if (groqKey) { this.slots.push({ provider: new GroqProvider('llama-3.3-70b-versatile', groqKey), priority: 1, enabled: true, status: 'available', requestsUsed: 0, successCount: 0, errorCount: 0, }); // Add secondary Groq model this.slots.push({ provider: new GroqProvider('llama-3.1-8b-instant', groqKey), priority: 3, enabled: true, status: 'available', requestsUsed: 0, successCount: 0, errorCount: 0, }); } if (mistralKey) { this.slots.push({ provider: new MistralProvider('mistral-small-latest', mistralKey), priority: 2, enabled: true, status: 'available', requestsUsed: 0, successCount: 0, errorCount: 0, }); } if (this.slots.length === 0) { throw new Error( 'No AI providers configured. Set GROQ_API_KEY and/or MISTRAL_API_KEY in your .env file.' ); } // Sort by priority this.slots.sort((a, b) => a.priority - b.priority); console.log(`[AI Pool] Initialized with ${this.slots.length} provider slots:`); this.slots.forEach((s, i) => { console.log(` [${i}] ${s.provider.name}/${s.provider.model} (priority ${s.priority})`); }); } /** Get the next available provider slot, skipping rate-limited ones */ private getAvailableSlot(): ProviderSlotRuntime | null { const now = new Date(); // First, check if any rate-limited slots have recovered for (const slot of this.slots) { if (slot.status === 'rate_limited' && slot.cooldownUntil && slot.cooldownUntil <= now) { slot.status = 'available'; slot.cooldownUntil = undefined; console.log(`[AI Pool] ${slot.provider.name}/${slot.provider.model} recovered from rate limit`); } } // Find the highest-priority available slot for (const slot of this.slots) { if (slot.enabled && slot.status === 'available') { return slot; } } return null; // All slots exhausted } /** Parse a single email body using the AI pool with auto-failover */ async parseEmail(emailBody: string): Promise { const userPrompt = buildUserPrompt(emailBody); let lastError: Error | null = null; // Try each available slot for (let attempt = 0; attempt < this.slots.length * 2; attempt++) { const slot = this.getAvailableSlot(); if (!slot) { // All slots exhausted — wait 10s and retry once console.warn('[AI Pool] All providers rate-limited. Waiting 10s...'); await new Promise((r) => setTimeout(r, 10000)); const retrySlot = this.getAvailableSlot(); if (!retrySlot) { throw new Error('All AI providers are rate-limited or unavailable. Try again later.'); } return this._callSlot(retrySlot, userPrompt); } try { return await this._callSlot(slot, userPrompt); } catch (error: any) { lastError = error; if (error instanceof RateLimitError) { // Mark slot as rate-limited with 60s cooldown const previousName = `${slot.provider.name}/${slot.provider.model}`; slot.status = 'rate_limited'; slot.cooldownUntil = new Date(Date.now() + 60_000); slot.errorCount++; // Find next slot for the switch event const nextSlot = this.getAvailableSlot(); if (nextSlot && this.onSwitch) { this.onSwitch( previousName, `${nextSlot.provider.name}/${nextSlot.provider.model}`, 'Rate limit hit' ); } console.warn(`[AI Pool] ${previousName} rate-limited, switching...`); continue; // Try next slot } // Non-rate-limit error — mark slot but still try fallback slot.errorCount++; slot.status = 'error'; slot.cooldownUntil = new Date(Date.now() + 30_000); console.error(`[AI Pool] ${slot.provider.name}/${slot.provider.model} error:`, error.message); continue; } } throw lastError || new Error('All AI providers failed'); } /** Internal: call a specific slot and parse the response */ private async _callSlot( slot: ProviderSlotRuntime, userPrompt: string ): Promise { const rawResponse = await slot.provider.call(SYSTEM_PROMPT, userPrompt); slot.requestsUsed++; slot.successCount++; // Clean potential markdown fences const cleaned = rawResponse .replace(/```json\s*/g, '') .replace(/```\s*/g, '') .trim(); // Parse and validate const parsed = JSON.parse(cleaned); const validated = TransactionSchema.parse(parsed); return validated as InteracTransactionInput; } /** Get current pool status for the UI */ getStatus() { return this.slots.map((s) => ({ name: s.provider.name, model: s.provider.model, priority: s.priority, status: s.status, enabled: s.enabled, requestsUsed: s.requestsUsed, successCount: s.successCount, errorCount: s.errorCount, cooldownUntil: s.cooldownUntil?.toISOString(), })); } getCurrentProvider(): string { const slot = this.getAvailableSlot(); return slot ? `${slot.provider.name}/${slot.provider.model}` : 'none'; } } export default AIProviderPool;