Heaven K
fix: hide internal AI error details from user-facing responses
98c32af
import type { ScanDateRange, ScanRequest } from '@icc/shared';
import { resolveBranch as resolveStaticBranch } from '@icc/shared';
import { db } from '../db/index.js';
import { transactions, scanLogs, branchConfig } from '../db/schema.js';
import { eq, and, inArray, desc } from 'drizzle-orm';
import { buildGmailQuery, fetchAllMessageIds, fetchMessage, type EmailMessage } from './gmailService.js';
import { AIService } from './aiService.js';
import { captureEmailScreenshots } from './screenshotService.js';
import { lookupEnvelopeNumber } from './envelopeService.js';
import { syncEnvelopesFromSheet } from '../db/index.js';
import { io } from '../index.js';
import pino from 'pino';
const DISABLE_SCREENSHOTS = process.env.DISABLE_SCREENSHOTS === 'true';
const logger = pino({ level: 'info', transport: { target: 'pino-pretty', options: { colorize: true } } });
const BATCH_SIZE = 100;
const CONCURRENT_FETCH = 10; // Paid tier: can fetch more emails in parallel
const CONCURRENT_PARSE = 3; // Paid tier: 3 parallel AI calls is safe
const PARSE_DELAY_MS = 100; // Small buffer between parse batches
/** Extract a bare email address from a header like "Name <email@domain>" or "email@domain" */
function extractEmailAddress(header: string): string {
if (!header) return '';
const match = header.match(/<([^>]+)>/);
if (match) return match[1].trim().toLowerCase();
// Might be a plain email address
const trimmed = header.trim().toLowerCase();
return trimmed.includes('@') ? trimmed : '';
}
/** Resolve branch from DB branchConfig table first, then fall back to static mapping */
let _branchCache: Map<string, string> | null = null;
let _branchCacheTime = 0;
const BRANCH_CACHE_TTL = 60_000; // 1 minute
async function resolveBranch(email: string): Promise<string> {
const now = Date.now();
if (!_branchCache || now - _branchCacheTime > BRANCH_CACHE_TTL) {
const rows = await db.select({ email: branchConfig.email, branch: branchConfig.branch })
.from(branchConfig)
.where(eq(branchConfig.active, true));
_branchCache = new Map(rows.map(r => [r.email.toLowerCase(), r.branch]));
_branchCacheTime = now;
}
const key = email.toLowerCase();
return _branchCache.get(key) ?? resolveStaticBranch(key);
}
interface ScanJobState {
jobId: string;
userId: string;
status: 'queued' | 'scanning' | 'parsing' | 'completed' | 'failed';
dateRange: ScanDateRange;
progress: {
emailsFound: number;
emailsProcessed: number;
emailsSkipped: number;
emailsErrored: number;
currentEmail?: string;
};
startedAt: string;
completedAt?: string;
}
// In-memory job tracking (replace with Redis/BullMQ for production)
const activeJobs = new Map<string, ScanJobState>();
export function getJobStatus(jobId: string): ScanJobState | undefined {
return activeJobs.get(jobId);
}
/** Clear all in-memory caches (branch cache + active jobs) */
export function clearScanCaches() {
_branchCache = null;
_branchCacheTime = 0;
activeJobs.clear();
}
// User-scoped scan history (newest first)
export async function getScanHistory(userId: string) {
return db.select().from(scanLogs).where(eq(scanLogs.userId, userId)).orderBy(desc(scanLogs.id)).limit(100);
}
/** Emit a WebSocket event only to the user who owns this scan */
function emitToUser(userId: string, event: string, data: any) {
io.to(`user:${userId}`).emit(event, data);
}
export async function executeScan(scanRequest: ScanRequest, userId: string): Promise<string> {
const jobId = `scan_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
const dateRange = scanRequest.dateRange;
const jobState: ScanJobState = {
jobId,
userId,
status: 'queued',
dateRange,
progress: { emailsFound: 0, emailsProcessed: 0, emailsSkipped: 0, emailsErrored: 0 },
startedAt: new Date().toISOString(),
};
activeJobs.set(jobId, jobState);
// Create scan log entry
const [scanLog] = await db.insert(scanLogs).values({
userId,
scanPreset: dateRange.preset,
scanStartDate: dateRange.startDate,
scanEndDate: dateRange.endDate,
forceRescan: scanRequest.forceRescan ?? false,
startedAt: new Date().toISOString(),
}).returning();
// Run scan asynchronously
runScan(jobId, scanRequest, userId, scanLog.id).catch(async (err) => {
logger.error({ err: err.message, stack: err.stack, jobId }, 'Scan failed: %s', err.message);
jobState.status = 'failed';
jobState.completedAt = new Date().toISOString();
// Save failure to DB so Journal page shows it
await db.update(scanLogs).set({
finishedAt: jobState.completedAt,
emailsFound: jobState.progress.emailsFound,
emailsParsed: jobState.progress.emailsProcessed,
emailsSkipped: jobState.progress.emailsSkipped,
errors: (jobState.progress.emailsErrored || 0) + 1,
errorDetails: `[Fatal] ${err.message}\n${err.stack || ''}`.substring(0, 2000),
}).where(eq(scanLogs.id, scanLog.id)).catch(() => {});
emitToUser(userId, 'scan:error', { jobId, message: 'Le scan a échoué. Veuillez réessayer.', dateRange });
});
return jobId;
}
async function runScan(jobId: string, scanRequest: ScanRequest, userId: string, scanLogId: number) {
const jobState = activeJobs.get(jobId)!;
const { dateRange, forceRescan } = scanRequest;
try {
// Phase 0: Refresh envelopes from Google Sheets so new/updated names are picked up
await syncEnvelopesFromSheet();
// Phase 1: Email discovery
jobState.status = 'scanning';
const query = buildGmailQuery(dateRange);
logger.info({ jobId, query, dateRange }, 'Starting email discovery — query: %s', query);
const messageIds = await fetchAllMessageIds(userId, query);
jobState.progress.emailsFound = messageIds.length;
logger.info({ jobId, emailsFound: messageIds.length }, 'Email discovery complete: %d emails found', messageIds.length);
// Deduplication — scoped to this user's transactions only
// Batch the check to avoid SQLite's max variable limit (999)
let newIds: string[];
if (forceRescan) {
newIds = messageIds;
} else {
const existingSet = new Set<string>();
const DEDUP_BATCH = 500;
for (let i = 0; i < messageIds.length; i += DEDUP_BATCH) {
const chunk = messageIds.slice(i, i + DEDUP_BATCH);
const existing = await db.select({ emailId: transactions.emailId })
.from(transactions)
.where(and(
eq(transactions.userId, userId),
inArray(transactions.emailId, chunk)
));
for (const e of existing) existingSet.add(e.emailId);
}
newIds = messageIds.filter(id => !existingSet.has(id));
jobState.progress.emailsSkipped = messageIds.length - newIds.length;
}
emitToUser(userId, 'scan:started', {
jobId,
timestamp: new Date().toISOString(),
dateRange,
totalEmails: messageIds.length,
newEmails: newIds.length,
skipped: jobState.progress.emailsSkipped,
});
if (newIds.length === 0) {
jobState.status = 'completed';
jobState.completedAt = new Date().toISOString();
await db.update(scanLogs).set({
finishedAt: jobState.completedAt,
emailsFound: messageIds.length,
emailsParsed: 0,
emailsSkipped: jobState.progress.emailsSkipped,
errors: 0,
}).where(eq(scanLogs.id, scanLogId));
emitToUser(userId, 'scan:completed', {
jobId,
summary: { found: messageIds.length, parsed: 0, skipped: jobState.progress.emailsSkipped, errors: 0 },
dateRange,
duration: '0s',
});
return;
}
// Phase 2: Fetch emails in batches with concurrency
jobState.status = 'parsing';
const saveBuffer: any[] = [];
const errorMessages: string[] = [];
for (let i = 0; i < newIds.length; i += BATCH_SIZE) {
const batch = newIds.slice(i, i + BATCH_SIZE);
// Fetch emails with concurrency limit
const fetchedEmails: EmailMessage[] = [];
for (let j = 0; j < batch.length; j += CONCURRENT_FETCH) {
const chunk = batch.slice(j, j + CONCURRENT_FETCH);
const results = await Promise.allSettled(
chunk.map(id => fetchMessage(userId, id))
);
for (const result of results) {
if (result.status === 'fulfilled') {
fetchedEmails.push(result.value);
} else {
jobState.progress.emailsErrored++;
const errMsg = result.reason?.message || 'Fetch error';
errorMessages.push(`[Fetch] ${errMsg}`);
logger.error({ err: result.reason }, 'Failed to fetch email');
emitToUser(userId, 'scan:progress', {
jobId,
processed: jobState.progress.emailsProcessed,
total: newIds.length,
skipped: jobState.progress.emailsSkipped,
errored: jobState.progress.emailsErrored,
});
}
}
}
// Post-fetch date filter: Gmail only filters by day, so we enforce exact range here
const rangeStart = new Date(dateRange.startDate).getTime();
const rangeEnd = new Date(dateRange.endDate).getTime();
const emails: EmailMessage[] = [];
for (const email of fetchedEmails) {
const emailTime = new Date(email.date).getTime();
if (emailTime >= rangeStart && emailTime <= rangeEnd) {
emails.push(email);
} else {
jobState.progress.emailsSkipped++;
logger.info({ emailId: email.emailId, emailDate: email.date, rangeStart: dateRange.startDate, rangeEnd: dateRange.endDate }, 'Skipping email outside date range');
emitToUser(userId, 'scan:progress', {
jobId,
processed: jobState.progress.emailsProcessed,
total: newIds.length,
skipped: jobState.progress.emailsSkipped,
errored: jobState.progress.emailsErrored,
});
}
}
// Phase 3: AI parsing — sequential with delay to respect free-tier RPM limits
for (let j = 0; j < emails.length; j += CONCURRENT_PARSE) {
const parseChunk = emails.slice(j, j + CONCURRENT_PARSE);
const parseResults = await Promise.allSettled(
parseChunk.map(async (email) => {
const parsed = await AIService.parseEmail(email.body);
return { email, parsed };
})
);
// Small delay between batches to avoid rate limiting
if (j + CONCURRENT_PARSE < emails.length) {
await new Promise(resolve => setTimeout(resolve, PARSE_DELAY_MS));
}
for (const result of parseResults) {
if (result.status === 'fulfilled') {
const { email, parsed } = result.value;
// Skip non-Interac emails (AI couldn't extract meaningful data)
if (parsed.amount <= 0 || !parsed.sender || parsed.sender === 'Inconnu') {
jobState.progress.emailsSkipped++;
jobState.progress.emailsProcessed++;
logger.info({ emailId: email.emailId }, 'Skipping non-Interac email (no amount/sender)');
emitToUser(userId, 'scan:progress', {
jobId,
processed: jobState.progress.emailsProcessed,
total: newIds.length,
skipped: jobState.progress.emailsSkipped,
errored: jobState.progress.emailsErrored,
});
continue;
}
// Phase 4: Branch routing
// Use AI-extracted recipient_email first, fallback to Gmail To header
const recipientEmail = parsed.recipient_email || extractEmailAddress(email.to);
const branch = recipientEmail
? await resolveBranch(recipientEmail)
: 'Montreal';
// Screenshots are deferred to background after scan completes (see Phase 6b)
// Look up envelope number from the contact database
const envelopeNumber = lookupEnvelopeNumber(parsed.sender || '') || null;
const txId = crypto.randomUUID();
saveBuffer.push({
id: txId,
emailId: email.emailId,
userId,
date: new Date(email.date).toISOString(),
sender: parsed.sender || 'Inconnu',
amount: parsed.amount || 0,
currency: parsed.currency || 'CAD',
reference: parsed.reference,
message: parsed.message,
recipientEmail: recipientEmail || parsed.recipient_email,
branch,
envelopeNumber,
status: parsed.status || 'pending',
rawEmail: email.body.substring(0, 5000),
rawEmailHtml: email.bodyHtml,
screenshotOriginal: '',
screenshotPreview: '',
parsedAt: new Date().toISOString(),
reviewed: false,
});
jobState.progress.emailsProcessed++;
jobState.progress.currentEmail = parsed.sender || email.subject;
emitToUser(userId, 'scan:progress', {
jobId,
processed: jobState.progress.emailsProcessed,
total: newIds.length,
skipped: jobState.progress.emailsSkipped,
errored: jobState.progress.emailsErrored,
latest: { ...parsed, branch },
});
} else {
jobState.progress.emailsErrored++;
const errMsg = result.reason?.message || 'AI parse error';
errorMessages.push(`[AI Parse] ${errMsg}`);
logger.error({ err: result.reason }, 'Failed to parse email');
emitToUser(userId, 'scan:progress', {
jobId,
processed: jobState.progress.emailsProcessed,
total: newIds.length,
skipped: jobState.progress.emailsSkipped,
errored: jobState.progress.emailsErrored,
});
}
}
}
// Phase 5: Batch save (flush frequently for faster UI updates)
if (saveBuffer.length >= 10) {
const toSave = saveBuffer.splice(0, saveBuffer.length);
await db.insert(transactions).values(toSave).onConflictDoNothing();
for (const tx of toSave) {
emitToUser(userId, 'transaction:new', { transaction: tx });
}
}
}
// Flush remaining buffer
if (saveBuffer.length > 0) {
await db.insert(transactions).values(saveBuffer).onConflictDoNothing();
for (const tx of saveBuffer) {
emitToUser(userId, 'transaction:new', { transaction: tx });
}
}
// Phase 6: Completion
jobState.status = 'completed';
jobState.completedAt = new Date().toISOString();
const durationMs = Date.now() - new Date(jobState.startedAt).getTime();
const durationStr = durationMs > 60000
? `${Math.round(durationMs / 60000)}m`
: `${Math.round(durationMs / 1000)}s`;
const errorDetailsStr = errorMessages.length > 0
? errorMessages.slice(0, 50).join('\n')
: null;
await db.update(scanLogs).set({
finishedAt: jobState.completedAt,
emailsFound: jobState.progress.emailsFound,
emailsParsed: jobState.progress.emailsProcessed,
emailsSkipped: jobState.progress.emailsSkipped,
errors: jobState.progress.emailsErrored,
errorDetails: errorDetailsStr,
}).where(eq(scanLogs.id, scanLogId));
emitToUser(userId, 'scan:completed', {
jobId,
summary: {
found: jobState.progress.emailsFound,
parsed: jobState.progress.emailsProcessed,
skipped: jobState.progress.emailsSkipped,
errors: jobState.progress.emailsErrored,
},
dateRange,
duration: durationStr,
});
logger.info({ jobId, duration: durationStr, parsed: jobState.progress.emailsProcessed }, 'Scan completed');
// Phase 6b: Background screenshot capture (non-blocking, after scan is done)
if (!DISABLE_SCREENSHOTS) {
captureScreenshotsInBackground(userId).catch((err) => {
logger.warn({ err: err.message }, 'Background screenshot capture failed');
});
}
} catch (err) {
jobState.status = 'failed';
throw err;
}
}
/** Capture screenshots for transactions that don't have them yet (runs in background) */
async function captureScreenshotsInBackground(userId: string) {
const pending = await db.select({
id: transactions.id,
emailId: transactions.emailId,
rawEmailHtml: transactions.rawEmailHtml,
rawEmail: transactions.rawEmail,
})
.from(transactions)
.where(and(
eq(transactions.userId, userId),
eq(transactions.screenshotOriginal, ''),
))
.limit(50);
if (pending.length === 0) return;
logger.info({ count: pending.length }, 'Capturing screenshots in background...');
for (const tx of pending) {
try {
const screenshots = await captureEmailScreenshots(
tx.rawEmailHtml || '',
tx.rawEmail || '',
tx.emailId
);
await db.update(transactions).set({
screenshotOriginal: screenshots.originalPath,
screenshotPreview: screenshots.previewPath,
}).where(eq(transactions.id, tx.id));
} catch (err: any) {
logger.warn({ emailId: tx.emailId, err: err.message }, 'Background screenshot failed');
}
}
logger.info({ count: pending.length }, 'Background screenshots done');
}