Spaces:
Runtime error
Runtime error
| from utils.mongo_utils import generate_mongodb_query, get_prompt | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_core.output_parsers import JsonOutputParser | |
| import random | |
| from utils.utils import timing_decorator | |
| class MongoSearch: | |
| def __init__(self, collection, search_index, index_variable, embedding_model="text-embedding-3-large"): | |
| self.collection = collection | |
| self.embedding_model = OpenAIEmbeddings(model=embedding_model) | |
| self.llm = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0) | |
| self.parser = JsonOutputParser() | |
| self.search_index = search_index | |
| self.index_variable = index_variable | |
| def __call__(self, query, k=4, use_filter=True): | |
| query_filter = {} | |
| if use_filter: | |
| result = self.llm.invoke(get_prompt(query)) | |
| parser = JsonOutputParser() | |
| result = parser.parse(result.content) | |
| query_filter = generate_mongodb_query(result) | |
| query_vector = self.embedding_model.embed_query(query) | |
| # define pipeline | |
| first_pipeline = [ | |
| { | |
| '$vectorSearch': { | |
| 'index': self.search_index, | |
| 'path': self.index_variable, | |
| 'filter': query_filter if query_filter else {}, | |
| 'queryVector': query_vector, | |
| 'numCandidates': k * 3, | |
| 'limit': k | |
| } | |
| }, | |
| { | |
| '$project': { | |
| 'makeModel': 1, | |
| } | |
| }, | |
| ] | |
| # Step 1: Run pipeline to get the makeModel from the first vector search | |
| first_search_results = list(self.collection.aggregate(first_pipeline)) | |
| # Extract unique makeModel values for the next step | |
| make_model_list = [doc['makeModel'] for doc in first_search_results] | |
| k = k * 3 | |
| # Define the second pipeline | |
| second_pipeline = [ | |
| { | |
| '$vectorSearch': { | |
| 'index': 'filter-vector-index', # Perform vector search on the features search index | |
| 'path': 'feature_embedding', # Path to the feature embeddings | |
| 'filter': { | |
| 'makeModel': {'$in': make_model_list} # Use makeModel as a filter | |
| }, | |
| 'queryVector': query_vector, | |
| 'numCandidates': k * 3, | |
| 'limit': k | |
| } | |
| }, | |
| { | |
| '$project': { | |
| 'description': 0, # Exclude 'description' | |
| 'variants': 0, # Exclude 'variants' | |
| 'review_embedding': 0, # Exclude 'review_embedding' | |
| 'feature_embedding':0 | |
| } | |
| } | |
| ] | |
| # run pipeline | |
| result = self.collection.aggregate(second_pipeline) | |
| # Convert the result cursor to a list | |
| result_list = list(result) | |
| # Randomly select k/3 objects from the results | |
| k_third = k // 3 | |
| selected_cars = random.sample(result_list, min(k_third, len(result_list))) | |
| # Update the result with the randomly selected cars | |
| result = selected_cars | |
| cars = [] | |
| for i in result: | |
| cars.append(i) | |
| return cars |