n8cn / packages /cli /src /databases /db-connection.ts
gallyga's picture
Add n8n Chinese version
aec3094
import { inTest } from '@n8n/backend-common';
import { DatabaseConfig } from '@n8n/config';
import type { Migration } from '@n8n/db';
import { wrapMigration } from '@n8n/db';
import { Memoized } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { DataSource } from '@n8n/typeorm';
import { ErrorReporter } from 'n8n-core';
import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow';
import { Time } from '@/constants';
import { DbConnectionOptions } from './db-connection-options';
type ConnectionState = {
connected: boolean;
migrated: boolean;
};
@Service()
export class DbConnection {
private dataSource: DataSource;
private pingTimer: NodeJS.Timeout | undefined;
readonly connectionState: ConnectionState = {
connected: false,
migrated: false,
};
constructor(
private readonly errorReporter: ErrorReporter,
private readonly connectionOptions: DbConnectionOptions,
private readonly databaseConfig: DatabaseConfig,
) {
this.dataSource = new DataSource(this.options);
Container.set(DataSource, this.dataSource);
}
@Memoized
get options() {
return this.connectionOptions.getOptions();
}
async init(): Promise<void> {
const { connectionState, options } = this;
if (connectionState.connected) return;
try {
await this.dataSource.initialize();
} catch (e) {
let error = ensureError(e);
if (
options.type === 'postgres' &&
error.message === 'Connection terminated due to connection timeout'
) {
error = new DbConnectionTimeoutError({
cause: error,
configuredTimeoutInMs: options.connectTimeoutMS!,
});
}
throw error;
}
connectionState.connected = true;
if (!inTest) this.scheduleNextPing();
}
async migrate() {
const { dataSource, connectionState } = this;
(dataSource.options.migrations as Migration[]).forEach(wrapMigration);
await dataSource.runMigrations({ transaction: 'each' });
connectionState.migrated = true;
}
async close() {
if (this.pingTimer) {
clearTimeout(this.pingTimer);
this.pingTimer = undefined;
}
if (this.dataSource.isInitialized) {
await this.dataSource.destroy();
this.connectionState.connected = false;
}
}
/** Ping DB connection every `pingIntervalSeconds` seconds to check if it is still alive. */
private scheduleNextPing() {
this.pingTimer = setTimeout(
async () => await this.ping(),
this.databaseConfig.pingIntervalSeconds * Time.seconds.toMilliseconds,
);
}
private async ping() {
if (!this.dataSource.isInitialized) return;
try {
await this.dataSource.query('SELECT 1');
this.connectionState.connected = true;
return;
} catch (error) {
this.connectionState.connected = false;
this.errorReporter.error(error);
} finally {
this.scheduleNextPing();
}
}
}