| import os | |
| import sys | |
| from pymongo import MongoClient | |
| from dotenv import load_dotenv | |
| project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| sys.path.append(project_root) | |
| from tools_.delveant_all_tools_.crm_data_extraction import (extract_crm_lead) | |
| from models_.code_generation_model import CodeGenerationLLM | |
| from models_.delveant_llm_model_.multi_agent_llm import CollectionAndQueryFinder | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| mongo_db_client = MongoClient(MONGO_URI) | |
| class Delveant: | |
| def __init__(self): | |
| self.mongo_db_client = mongo_db_client | |
| def extract_keywords(self,db_name): | |
| db = self.mongo_db_client[db_name] | |
| crm_leads_collection = db["CRM_AICollection_Keys"] | |
| first_two = list(crm_leads_collection.find().sort([("_id", 1)]).limit(2)) | |
| last_two = list(crm_leads_collection.find().sort([("_id", -1)]).limit(2)) | |
| documents = first_two + last_two | |
| formatted_output = [] | |
| for doc in documents: | |
| for item in doc.get("keywords", []): | |
| collection_name = item.get("collection_name", "") | |
| keywords = item.get("keywords", []) | |
| formatted_output.append({ | |
| "collection_name": collection_name, | |
| "keywords": keywords | |
| }) | |
| return formatted_output | |
| def extract_lead_data_(self,input_message,collection_name,db_name,user_id,perso_id): | |
| documents, field_name_datatypes, status, source = extract_crm_lead( | |
| collection_name=collection_name, | |
| db_name=db_name, | |
| ) | |
| prompt = mongodb_query_generator_leads(input_message,documents,user_id, | |
| field_name_datatypes,db_name,collection_name,source) | |
| return prompt | |
| def resolve_date_variables(self,final_json) -> dict: | |
| python_code = final_json.get("python_code") | |
| if not python_code: | |
| return final_json | |
| local_vars = {} | |
| exec(python_code, {"datetime": datetime, "timedelta": timedelta, "timezone": timezone}, local_vars) | |
| def replace_vars(obj): | |
| if isinstance(obj, dict): | |
| return {k: replace_vars(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [replace_vars(i) for i in obj] | |
| elif isinstance(obj, str) and obj in local_vars: | |
| return local_vars[obj] | |
| return obj | |
| final_json["filter"] = replace_vars(final_json.get("filter", {})) | |
| final_json["pipeline"] = replace_vars(final_json.get("pipeline", [])) | |
| return final_json | |
| async def final_response_executer(self,prompt): | |
| try: | |
| answer = generate_response(prompt) | |
| final_json = json.loads(answer) | |
| final_json = resolve_date_variables(final_json) | |
| pipeline = final_json.get("pipeline", []) | |
| filter_ = final_json.get("filter", {}) | |
| operation = final_json.get("operation", "find") | |
| projection = final_json.get("projection", {}) | |
| sort = final_json.get("sort", {}) | |
| limit = final_json.get("limit", 0) | |
| if operation.lower() == "find": | |
| cursor = crm_leads_collection.find(filter_, projection) | |
| if sort: | |
| cursor = cursor.sort(list(sort.items())) | |
| if limit: | |
| cursor = cursor.limit(limit) | |
| results = list(cursor) | |
| elif operation.lower() == "aggregate": | |
| if sort and not any("$sort" in s for s in pipeline): | |
| pipeline.append({"$sort": sort}) | |
| if limit and not any("$limit" in s for s in pipeline): | |
| pipeline.append({"$limit": limit}) | |
| results = list(crm_leads_collection.aggregate(pipeline)) | |
| else: | |
| print("Unknown operation:", operation) | |
| results = [] | |
| json_data = json.loads(json_util.dumps(results)) | |
| cleaned_documents = [] | |
| for doc in json_data: | |
| cleaned_doc = { | |
| k: v | |
| for k, v in doc.items() | |
| if v not in (None, '') and k != "_id" | |
| } | |
| if cleaned_doc: | |
| cleaned_documents.append(cleaned_doc) | |
| return cleaned_documents | |
| except Exception as e: | |
| return "error" | |
| async def generate_response(self,prompt,model="openai"): | |
| if model == "openai": | |
| print("openai model call") | |
| llm = CodeGenerationLLM() | |
| response = await llm.generate_final_output(prompt) | |
| if model == "openrouter": | |
| print("openrouter model call") | |
| llm = OpenRouterClient() | |
| response = await llm.stream_chat(prompt) | |
| return response | |
| async def collection_query_finader(self,prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query): | |
| llm = CollectionAndQueryFinder() | |
| return await llm.generate_response(prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query) | |
| async def db_and_collection(self,db_name,collection_name): | |
| db = self.mongo_db_client[db_name] | |
| collection_name= db[collection_name] | |
| return db, collection_name | |
| async def final_output_extractor_and_responser(self, prompt,priveous_chat_history_query ,agents,db_name,user_id,person_id): | |
| pass | |
| async def run_file(self,user_input,priveous_chat_history_query,db_name, user_id, person_id): | |
| answer = await self.final_output_extractor_and_responser( | |
| user_input,priveous_chat_history_query, db_collection_find, db_name, user_id, person_id | |
| ) | |