import type { ScanDateRange, ScanRequest } from '@icc/shared'; import { resolveBranch as resolveStaticBranch } from '@icc/shared'; import { db } from '../db/index.js'; import { transactions, scanLogs, branchConfig } from '../db/schema.js'; import { eq, and, inArray, desc } from 'drizzle-orm'; import { buildGmailQuery, fetchAllMessageIds, fetchMessage, type EmailMessage } from './gmailService.js'; import { AIService } from './aiService.js'; import { captureEmailScreenshots } from './screenshotService.js'; import { lookupEnvelopeNumber } from './envelopeService.js'; import { syncEnvelopesFromSheet } from '../db/index.js'; import { io } from '../index.js'; import pino from 'pino'; const DISABLE_SCREENSHOTS = process.env.DISABLE_SCREENSHOTS === 'true'; const logger = pino({ level: 'info', transport: { target: 'pino-pretty', options: { colorize: true } } }); const BATCH_SIZE = 100; const CONCURRENT_FETCH = 10; // Paid tier: can fetch more emails in parallel const CONCURRENT_PARSE = 3; // Paid tier: 3 parallel AI calls is safe const PARSE_DELAY_MS = 100; // Small buffer between parse batches /** Extract a bare email address from a header like "Name " or "email@domain" */ function extractEmailAddress(header: string): string { if (!header) return ''; const match = header.match(/<([^>]+)>/); if (match) return match[1].trim().toLowerCase(); // Might be a plain email address const trimmed = header.trim().toLowerCase(); return trimmed.includes('@') ? trimmed : ''; } /** Resolve branch from DB branchConfig table first, then fall back to static mapping */ let _branchCache: Map | null = null; let _branchCacheTime = 0; const BRANCH_CACHE_TTL = 60_000; // 1 minute async function resolveBranch(email: string): Promise { const now = Date.now(); if (!_branchCache || now - _branchCacheTime > BRANCH_CACHE_TTL) { const rows = await db.select({ email: branchConfig.email, branch: branchConfig.branch }) .from(branchConfig) .where(eq(branchConfig.active, true)); _branchCache = new Map(rows.map(r => [r.email.toLowerCase(), r.branch])); _branchCacheTime = now; } const key = email.toLowerCase(); return _branchCache.get(key) ?? resolveStaticBranch(key); } interface ScanJobState { jobId: string; userId: string; status: 'queued' | 'scanning' | 'parsing' | 'completed' | 'failed'; dateRange: ScanDateRange; progress: { emailsFound: number; emailsProcessed: number; emailsSkipped: number; emailsErrored: number; currentEmail?: string; }; startedAt: string; completedAt?: string; } // In-memory job tracking (replace with Redis/BullMQ for production) const activeJobs = new Map(); export function getJobStatus(jobId: string): ScanJobState | undefined { return activeJobs.get(jobId); } /** Clear all in-memory caches (branch cache + active jobs) */ export function clearScanCaches() { _branchCache = null; _branchCacheTime = 0; activeJobs.clear(); } // User-scoped scan history (newest first) export async function getScanHistory(userId: string) { return db.select().from(scanLogs).where(eq(scanLogs.userId, userId)).orderBy(desc(scanLogs.id)).limit(100); } /** Emit a WebSocket event only to the user who owns this scan */ function emitToUser(userId: string, event: string, data: any) { io.to(`user:${userId}`).emit(event, data); } export async function executeScan(scanRequest: ScanRequest, userId: string): Promise { const jobId = `scan_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; const dateRange = scanRequest.dateRange; const jobState: ScanJobState = { jobId, userId, status: 'queued', dateRange, progress: { emailsFound: 0, emailsProcessed: 0, emailsSkipped: 0, emailsErrored: 0 }, startedAt: new Date().toISOString(), }; activeJobs.set(jobId, jobState); // Create scan log entry const [scanLog] = await db.insert(scanLogs).values({ userId, scanPreset: dateRange.preset, scanStartDate: dateRange.startDate, scanEndDate: dateRange.endDate, forceRescan: scanRequest.forceRescan ?? false, startedAt: new Date().toISOString(), }).returning(); // Run scan asynchronously runScan(jobId, scanRequest, userId, scanLog.id).catch(async (err) => { logger.error({ err: err.message, stack: err.stack, jobId }, 'Scan failed: %s', err.message); jobState.status = 'failed'; jobState.completedAt = new Date().toISOString(); // Save failure to DB so Journal page shows it await db.update(scanLogs).set({ finishedAt: jobState.completedAt, emailsFound: jobState.progress.emailsFound, emailsParsed: jobState.progress.emailsProcessed, emailsSkipped: jobState.progress.emailsSkipped, errors: (jobState.progress.emailsErrored || 0) + 1, errorDetails: `[Fatal] ${err.message}\n${err.stack || ''}`.substring(0, 2000), }).where(eq(scanLogs.id, scanLog.id)).catch(() => {}); emitToUser(userId, 'scan:error', { jobId, message: 'Le scan a échoué. Veuillez réessayer.', dateRange }); }); return jobId; } async function runScan(jobId: string, scanRequest: ScanRequest, userId: string, scanLogId: number) { const jobState = activeJobs.get(jobId)!; const { dateRange, forceRescan } = scanRequest; try { // Phase 0: Refresh envelopes from Google Sheets so new/updated names are picked up await syncEnvelopesFromSheet(); // Phase 1: Email discovery jobState.status = 'scanning'; const query = buildGmailQuery(dateRange); logger.info({ jobId, query, dateRange }, 'Starting email discovery — query: %s', query); const messageIds = await fetchAllMessageIds(userId, query); jobState.progress.emailsFound = messageIds.length; logger.info({ jobId, emailsFound: messageIds.length }, 'Email discovery complete: %d emails found', messageIds.length); // Deduplication — scoped to this user's transactions only // Batch the check to avoid SQLite's max variable limit (999) let newIds: string[]; if (forceRescan) { newIds = messageIds; } else { const existingSet = new Set(); const DEDUP_BATCH = 500; for (let i = 0; i < messageIds.length; i += DEDUP_BATCH) { const chunk = messageIds.slice(i, i + DEDUP_BATCH); const existing = await db.select({ emailId: transactions.emailId }) .from(transactions) .where(and( eq(transactions.userId, userId), inArray(transactions.emailId, chunk) )); for (const e of existing) existingSet.add(e.emailId); } newIds = messageIds.filter(id => !existingSet.has(id)); jobState.progress.emailsSkipped = messageIds.length - newIds.length; } emitToUser(userId, 'scan:started', { jobId, timestamp: new Date().toISOString(), dateRange, totalEmails: messageIds.length, newEmails: newIds.length, skipped: jobState.progress.emailsSkipped, }); if (newIds.length === 0) { jobState.status = 'completed'; jobState.completedAt = new Date().toISOString(); await db.update(scanLogs).set({ finishedAt: jobState.completedAt, emailsFound: messageIds.length, emailsParsed: 0, emailsSkipped: jobState.progress.emailsSkipped, errors: 0, }).where(eq(scanLogs.id, scanLogId)); emitToUser(userId, 'scan:completed', { jobId, summary: { found: messageIds.length, parsed: 0, skipped: jobState.progress.emailsSkipped, errors: 0 }, dateRange, duration: '0s', }); return; } // Phase 2: Fetch emails in batches with concurrency jobState.status = 'parsing'; const saveBuffer: any[] = []; const errorMessages: string[] = []; for (let i = 0; i < newIds.length; i += BATCH_SIZE) { const batch = newIds.slice(i, i + BATCH_SIZE); // Fetch emails with concurrency limit const fetchedEmails: EmailMessage[] = []; for (let j = 0; j < batch.length; j += CONCURRENT_FETCH) { const chunk = batch.slice(j, j + CONCURRENT_FETCH); const results = await Promise.allSettled( chunk.map(id => fetchMessage(userId, id)) ); for (const result of results) { if (result.status === 'fulfilled') { fetchedEmails.push(result.value); } else { jobState.progress.emailsErrored++; const errMsg = result.reason?.message || 'Fetch error'; errorMessages.push(`[Fetch] ${errMsg}`); logger.error({ err: result.reason }, 'Failed to fetch email'); emitToUser(userId, 'scan:progress', { jobId, processed: jobState.progress.emailsProcessed, total: newIds.length, skipped: jobState.progress.emailsSkipped, errored: jobState.progress.emailsErrored, }); } } } // Post-fetch date filter: Gmail only filters by day, so we enforce exact range here const rangeStart = new Date(dateRange.startDate).getTime(); const rangeEnd = new Date(dateRange.endDate).getTime(); const emails: EmailMessage[] = []; for (const email of fetchedEmails) { const emailTime = new Date(email.date).getTime(); if (emailTime >= rangeStart && emailTime <= rangeEnd) { emails.push(email); } else { jobState.progress.emailsSkipped++; logger.info({ emailId: email.emailId, emailDate: email.date, rangeStart: dateRange.startDate, rangeEnd: dateRange.endDate }, 'Skipping email outside date range'); emitToUser(userId, 'scan:progress', { jobId, processed: jobState.progress.emailsProcessed, total: newIds.length, skipped: jobState.progress.emailsSkipped, errored: jobState.progress.emailsErrored, }); } } // Phase 3: AI parsing — sequential with delay to respect free-tier RPM limits for (let j = 0; j < emails.length; j += CONCURRENT_PARSE) { const parseChunk = emails.slice(j, j + CONCURRENT_PARSE); const parseResults = await Promise.allSettled( parseChunk.map(async (email) => { const parsed = await AIService.parseEmail(email.body); return { email, parsed }; }) ); // Small delay between batches to avoid rate limiting if (j + CONCURRENT_PARSE < emails.length) { await new Promise(resolve => setTimeout(resolve, PARSE_DELAY_MS)); } for (const result of parseResults) { if (result.status === 'fulfilled') { const { email, parsed } = result.value; // Skip non-Interac emails (AI couldn't extract meaningful data) if (parsed.amount <= 0 || !parsed.sender || parsed.sender === 'Inconnu') { jobState.progress.emailsSkipped++; jobState.progress.emailsProcessed++; logger.info({ emailId: email.emailId }, 'Skipping non-Interac email (no amount/sender)'); emitToUser(userId, 'scan:progress', { jobId, processed: jobState.progress.emailsProcessed, total: newIds.length, skipped: jobState.progress.emailsSkipped, errored: jobState.progress.emailsErrored, }); continue; } // Phase 4: Branch routing // Use AI-extracted recipient_email first, fallback to Gmail To header const recipientEmail = parsed.recipient_email || extractEmailAddress(email.to); const branch = recipientEmail ? await resolveBranch(recipientEmail) : 'Montreal'; // Screenshots are deferred to background after scan completes (see Phase 6b) // Look up envelope number from the contact database const envelopeNumber = lookupEnvelopeNumber(parsed.sender || '') || null; const txId = crypto.randomUUID(); saveBuffer.push({ id: txId, emailId: email.emailId, userId, date: new Date(email.date).toISOString(), sender: parsed.sender || 'Inconnu', amount: parsed.amount || 0, currency: parsed.currency || 'CAD', reference: parsed.reference, message: parsed.message, recipientEmail: recipientEmail || parsed.recipient_email, branch, envelopeNumber, status: parsed.status || 'pending', rawEmail: email.body.substring(0, 5000), rawEmailHtml: email.bodyHtml, screenshotOriginal: '', screenshotPreview: '', parsedAt: new Date().toISOString(), reviewed: false, }); jobState.progress.emailsProcessed++; jobState.progress.currentEmail = parsed.sender || email.subject; emitToUser(userId, 'scan:progress', { jobId, processed: jobState.progress.emailsProcessed, total: newIds.length, skipped: jobState.progress.emailsSkipped, errored: jobState.progress.emailsErrored, latest: { ...parsed, branch }, }); } else { jobState.progress.emailsErrored++; const errMsg = result.reason?.message || 'AI parse error'; errorMessages.push(`[AI Parse] ${errMsg}`); logger.error({ err: result.reason }, 'Failed to parse email'); emitToUser(userId, 'scan:progress', { jobId, processed: jobState.progress.emailsProcessed, total: newIds.length, skipped: jobState.progress.emailsSkipped, errored: jobState.progress.emailsErrored, }); } } } // Phase 5: Batch save (flush frequently for faster UI updates) if (saveBuffer.length >= 10) { const toSave = saveBuffer.splice(0, saveBuffer.length); await db.insert(transactions).values(toSave).onConflictDoNothing(); for (const tx of toSave) { emitToUser(userId, 'transaction:new', { transaction: tx }); } } } // Flush remaining buffer if (saveBuffer.length > 0) { await db.insert(transactions).values(saveBuffer).onConflictDoNothing(); for (const tx of saveBuffer) { emitToUser(userId, 'transaction:new', { transaction: tx }); } } // Phase 6: Completion jobState.status = 'completed'; jobState.completedAt = new Date().toISOString(); const durationMs = Date.now() - new Date(jobState.startedAt).getTime(); const durationStr = durationMs > 60000 ? `${Math.round(durationMs / 60000)}m` : `${Math.round(durationMs / 1000)}s`; const errorDetailsStr = errorMessages.length > 0 ? errorMessages.slice(0, 50).join('\n') : null; await db.update(scanLogs).set({ finishedAt: jobState.completedAt, emailsFound: jobState.progress.emailsFound, emailsParsed: jobState.progress.emailsProcessed, emailsSkipped: jobState.progress.emailsSkipped, errors: jobState.progress.emailsErrored, errorDetails: errorDetailsStr, }).where(eq(scanLogs.id, scanLogId)); emitToUser(userId, 'scan:completed', { jobId, summary: { found: jobState.progress.emailsFound, parsed: jobState.progress.emailsProcessed, skipped: jobState.progress.emailsSkipped, errors: jobState.progress.emailsErrored, }, dateRange, duration: durationStr, }); logger.info({ jobId, duration: durationStr, parsed: jobState.progress.emailsProcessed }, 'Scan completed'); // Phase 6b: Background screenshot capture (non-blocking, after scan is done) if (!DISABLE_SCREENSHOTS) { captureScreenshotsInBackground(userId).catch((err) => { logger.warn({ err: err.message }, 'Background screenshot capture failed'); }); } } catch (err) { jobState.status = 'failed'; throw err; } } /** Capture screenshots for transactions that don't have them yet (runs in background) */ async function captureScreenshotsInBackground(userId: string) { const pending = await db.select({ id: transactions.id, emailId: transactions.emailId, rawEmailHtml: transactions.rawEmailHtml, rawEmail: transactions.rawEmail, }) .from(transactions) .where(and( eq(transactions.userId, userId), eq(transactions.screenshotOriginal, ''), )) .limit(50); if (pending.length === 0) return; logger.info({ count: pending.length }, 'Capturing screenshots in background...'); for (const tx of pending) { try { const screenshots = await captureEmailScreenshots( tx.rawEmailHtml || '', tx.rawEmail || '', tx.emailId ); await db.update(transactions).set({ screenshotOriginal: screenshots.originalPath, screenshotPreview: screenshots.previewPath, }).where(eq(transactions.id, tx.id)); } catch (err: any) { logger.warn({ emailId: tx.emailId, err: err.message }, 'Background screenshot failed'); } } logger.info({ count: pending.length }, 'Background screenshots done'); }