File size: 15,153 Bytes
da2e594
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
/**
 * Batch Processor for Telemetry
 * Handles batching, queuing, and sending telemetry data to Supabase
 */

import { SupabaseClient } from '@supabase/supabase-js';
import { TelemetryEvent, WorkflowTelemetry, WorkflowMutationRecord, TELEMETRY_CONFIG, TelemetryMetrics } from './telemetry-types';
import { TelemetryError, TelemetryErrorType, TelemetryCircuitBreaker } from './telemetry-error';
import { logger } from '../utils/logger';

/**
 * Convert camelCase key to snake_case
 */
function keyToSnakeCase(key: string): string {
  return key.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
}

/**
 * Convert WorkflowMutationRecord to Supabase-compatible format.
 *
 * IMPORTANT: Only converts top-level field names to snake_case.
 * Nested workflow data (workflowBefore, workflowAfter, operations, etc.)
 * is preserved EXACTLY as-is to maintain n8n API compatibility.
 *
 * The Supabase workflow_mutations table stores workflow_before and
 * workflow_after as JSONB columns, which preserve the original structure.
 * Only the top-level columns (user_id, session_id, etc.) require snake_case.
 *
 * Issue #517: Previously this used recursive conversion which mangled:
 * - Connection keys (node names like "Webhook" → "_webhook")
 * - Node field names (typeVersion → type_version)
 */
function mutationToSupabaseFormat(mutation: WorkflowMutationRecord): Record<string, any> {
  const result: Record<string, any> = {};

  for (const [key, value] of Object.entries(mutation)) {
    result[keyToSnakeCase(key)] = value;
  }

  return result;
}

export class TelemetryBatchProcessor {
  private flushTimer?: NodeJS.Timeout;
  private isFlushingEvents: boolean = false;
  private isFlushingWorkflows: boolean = false;
  private isFlushingMutations: boolean = false;
  private circuitBreaker: TelemetryCircuitBreaker;
  private metrics: TelemetryMetrics = {
    eventsTracked: 0,
    eventsDropped: 0,
    eventsFailed: 0,
    batchesSent: 0,
    batchesFailed: 0,
    averageFlushTime: 0,
    rateLimitHits: 0
  };
  private flushTimes: number[] = [];
  private deadLetterQueue: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[] = [];
  private readonly maxDeadLetterSize = 100;

  constructor(
    private supabase: SupabaseClient | null,
    private isEnabled: () => boolean
  ) {
    this.circuitBreaker = new TelemetryCircuitBreaker();
  }

  /**
   * Start the batch processor
   */
  start(): void {
    if (!this.isEnabled() || !this.supabase) return;

    // Set up periodic flushing
    this.flushTimer = setInterval(() => {
      this.flush();
    }, TELEMETRY_CONFIG.BATCH_FLUSH_INTERVAL);

    // Prevent timer from keeping process alive
    // In tests, flushTimer might be a number instead of a Timer object
    if (typeof this.flushTimer === 'object' && 'unref' in this.flushTimer) {
      this.flushTimer.unref();
    }

    // Set up process exit handlers
    process.on('beforeExit', () => this.flush());
    process.on('SIGINT', () => {
      this.flush();
      process.exit(0);
    });
    process.on('SIGTERM', () => {
      this.flush();
      process.exit(0);
    });

    logger.debug('Telemetry batch processor started');
  }

  /**
   * Stop the batch processor
   */
  stop(): void {
    if (this.flushTimer) {
      clearInterval(this.flushTimer);
      this.flushTimer = undefined;
    }
    logger.debug('Telemetry batch processor stopped');
  }

  /**
   * Flush events, workflows, and mutations to Supabase
   */
  async flush(events?: TelemetryEvent[], workflows?: WorkflowTelemetry[], mutations?: WorkflowMutationRecord[]): Promise<void> {
    if (!this.isEnabled() || !this.supabase) return;

    // Check circuit breaker
    if (!this.circuitBreaker.shouldAllow()) {
      logger.debug('Circuit breaker open - skipping flush');
      this.metrics.eventsDropped += (events?.length || 0) + (workflows?.length || 0) + (mutations?.length || 0);
      return;
    }

    const startTime = Date.now();
    let hasErrors = false;

    // Flush events if provided
    if (events && events.length > 0) {
      hasErrors = !(await this.flushEvents(events)) || hasErrors;
    }

    // Flush workflows if provided
    if (workflows && workflows.length > 0) {
      hasErrors = !(await this.flushWorkflows(workflows)) || hasErrors;
    }

    // Flush mutations if provided
    if (mutations && mutations.length > 0) {
      hasErrors = !(await this.flushMutations(mutations)) || hasErrors;
    }

    // Record flush time
    const flushTime = Date.now() - startTime;
    this.recordFlushTime(flushTime);

    // Update circuit breaker
    if (hasErrors) {
      this.circuitBreaker.recordFailure();
    } else {
      this.circuitBreaker.recordSuccess();
    }

    // Process dead letter queue if circuit is healthy
    if (!hasErrors && this.deadLetterQueue.length > 0) {
      await this.processDeadLetterQueue();
    }
  }

