ORA / app /core /swarm_client.py
Abdalkaderdev's picture
Initial ORA deployment
5e0532d
import inspect
import json
from typing import List, Dict, Any
from app.agents.base import Agent, Response
from app.services.llm import llm_service
from app.services.trace import trace_service
class SwarmClient:
def _function_to_schema(self, func) -> Dict:
"""Converts Python function to OpenAI-style schema."""
sig = inspect.signature(func)
return {
"type": "function",
"function": {
"name": func.__name__,
"description": func.__doc__ or "No description provided.",
"parameters": {
"type": "object",
"properties": {
name: {"type": "string"} # Simplified: assume string for ORA
for name in sig.parameters
},
"required": [name for name, p in sig.parameters.items() if p.default == inspect.Parameter.empty]
},
},
}
async def run(
self,
agent: Agent,
messages: List[Dict],
context_variables: Dict = {},
max_turns: int = 5
) -> Response:
active_agent = agent
history = list(messages)
user_query = messages[-1]["content"] if messages else ""
reasoning_steps = []
for _ in range(max_turns):
# 1. Update active agent's context
active_agent.context_variables.update(context_variables)
# 2. Prepare Tools
tools = [self._function_to_schema(f) for f in active_agent.functions] if active_agent.functions else None
# 3. Get LLM Choice
instructions = active_agent.instructions
if callable(instructions):
sig = inspect.signature(instructions)
if len(sig.parameters) > 0:
instructions = instructions(active_agent.context_variables)
else:
instructions = instructions()
print(f"Swarm [{active_agent.name}]: Processing...")
llm_res = await llm_service.generate_response(
message=history[-1]["content"],
system_prompt=instructions,
tools=tools
)
content = llm_res.get("content", "")
tool_calls = llm_res.get("tool_calls")
# Record thought step
reasoning_steps.append({
"agent": active_agent.name,
"thought": content,
"tool_calls": [
{"name": tc.function.name, "args": json.loads(tc.function.arguments)}
for tc in tool_calls
] if tool_calls else []
})
# 3. Add Assistant Message to History
history.append({"role": "assistant", "content": content})
if not tool_calls:
# Capture and Save Trace
trace_service.save_trace({
"user_query": user_query,
"steps": reasoning_steps,
"final_response": content,
"success": True
})
return Response(
agent=active_agent,
messages=history,
context_variables=context_variables,
trace=reasoning_steps
)
# 4. Handle Tool Calls
for tool_call in tool_calls:
func_name = tool_call.function.name
func_args = json.loads(tool_call.function.arguments)
func = next((f for f in active_agent.functions if f.__name__ == func_name), None)
if not func:
history.append({"role": "tool", "tool_call_id": tool_call.id, "content": f"Error: Function {func_name} not found."})
continue
print(f"Swarm: Executing {func_name}...")
if inspect.iscoroutinefunction(func):
result = await func(**func_args)
else:
result = func(**func_args)
# Record tool result
reasoning_steps[-1]["tool_results"] = reasoning_steps[-1].get("tool_results", [])
reasoning_steps[-1]["tool_results"].append({
"name": func_name,
"result": str(result)
})
if isinstance(result, Agent):
active_agent = result
history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": f"Transferring to {active_agent.name}."
})
else:
history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
# Save Trace even if max turns hit
trace_service.save_trace({
"user_query": user_query,
"steps": reasoning_steps,
"final_response": history[-1]["content"],
"success": False,
"error": "Max turns reached"
})
return Response(
agent=active_agent,
messages=history,
context_variables=context_variables,
trace=reasoning_steps
)
swarm_client = SwarmClient()