import os import sys from pymongo import MongoClient from dotenv import load_dotenv project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(project_root) from tools_.delveant_all_tools_.crm_data_extraction import (extract_crm_lead) from models_.code_generation_model import CodeGenerationLLM from models_.delveant_llm_model_.multi_agent_llm import CollectionAndQueryFinder load_dotenv() MONGO_URI = os.getenv("MONGO_URI") mongo_db_client = MongoClient(MONGO_URI) class Delveant: def __init__(self): self.mongo_db_client = mongo_db_client def extract_keywords(self,db_name): db = self.mongo_db_client[db_name] crm_leads_collection = db["CRM_AICollection_Keys"] first_two = list(crm_leads_collection.find().sort([("_id", 1)]).limit(2)) last_two = list(crm_leads_collection.find().sort([("_id", -1)]).limit(2)) documents = first_two + last_two formatted_output = [] for doc in documents: for item in doc.get("keywords", []): collection_name = item.get("collection_name", "") keywords = item.get("keywords", []) formatted_output.append({ "collection_name": collection_name, "keywords": keywords }) return formatted_output def extract_lead_data_(self,input_message,collection_name,db_name,user_id,perso_id): documents, field_name_datatypes, status, source = extract_crm_lead( collection_name=collection_name, db_name=db_name, ) prompt = mongodb_query_generator_leads(input_message,documents,user_id, field_name_datatypes,db_name,collection_name,source) return prompt def resolve_date_variables(self,final_json) -> dict: python_code = final_json.get("python_code") if not python_code: return final_json local_vars = {} exec(python_code, {"datetime": datetime, "timedelta": timedelta, "timezone": timezone}, local_vars) def replace_vars(obj): if isinstance(obj, dict): return {k: replace_vars(v) for k, v in obj.items()} elif isinstance(obj, list): return [replace_vars(i) for i in obj] elif isinstance(obj, str) and obj in local_vars: return local_vars[obj] return obj final_json["filter"] = replace_vars(final_json.get("filter", {})) final_json["pipeline"] = replace_vars(final_json.get("pipeline", [])) return final_json async def final_response_executer(self,prompt): try: answer = generate_response(prompt) final_json = json.loads(answer) final_json = resolve_date_variables(final_json) pipeline = final_json.get("pipeline", []) filter_ = final_json.get("filter", {}) operation = final_json.get("operation", "find") projection = final_json.get("projection", {}) sort = final_json.get("sort", {}) limit = final_json.get("limit", 0) if operation.lower() == "find": cursor = crm_leads_collection.find(filter_, projection) if sort: cursor = cursor.sort(list(sort.items())) if limit: cursor = cursor.limit(limit) results = list(cursor) elif operation.lower() == "aggregate": if sort and not any("$sort" in s for s in pipeline): pipeline.append({"$sort": sort}) if limit and not any("$limit" in s for s in pipeline): pipeline.append({"$limit": limit}) results = list(crm_leads_collection.aggregate(pipeline)) else: print("Unknown operation:", operation) results = [] json_data = json.loads(json_util.dumps(results)) cleaned_documents = [] for doc in json_data: cleaned_doc = { k: v for k, v in doc.items() if v not in (None, '') and k != "_id" } if cleaned_doc: cleaned_documents.append(cleaned_doc) return cleaned_documents except Exception as e: return "error" async def generate_response(self,prompt,model="openai"): if model == "openai": print("openai model call") llm = CodeGenerationLLM() response = await llm.generate_final_output(prompt) if model == "openrouter": print("openrouter model call") llm = OpenRouterClient() response = await llm.stream_chat(prompt) return response async def collection_query_finader(self,prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query): llm = CollectionAndQueryFinder() return await llm.generate_response(prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query) async def db_and_collection(self,db_name,collection_name): db = self.mongo_db_client[db_name] collection_name= db[collection_name] return db, collection_name async def final_output_extractor_and_responser(self, prompt,priveous_chat_history_query ,agents,db_name,user_id,person_id): pass async def run_file(self,user_input,priveous_chat_history_query,db_name, user_id, person_id): answer = await self.final_output_extractor_and_responser( user_input,priveous_chat_history_query, db_collection_find, db_name, user_id, person_id )