interacmanagernew / files /scanService.ts
MichaelEdou
Set default branch to Montreal for unclassified Interac transactions
fe203ef
// packages/server/src/services/scanService.ts
//
// Main scan orchestration service
// Connects Gmail β†’ AI Parsing β†’ Branch Routing β†’ Database
//
// Install: npm install p-limit uuid
import pLimit from 'p-limit';
import { v4 as uuidv4 } from 'uuid';
import type { ScanDateRange, ScanRequest, ScanJob, ScanSummary } from '../../shared/types/scan';
import type { InteracTransaction } from '../../shared/types/transaction';
import { resolveBranch } from '../../shared/constants/branches';
import { gmailService, buildGmailQuery, fetchAllMessageIds, fetchEmailBody } from './gmailService';
import { AIProviderPool } from './aiService';
import database from '../db/database';
// ═══════════════════════════════════════════
// CONCURRENCY LIMITS
// ═══════════════════════════════════════════
const GMAIL_CONCURRENCY = 10; // Fetch 10 emails simultaneously
const AI_CONCURRENCY = 5; // 5 concurrent AI parse requests
const DB_BATCH_SIZE = 25; // Insert 25 rows per batch
// ═══════════════════════════════════════════
// ACTIVE JOBS TRACKING
// ═══════════════════════════════════════════
const activeJobs = new Map<string, ScanJob>();
export function getJob(jobId: string): ScanJob | undefined {
return activeJobs.get(jobId);
}
// ═══════════════════════════════════════════
// WEBSOCKET EVENT EMITTER (set externally)
// ═══════════════════════════════════════════
type EmitFn = (event: string, data: any) => void;
let emitToClient: EmitFn = () => {}; // no-op until WebSocket is connected
export function setEmitter(fn: EmitFn) {
emitToClient = fn;
}
// ═══════════════════════════════════════════
// MAIN SCAN EXECUTION
// ═══════════════════════════════════════════
export async function executeScan(
scanRequest: ScanRequest,
userId: string
): Promise<ScanSummary> {
const jobId = uuidv4();
const { dateRange, forceRescan } = scanRequest;
// Initialize job tracking
const job: ScanJob = {
jobId,
status: 'scanning',
dateRange,
progress: {
emailsFound: 0,
emailsProcessed: 0,
emailsSkipped: 0,
emailsErrored: 0,
},
startedAt: new Date().toISOString(),
};
activeJobs.set(jobId, job);
// Initialize AI provider pool
const aiPool = new AIProviderPool((from, to, reason) => {
emitToClient('ai:switch', { jobId, from, to, reason, timestamp: new Date().toISOString() });
});
// Log the scan
const scanLogId = database.insertScanLog({
user_id: userId,
scan_preset: dateRange.preset,
scan_start_date: dateRange.startDate,
scan_end_date: dateRange.endDate,
force_rescan: forceRescan ? 1 : 0,
started_at: new Date().toISOString(),
ai_provider: aiPool.getCurrentProvider(),
ai_model: null,
});
try {
// ═══ PHASE 1: DISCOVER EMAILS ═══
console.log(`[Scan ${jobId}] Starting scan for ${dateRange.preset} range`);
const gmail = gmailService.getGmailClient(userId);
const query = buildGmailQuery(dateRange);
const messageIds = await fetchAllMessageIds(gmail, query);
job.progress.emailsFound = messageIds.length;
// Deduplication: check which emails are already in DB
const existingIds = database.getExistingEmailIds(messageIds);
const newIds = forceRescan
? messageIds
: messageIds.filter((id) => !existingIds.has(id));
job.progress.emailsSkipped = messageIds.length - newIds.length;
console.log(
`[Scan ${jobId}] Found ${messageIds.length} emails, ${newIds.length} new, ${existingIds.size} already in DB`
);
// Emit scan:started
emitToClient('scan:started', {
jobId,
totalEmails: messageIds.length,
newEmails: newIds.length,
skipped: existingIds.size,
dateRange,
timestamp: new Date().toISOString(),
});
if (newIds.length === 0) {
// Nothing new to process
job.status = 'completed';
job.completedAt = new Date().toISOString();
const summary: ScanSummary = {
found: messageIds.length,
parsed: 0,
skipped: existingIds.size,
errors: 0,
};
emitToClient('scan:completed', { jobId, summary, dateRange });
database.updateScanLog({
id: scanLogId,
finished_at: new Date().toISOString(),
emails_found: messageIds.length,
emails_parsed: 0,
emails_skipped: existingIds.size,
errors: 0,
error_details: null,
});
return summary;
}
// ═══ PHASE 2: PIPELINE β€” Fetch + Parse + Save ═══
job.status = 'parsing';
const gmailLimit = pLimit(GMAIL_CONCURRENCY);
const aiLimit = pLimit(AI_CONCURRENCY);
const saveBuffer: InteracTransaction[] = [];
const errorDetails: string[] = [];
let processed = 0;
let errored = 0;
const pipelines = newIds.map((emailId) =>
gmailLimit(async () => {
try {
// Stage 1: Fetch email body from Gmail
const { body, subject, internalDate } = await fetchEmailBody(gmail, emailId);
if (!body || body.trim().length < 20) {
console.warn(`[Scan ${jobId}] Empty body for email ${emailId}, skipping`);
errored++;
return null;
}
// Stage 2: AI parse (with its own concurrency limit)
const parsed = await aiLimit(() => aiPool.parseEmail(body));
// Stage 3: Route to branch
const branch = parsed.recipient_email
? resolveBranch(parsed.recipient_email)
: 'Montreal';
// Build complete transaction
const transaction: InteracTransaction = {
id: uuidv4(),
email_id: emailId,
user_id: userId,
sender: parsed.sender,
amount: parsed.amount,
currency: parsed.currency,
reference: parsed.reference || '',
message: parsed.message,
recipient_email: parsed.recipient_email || '',
date: parsed.date || new Date(parseInt(internalDate)).toISOString(),
status: parsed.status,
branch,
raw_email: body.substring(0, 5000), // Store first 5KB for audit
};
// Buffer for batch save
saveBuffer.push(transaction);
// Flush buffer when full
if (saveBuffer.length >= DB_BATCH_SIZE) {
const batch = saveBuffer.splice(0, DB_BATCH_SIZE);
database.batchInsertTransactions(
batch.map((t) => ({
id: t.id,
email_id: t.email_id,
user_id: t.user_id,
date: t.date,
sender: t.sender,
amount: t.amount,
currency: t.currency,
reference: t.reference,
message: t.message,
recipient_email: t.recipient_email,
branch: t.branch,
status: t.status,
raw_email: t.raw_email,
}))
);
// Emit new transactions to frontend
batch.forEach((t) => emitToClient('transaction:new', { transaction: t }));
}
processed++;
job.progress.emailsProcessed = processed;
job.progress.emailsErrored = errored;
job.progress.currentEmail = `${parsed.sender} β€” ${parsed.amount} $`;
// Emit progress
emitToClient('scan:progress', {
jobId,
processed,
total: newIds.length,
errored,
latest: {
sender: parsed.sender,
amount: parsed.amount,
branch,
},
currentProvider: aiPool.getCurrentProvider(),
});
return transaction;
} catch (error: any) {
errored++;
job.progress.emailsErrored = errored;
const errMsg = `Email ${emailId}: ${error.message}`;
errorDetails.push(errMsg);
console.error(`[Scan ${jobId}] ${errMsg}`);
return null;
}
})
);
// Wait for all pipelines to complete
await Promise.allSettled(pipelines);
// Flush remaining buffer
if (saveBuffer.length > 0) {
database.batchInsertTransactions(
saveBuffer.map((t) => ({
id: t.id,
email_id: t.email_id,
user_id: t.user_id,
date: t.date,
sender: t.sender,
amount: t.amount,
currency: t.currency,
reference: t.reference,
message: t.message,
recipient_email: t.recipient_email,
branch: t.branch,
status: t.status,
raw_email: t.raw_email,
}))
);
saveBuffer.forEach((t) => emitToClient('transaction:new', { transaction: t }));
}
// ═══ PHASE 3: COMPLETE ═══
job.status = 'completed';
job.completedAt = new Date().toISOString();
const summary: ScanSummary = {
found: messageIds.length,
parsed: processed,
skipped: existingIds.size,
errors: errored,
};
emitToClient('scan:completed', { jobId, summary, dateRange });
// Update scan log
database.updateScanLog({
id: scanLogId,
finished_at: new Date().toISOString(),
emails_found: messageIds.length,
emails_parsed: processed,
emails_skipped: existingIds.size,
errors: errored,
error_details: errorDetails.length > 0 ? JSON.stringify(errorDetails.slice(0, 50)) : null,
});
console.log(
`[Scan ${jobId}] Completed: ${processed} parsed, ${errored} errors, ${existingIds.size} skipped`
);
return summary;
} catch (error: any) {
job.status = 'failed';
job.completedAt = new Date().toISOString();
emitToClient('scan:error', {
jobId,
error: error.message,
timestamp: new Date().toISOString(),
});
database.updateScanLog({
id: scanLogId,
finished_at: new Date().toISOString(),
emails_found: job.progress.emailsFound,
emails_parsed: job.progress.emailsProcessed,
emails_skipped: job.progress.emailsSkipped,
errors: job.progress.emailsErrored + 1,
error_details: JSON.stringify([error.message]),
});
throw error;
} finally {
// Clean up job after 5 minutes
setTimeout(() => activeJobs.delete(jobId), 5 * 60 * 1000);
}
}
export const scanService = {
executeScan,
getJob,
setEmitter,
};
export default scanService;