Spaces:
Running
Running
| // packages/server/src/services/scanService.ts | |
| // | |
| // Main scan orchestration service | |
| // Connects Gmail β AI Parsing β Branch Routing β Database | |
| // | |
| // Install: npm install p-limit uuid | |
| import pLimit from 'p-limit'; | |
| import { v4 as uuidv4 } from 'uuid'; | |
| import type { ScanDateRange, ScanRequest, ScanJob, ScanSummary } from '../../shared/types/scan'; | |
| import type { InteracTransaction } from '../../shared/types/transaction'; | |
| import { resolveBranch } from '../../shared/constants/branches'; | |
| import { gmailService, buildGmailQuery, fetchAllMessageIds, fetchEmailBody } from './gmailService'; | |
| import { AIProviderPool } from './aiService'; | |
| import database from '../db/database'; | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| // CONCURRENCY LIMITS | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| const GMAIL_CONCURRENCY = 10; // Fetch 10 emails simultaneously | |
| const AI_CONCURRENCY = 5; // 5 concurrent AI parse requests | |
| const DB_BATCH_SIZE = 25; // Insert 25 rows per batch | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| // ACTIVE JOBS TRACKING | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| const activeJobs = new Map<string, ScanJob>(); | |
| export function getJob(jobId: string): ScanJob | undefined { | |
| return activeJobs.get(jobId); | |
| } | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| // WEBSOCKET EVENT EMITTER (set externally) | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| type EmitFn = (event: string, data: any) => void; | |
| let emitToClient: EmitFn = () => {}; // no-op until WebSocket is connected | |
| export function setEmitter(fn: EmitFn) { | |
| emitToClient = fn; | |
| } | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| // MAIN SCAN EXECUTION | |
| // βββββββββββββββββββββββββββββββββββββββββββ | |
| export async function executeScan( | |
| scanRequest: ScanRequest, | |
| userId: string | |
| ): Promise<ScanSummary> { | |
| const jobId = uuidv4(); | |
| const { dateRange, forceRescan } = scanRequest; | |
| // Initialize job tracking | |
| const job: ScanJob = { | |
| jobId, | |
| status: 'scanning', | |
| dateRange, | |
| progress: { | |
| emailsFound: 0, | |
| emailsProcessed: 0, | |
| emailsSkipped: 0, | |
| emailsErrored: 0, | |
| }, | |
| startedAt: new Date().toISOString(), | |
| }; | |
| activeJobs.set(jobId, job); | |
| // Initialize AI provider pool | |
| const aiPool = new AIProviderPool((from, to, reason) => { | |
| emitToClient('ai:switch', { jobId, from, to, reason, timestamp: new Date().toISOString() }); | |
| }); | |
| // Log the scan | |
| const scanLogId = database.insertScanLog({ | |
| user_id: userId, | |
| scan_preset: dateRange.preset, | |
| scan_start_date: dateRange.startDate, | |
| scan_end_date: dateRange.endDate, | |
| force_rescan: forceRescan ? 1 : 0, | |
| started_at: new Date().toISOString(), | |
| ai_provider: aiPool.getCurrentProvider(), | |
| ai_model: null, | |
| }); | |
| try { | |
| // βββ PHASE 1: DISCOVER EMAILS βββ | |
| console.log(`[Scan ${jobId}] Starting scan for ${dateRange.preset} range`); | |
| const gmail = gmailService.getGmailClient(userId); | |
| const query = buildGmailQuery(dateRange); | |
| const messageIds = await fetchAllMessageIds(gmail, query); | |
| job.progress.emailsFound = messageIds.length; | |
| // Deduplication: check which emails are already in DB | |
| const existingIds = database.getExistingEmailIds(messageIds); | |
| const newIds = forceRescan | |
| ? messageIds | |
| : messageIds.filter((id) => !existingIds.has(id)); | |
| job.progress.emailsSkipped = messageIds.length - newIds.length; | |
| console.log( | |
| `[Scan ${jobId}] Found ${messageIds.length} emails, ${newIds.length} new, ${existingIds.size} already in DB` | |
| ); | |
| // Emit scan:started | |
| emitToClient('scan:started', { | |
| jobId, | |
| totalEmails: messageIds.length, | |
| newEmails: newIds.length, | |
| skipped: existingIds.size, | |
| dateRange, | |
| timestamp: new Date().toISOString(), | |
| }); | |
| if (newIds.length === 0) { | |
| // Nothing new to process | |
| job.status = 'completed'; | |
| job.completedAt = new Date().toISOString(); | |
| const summary: ScanSummary = { | |
| found: messageIds.length, | |
| parsed: 0, | |
| skipped: existingIds.size, | |
| errors: 0, | |
| }; | |
| emitToClient('scan:completed', { jobId, summary, dateRange }); | |
| database.updateScanLog({ | |
| id: scanLogId, | |
| finished_at: new Date().toISOString(), | |
| emails_found: messageIds.length, | |
| emails_parsed: 0, | |
| emails_skipped: existingIds.size, | |
| errors: 0, | |
| error_details: null, | |
| }); | |
| return summary; | |
| } | |
| // βββ PHASE 2: PIPELINE β Fetch + Parse + Save βββ | |
| job.status = 'parsing'; | |
| const gmailLimit = pLimit(GMAIL_CONCURRENCY); | |
| const aiLimit = pLimit(AI_CONCURRENCY); | |
| const saveBuffer: InteracTransaction[] = []; | |
| const errorDetails: string[] = []; | |
| let processed = 0; | |
| let errored = 0; | |
| const pipelines = newIds.map((emailId) => | |
| gmailLimit(async () => { | |
| try { | |
| // Stage 1: Fetch email body from Gmail | |
| const { body, subject, internalDate } = await fetchEmailBody(gmail, emailId); | |
| if (!body || body.trim().length < 20) { | |
| console.warn(`[Scan ${jobId}] Empty body for email ${emailId}, skipping`); | |
| errored++; | |
| return null; | |
| } | |
| // Stage 2: AI parse (with its own concurrency limit) | |
| const parsed = await aiLimit(() => aiPool.parseEmail(body)); | |
| // Stage 3: Route to branch | |
| const branch = parsed.recipient_email | |
| ? resolveBranch(parsed.recipient_email) | |
| : 'Montreal'; | |
| // Build complete transaction | |
| const transaction: InteracTransaction = { | |
| id: uuidv4(), | |
| email_id: emailId, | |
| user_id: userId, | |
| sender: parsed.sender, | |
| amount: parsed.amount, | |
| currency: parsed.currency, | |
| reference: parsed.reference || '', | |
| message: parsed.message, | |
| recipient_email: parsed.recipient_email || '', | |
| date: parsed.date || new Date(parseInt(internalDate)).toISOString(), | |
| status: parsed.status, | |
| branch, | |
| raw_email: body.substring(0, 5000), // Store first 5KB for audit | |
| }; | |
| // Buffer for batch save | |
| saveBuffer.push(transaction); | |
| // Flush buffer when full | |
| if (saveBuffer.length >= DB_BATCH_SIZE) { | |
| const batch = saveBuffer.splice(0, DB_BATCH_SIZE); | |
| database.batchInsertTransactions( | |
| batch.map((t) => ({ | |
| id: t.id, | |
| email_id: t.email_id, | |
| user_id: t.user_id, | |
| date: t.date, | |
| sender: t.sender, | |
| amount: t.amount, | |
| currency: t.currency, | |
| reference: t.reference, | |
| message: t.message, | |
| recipient_email: t.recipient_email, | |
| branch: t.branch, | |
| status: t.status, | |
| raw_email: t.raw_email, | |
| })) | |
| ); | |
| // Emit new transactions to frontend | |
| batch.forEach((t) => emitToClient('transaction:new', { transaction: t })); | |
| } | |
| processed++; | |
| job.progress.emailsProcessed = processed; | |
| job.progress.emailsErrored = errored; | |
| job.progress.currentEmail = `${parsed.sender} β ${parsed.amount} $`; | |
| // Emit progress | |
| emitToClient('scan:progress', { | |
| jobId, | |
| processed, | |
| total: newIds.length, | |
| errored, | |
| latest: { | |
| sender: parsed.sender, | |
| amount: parsed.amount, | |
| branch, | |
| }, | |
| currentProvider: aiPool.getCurrentProvider(), | |
| }); | |
| return transaction; | |
| } catch (error: any) { | |
| errored++; | |
| job.progress.emailsErrored = errored; | |
| const errMsg = `Email ${emailId}: ${error.message}`; | |
| errorDetails.push(errMsg); | |
| console.error(`[Scan ${jobId}] ${errMsg}`); | |
| return null; | |
| } | |
| }) | |
| ); | |
| // Wait for all pipelines to complete | |
| await Promise.allSettled(pipelines); | |
| // Flush remaining buffer | |
| if (saveBuffer.length > 0) { | |
| database.batchInsertTransactions( | |
| saveBuffer.map((t) => ({ | |
| id: t.id, | |
| email_id: t.email_id, | |
| user_id: t.user_id, | |
| date: t.date, | |
| sender: t.sender, | |
| amount: t.amount, | |
| currency: t.currency, | |
| reference: t.reference, | |
| message: t.message, | |
| recipient_email: t.recipient_email, | |
| branch: t.branch, | |
| status: t.status, | |
| raw_email: t.raw_email, | |
| })) | |
| ); | |
| saveBuffer.forEach((t) => emitToClient('transaction:new', { transaction: t })); | |
| } | |
| // βββ PHASE 3: COMPLETE βββ | |
| job.status = 'completed'; | |
| job.completedAt = new Date().toISOString(); | |
| const summary: ScanSummary = { | |
| found: messageIds.length, | |
| parsed: processed, | |
| skipped: existingIds.size, | |
| errors: errored, | |
| }; | |
| emitToClient('scan:completed', { jobId, summary, dateRange }); | |
| // Update scan log | |
| database.updateScanLog({ | |
| id: scanLogId, | |
| finished_at: new Date().toISOString(), | |
| emails_found: messageIds.length, | |
| emails_parsed: processed, | |
| emails_skipped: existingIds.size, | |
| errors: errored, | |
| error_details: errorDetails.length > 0 ? JSON.stringify(errorDetails.slice(0, 50)) : null, | |
| }); | |
| console.log( | |
| `[Scan ${jobId}] Completed: ${processed} parsed, ${errored} errors, ${existingIds.size} skipped` | |
| ); | |
| return summary; | |
| } catch (error: any) { | |
| job.status = 'failed'; | |
| job.completedAt = new Date().toISOString(); | |
| emitToClient('scan:error', { | |
| jobId, | |
| error: error.message, | |
| timestamp: new Date().toISOString(), | |
| }); | |
| database.updateScanLog({ | |
| id: scanLogId, | |
| finished_at: new Date().toISOString(), | |
| emails_found: job.progress.emailsFound, | |
| emails_parsed: job.progress.emailsProcessed, | |
| emails_skipped: job.progress.emailsSkipped, | |
| errors: job.progress.emailsErrored + 1, | |
| error_details: JSON.stringify([error.message]), | |
| }); | |
| throw error; | |
| } finally { | |
| // Clean up job after 5 minutes | |
| setTimeout(() => activeJobs.delete(jobId), 5 * 60 * 1000); | |
| } | |
| } | |
| export const scanService = { | |
| executeScan, | |
| getJob, | |
| setEmitter, | |
| }; | |
| export default scanService; | |