File size: 13,516 Bytes
eb846d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import 'reflect-metadata'; // Ensure reflect-metadata is imported here too
import { DataSource, DataSourceOptions } from 'typeorm';
import entities from './entities/index.js';
import { registerPostgresVectorType } from './types/postgresVectorType.js';
import { VectorEmbeddingSubscriber } from './subscribers/VectorEmbeddingSubscriber.js';
import { getSmartRoutingConfig } from '../utils/smartRouting.js';

// Helper function to create required PostgreSQL extensions
const createRequiredExtensions = async (dataSource: DataSource): Promise<void> => {
  try {
    await dataSource.query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";');
    console.log('UUID extension created or already exists.');
  } catch (err: any) {
    console.warn('Failed to create uuid-ossp extension:', err.message);
    console.warn('UUID generation functionality may not be available.');
  }

  try {
    await dataSource.query('CREATE EXTENSION IF NOT EXISTS vector;');
    console.log('Vector extension created or already exists.');
  } catch (err: any) {
    console.warn('Failed to create vector extension:', err.message);
    console.warn('Vector functionality may not be available.');
  }
};

// Get database URL from smart routing config or fallback to environment variable
const getDatabaseUrl = (): string => {
  return getSmartRoutingConfig().dbUrl;
};

// Default database configuration
const defaultConfig: DataSourceOptions = {
  type: 'postgres',
  url: getDatabaseUrl(),
  synchronize: true,
  entities: entities,
  subscribers: [VectorEmbeddingSubscriber],
};

// AppDataSource is the TypeORM data source
let appDataSource = new DataSource(defaultConfig);

// Global promise to track initialization status
let initializationPromise: Promise<DataSource> | null = null;

// Function to create a new DataSource with updated configuration
export const updateDataSourceConfig = (): DataSource => {
  const newConfig: DataSourceOptions = {
    ...defaultConfig,
    url: getDatabaseUrl(),
  };

  // If the configuration has changed, we need to create a new DataSource
  const currentUrl = (appDataSource.options as any).url;
  if (currentUrl !== newConfig.url) {
    console.log('Database URL configuration changed, updating DataSource...');
    appDataSource = new DataSource(newConfig);
    // Reset initialization promise when configuration changes
    initializationPromise = null;
  }

  return appDataSource;
};

// Get the current AppDataSource instance
export const getAppDataSource = (): DataSource => {
  return appDataSource;
};

// Reconnect database with updated configuration
export const reconnectDatabase = async (): Promise<DataSource> => {
  try {
    // Close existing connection if it exists
    if (appDataSource.isInitialized) {
      console.log('Closing existing database connection...');
      await appDataSource.destroy();
    }

    // Reset initialization promise to allow fresh initialization
    initializationPromise = null;

    // Update configuration and reconnect
    appDataSource = updateDataSourceConfig();
    return await initializeDatabase();
  } catch (error) {
    console.error('Error during database reconnection:', error);
    throw error;
  }
};

// Initialize database connection with concurrency control
export const initializeDatabase = async (): Promise<DataSource> => {
  // If initialization is already in progress, wait for it to complete
  if (initializationPromise) {
    console.log('Database initialization already in progress, waiting for completion...');
    return initializationPromise;
  }

  // If already initialized, return the existing instance
  if (appDataSource.isInitialized) {
    console.log('Database already initialized, returning existing instance');
    return Promise.resolve(appDataSource);
  }

  // Create a new initialization promise
  initializationPromise = performDatabaseInitialization();

  try {
    const result = await initializationPromise;
    console.log('Database initialization completed successfully');
    return result;
  } catch (error) {
    // Reset the promise on error so initialization can be retried
    initializationPromise = null;
    console.error('Database initialization failed:', error);
    throw error;
  }
};

