Spaces:
Sleeping
Sleeping
| from groq import Groq | |
| from databaseengine import DatabaseEngine | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from biological_context_language_orchestrator import Biological_Context_Orchestrator | |
| from biological_context_language import xFORCE_BIOLOGICAL_CONTEXT_LANGUAGE | |
| from databaseengine import DatabaseEngine | |
| from datetime import datetime, timedelta | |
| import json | |
| from query_dispatcher import QueryDispatcher | |
| from qa_agent import QA_Agent | |
| client=Groq(api_key="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") | |
| de=DatabaseEngine() | |
| scheduler=BackgroundScheduler() | |
| scheduler.start() | |
| BCL=xFORCE_BIOLOGICAL_CONTEXT_LANGUAGE() | |
| BCL_Orc=Biological_Context_Orchestrator() | |
| def scheduling_task(project_id,id,bcl_input,target): | |
| BCL_Orc.BCL_Orchestrator(project_id,id,bcl_input,target) | |
| def create_and_execute_bcl_workflow(bio_query,uid,user_id,project_id,target): | |
| try: | |
| #ABORT_FLAG=False | |
| bcl_plan=BCL.BCL_PLANNER(bio_query,uid) | |
| def VIOLATION_CHECK(): | |
| ''' compare if previously generated such ''' | |
| #global ABORT_FLAG | |
| bcl_already_in_db=de.Fetch_IE(f"origin_ai_bio_{user_id}_{project_id}") | |
| #exp_list = ast.literal_eval(bcl_already_in_db) if isinstance(bcl_already_in_db, str) else bcl_already_in_db | |
| present_ops=[] | |
| bcl_ops=[] | |
| bcl_already_in_db_list=json.loads(bcl_already_in_db) | |
| for objj in bcl_already_in_db_list: | |
| present_ops.append(objj["operation"]) | |
| #bcl_plan_json=json.loads(bcl_plan) | |
| experiments_in_bcl_plan=bcl_plan.get("experiments") | |
| for exp_objj in experiments_in_bcl_plan: | |
| bcl_ops.append(exp_objj["operation"]) | |
| present_ops_set=set(present_ops) | |
| bcl_ops_set=set(bcl_ops) | |
| common_elms=present_ops_set & bcl_ops_set | |
| return len(common_elms) > 0 | |
| emptyproject=de.CheckEmptyProjects(uid) | |
| if emptyproject ==True: | |
| project_payload={ | |
| #"bcl_id":uid, | |
| "user_id":user_id, | |
| "project":{ | |
| "project_id":project_id, | |
| "plans":[{ "bcl_id":uid, "bcl_plan":bcl_plan, "status":"active" }] | |
| }, | |
| "target":target | |
| } | |
| de.Insert(project_payload) | |
| elif emptyproject ==False: | |
| violation1=VIOLATION_CHECK() | |
| if violation1==False: | |
| de.UpdateProject(project_id,bcl_plan) | |
| elif violation1==True: | |
| pass | |
| violation2=VIOLATION_CHECK() | |
| if violation2==False: | |
| run_time = datetime.now() + timedelta(seconds=1) | |
| scheduler.add_job(scheduling_task, trigger='date', run_date=run_time,args=[project_id,uid,bcl_plan,target]) | |
| return json.dumps( | |
| { | |
| "operation":str(bio_query), | |
| "status":"active" | |
| } | |
| ) | |
| elif violation2==True: | |
| return json.dumps({ | |
| "operation":str(bio_query), | |
| "status":"Operation already in the system" | |
| }) | |
| except Exception as e: | |
| return json.dumps( | |
| { | |
| "operation":str(bio_query), | |
| "status":str(e) | |
| } | |
| ) | |
| def qa_wrapper(context,question): | |
| answer=QA_Agent(context,question) | |
| return answer | |
| def RoutingAgent(user_query,uid,user_id,project_id,target): | |
| dispatcher_output=QueryDispatcher(query=user_query) | |
| if dispatcher_output.get("score") == "High": | |
| match dispatcher_output.get("agent") : | |
| case "bio_engineering_agent": | |
| result=create_and_execute_bcl_workflow(user_query,uid,user_id,project_id,target) | |
| return result | |
| case "bio_engineering_question_answer_agent": | |
| context=None | |
| empty_ooda=de.CheckEmptyOODA(uid) | |
| if empty_ooda == True: | |
| context ="!Information not available" | |
| elif empty_ooda == False: | |
| context=de.FetchOODA(uid) | |
| answer=qa_wrapper(context,user_query) | |
| return str(answer) | |
| onboard_tools={ | |
| #"create_and_execute_bcl_workflow":create_and_execute_bcl_workflow | |
| "routing_agent":RoutingAgent | |
| } | |
| sfl=""" | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "create_and_execute_bcl_workflow", | |
| "description": "Takes in a high level experimental goals and converts them to bcl workflow and executes that workflow ", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "bio_query": { | |
| "type": "string", | |
| "description": "high level bio query" | |
| }, | |
| }, | |
| "required": ["bio_query"] | |
| } | |
| } | |
| } | |
| """ | |
| tools=[ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "routing_agent", | |
| "description": "Takes in exact user query and routes it to appropriate agent", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_query": { | |
| "type": "string", | |
| "description": "original user query without alteration" | |
| }, | |
| }, | |
| "required": ["user_query"] | |
| } | |
| } | |
| } | |
| ] | |
| def PROMPT_FOR_APPLICATION_LAYER_AGENT_V2(): | |
| return f""" | |
| ROLE: | |
| You are a biological AI assistant whose sole purpose is to route user queries to the provided tool. | |
| INPUT: | |
| A user's biological intent or problem description, in natural language | |
| GOAL: | |
| Call the provided tool (the routing agent) exactly once with the user's original input (verbatim), and respond with exactly what the tool communicates, translated into natural language if the tool response is structured (e.g., JSON). | |
| RULES: | |
| β Always call the tool only once with the exact original user input. | |
| β If the tool returns a structured response (e.g., a JSON like {{ "operation":"user prompt", "status":"active" }}), interpret and convert it accurately into plain natural language (e.g., "The operation 'user prompt' is currently active."). | |
| β If the tool returns plain text, return it exactly as is β do not paraphrase, explain, or alter. | |
| π« Do not add any commentary, explanations, analysis, markdown formatting, or extra information β even if the tool's response seems unclear or minimal. | |
| π« Do not hallucinate or fabricate responses or parts of responses under any condition. | |
| π« Do not wrap responses in code blocks or formatting. | |
| """ | |
| def ApplicationLayerAgent(user_input,uid,user_id,project_id,target): | |
| actual_preserved_message={"role":"system","content":PROMPT_FOR_APPLICATION_LAYER_AGENT_V2()} | |
| status=de.CheckEmptyAppLayer(uid) | |
| g_messages=[ | |
| actual_preserved_message | |
| ] | |
| if status ==True: | |
| de.Insert_AppLayer({ | |
| "bcl_id":uid, | |
| "messages":[{ | |
| "role":"user", | |
| "content":user_input | |
| }] | |
| }) | |
| g_messages.append({"role":"user","content":user_input}) | |
| elif status == False : | |
| de.Update_AppLayer(uid,[{"role":"user","content":user_input}]) | |
| history=de.Fetch_AppLayer(uid) | |
| history=history.get("messages") | |
| for message in history: | |
| g_messages.append(message) | |
| if len(g_messages) > 8: | |
| g_messages=g_messages[-4:] | |
| g_messages.insert(0,actual_preserved_message) | |
| response = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=g_messages, | |
| stream=False, | |
| max_completion_tokens=5000, | |
| tools=tools, | |
| tool_choice={'type': 'function', 'function': {'name': 'routing_agent'}} | |
| ) | |
| response_message=response.choices[0].message.content | |
| tool_calls = tool_calls = response.choices[0].message.tool_calls | |
| if tool_calls: | |
| #g_messages.append(response_message) | |
| #de.Update_AppLayer(id,response_message) | |
| for tool_call in tool_calls: | |
| function_name = tool_call.function.name | |
| function_to_call = onboard_tools[function_name] | |
| function_args = json.loads(tool_call.function.arguments) | |
| #Call the tool and get the response | |
| function_response = function_to_call( | |
| user_query=function_args.get("user_query"), | |
| uid=uid, | |
| user_id=user_id, | |
| project_id=project_id, | |
| target=target | |
| ) | |
| g_messages.append( | |
| { | |
| "tool_call_id": tool_call.id, | |
| "role": "tool", | |
| "name": function_name, | |
| "content": function_response, | |
| } | |
| ) | |
| de.Update_AppLayer(uid,[ | |
| { | |
| "tool_call_id": tool_call.id, | |
| "role": "tool", | |
| "name": function_name, | |
| "content": function_response, | |
| } | |
| ]) | |
| second_response = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=g_messages | |
| ) | |
| #Return the final response | |
| de.Update_AppLayer(uid,[ {"role":"assistant","content":second_response.choices[0].message.content } ]) | |
| return second_response.choices[0].message.content | |
| else: | |
| return response_message |