Spaces:
Sleeping
Sleeping
| import type { ScanDateRange, ScanRequest } from '@icc/shared'; | |
| import { resolveBranch as resolveStaticBranch } from '@icc/shared'; | |
| import { db } from '../db/index.js'; | |
| import { transactions, scanLogs, branchConfig } from '../db/schema.js'; | |
| import { eq, and, inArray, desc } from 'drizzle-orm'; | |
| import { buildGmailQuery, fetchAllMessageIds, fetchMessage, type EmailMessage } from './gmailService.js'; | |
| import { AIService } from './aiService.js'; | |
| import { captureEmailScreenshots } from './screenshotService.js'; | |
| import { lookupEnvelopeNumber } from './envelopeService.js'; | |
| import { syncEnvelopesFromSheet } from '../db/index.js'; | |
| import { io } from '../index.js'; | |
| import pino from 'pino'; | |
| const DISABLE_SCREENSHOTS = process.env.DISABLE_SCREENSHOTS === 'true'; | |
| const logger = pino({ level: 'info', transport: { target: 'pino-pretty', options: { colorize: true } } }); | |
| const BATCH_SIZE = 100; | |
| const CONCURRENT_FETCH = 10; // Paid tier: can fetch more emails in parallel | |
| const CONCURRENT_PARSE = 3; // Paid tier: 3 parallel AI calls is safe | |
| const PARSE_DELAY_MS = 100; // Small buffer between parse batches | |
| /** Extract a bare email address from a header like "Name <email@domain>" or "email@domain" */ | |
| function extractEmailAddress(header: string): string { | |
| if (!header) return ''; | |
| const match = header.match(/<([^>]+)>/); | |
| if (match) return match[1].trim().toLowerCase(); | |
| // Might be a plain email address | |
| const trimmed = header.trim().toLowerCase(); | |
| return trimmed.includes('@') ? trimmed : ''; | |
| } | |
| /** Resolve branch from DB branchConfig table first, then fall back to static mapping */ | |
| let _branchCache: Map<string, string> | null = null; | |
| let _branchCacheTime = 0; | |
| const BRANCH_CACHE_TTL = 60_000; // 1 minute | |
| async function resolveBranch(email: string): Promise<string> { | |
| const now = Date.now(); | |
| if (!_branchCache || now - _branchCacheTime > BRANCH_CACHE_TTL) { | |
| const rows = await db.select({ email: branchConfig.email, branch: branchConfig.branch }) | |
| .from(branchConfig) | |
| .where(eq(branchConfig.active, true)); | |
| _branchCache = new Map(rows.map(r => [r.email.toLowerCase(), r.branch])); | |
| _branchCacheTime = now; | |
| } | |
| const key = email.toLowerCase(); | |
| return _branchCache.get(key) ?? resolveStaticBranch(key); | |
| } | |
| interface ScanJobState { | |
| jobId: string; | |
| userId: string; | |
| status: 'queued' | 'scanning' | 'parsing' | 'completed' | 'failed'; | |
| dateRange: ScanDateRange; | |
| progress: { | |
| emailsFound: number; | |
| emailsProcessed: number; | |
| emailsSkipped: number; | |
| emailsErrored: number; | |
| currentEmail?: string; | |
| }; | |
| startedAt: string; | |
| completedAt?: string; | |
| } | |
| // In-memory job tracking (replace with Redis/BullMQ for production) | |
| const activeJobs = new Map<string, ScanJobState>(); | |
| export function getJobStatus(jobId: string): ScanJobState | undefined { | |
| return activeJobs.get(jobId); | |
| } | |
| /** Clear all in-memory caches (branch cache + active jobs) */ | |
| export function clearScanCaches() { | |
| _branchCache = null; | |
| _branchCacheTime = 0; | |
| activeJobs.clear(); | |
| } | |
| // User-scoped scan history (newest first) | |
| export async function getScanHistory(userId: string) { | |
| return db.select().from(scanLogs).where(eq(scanLogs.userId, userId)).orderBy(desc(scanLogs.id)).limit(100); | |
| } | |
| /** Emit a WebSocket event only to the user who owns this scan */ | |
| function emitToUser(userId: string, event: string, data: any) { | |
| io.to(`user:${userId}`).emit(event, data); | |
| } | |
| export async function executeScan(scanRequest: ScanRequest, userId: string): Promise<string> { | |
| const jobId = `scan_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; | |
| const dateRange = scanRequest.dateRange; | |
| const jobState: ScanJobState = { | |
| jobId, | |
| userId, | |
| status: 'queued', | |
| dateRange, | |
| progress: { emailsFound: 0, emailsProcessed: 0, emailsSkipped: 0, emailsErrored: 0 }, | |
| startedAt: new Date().toISOString(), | |
| }; | |
| activeJobs.set(jobId, jobState); | |
| // Create scan log entry | |
| const [scanLog] = await db.insert(scanLogs).values({ | |
| userId, | |
| scanPreset: dateRange.preset, | |
| scanStartDate: dateRange.startDate, | |
| scanEndDate: dateRange.endDate, | |
| forceRescan: scanRequest.forceRescan ?? false, | |
| startedAt: new Date().toISOString(), | |
| }).returning(); | |
| // Run scan asynchronously | |
| runScan(jobId, scanRequest, userId, scanLog.id).catch(async (err) => { | |
| logger.error({ err: err.message, stack: err.stack, jobId }, 'Scan failed: %s', err.message); | |
| jobState.status = 'failed'; | |
| jobState.completedAt = new Date().toISOString(); | |
| // Save failure to DB so Journal page shows it | |
| await db.update(scanLogs).set({ | |
| finishedAt: jobState.completedAt, | |
| emailsFound: jobState.progress.emailsFound, | |
| emailsParsed: jobState.progress.emailsProcessed, | |
| emailsSkipped: jobState.progress.emailsSkipped, | |
| errors: (jobState.progress.emailsErrored || 0) + 1, | |
| errorDetails: `[Fatal] ${err.message}\n${err.stack || ''}`.substring(0, 2000), | |
| }).where(eq(scanLogs.id, scanLog.id)).catch(() => {}); | |
| emitToUser(userId, 'scan:error', { jobId, message: 'Le scan a échoué. Veuillez réessayer.', dateRange }); | |
| }); | |
| return jobId; | |
| } | |
| async function runScan(jobId: string, scanRequest: ScanRequest, userId: string, scanLogId: number) { | |
| const jobState = activeJobs.get(jobId)!; | |
| const { dateRange, forceRescan } = scanRequest; | |
| try { | |
| // Phase 0: Refresh envelopes from Google Sheets so new/updated names are picked up | |
| await syncEnvelopesFromSheet(); | |
| // Phase 1: Email discovery | |
| jobState.status = 'scanning'; | |
| const query = buildGmailQuery(dateRange); | |
| logger.info({ jobId, query, dateRange }, 'Starting email discovery — query: %s', query); | |
| const messageIds = await fetchAllMessageIds(userId, query); | |
| jobState.progress.emailsFound = messageIds.length; | |
| logger.info({ jobId, emailsFound: messageIds.length }, 'Email discovery complete: %d emails found', messageIds.length); | |
| // Deduplication — scoped to this user's transactions only | |
| // Batch the check to avoid SQLite's max variable limit (999) | |
| let newIds: string[]; | |
| if (forceRescan) { | |
| newIds = messageIds; | |
| } else { | |
| const existingSet = new Set<string>(); | |
| const DEDUP_BATCH = 500; | |
| for (let i = 0; i < messageIds.length; i += DEDUP_BATCH) { | |
| const chunk = messageIds.slice(i, i + DEDUP_BATCH); | |
| const existing = await db.select({ emailId: transactions.emailId }) | |
| .from(transactions) | |
| .where(and( | |
| eq(transactions.userId, userId), | |
| inArray(transactions.emailId, chunk) | |
| )); | |
| for (const e of existing) existingSet.add(e.emailId); | |
| } | |
| newIds = messageIds.filter(id => !existingSet.has(id)); | |
| jobState.progress.emailsSkipped = messageIds.length - newIds.length; | |
| } | |
| emitToUser(userId, 'scan:started', { | |
| jobId, | |
| timestamp: new Date().toISOString(), | |
| dateRange, | |
| totalEmails: messageIds.length, | |
| newEmails: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| }); | |
| if (newIds.length === 0) { | |
| jobState.status = 'completed'; | |
| jobState.completedAt = new Date().toISOString(); | |
| await db.update(scanLogs).set({ | |
| finishedAt: jobState.completedAt, | |
| emailsFound: messageIds.length, | |
| emailsParsed: 0, | |
| emailsSkipped: jobState.progress.emailsSkipped, | |
| errors: 0, | |
| }).where(eq(scanLogs.id, scanLogId)); | |
| emitToUser(userId, 'scan:completed', { | |
| jobId, | |
| summary: { found: messageIds.length, parsed: 0, skipped: jobState.progress.emailsSkipped, errors: 0 }, | |
| dateRange, | |
| duration: '0s', | |
| }); | |
| return; | |
| } | |
| // Phase 2: Fetch emails in batches with concurrency | |
| jobState.status = 'parsing'; | |
| const saveBuffer: any[] = []; | |
| const errorMessages: string[] = []; | |
| for (let i = 0; i < newIds.length; i += BATCH_SIZE) { | |
| const batch = newIds.slice(i, i + BATCH_SIZE); | |
| // Fetch emails with concurrency limit | |
| const fetchedEmails: EmailMessage[] = []; | |
| for (let j = 0; j < batch.length; j += CONCURRENT_FETCH) { | |
| const chunk = batch.slice(j, j + CONCURRENT_FETCH); | |
| const results = await Promise.allSettled( | |
| chunk.map(id => fetchMessage(userId, id)) | |
| ); | |
| for (const result of results) { | |
| if (result.status === 'fulfilled') { | |
| fetchedEmails.push(result.value); | |
| } else { | |
| jobState.progress.emailsErrored++; | |
| const errMsg = result.reason?.message || 'Fetch error'; | |
| errorMessages.push(`[Fetch] ${errMsg}`); | |
| logger.error({ err: result.reason }, 'Failed to fetch email'); | |
| emitToUser(userId, 'scan:progress', { | |
| jobId, | |
| processed: jobState.progress.emailsProcessed, | |
| total: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| errored: jobState.progress.emailsErrored, | |
| }); | |
| } | |
| } | |
| } | |
| // Post-fetch date filter: Gmail only filters by day, so we enforce exact range here | |
| const rangeStart = new Date(dateRange.startDate).getTime(); | |
| const rangeEnd = new Date(dateRange.endDate).getTime(); | |
| const emails: EmailMessage[] = []; | |
| for (const email of fetchedEmails) { | |
| const emailTime = new Date(email.date).getTime(); | |
| if (emailTime >= rangeStart && emailTime <= rangeEnd) { | |
| emails.push(email); | |
| } else { | |
| jobState.progress.emailsSkipped++; | |
| logger.info({ emailId: email.emailId, emailDate: email.date, rangeStart: dateRange.startDate, rangeEnd: dateRange.endDate }, 'Skipping email outside date range'); | |
| emitToUser(userId, 'scan:progress', { | |
| jobId, | |
| processed: jobState.progress.emailsProcessed, | |
| total: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| errored: jobState.progress.emailsErrored, | |
| }); | |
| } | |
| } | |
| // Phase 3: AI parsing — sequential with delay to respect free-tier RPM limits | |
| for (let j = 0; j < emails.length; j += CONCURRENT_PARSE) { | |
| const parseChunk = emails.slice(j, j + CONCURRENT_PARSE); | |
| const parseResults = await Promise.allSettled( | |
| parseChunk.map(async (email) => { | |
| const parsed = await AIService.parseEmail(email.body); | |
| return { email, parsed }; | |
| }) | |
| ); | |
| // Small delay between batches to avoid rate limiting | |
| if (j + CONCURRENT_PARSE < emails.length) { | |
| await new Promise(resolve => setTimeout(resolve, PARSE_DELAY_MS)); | |
| } | |
| for (const result of parseResults) { | |
| if (result.status === 'fulfilled') { | |
| const { email, parsed } = result.value; | |
| // Skip non-Interac emails (AI couldn't extract meaningful data) | |
| if (parsed.amount <= 0 || !parsed.sender || parsed.sender === 'Inconnu') { | |
| jobState.progress.emailsSkipped++; | |
| jobState.progress.emailsProcessed++; | |
| logger.info({ emailId: email.emailId }, 'Skipping non-Interac email (no amount/sender)'); | |
| emitToUser(userId, 'scan:progress', { | |
| jobId, | |
| processed: jobState.progress.emailsProcessed, | |
| total: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| errored: jobState.progress.emailsErrored, | |
| }); | |
| continue; | |
| } | |
| // Phase 4: Branch routing | |
| // Use AI-extracted recipient_email first, fallback to Gmail To header | |
| const recipientEmail = parsed.recipient_email || extractEmailAddress(email.to); | |
| const branch = recipientEmail | |
| ? await resolveBranch(recipientEmail) | |
| : 'Montreal'; | |
| // Screenshots are deferred to background after scan completes (see Phase 6b) | |
| // Look up envelope number from the contact database | |
| const envelopeNumber = lookupEnvelopeNumber(parsed.sender || '') || null; | |
| const txId = crypto.randomUUID(); | |
| saveBuffer.push({ | |
| id: txId, | |
| emailId: email.emailId, | |
| userId, | |
| date: new Date(email.date).toISOString(), | |
| sender: parsed.sender || 'Inconnu', | |
| amount: parsed.amount || 0, | |
| currency: parsed.currency || 'CAD', | |
| reference: parsed.reference, | |
| message: parsed.message, | |
| recipientEmail: recipientEmail || parsed.recipient_email, | |
| branch, | |
| envelopeNumber, | |
| status: parsed.status || 'pending', | |
| rawEmail: email.body.substring(0, 5000), | |
| rawEmailHtml: email.bodyHtml, | |
| screenshotOriginal: '', | |
| screenshotPreview: '', | |
| parsedAt: new Date().toISOString(), | |
| reviewed: false, | |
| }); | |
| jobState.progress.emailsProcessed++; | |
| jobState.progress.currentEmail = parsed.sender || email.subject; | |
| emitToUser(userId, 'scan:progress', { | |
| jobId, | |
| processed: jobState.progress.emailsProcessed, | |
| total: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| errored: jobState.progress.emailsErrored, | |
| latest: { ...parsed, branch }, | |
| }); | |
| } else { | |
| jobState.progress.emailsErrored++; | |
| const errMsg = result.reason?.message || 'AI parse error'; | |
| errorMessages.push(`[AI Parse] ${errMsg}`); | |
| logger.error({ err: result.reason }, 'Failed to parse email'); | |
| emitToUser(userId, 'scan:progress', { | |
| jobId, | |
| processed: jobState.progress.emailsProcessed, | |
| total: newIds.length, | |
| skipped: jobState.progress.emailsSkipped, | |
| errored: jobState.progress.emailsErrored, | |
| }); | |
| } | |
| } | |
| } | |
| // Phase 5: Batch save (flush frequently for faster UI updates) | |
| if (saveBuffer.length >= 10) { | |
| const toSave = saveBuffer.splice(0, saveBuffer.length); | |
| await db.insert(transactions).values(toSave).onConflictDoNothing(); | |
| for (const tx of toSave) { | |
| emitToUser(userId, 'transaction:new', { transaction: tx }); | |
| } | |
| } | |
| } | |
| // Flush remaining buffer | |
| if (saveBuffer.length > 0) { | |
| await db.insert(transactions).values(saveBuffer).onConflictDoNothing(); | |
| for (const tx of saveBuffer) { | |
| emitToUser(userId, 'transaction:new', { transaction: tx }); | |
| } | |
| } | |
| // Phase 6: Completion | |
| jobState.status = 'completed'; | |
| jobState.completedAt = new Date().toISOString(); | |
| const durationMs = Date.now() - new Date(jobState.startedAt).getTime(); | |
| const durationStr = durationMs > 60000 | |
| ? `${Math.round(durationMs / 60000)}m` | |
| : `${Math.round(durationMs / 1000)}s`; | |
| const errorDetailsStr = errorMessages.length > 0 | |
| ? errorMessages.slice(0, 50).join('\n') | |
| : null; | |
| await db.update(scanLogs).set({ | |
| finishedAt: jobState.completedAt, | |
| emailsFound: jobState.progress.emailsFound, | |
| emailsParsed: jobState.progress.emailsProcessed, | |
| emailsSkipped: jobState.progress.emailsSkipped, | |
| errors: jobState.progress.emailsErrored, | |
| errorDetails: errorDetailsStr, | |
| }).where(eq(scanLogs.id, scanLogId)); | |
| emitToUser(userId, 'scan:completed', { | |
| jobId, | |
| summary: { | |
| found: jobState.progress.emailsFound, | |
| parsed: jobState.progress.emailsProcessed, | |
| skipped: jobState.progress.emailsSkipped, | |
| errors: jobState.progress.emailsErrored, | |
| }, | |
| dateRange, | |
| duration: durationStr, | |
| }); | |
| logger.info({ jobId, duration: durationStr, parsed: jobState.progress.emailsProcessed }, 'Scan completed'); | |
| // Phase 6b: Background screenshot capture (non-blocking, after scan is done) | |
| if (!DISABLE_SCREENSHOTS) { | |
| captureScreenshotsInBackground(userId).catch((err) => { | |
| logger.warn({ err: err.message }, 'Background screenshot capture failed'); | |
| }); | |
| } | |
| } catch (err) { | |
| jobState.status = 'failed'; | |
| throw err; | |
| } | |
| } | |
| /** Capture screenshots for transactions that don't have them yet (runs in background) */ | |
| async function captureScreenshotsInBackground(userId: string) { | |
| const pending = await db.select({ | |
| id: transactions.id, | |
| emailId: transactions.emailId, | |
| rawEmailHtml: transactions.rawEmailHtml, | |
| rawEmail: transactions.rawEmail, | |
| }) | |
| .from(transactions) | |
| .where(and( | |
| eq(transactions.userId, userId), | |
| eq(transactions.screenshotOriginal, ''), | |
| )) | |
| .limit(50); | |
| if (pending.length === 0) return; | |
| logger.info({ count: pending.length }, 'Capturing screenshots in background...'); | |
| for (const tx of pending) { | |
| try { | |
| const screenshots = await captureEmailScreenshots( | |
| tx.rawEmailHtml || '', | |
| tx.rawEmail || '', | |
| tx.emailId | |
| ); | |
| await db.update(transactions).set({ | |
| screenshotOriginal: screenshots.originalPath, | |
| screenshotPreview: screenshots.previewPath, | |
| }).where(eq(transactions.id, tx.id)); | |
| } catch (err: any) { | |
| logger.warn({ emailId: tx.emailId, err: err.message }, 'Background screenshot failed'); | |
| } | |
| } | |
| logger.info({ count: pending.length }, 'Background screenshots done'); | |
| } | |