// packages/server/src/services/scanService.ts // // Main scan orchestration service // Connects Gmail → AI Parsing → Branch Routing → Database // // Install: npm install p-limit uuid import pLimit from 'p-limit'; import { v4 as uuidv4 } from 'uuid'; import type { ScanDateRange, ScanRequest, ScanJob, ScanSummary } from '../../shared/types/scan'; import type { InteracTransaction } from '../../shared/types/transaction'; import { resolveBranch } from '../../shared/constants/branches'; import { gmailService, buildGmailQuery, fetchAllMessageIds, fetchEmailBody } from './gmailService'; import { AIProviderPool } from './aiService'; import database from '../db/database'; // ═══════════════════════════════════════════ // CONCURRENCY LIMITS // ═══════════════════════════════════════════ const GMAIL_CONCURRENCY = 10; // Fetch 10 emails simultaneously const AI_CONCURRENCY = 5; // 5 concurrent AI parse requests const DB_BATCH_SIZE = 25; // Insert 25 rows per batch // ═══════════════════════════════════════════ // ACTIVE JOBS TRACKING // ═══════════════════════════════════════════ const activeJobs = new Map(); export function getJob(jobId: string): ScanJob | undefined { return activeJobs.get(jobId); } // ═══════════════════════════════════════════ // WEBSOCKET EVENT EMITTER (set externally) // ═══════════════════════════════════════════ type EmitFn = (event: string, data: any) => void; let emitToClient: EmitFn = () => {}; // no-op until WebSocket is connected export function setEmitter(fn: EmitFn) { emitToClient = fn; } // ═══════════════════════════════════════════ // MAIN SCAN EXECUTION // ═══════════════════════════════════════════ export async function executeScan( scanRequest: ScanRequest, userId: string ): Promise { const jobId = uuidv4(); const { dateRange, forceRescan } = scanRequest; // Initialize job tracking const job: ScanJob = { jobId, status: 'scanning', dateRange, progress: { emailsFound: 0, emailsProcessed: 0, emailsSkipped: 0, emailsErrored: 0, }, startedAt: new Date().toISOString(), }; activeJobs.set(jobId, job); // Initialize AI provider pool const aiPool = new AIProviderPool((from, to, reason) => { emitToClient('ai:switch', { jobId, from, to, reason, timestamp: new Date().toISOString() }); }); // Log the scan const scanLogId = database.insertScanLog({ user_id: userId, scan_preset: dateRange.preset, scan_start_date: dateRange.startDate, scan_end_date: dateRange.endDate, force_rescan: forceRescan ? 1 : 0, started_at: new Date().toISOString(), ai_provider: aiPool.getCurrentProvider(), ai_model: null, }); try { // ═══ PHASE 1: DISCOVER EMAILS ═══ console.log(`[Scan ${jobId}] Starting scan for ${dateRange.preset} range`); const gmail = gmailService.getGmailClient(userId); const query = buildGmailQuery(dateRange); const messageIds = await fetchAllMessageIds(gmail, query); job.progress.emailsFound = messageIds.length; // Deduplication: check which emails are already in DB const existingIds = database.getExistingEmailIds(messageIds); const newIds = forceRescan ? messageIds : messageIds.filter((id) => !existingIds.has(id)); job.progress.emailsSkipped = messageIds.length - newIds.length; console.log( `[Scan ${jobId}] Found ${messageIds.length} emails, ${newIds.length} new, ${existingIds.size} already in DB` ); // Emit scan:started emitToClient('scan:started', { jobId, totalEmails: messageIds.length, newEmails: newIds.length, skipped: existingIds.size, dateRange, timestamp: new Date().toISOString(), }); if (newIds.length === 0) { // Nothing new to process job.status = 'completed'; job.completedAt = new Date().toISOString(); const summary: ScanSummary = { found: messageIds.length, parsed: 0, skipped: existingIds.size, errors: 0, }; emitToClient('scan:completed', { jobId, summary, dateRange }); database.updateScanLog({ id: scanLogId, finished_at: new Date().toISOString(), emails_found: messageIds.length, emails_parsed: 0, emails_skipped: existingIds.size, errors: 0, error_details: null, }); return summary; } // ═══ PHASE 2: PIPELINE — Fetch + Parse + Save ═══ job.status = 'parsing'; const gmailLimit = pLimit(GMAIL_CONCURRENCY); const aiLimit = pLimit(AI_CONCURRENCY); const saveBuffer: InteracTransaction[] = []; const errorDetails: string[] = []; let processed = 0; let errored = 0; const pipelines = newIds.map((emailId) => gmailLimit(async () => { try { // Stage 1: Fetch email body from Gmail const { body, subject, internalDate } = await fetchEmailBody(gmail, emailId); if (!body || body.trim().length < 20) { console.warn(`[Scan ${jobId}] Empty body for email ${emailId}, skipping`); errored++; return null; } // Stage 2: AI parse (with its own concurrency limit) const parsed = await aiLimit(() => aiPool.parseEmail(body)); // Stage 3: Route to branch const branch = parsed.recipient_email ? resolveBranch(parsed.recipient_email) : 'Montreal'; // Build complete transaction const transaction: InteracTransaction = { id: uuidv4(), email_id: emailId, user_id: userId, sender: parsed.sender, amount: parsed.amount, currency: parsed.currency, reference: parsed.reference || '', message: parsed.message, recipient_email: parsed.recipient_email || '', date: parsed.date || new Date(parseInt(internalDate)).toISOString(), status: parsed.status, branch, raw_email: body.substring(0, 5000), // Store first 5KB for audit }; // Buffer for batch save saveBuffer.push(transaction); // Flush buffer when full if (saveBuffer.length >= DB_BATCH_SIZE) { const batch = saveBuffer.splice(0, DB_BATCH_SIZE); database.batchInsertTransactions( batch.map((t) => ({ id: t.id, email_id: t.email_id, user_id: t.user_id, date: t.date, sender: t.sender, amount: t.amount, currency: t.currency, reference: t.reference, message: t.message, recipient_email: t.recipient_email, branch: t.branch, status: t.status, raw_email: t.raw_email, })) ); // Emit new transactions to frontend batch.forEach((t) => emitToClient('transaction:new', { transaction: t })); } processed++; job.progress.emailsProcessed = processed; job.progress.emailsErrored = errored; job.progress.currentEmail = `${parsed.sender} — ${parsed.amount} $`; // Emit progress emitToClient('scan:progress', { jobId, processed, total: newIds.length, errored, latest: { sender: parsed.sender, amount: parsed.amount, branch, }, currentProvider: aiPool.getCurrentProvider(), }); return transaction; } catch (error: any) { errored++; job.progress.emailsErrored = errored; const errMsg = `Email ${emailId}: ${error.message}`; errorDetails.push(errMsg); console.error(`[Scan ${jobId}] ${errMsg}`); return null; } }) ); // Wait for all pipelines to complete await Promise.allSettled(pipelines); // Flush remaining buffer if (saveBuffer.length > 0) { database.batchInsertTransactions( saveBuffer.map((t) => ({ id: t.id, email_id: t.email_id, user_id: t.user_id, date: t.date, sender: t.sender, amount: t.amount, currency: t.currency, reference: t.reference, message: t.message, recipient_email: t.recipient_email, branch: t.branch, status: t.status, raw_email: t.raw_email, })) ); saveBuffer.forEach((t) => emitToClient('transaction:new', { transaction: t })); } // ═══ PHASE 3: COMPLETE ═══ job.status = 'completed'; job.completedAt = new Date().toISOString(); const summary: ScanSummary = { found: messageIds.length, parsed: processed, skipped: existingIds.size, errors: errored, }; emitToClient('scan:completed', { jobId, summary, dateRange }); // Update scan log database.updateScanLog({ id: scanLogId, finished_at: new Date().toISOString(), emails_found: messageIds.length, emails_parsed: processed, emails_skipped: existingIds.size, errors: errored, error_details: errorDetails.length > 0 ? JSON.stringify(errorDetails.slice(0, 50)) : null, }); console.log( `[Scan ${jobId}] Completed: ${processed} parsed, ${errored} errors, ${existingIds.size} skipped` ); return summary; } catch (error: any) { job.status = 'failed'; job.completedAt = new Date().toISOString(); emitToClient('scan:error', { jobId, error: error.message, timestamp: new Date().toISOString(), }); database.updateScanLog({ id: scanLogId, finished_at: new Date().toISOString(), emails_found: job.progress.emailsFound, emails_parsed: job.progress.emailsProcessed, emails_skipped: job.progress.emailsSkipped, errors: job.progress.emailsErrored + 1, error_details: JSON.stringify([error.message]), }); throw error; } finally { // Clean up job after 5 minutes setTimeout(() => activeJobs.delete(jobId), 5 * 60 * 1000); } } export const scanService = { executeScan, getJob, setEmitter, }; export default scanService;