vicky4s4s's picture
Upload 76 files
01e9350 verified
import os
import sys
from pymongo import MongoClient
from dotenv import load_dotenv
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(project_root)
from tools_.delveant_all_tools_.crm_data_extraction import (extract_crm_lead)
from models_.code_generation_model import CodeGenerationLLM
from models_.delveant_llm_model_.multi_agent_llm import CollectionAndQueryFinder
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
mongo_db_client = MongoClient(MONGO_URI)
class Delveant:
def __init__(self):
self.mongo_db_client = mongo_db_client
def extract_keywords(self,db_name):
db = self.mongo_db_client[db_name]
crm_leads_collection = db["CRM_AICollection_Keys"]
first_two = list(crm_leads_collection.find().sort([("_id", 1)]).limit(2))
last_two = list(crm_leads_collection.find().sort([("_id", -1)]).limit(2))
documents = first_two + last_two
formatted_output = []
for doc in documents:
for item in doc.get("keywords", []):
collection_name = item.get("collection_name", "")
keywords = item.get("keywords", [])
formatted_output.append({
"collection_name": collection_name,
"keywords": keywords
})
return formatted_output
def extract_lead_data_(self,input_message,collection_name,db_name,user_id,perso_id):
documents, field_name_datatypes, status, source = extract_crm_lead(
collection_name=collection_name,
db_name=db_name,
)
prompt = mongodb_query_generator_leads(input_message,documents,user_id,
field_name_datatypes,db_name,collection_name,source)
return prompt
def resolve_date_variables(self,final_json) -> dict:
python_code = final_json.get("python_code")
if not python_code:
return final_json
local_vars = {}
exec(python_code, {"datetime": datetime, "timedelta": timedelta, "timezone": timezone}, local_vars)
def replace_vars(obj):
if isinstance(obj, dict):
return {k: replace_vars(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_vars(i) for i in obj]
elif isinstance(obj, str) and obj in local_vars:
return local_vars[obj]
return obj
final_json["filter"] = replace_vars(final_json.get("filter", {}))
final_json["pipeline"] = replace_vars(final_json.get("pipeline", []))
return final_json
async def final_response_executer(self,prompt):
try:
answer = generate_response(prompt)
final_json = json.loads(answer)
final_json = resolve_date_variables(final_json)
pipeline = final_json.get("pipeline", [])
filter_ = final_json.get("filter", {})
operation = final_json.get("operation", "find")
projection = final_json.get("projection", {})
sort = final_json.get("sort", {})
limit = final_json.get("limit", 0)
if operation.lower() == "find":
cursor = crm_leads_collection.find(filter_, projection)
if sort:
cursor = cursor.sort(list(sort.items()))
if limit:
cursor = cursor.limit(limit)
results = list(cursor)
elif operation.lower() == "aggregate":
if sort and not any("$sort" in s for s in pipeline):
pipeline.append({"$sort": sort})
if limit and not any("$limit" in s for s in pipeline):
pipeline.append({"$limit": limit})
results = list(crm_leads_collection.aggregate(pipeline))
else:
print("Unknown operation:", operation)
results = []
json_data = json.loads(json_util.dumps(results))
cleaned_documents = []
for doc in json_data:
cleaned_doc = {
k: v
for k, v in doc.items()
if v not in (None, '') and k != "_id"
}
if cleaned_doc:
cleaned_documents.append(cleaned_doc)
return cleaned_documents
except Exception as e:
return "error"
async def generate_response(self,prompt,model="openai"):
if model == "openai":
print("openai model call")
llm = CodeGenerationLLM()
response = await llm.generate_final_output(prompt)
if model == "openrouter":
print("openrouter model call")
llm = OpenRouterClient()
response = await llm.stream_chat(prompt)
return response
async def collection_query_finader(self,prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query):
llm = CollectionAndQueryFinder()
return await llm.generate_response(prompt,agents,user_id,person_id,collection_keywords,priveous_chat_history_query)
async def db_and_collection(self,db_name,collection_name):
db = self.mongo_db_client[db_name]
collection_name= db[collection_name]
return db, collection_name
async def final_output_extractor_and_responser(self, prompt,priveous_chat_history_query ,agents,db_name,user_id,person_id):
pass
async def run_file(self,user_input,priveous_chat_history_query,db_name, user_id, person_id):
answer = await self.final_output_extractor_and_responser(
user_input,priveous_chat_history_query, db_collection_find, db_name, user_id, person_id
)