from astrapy import DataAPIClient xforce_bcl_endpoint="https://b3cc3888-098f-45f7-9087-260ed43b742c-us-east-2.apps.astra.datastax.com" xforce_bcl_db_token="AstraCS:dRZIxPSXJqqWPPgJjMpoqnrJ:999c021256da787c45f0309e589c9e99884c6559a9337d09de9fa15cd88843dc" #Initialize the client client = DataAPIClient(xforce_bcl_db_token) db = client.get_database_by_api_endpoint(xforce_bcl_endpoint) coll="bcl" ie="individual_experiments" app="app" memory="memory" history="history" applayer="applayer" filtered_candidates="filtered_candidates" ooda="ooda" class DatabaseEngine(): def __init__(self): pass def Insert(self,data): collection=db.get_collection(coll) result = collection.insert_one(data) return result def Insert_IE(self,data): try: collection=db.get_collection(ie) collection.insert_one(data) return True except Exception as e: return False def InsertMemory(self,data): try: collection=db.get_collection(memory) collection.insert_one(data) return True except Exception as e: return False def Fetch_IE(self,bcl_id): collection=db.get_collection(ie) documents=collection.find({"bcl_id":bcl_id}) released_docs=[] for doc in documents: released_docs.append(doc) return released_docs def FetchMemory(self,bcl_id): collection=db.get_collection(memory) document=collection.find_one({"bcl_id":bcl_id}) return document def UpdateMemory(self,bcl_id,operation): collection=db.get_collection(memory) update_result_a = collection.update_one( {"bcl_id": bcl_id}, {"$push": {"executed_operations":operation}} ) #UPDATED def UpdateProject(self,projectid,plan): collection=db.get_collection(coll) update_result_a = collection.update_one( {"project.project_id": projectid}, {"$push": {"plans":plan}} ) def CheckEmpty(self,id): collection=db.get_collection(history) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def CheckEmptyAppLayer(self,id): collection=db.get_collection(applayer) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def CheckEmptyOps(self,id): collection=db.get_collection(memory) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def CheckEmptyProjects(self,id): collection=db.get_collection(coll) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def Insert_Conversation(self,data): collection=db.get_collection(history) collection.insert_one(data) def Update_Conversation(self,bcl_id,data): collection=db.get_collection(history) update_result_a = collection.update_one( {"bcl_id": bcl_id}, {"$push": {"messages": {"$each": data}}} ) def FetchConversation(self,id): collection=db.get_collection(history) document=collection.find_one({"bcl_id":id}) return document def Insert_AppLayer(self,data): collection=db.get_collection(applayer) collection.insert_one(data) def Update_AppLayer(self,bcl_id,data): collection=db.get_collection(applayer) update_result_a = collection.update_one( {"bcl_id": bcl_id}, {"$push": {"messages": {"$each": data}}} ) def Fetch_AppLayer(self,id): collection=db.get_collection(applayer) document=collection.find_one({"bcl_id":id}) return document def Update_Status(self,project_id,bcl_id): try: collection=db.get_collection(coll) # This is the query to find the correct document AND the specific plan. filter_query = { "project.project_id": project_id, "project.plans.bcl_id": bcl_id } update_operation = { "$set": { "project.plans.$.status": "done" } } collection.update_one(filter_query, update_operation) #collection.update_one(filter_query, update_operation) return True except Exception as e: return False def Find_User(self,uid): collection = db.get_collection(coll) cursor = collection.find({"user_id":uid}) documents=[] for document in cursor: bcl_id=document.get("bcl_id") status=document.get("status") newdoc={"id":bcl_id,"status":status} documents.append(newdoc) return documents def Find_Task(self,id): collection = db.get_collection(coll) cursor = collection.find({"bcl_id":id}) documents=[] for document in cursor: documents.append(document) return documents def Fetch_Status(self,bcl_id): collection=db.get_collection(coll) document=collection.find_one({"bcl_id":bcl_id}) return {"status":document["status"]} ''' def Fetch_Projects(self,user_id): collection=db.get_collection(coll) cursor=collection.find({"user_id":user_id}) documents=[] for document in cursor: projectid=document.get("project_id") status=document.get("status") documents.append({"project":projectid,"status":status}) return documents ''' def Fetch_Projects(self, user_id): #Get the correct collection from the database collection = db.get_collection(coll) #Find the document for the given user_id document = collection.find_one({"user_id": user_id}) #Initialize a list to hold the project details projects = [] #Check if a document was found and if it contains a 'project' field if document and "project" in document: # Get the project dictionary from the document project_data = document["project"] # Get the project_id from the project dictionary project_id = project_data.get("project_id") # Get the list of plans from the project data #plans = project_data.get("plans", []) # Iterate over each plan to extract its status and include the project_id #for plan in plans: projects.append(project_id) return projects def RegisterUser(self,data): collection=db.get_collection(app) collection.insert_one(data) def UpdateUser(self,uid,data): try: collection=db.get_collection(app) collection.update_one( {"username": uid}, {"$push": {"projects": data}} ) return True except Exception as e: return False def InsertFiltered(self,data): collection=db.get_collection(filtered_candidates) collection.insert_one(data) def FetchFiltered(self,bcl_id): collection=db.get_collection(filtered_candidates) document=collection.find_one({"bcl_id":bcl_id}) return document def CheckEmptyFiltered(self,id): collection=db.get_collection(filtered_candidates) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def UpdateFiltered(self,bcl_id,data): collection=db.get_collection(filtered_candidates) update_result_a = collection.update_one( {"bcl_id": bcl_id}, {"$push": {"candidates": {"$each": data}}} ) def Insert_OODA(self,data): collection=db.get_collection(ooda) collection.insert_one(data) def update_OODA(self,bcl_id,data): collection=db.get_collection(ooda) update_result_a=collection.update_one( {"bcl_id":bcl_id}, {"$push":{"ooda_loop":{"$each":data}}} ) def CheckEmptyOODA(self,id): collection=db.get_collection(ooda) cursor=collection.find({"bcl_id":id}) data=list(cursor) if not data: return True else: return False def FetchOODA(self,bcl_id): collection=db.get_collection(ooda) document=collection.find_one({"bcl_id":bcl_id}) return document