originbio-component1 / application_layer_agent.py
Junaidb's picture
Create application_layer_agent.py
66d2729 verified
from groq import Groq
from databaseengine import DatabaseEngine
from apscheduler.schedulers.background import BackgroundScheduler
from biological_context_language_orchestrator import Biological_Context_Orchestrator
from biological_context_language import xFORCE_BIOLOGICAL_CONTEXT_LANGUAGE
from databaseengine import DatabaseEngine
from datetime import datetime, timedelta
import json
from query_dispatcher import QueryDispatcher
from qa_agent import QA_Agent
client=Groq(api_key="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")
de=DatabaseEngine()
scheduler=BackgroundScheduler()
scheduler.start()
BCL=xFORCE_BIOLOGICAL_CONTEXT_LANGUAGE()
BCL_Orc=Biological_Context_Orchestrator()
def scheduling_task(project_id,id,bcl_input,target):
BCL_Orc.BCL_Orchestrator(project_id,id,bcl_input,target)
def create_and_execute_bcl_workflow(bio_query,uid,user_id,project_id,target):
try:
#ABORT_FLAG=False
bcl_plan=BCL.BCL_PLANNER(bio_query,uid)
def VIOLATION_CHECK():
''' compare if previously generated such '''
#global ABORT_FLAG
bcl_already_in_db=de.Fetch_IE(f"origin_ai_bio_{user_id}_{project_id}")
#exp_list = ast.literal_eval(bcl_already_in_db) if isinstance(bcl_already_in_db, str) else bcl_already_in_db
present_ops=[]
bcl_ops=[]
bcl_already_in_db_list=json.loads(bcl_already_in_db)
for objj in bcl_already_in_db_list:
present_ops.append(objj["operation"])
#bcl_plan_json=json.loads(bcl_plan)
experiments_in_bcl_plan=bcl_plan.get("experiments")
for exp_objj in experiments_in_bcl_plan:
bcl_ops.append(exp_objj["operation"])
present_ops_set=set(present_ops)
bcl_ops_set=set(bcl_ops)
common_elms=present_ops_set & bcl_ops_set
return len(common_elms) > 0
emptyproject=de.CheckEmptyProjects(uid)
if emptyproject ==True:
project_payload={
#"bcl_id":uid,
"user_id":user_id,
"project":{
"project_id":project_id,
"plans":[{ "bcl_id":uid, "bcl_plan":bcl_plan, "status":"active" }]
},
"target":target
}
de.Insert(project_payload)
elif emptyproject ==False:
violation1=VIOLATION_CHECK()
if violation1==False:
de.UpdateProject(project_id,bcl_plan)
elif violation1==True:
pass
violation2=VIOLATION_CHECK()
if violation2==False:
run_time = datetime.now() + timedelta(seconds=1)
scheduler.add_job(scheduling_task, trigger='date', run_date=run_time,args=[project_id,uid,bcl_plan,target])
return json.dumps(
{
"operation":str(bio_query),
"status":"active"
}
)
elif violation2==True:
return json.dumps({
"operation":str(bio_query),
"status":"Operation already in the system"
})
except Exception as e:
return json.dumps(
{
"operation":str(bio_query),
"status":str(e)
}
)
def qa_wrapper(context,question):
answer=QA_Agent(context,question)
return answer
def RoutingAgent(user_query,uid,user_id,project_id,target):
dispatcher_output=QueryDispatcher(query=user_query)
if dispatcher_output.get("score") == "High":
match dispatcher_output.get("agent") :
case "bio_engineering_agent":
result=create_and_execute_bcl_workflow(user_query,uid,user_id,project_id,target)
return result
case "bio_engineering_question_answer_agent":
context=None
empty_ooda=de.CheckEmptyOODA(uid)
if empty_ooda == True:
context ="!Information not available"
elif empty_ooda == False:
context=de.FetchOODA(uid)
answer=qa_wrapper(context,user_query)
return str(answer)
onboard_tools={
#"create_and_execute_bcl_workflow":create_and_execute_bcl_workflow
"routing_agent":RoutingAgent
}
sfl="""
{
"type": "function",
"function": {
"name": "create_and_execute_bcl_workflow",
"description": "Takes in a high level experimental goals and converts them to bcl workflow and executes that workflow ",
"parameters": {
"type": "object",
"properties": {
"bio_query": {
"type": "string",
"description": "high level bio query"
},
},
"required": ["bio_query"]
}
}
}
"""
tools=[
{
"type": "function",
"function": {
"name": "routing_agent",
"description": "Takes in exact user query and routes it to appropriate agent",
"parameters": {
"type": "object",
"properties": {
"user_query": {
"type": "string",
"description": "original user query without alteration"
},
},
"required": ["user_query"]
}
}
}
]
def PROMPT_FOR_APPLICATION_LAYER_AGENT_V2():
return f"""
ROLE:
You are a biological AI assistant whose sole purpose is to route user queries to the provided tool.
INPUT:
A user's biological intent or problem description, in natural language
GOAL:
Call the provided tool (the routing agent) exactly once with the user's original input (verbatim), and respond with exactly what the tool communicates, translated into natural language if the tool response is structured (e.g., JSON).
RULES:
βœ… Always call the tool only once with the exact original user input.
βœ… If the tool returns a structured response (e.g., a JSON like {{ "operation":"user prompt", "status":"active" }}), interpret and convert it accurately into plain natural language (e.g., "The operation 'user prompt' is currently active.").
βœ… If the tool returns plain text, return it exactly as is β€” do not paraphrase, explain, or alter.
🚫 Do not add any commentary, explanations, analysis, markdown formatting, or extra information β€” even if the tool's response seems unclear or minimal.
🚫 Do not hallucinate or fabricate responses or parts of responses under any condition.
🚫 Do not wrap responses in code blocks or formatting.
"""
def ApplicationLayerAgent(user_input,uid,user_id,project_id,target):
actual_preserved_message={"role":"system","content":PROMPT_FOR_APPLICATION_LAYER_AGENT_V2()}
status=de.CheckEmptyAppLayer(uid)
g_messages=[
actual_preserved_message
]
if status ==True:
de.Insert_AppLayer({
"bcl_id":uid,
"messages":[{
"role":"user",
"content":user_input
}]
})
g_messages.append({"role":"user","content":user_input})
elif status == False :
de.Update_AppLayer(uid,[{"role":"user","content":user_input}])
history=de.Fetch_AppLayer(uid)
history=history.get("messages")
for message in history:
g_messages.append(message)
if len(g_messages) > 8:
g_messages=g_messages[-4:]
g_messages.insert(0,actual_preserved_message)
response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=g_messages,
stream=False,
max_completion_tokens=5000,
tools=tools,
tool_choice={'type': 'function', 'function': {'name': 'routing_agent'}}
)
response_message=response.choices[0].message.content
tool_calls = tool_calls = response.choices[0].message.tool_calls
if tool_calls:
#g_messages.append(response_message)
#de.Update_AppLayer(id,response_message)
for tool_call in tool_calls:
function_name = tool_call.function.name
function_to_call = onboard_tools[function_name]
function_args = json.loads(tool_call.function.arguments)
#Call the tool and get the response
function_response = function_to_call(
user_query=function_args.get("user_query"),
uid=uid,
user_id=user_id,
project_id=project_id,
target=target
)
g_messages.append(
{
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": function_response,
}
)
de.Update_AppLayer(uid,[
{
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": function_response,
}
])
second_response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=g_messages
)
#Return the final response
de.Update_AppLayer(uid,[ {"role":"assistant","content":second_response.choices[0].message.content } ])
return second_response.choices[0].message.content
else:
return response_message