File size: 5,987 Bytes
01e9350 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import os
import sys
from pymongo import MongoClient
from dotenv import load_dotenv
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(project_root)
from tools_.delveant_all_tools_.crm_data_extraction import (extract_crm_lead)
from models_.code_generation_model import CodeGenerationLLM
from models_.delveant_llm_model_.multi_agent_llm import CollectionAndQueryFinder
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
mongo_db_client = MongoClient(MONGO_URI)
class Delveant:
def __init__(self):
self.mongo_db_client = mongo_db_client
def extract_keywords(self,db_name):
db = self.mongo_db_client[db_name]
crm_leads_collection = db["CRM_AICollection_Keys"]
first_two = list(crm_leads_collection.find().sort([("_id", 1)]).limit(2))
last_two = list(crm_leads_collection.find().sort([("_id", -1)]).limit(2))
documents = first_two + last_two
formatted_output = []
for doc in documents:
for item in doc.get("keywords", []):
collection_name = item.get("collection_name", "")
keywords = item.get("keywords", [])
formatted_output.append({
"collection_name": collection_name,
"keywords": keywords
})
return formatted_output
def extract_lead_data_(self,input_message,collection_name,db_name,user_id,perso_id):
documents, field_name_datatypes, status, source = extract_crm_lead(
collection_name=collection_name,
db_name=db_name,
)
prompt = mongodb_query_generator_leads(input_message,documents,user_id,
field_name_datatypes,db_name,collection_name,source)
return prompt
def resolve_date_variables(self,final_json) -> dict:
python_code = final_json.get("python_code")
if not python_code:
return final_json
local_vars = {}
exec(python_code, {"datetime": datetime, "timedelta": timedelta, "timezone": timezone}, local_vars)
def replace_vars(obj):
if isinstance(obj, dict):
return {k: replace_vars(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_vars(i) for i in obj]
elif isinstance(obj, str) and obj in local_vars:
return local_vars[obj]
return obj
final_json["filter"] = replace_vars(final_json.get("filter", {}))
final_json["pipeline"] = replace_vars(final_json.get("pipeline", []))
return final_json
async def final_response_executer(self,prompt):
try:
answer = generate_response(prompt)
final_json = json.loads(answer)
final_json = resolve_date_variables(final_json)
pipeline = final_json.get("pipeline", [])
filter_ = final_json.get("filter", {})
operation = final_json.get("operation", "find")
projection = final_json.get("projection", {})
sort = final_json.get("sort", {})
limit = final_json.get("limit", 0)
if operation.lower() == "find":
cursor = crm_leads_collection.find(filter_, projection)
if sort:
cursor = cursor.sort(list(sort.items()))
if limit:
cursor = cursor.limit(limit)
results = list(cursor)
elif operation.lower() == "aggregate":
if sort and not any("$sort" in s for s in pipeline):
pipeline.append({"$sort": sort})
if limit and not any("$limit" in s for s in pipeline):
pipeline.append({"$limit": limit})
results = list(crm_leads_collection.aggregate(pipeline))
else:
print("Unknown operation:", operation)
results = []
json_data = json.loads(json_util.dumps(results))
cleaned_documents = []
for doc in json_data:
cleaned_doc = {
k: v
for k, v in doc.items()
if v not in (None, '') and k != "_id"
}
if cleaned_doc:
cleaned_documents.append(cleaned_doc)
return cleaned_documents
except Exception as e:
return "error"
async def generate_response(self,prompt,model="openai"):
if model == "openai":
print("openai model call")
llm = CodeGenerationLLM()
response = await llm.generate_final_output(prompt)
if model == "openrouter":
print("openrouter model call")
llm = OpenRouterClient()
response = await llm.stream_chat(prompt)
return response
async def collection_query_finader(self,prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query):
llm = CollectionAndQueryFinder()
return await llm.generate_response(prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query)
async def db_and_collection(self,db_name,collection_name):
db = self.mongo_db_client[db_name]
collection_name= db[collection_name]
return db, collection_name
async def final_output_extractor_and_responser(self, prompt,priveous_chat_history_query ,agents,db_name,user_id,person_id):
pass
async def run_file(self,user_input,priveous_chat_history_query,db_name, user_id, person_id):
answer = await self.final_output_extractor_and_responser(
user_input,priveous_chat_history_query, db_collection_find, db_name, user_id, person_id
)
|