  /**
   * Flush events with batching
   */
  private async flushEvents(events: TelemetryEvent[]): Promise<boolean> {
    if (this.isFlushingEvents || events.length === 0) return true;

    this.isFlushingEvents = true;

    try {
      // Batch events
      const batches = this.createBatches(events, TELEMETRY_CONFIG.MAX_BATCH_SIZE);

      for (const batch of batches) {
        const result = await this.executeWithRetry(async () => {
          const { error } = await this.supabase!
            .from('telemetry_events')
            .insert(batch);

          if (error) {
            throw error;
          }

          logger.debug(`Flushed batch of ${batch.length} telemetry events`);
          return true;
        }, 'Flush telemetry events');

        if (result) {
          this.metrics.eventsTracked += batch.length;
          this.metrics.batchesSent++;
        } else {
          this.metrics.eventsFailed += batch.length;
          this.metrics.batchesFailed++;
          this.addToDeadLetterQueue(batch);
          return false;
        }
      }

      return true;
    } catch (error) {
      logger.debug('Failed to flush events:', error);
      throw new TelemetryError(
        TelemetryErrorType.NETWORK_ERROR,
        'Failed to flush events',
        { error: error instanceof Error ? error.message : String(error) },
        true
      );
    } finally {
      this.isFlushingEvents = false;
    }
  }

  /**
   * Flush workflows with deduplication
   */
  private async flushWorkflows(workflows: WorkflowTelemetry[]): Promise<boolean> {
    if (this.isFlushingWorkflows || workflows.length === 0) return true;

    this.isFlushingWorkflows = true;

    try {
      // Deduplicate workflows by hash
      const uniqueWorkflows = this.deduplicateWorkflows(workflows);
      logger.debug(`Deduplicating workflows: ${workflows.length} -> ${uniqueWorkflows.length}`);

      // Batch workflows
      const batches = this.createBatches(uniqueWorkflows, TELEMETRY_CONFIG.MAX_BATCH_SIZE);

      for (const batch of batches) {
        const result = await this.executeWithRetry(async () => {
          const { error } = await this.supabase!
            .from('telemetry_workflows')
            .insert(batch);

          if (error) {
            throw error;
          }

          logger.debug(`Flushed batch of ${batch.length} telemetry workflows`);
          return true;
        }, 'Flush telemetry workflows');

        if (result) {
          this.metrics.eventsTracked += batch.length;
          this.metrics.batchesSent++;
        } else {
          this.metrics.eventsFailed += batch.length;
          this.metrics.batchesFailed++;
          this.addToDeadLetterQueue(batch);
          return false;
        }
      }

      return true;
    } catch (error) {
      logger.debug('Failed to flush workflows:', error);
      throw new TelemetryError(
        TelemetryErrorType.NETWORK_ERROR,
        'Failed to flush workflows',
        { error: error instanceof Error ? error.message : String(error) },
        true
      );
    } finally {
      this.isFlushingWorkflows = false;
    }
  }

  /**
   * Flush workflow mutations with batching
   */
  private async flushMutations(mutations: WorkflowMutationRecord[]): Promise<boolean> {
    if (this.isFlushingMutations || mutations.length === 0) return true;

    this.isFlushingMutations = true;

    try {
      // Batch mutations
      const batches = this.createBatches(mutations, TELEMETRY_CONFIG.MAX_BATCH_SIZE);

      for (const batch of batches) {
        const result = await this.executeWithRetry(async () => {
          // Convert camelCase to snake_case for Supabase
          const snakeCaseBatch = batch.map(mutation => mutationToSupabaseFormat(mutation));

          const { error } = await this.supabase!
            .from('workflow_mutations')
            .insert(snakeCaseBatch);

          if (error) {
            // Enhanced error logging for mutation flushes
            logger.error('Mutation insert error details:', {
              code: (error as any).code,
              message: (error as any).message,
              details: (error as any).details,
              hint: (error as any).hint,
              fullError: String(error)
            });
            throw error;
          }

          logger.debug(`Flushed batch of ${batch.length} workflow mutations`);
          return true;
        }, 'Flush workflow mutations');

        if (result) {
          this.metrics.eventsTracked += batch.length;
          this.metrics.batchesSent++;
        } else {
          this.metrics.eventsFailed += batch.length;
          this.metrics.batchesFailed++;
          this.addToDeadLetterQueue(batch);
          return false;
        }
      }

      return true;
    } catch (error) {
      logger.error('Failed to flush mutations with details:', {
        errorMsg: error instanceof Error ? error.message : String(error),
        errorType: error instanceof Error ? error.constructor.name : typeof error
      });
      throw new TelemetryError(
        TelemetryErrorType.NETWORK_ERROR,
        'Failed to flush workflow mutations',
        { error: error instanceof Error ? error.message : String(error) },
        true
      );
    } finally {
      this.isFlushingMutations = false;
    }
  }

