| import _ from 'lodash'; |
| import { MeiliSearch } from 'meilisearch'; |
| import type { SearchResponse, SearchParams, Index } from 'meilisearch'; |
| import type { |
| CallbackWithoutResultAndOptionalError, |
| FilterQuery, |
| Document, |
| Schema, |
| Query, |
| Types, |
| Model, |
| } from 'mongoose'; |
| import type { IConversation, IMessage } from '~/types'; |
| import logger from '~/config/meiliLogger'; |
|
|
| interface MongoMeiliOptions { |
| host: string; |
| apiKey: string; |
| indexName: string; |
| primaryKey: string; |
| mongoose: typeof import('mongoose'); |
| syncBatchSize?: number; |
| syncDelayMs?: number; |
| } |
|
|
| interface MeiliIndexable { |
| [key: string]: unknown; |
| _meiliIndex?: boolean; |
| } |
|
|
| interface ContentItem { |
| type: string; |
| text?: string; |
| } |
|
|
| interface SyncProgress { |
| lastSyncedId?: string; |
| totalProcessed: number; |
| totalDocuments: number; |
| isComplete: boolean; |
| } |
|
|
| interface _DocumentWithMeiliIndex extends Document { |
| _meiliIndex?: boolean; |
| preprocessObjectForIndex?: () => Record<string, unknown>; |
| addObjectToMeili?: (next: CallbackWithoutResultAndOptionalError) => Promise<void>; |
| updateObjectToMeili?: (next: CallbackWithoutResultAndOptionalError) => Promise<void>; |
| deleteObjectFromMeili?: (next: CallbackWithoutResultAndOptionalError) => Promise<void>; |
| postSaveHook?: (next: CallbackWithoutResultAndOptionalError) => void; |
| postUpdateHook?: (next: CallbackWithoutResultAndOptionalError) => void; |
| postRemoveHook?: (next: CallbackWithoutResultAndOptionalError) => void; |
| } |
|
|
| export type DocumentWithMeiliIndex = _DocumentWithMeiliIndex & IConversation & Partial<IMessage>; |
|
|
| export interface SchemaWithMeiliMethods extends Model<DocumentWithMeiliIndex> { |
| syncWithMeili(options?: { resumeFromId?: string }): Promise<void>; |
| getSyncProgress(): Promise<SyncProgress>; |
| processSyncBatch( |
| index: Index<MeiliIndexable>, |
| documents: Array<Record<string, unknown>>, |
| updateOps: Array<{ |
| updateOne: { |
| filter: Record<string, unknown>; |
| update: { $set: { _meiliIndex: boolean } }; |
| }; |
| }>, |
| ): Promise<void>; |
| cleanupMeiliIndex( |
| index: Index<MeiliIndexable>, |
| primaryKey: string, |
| batchSize: number, |
| delayMs: number, |
| ): Promise<void>; |
| setMeiliIndexSettings(settings: Record<string, unknown>): Promise<unknown>; |
| meiliSearch( |
| q: string, |
| params?: SearchParams, |
| populate?: boolean, |
| ): Promise<SearchResponse<MeiliIndexable, Record<string, unknown>>>; |
| } |
|
|
| |
| |
| |
| |
| const searchEnabled = process.env.SEARCH != null && process.env.SEARCH.toLowerCase() === 'true'; |
|
|
| |
| |
| |
| const meiliEnabled = |
| process.env.MEILI_HOST != null && process.env.MEILI_MASTER_KEY != null && searchEnabled; |
|
|
| |
| |
| |
| const getSyncConfig = () => ({ |
| batchSize: parseInt(process.env.MEILI_SYNC_BATCH_SIZE || '100', 10), |
| delayMs: parseInt(process.env.MEILI_SYNC_DELAY_MS || '100', 10), |
| }); |
|
|
| |
| |
| |
| |
| const parseTextParts = (content: ContentItem[]): string => { |
| if (!Array.isArray(content)) { |
| return ''; |
| } |
|
|
| return content |
| .filter((item) => item.type === 'text' && typeof item.text === 'string') |
| .map((item) => item.text) |
| .join(' ') |
| .trim(); |
| }; |
|
|
| |
| |
| |
| const cleanUpPrimaryKeyValue = (value: string): string => { |
| return value.replace(/--/g, '|'); |
| }; |
|
|
| |
| |
| |
| const validateOptions = (options: Partial<MongoMeiliOptions>): void => { |
| const requiredKeys: (keyof MongoMeiliOptions)[] = ['host', 'apiKey', 'indexName']; |
| requiredKeys.forEach((key) => { |
| if (!options[key]) { |
| throw new Error(`Missing mongoMeili Option: ${key}`); |
| } |
| }); |
| }; |
|
|
| |
| |
| |
| const processBatch = async <T>( |
| items: T[], |
| batchSize: number, |
| delayMs: number, |
| processor: (batch: T[]) => Promise<void>, |
| ): Promise<void> => { |
| for (let i = 0; i < items.length; i += batchSize) { |
| const batch = items.slice(i, i + batchSize); |
| await processor(batch); |
|
|
| |
| if (i + batchSize < items.length && delayMs > 0) { |
| await new Promise((resolve) => setTimeout(resolve, delayMs)); |
| } |
| } |
| }; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| const createMeiliMongooseModel = ({ |
| index, |
| attributesToIndex, |
| syncOptions, |
| }: { |
| index: Index<MeiliIndexable>; |
| attributesToIndex: string[]; |
| syncOptions: { batchSize: number; delayMs: number }; |
| }) => { |
| const primaryKey = attributesToIndex[0]; |
| const syncConfig = { ...getSyncConfig(), ...syncOptions }; |
|
|
| class MeiliMongooseModel { |
| |
| |
| |
| static async getSyncProgress(this: SchemaWithMeiliMethods): Promise<SyncProgress> { |
| const totalDocuments = await this.countDocuments(); |
| const indexedDocuments = await this.countDocuments({ _meiliIndex: true }); |
|
|
| return { |
| totalProcessed: indexedDocuments, |
| totalDocuments, |
| isComplete: indexedDocuments === totalDocuments, |
| }; |
| } |
|
|
| |
| |
| |
| |
| static async syncWithMeili( |
| this: SchemaWithMeiliMethods, |
| options?: { resumeFromId?: string }, |
| ): Promise<void> { |
| try { |
| const startTime = Date.now(); |
| const { batchSize, delayMs } = syncConfig; |
|
|
| logger.info( |
| `[syncWithMeili] Starting sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} with batch size ${batchSize}`, |
| ); |
|
|
| |
| const query: FilterQuery<unknown> = {}; |
| if (options?.resumeFromId) { |
| query._id = { $gt: options.resumeFromId }; |
| } |
|
|
| |
| const totalCount = await this.countDocuments(query); |
| let processedCount = 0; |
|
|
| |
| await this.cleanupMeiliIndex(index, primaryKey, batchSize, delayMs); |
|
|
| |
| const cursor = this.find(query) |
| .select(attributesToIndex.join(' ') + ' _meiliIndex') |
| .sort({ _id: 1 }) |
| .batchSize(batchSize) |
| .cursor(); |
|
|
| const format = (doc: Record<string, unknown>) => |
| _.omitBy(_.pick(doc, attributesToIndex), (v, k) => k.startsWith('$')); |
|
|
| let documentBatch: Array<Record<string, unknown>> = []; |
| let updateOps: Array<{ |
| updateOne: { |
| filter: Record<string, unknown>; |
| update: { $set: { _meiliIndex: boolean } }; |
| }; |
| }> = []; |
|
|
| |
| for await (const doc of cursor) { |
| const typedDoc = doc.toObject() as unknown as Record<string, unknown>; |
| const formatted = format(typedDoc); |
|
|
| |
| if (!typedDoc._meiliIndex) { |
| documentBatch.push(formatted); |
| updateOps.push({ |
| updateOne: { |
| filter: { _id: typedDoc._id }, |
| update: { $set: { _meiliIndex: true } }, |
| }, |
| }); |
| } |
|
|
| processedCount++; |
|
|
| |
| if (documentBatch.length >= batchSize) { |
| await this.processSyncBatch(index, documentBatch, updateOps); |
| documentBatch = []; |
| updateOps = []; |
|
|
| |
| const progress = Math.round((processedCount / totalCount) * 100); |
| logger.info(`[syncWithMeili] Progress: ${progress}% (${processedCount}/${totalCount})`); |
|
|
| |
| if (delayMs > 0) { |
| await new Promise((resolve) => setTimeout(resolve, delayMs)); |
| } |
| } |
| } |
|
|
| |
| if (documentBatch.length > 0) { |
| await this.processSyncBatch(index, documentBatch, updateOps); |
| } |
|
|
| const duration = Date.now() - startTime; |
| logger.info( |
| `[syncWithMeili] Completed sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} in ${duration}ms`, |
| ); |
| } catch (error) { |
| logger.error('[syncWithMeili] Error during sync:', error); |
| throw error; |
| } |
| } |
|
|
| |
| |
| |
| static async processSyncBatch( |
| this: SchemaWithMeiliMethods, |
| index: Index<MeiliIndexable>, |
| documents: Array<Record<string, unknown>>, |
| updateOps: Array<{ |
| updateOne: { |
| filter: Record<string, unknown>; |
| update: { $set: { _meiliIndex: boolean } }; |
| }; |
| }>, |
| ): Promise<void> { |
| if (documents.length === 0) { |
| return; |
| } |
|
|
| try { |
| |
| await index.addDocuments(documents); |
|
|
| |
| if (updateOps.length > 0) { |
| await this.collection.bulkWrite(updateOps); |
| } |
| } catch (error) { |
| logger.error('[processSyncBatch] Error processing batch:', error); |
| |
| } |
| } |
|
|
| |
| |
| |
| static async cleanupMeiliIndex( |
| this: SchemaWithMeiliMethods, |
| index: Index<MeiliIndexable>, |
| primaryKey: string, |
| batchSize: number, |
| delayMs: number, |
| ): Promise<void> { |
| try { |
| let offset = 0; |
| let moreDocuments = true; |
|
|
| while (moreDocuments) { |
| const batch = await index.getDocuments({ limit: batchSize, offset }); |
| if (batch.results.length === 0) { |
| moreDocuments = false; |
| break; |
| } |
|
|
| const meiliIds = batch.results.map((doc) => doc[primaryKey]); |
| const query: Record<string, unknown> = {}; |
| query[primaryKey] = { $in: meiliIds }; |
|
|
| |
| const existingDocs = await this.find(query).select(primaryKey).lean(); |
|
|
| const existingIds = new Set( |
| existingDocs.map((doc: Record<string, unknown>) => doc[primaryKey]), |
| ); |
|
|
| |
| const toDelete = meiliIds.filter((id) => !existingIds.has(id)); |
| if (toDelete.length > 0) { |
| await Promise.all(toDelete.map((id) => index.deleteDocument(id as string))); |
| logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`); |
| } |
|
|
| offset += batchSize; |
|
|
| |
| if (delayMs > 0) { |
| await new Promise((resolve) => setTimeout(resolve, delayMs)); |
| } |
| } |
| } catch (error) { |
| logger.error('[cleanupMeiliIndex] Error during cleanup:', error); |
| } |
| } |
|
|
| |
| |
| |
| static async setMeiliIndexSettings(settings: Record<string, unknown>): Promise<unknown> { |
| return await index.updateSettings(settings); |
| } |
|
|
| |
| |
| |
| static async meiliSearch( |
| this: SchemaWithMeiliMethods, |
| q: string, |
| params: SearchParams, |
| populate: boolean, |
| ): Promise<SearchResponse<MeiliIndexable, Record<string, unknown>>> { |
| const data = await index.search(q, params); |
|
|
| if (populate) { |
| const query: Record<string, unknown> = {}; |
| query[primaryKey] = _.map(data.hits, (hit) => |
| cleanUpPrimaryKeyValue(hit[primaryKey] as string), |
| ); |
|
|
| const projection = Object.keys(this.schema.obj).reduce<Record<string, number>>( |
| (results, key) => { |
| if (!key.startsWith('$')) { |
| results[key] = 1; |
| } |
| return results; |
| }, |
| { _id: 1, __v: 1 }, |
| ); |
|
|
| const hitsFromMongoose = await this.find(query, projection).lean(); |
|
|
| const populatedHits = data.hits.map((hit) => { |
| const queryObj: Record<string, unknown> = {}; |
| queryObj[primaryKey] = hit[primaryKey]; |
| const originalHit = _.find(hitsFromMongoose, (item) => { |
| const typedItem = item as Record<string, unknown>; |
| return typedItem[primaryKey] === hit[primaryKey]; |
| }); |
|
|
| return { |
| ...(originalHit && typeof originalHit === 'object' ? originalHit : {}), |
| ...hit, |
| }; |
| }); |
| data.hits = populatedHits; |
| } |
|
|
| return data; |
| } |
|
|
| |
| |
| |
| preprocessObjectForIndex(this: DocumentWithMeiliIndex): Record<string, unknown> { |
| const object = _.omitBy(_.pick(this.toJSON(), attributesToIndex), (v, k) => |
| k.startsWith('$'), |
| ); |
|
|
| if ( |
| object.conversationId && |
| typeof object.conversationId === 'string' && |
| object.conversationId.includes('|') |
| ) { |
| object.conversationId = object.conversationId.replace(/\|/g, '--'); |
| } |
|
|
| if (object.content && Array.isArray(object.content)) { |
| object.text = parseTextParts(object.content); |
| delete object.content; |
| } |
|
|
| return object; |
| } |
|
|
| |
| |
| |
| async addObjectToMeili( |
| this: DocumentWithMeiliIndex, |
| next: CallbackWithoutResultAndOptionalError, |
| ): Promise<void> { |
| const object = this.preprocessObjectForIndex!(); |
| const maxRetries = 3; |
| let retryCount = 0; |
|
|
| while (retryCount < maxRetries) { |
| try { |
| await index.addDocuments([object]); |
| break; |
| } catch (error) { |
| retryCount++; |
| if (retryCount >= maxRetries) { |
| logger.error('[addObjectToMeili] Error adding document to Meili after retries:', error); |
| return next(); |
| } |
| |
| await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000)); |
| } |
| } |
|
|
| try { |
| await this.collection.updateMany( |
| { _id: this._id as Types.ObjectId }, |
| { $set: { _meiliIndex: true } }, |
| ); |
| } catch (error) { |
| logger.error('[addObjectToMeili] Error updating _meiliIndex field:', error); |
| return next(); |
| } |
|
|
| next(); |
| } |
|
|
| |
| |
| |
| async updateObjectToMeili( |
| this: DocumentWithMeiliIndex, |
| next: CallbackWithoutResultAndOptionalError, |
| ): Promise<void> { |
| try { |
| const object = _.omitBy(_.pick(this.toJSON(), attributesToIndex), (v, k) => |
| k.startsWith('$'), |
| ); |
| await index.updateDocuments([object]); |
| next(); |
| } catch (error) { |
| logger.error('[updateObjectToMeili] Error updating document in Meili:', error); |
| return next(); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| async deleteObjectFromMeili( |
| this: DocumentWithMeiliIndex, |
| next: CallbackWithoutResultAndOptionalError, |
| ): Promise<void> { |
| try { |
| await index.deleteDocument(this._id as string); |
| next(); |
| } catch (error) { |
| logger.error('[deleteObjectFromMeili] Error deleting document from Meili:', error); |
| return next(); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| postSaveHook(this: DocumentWithMeiliIndex, next: CallbackWithoutResultAndOptionalError): void { |
| if (this._meiliIndex) { |
| this.updateObjectToMeili!(next); |
| } else { |
| this.addObjectToMeili!(next); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| postUpdateHook( |
| this: DocumentWithMeiliIndex, |
| next: CallbackWithoutResultAndOptionalError, |
| ): void { |
| if (this._meiliIndex) { |
| this.updateObjectToMeili!(next); |
| } else { |
| next(); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| postRemoveHook( |
| this: DocumentWithMeiliIndex, |
| next: CallbackWithoutResultAndOptionalError, |
| ): void { |
| if (this._meiliIndex) { |
| this.deleteObjectFromMeili!(next); |
| } else { |
| next(); |
| } |
| } |
| } |
|
|
| return MeiliMongooseModel; |
| }; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): void { |
| const mongoose = options.mongoose; |
| validateOptions(options); |
|
|
| |
| schema.add({ |
| _meiliIndex: { |
| type: Boolean, |
| required: false, |
| select: false, |
| default: false, |
| }, |
| }); |
|
|
| const { host, apiKey, indexName, primaryKey } = options; |
| const syncOptions = { |
| batchSize: options.syncBatchSize || getSyncConfig().batchSize, |
| delayMs: options.syncDelayMs || getSyncConfig().delayMs, |
| }; |
|
|
| const client = new MeiliSearch({ host, apiKey }); |
|
|
| |
| const index = client.index<MeiliIndexable>(indexName); |
|
|
| |
| (async () => { |
| try { |
| await index.getRawInfo(); |
| logger.debug(`[mongoMeili] Index ${indexName} already exists`); |
| } catch (error) { |
| const errorCode = (error as { code?: string })?.code; |
| if (errorCode === 'index_not_found') { |
| try { |
| logger.info(`[mongoMeili] Creating new index: ${indexName}`); |
| await client.createIndex(indexName, { primaryKey }); |
| logger.info(`[mongoMeili] Successfully created index: ${indexName}`); |
| } catch (createError) { |
| |
| logger.debug(`[mongoMeili] Index ${indexName} may already exist:`, createError); |
| } |
| } else { |
| logger.error(`[mongoMeili] Error checking index ${indexName}:`, error); |
| } |
| } |
|
|
| |
| try { |
| await index.updateSettings({ |
| filterableAttributes: ['user'], |
| }); |
| logger.debug(`[mongoMeili] Updated index ${indexName} settings to make 'user' filterable`); |
| } catch (settingsError) { |
| logger.error(`[mongoMeili] Error updating index settings for ${indexName}:`, settingsError); |
| } |
| })(); |
|
|
| |
| const attributesToIndex: string[] = [ |
| ...Object.entries(schema.obj).reduce<string[]>((results, [key, value]) => { |
| const schemaValue = value as { meiliIndex?: boolean }; |
| return schemaValue.meiliIndex ? [...results, key] : results; |
| }, []), |
| ]; |
|
|
| |
| |
| if (schema.obj.user && !attributesToIndex.includes('user')) { |
| attributesToIndex.push('user'); |
| logger.debug(`[mongoMeili] Added 'user' field to ${indexName} index attributes`); |
| } |
|
|
| schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex, syncOptions })); |
|
|
| |
| schema.post('save', function (doc: DocumentWithMeiliIndex, next) { |
| doc.postSaveHook?.(next); |
| }); |
|
|
| schema.post('updateOne', function (doc: DocumentWithMeiliIndex, next) { |
| doc.postUpdateHook?.(next); |
| }); |
|
|
| schema.post('deleteOne', function (doc: DocumentWithMeiliIndex, next) { |
| doc.postRemoveHook?.(next); |
| }); |
|
|
| |
| schema.pre('deleteMany', async function (next) { |
| if (!meiliEnabled) { |
| return next(); |
| } |
|
|
| try { |
| const conditions = (this as Query<unknown, unknown>).getQuery(); |
| const { batchSize, delayMs } = syncOptions; |
|
|
| if (Object.prototype.hasOwnProperty.call(schema.obj, 'messages')) { |
| const convoIndex = client.index('convos'); |
| const deletedConvos = await mongoose |
| .model('Conversation') |
| .find(conditions as FilterQuery<unknown>) |
| .select('conversationId') |
| .lean(); |
|
|
| |
| await processBatch(deletedConvos, batchSize, delayMs, async (batch) => { |
| const promises = batch.map((convo: Record<string, unknown>) => |
| convoIndex.deleteDocument(convo.conversationId as string), |
| ); |
| await Promise.all(promises); |
| }); |
| } |
|
|
| if (Object.prototype.hasOwnProperty.call(schema.obj, 'messageId')) { |
| const messageIndex = client.index('messages'); |
| const deletedMessages = await mongoose |
| .model('Message') |
| .find(conditions as FilterQuery<unknown>) |
| .select('messageId') |
| .lean(); |
|
|
| |
| await processBatch(deletedMessages, batchSize, delayMs, async (batch) => { |
| const promises = batch.map((message: Record<string, unknown>) => |
| messageIndex.deleteDocument(message.messageId as string), |
| ); |
| await Promise.all(promises); |
| }); |
| } |
| return next(); |
| } catch (error) { |
| if (meiliEnabled) { |
| logger.error( |
| '[MeiliMongooseModel.deleteMany] There was an issue deleting conversation indexes upon deletion. Next startup may trigger syncing.', |
| error, |
| ); |
| } |
| return next(); |
| } |
| }); |
|
|
| |
| schema.post('findOneAndUpdate', async function (doc: DocumentWithMeiliIndex, next) { |
| if (!meiliEnabled) { |
| return next(); |
| } |
|
|
| if (doc.unfinished) { |
| return next(); |
| } |
|
|
| let meiliDoc: Record<string, unknown> | undefined; |
| if (doc.messages) { |
| try { |
| meiliDoc = await client.index('convos').getDocument(doc.conversationId as string); |
| } catch (error: unknown) { |
| logger.debug( |
| '[MeiliMongooseModel.findOneAndUpdate] Convo not found in MeiliSearch and will index ' + |
| doc.conversationId, |
| error as Record<string, unknown>, |
| ); |
| } |
| } |
|
|
| if (meiliDoc && meiliDoc.title === doc.title) { |
| return next(); |
| } |
|
|
| doc.postSaveHook?.(next); |
| }); |
| } |
|
|