| import { calculateInboxSuggestions, hasSuggestion } from "@midday/db/queries"; |
| import type { Job } from "bullmq"; |
| import { |
| type BatchProcessMatchingPayload, |
| batchProcessMatchingSchema, |
| } from "../../schemas/inbox"; |
| import { getDb } from "../../utils/db"; |
| import { classifyError } from "../../utils/error-classification"; |
| import { triggerMatchingNotification } from "../../utils/inbox-matching-notifications"; |
| import { BaseProcessor } from "../base"; |
|
|
| export class BatchProcessMatchingProcessor extends BaseProcessor<BatchProcessMatchingPayload> { |
| protected getPayloadSchema() { |
| return batchProcessMatchingSchema; |
| } |
| async process(job: Job<BatchProcessMatchingPayload>): Promise<{ |
| processed: number; |
| autoMatched: number; |
| suggestions: number; |
| noMatches: number; |
| errors: number; |
| }> { |
| const { teamId, inboxIds } = job.data; |
| const db = getDb(); |
|
|
| this.logger.info("Starting batch inbox matching", { |
| teamId, |
| inboxCount: inboxIds.length, |
| }); |
|
|
| let autoMatchCount = 0; |
| let suggestionCount = 0; |
| let noMatchCount = 0; |
| let errorCount = 0; |
|
|
| |
| const BATCH_SIZE = 5; |
| const totalBatches = Math.ceil(inboxIds.length / BATCH_SIZE); |
| const _progressPerBatch = 100 / totalBatches; |
|
|
| for (let i = 0; i < inboxIds.length; i += BATCH_SIZE) { |
| const batch = inboxIds.slice(i, i + BATCH_SIZE); |
| const batchIndex = Math.floor(i / BATCH_SIZE); |
|
|
| const results = await Promise.allSettled( |
| batch.map(async (inboxId) => { |
| try { |
| const result = await calculateInboxSuggestions(db, { |
| teamId, |
| inboxId, |
| }); |
|
|
| |
| if (hasSuggestion(result)) { |
| await triggerMatchingNotification({ |
| db, |
| teamId, |
| inboxId, |
| result, |
| }); |
| } |
|
|
| switch (result.action) { |
| case "auto_matched": |
| autoMatchCount++; |
| |
| this.logger.info("Auto-matched inbox item", { |
| teamId, |
| inboxId, |
| transactionId: result.suggestion!.transactionId, |
| confidence: result.suggestion!.confidenceScore, |
| }); |
| break; |
|
|
| case "suggestion_created": |
| suggestionCount++; |
| |
| this.logger.info("Created match suggestion", { |
| teamId, |
| inboxId, |
| transactionId: result.suggestion!.transactionId, |
| confidence: result.suggestion!.confidenceScore, |
| }); |
| break; |
|
|
| case "no_match_yet": |
| noMatchCount++; |
| break; |
| } |
|
|
| return result; |
| } catch (error) { |
| errorCount++; |
| const classified = classifyError(error); |
|
|
| this.logger.error("Failed to process inbox matching", { |
| teamId, |
| inboxId, |
| error: error instanceof Error ? error.message : "Unknown error", |
| errorCategory: classified.category, |
| retryable: classified.retryable, |
| }); |
|
|
| throw error; |
| } |
| }), |
| ); |
|
|
| |
| const batchErrors = results.filter((r) => r.status === "rejected").length; |
| this.logger.info("Completed batch processing", { |
| teamId, |
| batchIndex: batchIndex + 1, |
| batchSize: batch.length, |
| errors: batchErrors, |
| }); |
| } |
|
|
| this.logger.info("Completed batch inbox matching", { |
| teamId, |
| summary: { |
| totalProcessed: inboxIds.length, |
| autoMatches: autoMatchCount, |
| suggestions: suggestionCount, |
| noMatches: noMatchCount, |
| errors: errorCount, |
| }, |
| }); |
|
|
| return { |
| processed: inboxIds.length, |
| autoMatched: autoMatchCount, |
| suggestions: suggestionCount, |
| noMatches: noMatchCount, |
| errors: errorCount, |
| }; |
| } |
| } |
|
|