  /**
   * Execute operation with exponential backoff retry
   */
  private async executeWithRetry<T>(
    operation: () => Promise<T>,
    operationName: string
  ): Promise<T | null> {
    let lastError: Error | null = null;
    let delay = TELEMETRY_CONFIG.RETRY_DELAY;

    for (let attempt = 1; attempt <= TELEMETRY_CONFIG.MAX_RETRIES; attempt++) {
      try {
        // In test environment, execute without timeout but still handle errors
        if (process.env.NODE_ENV === 'test' && process.env.VITEST) {
          const result = await operation();
          return result;
        }

        // Create a timeout promise
        const timeoutPromise = new Promise<never>((_, reject) => {
          setTimeout(() => reject(new Error('Operation timed out')), TELEMETRY_CONFIG.OPERATION_TIMEOUT);
        });

        // Race between operation and timeout
        const result = await Promise.race([operation(), timeoutPromise]) as T;
        return result;
      } catch (error) {
        lastError = error as Error;
        logger.debug(`${operationName} attempt ${attempt} failed:`, error);

        if (attempt < TELEMETRY_CONFIG.MAX_RETRIES) {
          // Skip delay in test environment when using fake timers
          if (!(process.env.NODE_ENV === 'test' && process.env.VITEST)) {
            // Exponential backoff with jitter
            const jitter = Math.random() * 0.3 * delay; // 30% jitter
            const waitTime = delay + jitter;
            await new Promise(resolve => setTimeout(resolve, waitTime));
            delay *= 2; // Double the delay for next attempt
          }
          // In test mode, continue to next retry attempt without delay
        }
      }
    }

    logger.debug(`${operationName} failed after ${TELEMETRY_CONFIG.MAX_RETRIES} attempts:`, lastError);
    return null;
  }

  /**
   * Create batches from array
   */
  private createBatches<T>(items: T[], batchSize: number): T[][] {
    const batches: T[][] = [];

    for (let i = 0; i < items.length; i += batchSize) {
      batches.push(items.slice(i, i + batchSize));
    }

    return batches;
  }

  /**
   * Deduplicate workflows by hash
   */
  private deduplicateWorkflows(workflows: WorkflowTelemetry[]): WorkflowTelemetry[] {
    const seen = new Set<string>();
    const unique: WorkflowTelemetry[] = [];

    for (const workflow of workflows) {
      if (!seen.has(workflow.workflow_hash)) {
        seen.add(workflow.workflow_hash);
        unique.push(workflow);
      }
    }

    return unique;
  }

  /**
   * Add failed items to dead letter queue
   */
  private addToDeadLetterQueue(items: (TelemetryEvent | WorkflowTelemetry | WorkflowMutationRecord)[]): void {
    for (const item of items) {
      this.deadLetterQueue.push(item);

      // Maintain max size
      if (this.deadLetterQueue.length > this.maxDeadLetterSize) {
        const dropped = this.deadLetterQueue.shift();
        if (dropped) {
          this.metrics.eventsDropped++;
        }
      }
    }

    logger.debug(`Added ${items.length} items to dead letter queue`);
  }

  /**
   * Process dead letter queue when circuit is healthy
   */
  private async processDeadLetterQueue(): Promise<void> {
    if (this.deadLetterQueue.length === 0) return;

    logger.debug(`Processing ${this.deadLetterQueue.length} items from dead letter queue`);

    const events: TelemetryEvent[] = [];
    const workflows: WorkflowTelemetry[] = [];

    // Separate events and workflows
    for (const item of this.deadLetterQueue) {
      if ('workflow_hash' in item) {
        workflows.push(item as WorkflowTelemetry);
      } else {
        events.push(item as TelemetryEvent);
      }
    }

    // Clear dead letter queue
    this.deadLetterQueue = [];

    // Try to flush
    if (events.length > 0) {
      await this.flushEvents(events);
    }
    if (workflows.length > 0) {
      await this.flushWorkflows(workflows);
    }
  }

  /**
   * Record flush time for metrics
   */
  private recordFlushTime(time: number): void {
    this.flushTimes.push(time);

    // Keep last 100 flush times
    if (this.flushTimes.length > 100) {
      this.flushTimes.shift();
    }

    // Update average
    const sum = this.flushTimes.reduce((a, b) => a + b, 0);
    this.metrics.averageFlushTime = Math.round(sum / this.flushTimes.length);
    this.metrics.lastFlushTime = time;
  }

  /**
   * Get processor metrics
   */
  getMetrics(): TelemetryMetrics & { circuitBreakerState: any; deadLetterQueueSize: number } {
    return {
      ...this.metrics,
      circuitBreakerState: this.circuitBreaker.getState(),
      deadLetterQueueSize: this.deadLetterQueue.length
    };
  }

  /**
   * Reset metrics
   */
  resetMetrics(): void {
    this.metrics = {
      eventsTracked: 0,
      eventsDropped: 0,
      eventsFailed: 0,
      batchesSent: 0,
      batchesFailed: 0,
      averageFlushTime: 0,
      rateLimitHits: 0
    };
    this.flushTimes = [];
    this.circuitBreaker.reset();
  }
}