| import { |
| type AccountingProvider, |
| type ProviderEntityType, |
| RATE_LIMITS, |
| } from "@midday/accounting"; |
| import { |
| PROVIDER_ATTACHMENT_CONFIG, |
| resolveMimeType, |
| sleep, |
| throttledConcurrent, |
| } from "@midday/accounting/utils"; |
| import { |
| getAccountingSyncStatus, |
| getTransactionAttachmentsForSync, |
| updateSyncedAttachmentMapping, |
| } from "@midday/db/queries"; |
| import { createClient } from "@midday/supabase/job"; |
| import type { Job } from "bullmq"; |
| import type { AccountingAttachmentSyncPayload } from "../../schemas/accounting"; |
| import { AccountingProcessorBase, type AccountingProviderId } from "./base"; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| const ATTACHMENT_UPLOAD_RETRIES = 3; |
|
|
| |
| const ATTACHMENT_RETRY_BASE_DELAY_MS = 2000; |
|
|
| |
| const RATE_LIMIT_RETRY_BASE_DELAY_MS = 30000; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export class SyncAttachmentsProcessor extends AccountingProcessorBase<AccountingAttachmentSyncPayload> { |
| async process(job: Job<AccountingAttachmentSyncPayload>): Promise<{ |
| teamId: string; |
| providerId: string; |
| transactionId: string; |
| uploadedCount: number; |
| deletedCount: number; |
| failedCount: number; |
| }> { |
| const { |
| teamId, |
| providerId, |
| transactionId, |
| providerTransactionId, |
| attachmentIds, |
| removedAttachments, |
| existingSyncedAttachmentMapping, |
| syncRecordId, |
| providerEntityType, |
| |
| taxAmount, |
| taxRate, |
| taxType, |
| note, |
| addHistoryNote, |
| } = job.data; |
|
|
| const supabase = createClient(); |
|
|
| this.logger.info("Starting attachment sync", { |
| teamId, |
| providerId, |
| transactionId, |
| providerTransactionId, |
| newAttachmentCount: attachmentIds.length, |
| removedAttachmentCount: removedAttachments?.length ?? 0, |
| }); |
|
|
| |
| const { provider, config, db } = await this.initializeProvider( |
| teamId, |
| providerId, |
| ); |
|
|
| |
| const orgId = this.getOrgIdFromConfig(config); |
|
|
| |
| const currentMapping: Record<string, string | null> = |
| existingSyncedAttachmentMapping |
| ? { ...existingSyncedAttachmentMapping } |
| : {}; |
|
|
| let uploadedCount = 0; |
| let deletedCount = 0; |
| let failedCount = 0; |
|
|
| |
| if (removedAttachments && removedAttachments.length > 0) { |
| for (const removed of removedAttachments) { |
| |
| if (removed.providerId) { |
| try { |
| const deleteResult = await provider.deleteAttachment({ |
| tenantId: orgId, |
| transactionId: providerTransactionId, |
| attachmentId: removed.providerId, |
| }); |
|
|
| if (deleteResult.success) { |
| deletedCount++; |
| this.logger.info("Attachment deleted from provider", { |
| middayId: removed.middayId, |
| providerId: removed.providerId, |
| }); |
| } else { |
| |
| this.logger.warn("Failed to delete attachment from provider", { |
| middayId: removed.middayId, |
| providerId: removed.providerId, |
| error: deleteResult.error, |
| }); |
| } |
| } catch (error) { |
| this.logger.warn("Error deleting attachment from provider", { |
| middayId: removed.middayId, |
| providerId: removed.providerId, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
| } |
| } |
|
|
| |
| |
| delete currentMapping[removed.middayId]; |
| } |
| } |
|
|
| |
| const alreadySyncedIds = new Set(Object.keys(currentMapping)); |
| const newAttachmentIds = attachmentIds.filter( |
| (id) => !alreadySyncedIds.has(id), |
| ); |
|
|
| if ( |
| newAttachmentIds.length === 0 && |
| (!removedAttachments || removedAttachments.length === 0) |
| ) { |
| this.logger.info("No attachment changes to sync", { |
| teamId, |
| transactionId, |
| }); |
| return { |
| teamId, |
| providerId, |
| transactionId, |
| uploadedCount: 0, |
| deletedCount: 0, |
| failedCount: 0, |
| }; |
| } |
|
|
| |
| const errorCodes: string[] = []; |
| const errorMessages: string[] = []; |
|
|
| |
| if (newAttachmentIds.length > 0) { |
| |
| const attachments = await getTransactionAttachmentsForSync(db, { |
| teamId, |
| attachmentIds: newAttachmentIds, |
| }); |
|
|
| |
| const rateLimit = |
| RATE_LIMITS[providerId as keyof typeof RATE_LIMITS] ?? RATE_LIMITS.xero; |
|
|
| this.logger.info("Starting concurrent attachment uploads", { |
| attachmentCount: attachments.length, |
| maxConcurrent: rateLimit.maxConcurrent, |
| callDelayMs: rateLimit.callDelayMs, |
| }); |
|
|
| |
| const uploadAttachment = this.uploadSingleAttachment.bind(this); |
| const updateProgress = this.updateProgress.bind(this); |
|
|
| |
| const uploadResults = await throttledConcurrent( |
| attachments, |
| async (attachment) => { |
| return uploadAttachment( |
| attachment, |
| supabase, |
| provider, |
| orgId, |
| providerTransactionId, |
| providerId as keyof typeof PROVIDER_ATTACHMENT_CONFIG, |
| providerEntityType, |
| ); |
| }, |
| rateLimit.maxConcurrent, |
| rateLimit.callDelayMs, |
| async (completed, total) => { |
| |
| const deletionProgress = removedAttachments?.length ?? 0; |
| const totalOps = total + deletionProgress; |
| const progress = Math.round( |
| ((deletionProgress + completed) / Math.max(totalOps, 1)) * 100, |
| ); |
| await updateProgress(job, progress); |
| }, |
| ); |
|
|
| |
|
|
| for (const result of uploadResults.results) { |
| if (result.success) { |
| uploadedCount++; |
| currentMapping[result.attachmentId] = result.providerAttachmentId; |
| } else { |
| failedCount++; |
| if (result.errorCode) { |
| errorCodes.push(result.errorCode); |
| } |
| if (result.error) { |
| errorMessages.push(result.error); |
| } |
| } |
| } |
|
|
| |
| failedCount += uploadResults.errors.length; |
|
|
| |
| for (const error of uploadResults.errors) { |
| this.logger.error("Attachment upload failed", { |
| index: error.index, |
| error: error.error.message, |
| }); |
| } |
| } |
|
|
| |
| const hasChanges = uploadedCount > 0 || deletedCount > 0 || failedCount > 0; |
| if (hasChanges) { |
| |
| const status = failedCount > 0 ? "partial" : "synced"; |
|
|
| |
| |
| |
| const errorCode = failedCount > 0 ? (errorCodes[0] ?? null) : null; |
| const errorMessage = |
| failedCount > 0 |
| ? (errorMessages[0] ?? |
| `${failedCount} attachment(s) failed to upload`) |
| : null; |
|
|
| const updateParams = { |
| syncedAttachmentMapping: currentMapping, |
| status: status as "synced" | "partial", |
| errorMessage, |
| errorCode, |
| }; |
|
|
| let recordIdToUpdate = syncRecordId; |
|
|
| if (!recordIdToUpdate) { |
| |
| const syncRecords = await getAccountingSyncStatus(db, { |
| teamId, |
| transactionIds: [transactionId], |
| provider: providerId as AccountingProviderId, |
| }); |
|
|
| if (syncRecords.length > 0 && syncRecords[0]) { |
| recordIdToUpdate = syncRecords[0].id; |
| } |
| } |
|
|
| if (recordIdToUpdate) { |
| await updateSyncedAttachmentMapping(db, { |
| syncRecordId: recordIdToUpdate, |
| ...updateParams, |
| }); |
|
|
| this.logger.info("Sync record status updated", { |
| syncRecordId: recordIdToUpdate, |
| transactionId, |
| status, |
| uploadedCount, |
| deletedCount, |
| failedCount, |
| errorMessage: errorMessage ?? undefined, |
| }); |
| } else { |
| this.logger.error("Could not find sync record to update status", { |
| transactionId, |
| teamId, |
| providerId, |
| status, |
| failedCount, |
| }); |
| } |
| } |
|
|
| |
| if (addHistoryNote && providerId === "xero") { |
| try { |
| await provider.addTransactionHistoryNote?.({ |
| tenantId: orgId, |
| transactionId: providerTransactionId, |
| taxAmount, |
| taxRate, |
| taxType, |
| note, |
| }); |
| this.logger.debug("Added history note to Xero transaction", { |
| transactionId: providerTransactionId, |
| }); |
| } catch (error) { |
| |
| this.logger.warn("Failed to add history note to Xero transaction", { |
| transactionId: providerTransactionId, |
| error: error instanceof Error ? error.message : "Unknown error", |
| }); |
| } |
| } |
|
|
| this.logger.info("Attachment sync completed", { |
| teamId, |
| providerId, |
| transactionId, |
| uploadedCount, |
| deletedCount, |
| failedCount, |
| totalMappings: Object.keys(currentMapping).length, |
| }); |
|
|
| return { |
| teamId, |
| providerId, |
| transactionId, |
| uploadedCount, |
| deletedCount, |
| failedCount, |
| }; |
| } |
|
|
| |
| |
| |
| |
| private async uploadSingleAttachment( |
| attachment: { |
| id: string; |
| name: string | null; |
| path: string[] | null; |
| type: string | null; |
| }, |
| supabase: ReturnType<typeof createClient>, |
| provider: AccountingProvider, |
| orgId: string, |
| providerTransactionId: string, |
| providerId: keyof typeof PROVIDER_ATTACHMENT_CONFIG, |
| providerEntityType?: ProviderEntityType, |
| ): Promise<{ |
| success: boolean; |
| attachmentId: string; |
| providerAttachmentId: string | null; |
| error?: string; |
| errorCode?: string; |
| }> { |
| |
| if (!attachment.path || !attachment.name) { |
| this.logger.warn("Skipping attachment with missing data", { |
| attachmentId: attachment.id, |
| }); |
| return { |
| success: false, |
| attachmentId: attachment.id, |
| providerAttachmentId: null, |
| error: "Missing attachment data", |
| }; |
| } |
|
|
| |
| const filePath = Array.isArray(attachment.path) |
| ? attachment.path.join("/") |
| : attachment.path; |
|
|
| const { data: fileData, error: downloadError } = await supabase.storage |
| .from("vault") |
| .download(filePath); |
|
|
| if (downloadError || !fileData) { |
| this.logger.error("Failed to download attachment", { |
| attachmentId: attachment.id, |
| path: filePath, |
| error: downloadError?.message, |
| }); |
| return { |
| success: false, |
| attachmentId: attachment.id, |
| providerAttachmentId: null, |
| error: `Download failed: ${downloadError?.message}`, |
| }; |
| } |
|
|
| |
| const buffer = Buffer.from(await fileData.arrayBuffer()); |
|
|
| |
| const providerConfig = PROVIDER_ATTACHMENT_CONFIG[providerId]; |
| const mimeResolution = resolveMimeType( |
| attachment.type, |
| attachment.name, |
| buffer, |
| providerId, |
| ); |
|
|
| if (!mimeResolution.mimeType) { |
| this.logger.warn("Could not determine valid MIME type for attachment", { |
| attachmentId: attachment.id, |
| storedType: attachment.type, |
| fileName: attachment.name, |
| error: mimeResolution.error, |
| }); |
| return { |
| success: false, |
| attachmentId: attachment.id, |
| providerAttachmentId: null, |
| error: mimeResolution.error ?? "Unsupported file type", |
| errorCode: "ATTACHMENT_UNSUPPORTED_TYPE", |
| }; |
| } |
|
|
| |
| if (mimeResolution.source !== "stored") { |
| this.logger.info("Resolved MIME type from fallback", { |
| attachmentId: attachment.id, |
| storedType: attachment.type, |
| resolvedType: mimeResolution.mimeType, |
| source: mimeResolution.source, |
| }); |
| } |
|
|
| const resolvedMimeType = mimeResolution.mimeType; |
|
|
| |
| if (buffer.length > providerConfig.maxSizeBytes) { |
| const maxSizeMB = Math.round(providerConfig.maxSizeBytes / 1024 / 1024); |
| this.logger.warn("Attachment exceeds size limit, skipping", { |
| attachmentId: attachment.id, |
| fileName: attachment.name, |
| size: buffer.length, |
| maxSize: providerConfig.maxSizeBytes, |
| provider: providerId, |
| }); |
| return { |
| success: false, |
| attachmentId: attachment.id, |
| providerAttachmentId: null, |
| error: `File too large for ${providerId} (max: ${maxSizeMB}MB)`, |
| errorCode: "ATTACHMENT_TOO_LARGE", |
| }; |
| } |
|
|
| |
| let lastError: string | undefined; |
| let isRateLimited = false; |
|
|
| for (let attempt = 0; attempt <= ATTACHMENT_UPLOAD_RETRIES; attempt++) { |
| if (attempt > 0) { |
| |
| const baseDelay = isRateLimited |
| ? RATE_LIMIT_RETRY_BASE_DELAY_MS |
| : ATTACHMENT_RETRY_BASE_DELAY_MS; |
| const delay = baseDelay * 2 ** (attempt - 1); |
|
|
| this.logger.warn("Retrying attachment upload", { |
| attachmentId: attachment.id, |
| attempt: attempt + 1, |
| maxAttempts: ATTACHMENT_UPLOAD_RETRIES + 1, |
| delayMs: delay, |
| isRateLimited, |
| previousError: lastError, |
| }); |
| await sleep(delay); |
| } |
|
|
| try { |
| const result = await provider.uploadAttachment({ |
| tenantId: orgId, |
| transactionId: providerTransactionId, |
| fileName: attachment.name, |
| mimeType: resolvedMimeType, |
| content: buffer, |
| entityType: providerEntityType, |
| }); |
|
|
| if (result.success) { |
| this.logger.info("Attachment uploaded successfully", { |
| attachmentId: attachment.id, |
| providerAttachmentId: result.attachmentId, |
| attempt: attempt + 1, |
| }); |
| return { |
| success: true, |
| attachmentId: attachment.id, |
| providerAttachmentId: result.attachmentId ?? null, |
| }; |
| } |
|
|
| lastError = result.error; |
| |
| isRateLimited = this.isRateLimitError(lastError); |
| } catch (error) { |
| lastError = error instanceof Error ? error.message : "Unknown error"; |
| isRateLimited = this.isRateLimitError(lastError); |
| } |
| } |
|
|
| |
| this.logger.error("Failed to upload attachment after retries", { |
| attachmentId: attachment.id, |
| attempts: ATTACHMENT_UPLOAD_RETRIES + 1, |
| error: lastError, |
| }); |
|
|
| return { |
| success: false, |
| attachmentId: attachment.id, |
| providerAttachmentId: null, |
| error: lastError, |
| errorCode: "ATTACHMENT_UPLOAD_FAILED", |
| }; |
| } |
|
|
| |
| |
| |
| private isRateLimitError(error: string | undefined): boolean { |
| if (!error) return false; |
| const lowerError = error.toLowerCase(); |
| return ( |
| lowerError.includes("429") || |
| lowerError.includes("rate limit") || |
| lowerError.includes("too many requests") || |
| lowerError.includes("throttl") |
| ); |
| } |
| } |
|
|