// Internal function to perform the actual database initialization
const performDatabaseInitialization = async (): Promise<DataSource> => {
  try {
    // Update configuration before initializing
    appDataSource = updateDataSourceConfig();

    if (!appDataSource.isInitialized) {
      console.log('Initializing database connection...');
      // Register the vector type with TypeORM
      await appDataSource.initialize();
      registerPostgresVectorType(appDataSource);

      // Create required PostgreSQL extensions
      await createRequiredExtensions(appDataSource);

      // Set up vector column and index with a more direct approach
      try {
        // Check if table exists first
        const tableExists = await appDataSource.query(`
          SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_schema = 'public'
            AND table_name = 'vector_embeddings'
          );
        `);

        if (tableExists[0].exists) {
          // Add pgvector support via raw SQL commands
          console.log('Configuring vector support for embeddings table...');

          // Step 1: Drop any existing index on the column
          try {
            await appDataSource.query(`DROP INDEX IF EXISTS idx_vector_embeddings_embedding;`);
          } catch (dropError: any) {
            console.warn('Note: Could not drop existing index:', dropError.message);
          }

          // Step 2: Alter column type to vector (if it's not already)
          try {
            // Check column type first
            const columnType = await appDataSource.query(`
              SELECT data_type FROM information_schema.columns
              WHERE table_schema = 'public' AND table_name = 'vector_embeddings'
              AND column_name = 'embedding';
            `);

            if (columnType.length > 0 && columnType[0].data_type !== 'vector') {
              await appDataSource.query(`
                ALTER TABLE vector_embeddings 
                ALTER COLUMN embedding TYPE vector USING embedding::vector;
              `);
              console.log('Vector embedding column type updated successfully.');
            }
          } catch (alterError: any) {
            console.warn('Could not alter embedding column type:', alterError.message);
            console.warn('Will try to recreate the table later.');
          }

          // Step 3: Try to create appropriate indices
          try {
            // First, let's check if there are any records to determine the dimensions
            const records = await appDataSource.query(`
              SELECT dimensions FROM vector_embeddings LIMIT 1;
            `);

            let dimensions = 1536; // Default to common OpenAI embedding size
            if (records && records.length > 0 && records[0].dimensions) {
              dimensions = records[0].dimensions;
              console.log(`Found vector dimension from existing data: ${dimensions}`);
            } else {
              console.log(`Using default vector dimension: ${dimensions} (no existing data found)`);
            }

            // Set the vector dimensions explicitly only if table has data
            if (records && records.length > 0) {
              await appDataSource.query(`
                ALTER TABLE vector_embeddings 
                ALTER COLUMN embedding TYPE vector(${dimensions});
              `);

              // Now try to create the index
              await appDataSource.query(`
                CREATE INDEX IF NOT EXISTS idx_vector_embeddings_embedding 
                ON vector_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
              `);
              console.log('Created IVFFlat index for vector similarity search.');
            } else {
              console.log(
                'No existing vector data found, skipping index creation - will be handled by vector service.',
              );
            }
          } catch (indexError: any) {
            console.warn('IVFFlat index creation failed:', indexError.message);
            console.warn('Trying alternative index type...');

            try {
              // Try HNSW index instead
              await appDataSource.query(`
                CREATE INDEX IF NOT EXISTS idx_vector_embeddings_embedding 
                ON vector_embeddings USING hnsw (embedding vector_cosine_ops);
              `);
              console.log('Created HNSW index for vector similarity search.');
            } catch (hnswError: any) {
              // Final fallback to simpler index type
              console.warn('HNSW index creation failed too. Using simple L2 distance index.');

              try {
                // Create a basic GIN index as last resort
                await appDataSource.query(`
                  CREATE INDEX IF NOT EXISTS idx_vector_embeddings_embedding 
                  ON vector_embeddings USING gin (embedding);
                `);
                console.log('Created GIN index for basic vector lookups.');
              } catch (ginError: any) {
                console.warn('All index creation attempts failed:', ginError.message);
                console.warn('Vector search will be slower without an optimized index.');
              }
            }
          }
        } else {
          console.log(
            'Vector embeddings table does not exist yet - will configure after schema sync.',
          );
        }
      } catch (error: any) {
        console.warn('Could not set up vector column/index:', error.message);
        console.warn('Will attempt again after schema synchronization.');
      }

      console.log('Database connection established successfully.');

      // Run one final setup check after schema synchronization is done
      if (defaultConfig.synchronize) {
        try {
          console.log('Running final vector configuration check...');

          // Try setup again with the same code from above
          const tableExists = await appDataSource.query(`
              SELECT EXISTS (
                SELECT FROM information_schema.tables 
                WHERE table_schema = 'public'
                AND table_name = 'vector_embeddings'
              );
            `);

          if (tableExists[0].exists) {
            console.log('Vector embeddings table found, checking configuration...');

            // Get the dimension size first
            try {
              // Try to get dimensions from an existing record
              const records = await appDataSource.query(`
                  SELECT dimensions FROM vector_embeddings LIMIT 1;
                `);

              // Only proceed if we have existing data, otherwise let vector service handle it
              if (records && records.length > 0 && records[0].dimensions) {
                const dimensions = records[0].dimensions;
                console.log(`Found vector dimension from database: ${dimensions}`);

                // Ensure column type is vector with explicit dimensions
                await appDataSource.query(`
                    ALTER TABLE vector_embeddings 
                    ALTER COLUMN embedding TYPE vector(${dimensions});
                  `);
                console.log('Vector embedding column type updated in final check.');

                // One more attempt at creating the index with dimensions
                try {
                  // Drop existing index if any
                  await appDataSource.query(`
                      DROP INDEX IF EXISTS idx_vector_embeddings_embedding;
                    `);

                  // Create new index with proper dimensions
                  await appDataSource.query(`
                      CREATE INDEX idx_vector_embeddings_embedding 
                      ON vector_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
                    `);
                  console.log('Created IVFFlat index in final check.');
                } catch (indexError: any) {
                  console.warn('Final index creation attempt did not succeed:', indexError.message);
                  console.warn('Using basic lookup without vector index.');
                }
              } else {
                console.log(
                  'No existing vector data found, vector dimensions will be configured by vector service.',
                );
              }
            } catch (setupError: any) {
              console.warn('Vector setup in final check failed:', setupError.message);
            }
          }
        } catch (error: any) {
          console.warn('Post-initialization vector setup failed:', error.message);
        }
      }
    }
    return appDataSource;
  } catch (error) {
    console.error('Error during database initialization:', error);
    throw error;
  }
};

// Get database connection status
export const isDatabaseConnected = (): boolean => {
  return appDataSource.isInitialized;
};

// Close database connection
export const closeDatabase = async (): Promise<void> => {
  if (appDataSource.isInitialized) {
    await appDataSource.destroy();
    console.log('Database connection closed.');
  }
};

// Export AppDataSource for backward compatibility
export const AppDataSource = appDataSource;

export default getAppDataSource;