import inspect import json from typing import List, Dict, Any from app.agents.base import Agent, Response from app.services.llm import llm_service from app.services.trace import trace_service class SwarmClient: def _function_to_schema(self, func) -> Dict: """Converts Python function to OpenAI-style schema.""" sig = inspect.signature(func) return { "type": "function", "function": { "name": func.__name__, "description": func.__doc__ or "No description provided.", "parameters": { "type": "object", "properties": { name: {"type": "string"} # Simplified: assume string for ORA for name in sig.parameters }, "required": [name for name, p in sig.parameters.items() if p.default == inspect.Parameter.empty] }, }, } async def run( self, agent: Agent, messages: List[Dict], context_variables: Dict = {}, max_turns: int = 5 ) -> Response: active_agent = agent history = list(messages) user_query = messages[-1]["content"] if messages else "" reasoning_steps = [] for _ in range(max_turns): # 1. Update active agent's context active_agent.context_variables.update(context_variables) # 2. Prepare Tools tools = [self._function_to_schema(f) for f in active_agent.functions] if active_agent.functions else None # 3. Get LLM Choice instructions = active_agent.instructions if callable(instructions): sig = inspect.signature(instructions) if len(sig.parameters) > 0: instructions = instructions(active_agent.context_variables) else: instructions = instructions() print(f"Swarm [{active_agent.name}]: Processing...") llm_res = await llm_service.generate_response( message=history[-1]["content"], system_prompt=instructions, tools=tools ) content = llm_res.get("content", "") tool_calls = llm_res.get("tool_calls") # Record thought step reasoning_steps.append({ "agent": active_agent.name, "thought": content, "tool_calls": [ {"name": tc.function.name, "args": json.loads(tc.function.arguments)} for tc in tool_calls ] if tool_calls else [] }) # 3. Add Assistant Message to History history.append({"role": "assistant", "content": content}) if not tool_calls: # Capture and Save Trace trace_service.save_trace({ "user_query": user_query, "steps": reasoning_steps, "final_response": content, "success": True }) return Response( agent=active_agent, messages=history, context_variables=context_variables, trace=reasoning_steps ) # 4. Handle Tool Calls for tool_call in tool_calls: func_name = tool_call.function.name func_args = json.loads(tool_call.function.arguments) func = next((f for f in active_agent.functions if f.__name__ == func_name), None) if not func: history.append({"role": "tool", "tool_call_id": tool_call.id, "content": f"Error: Function {func_name} not found."}) continue print(f"Swarm: Executing {func_name}...") if inspect.iscoroutinefunction(func): result = await func(**func_args) else: result = func(**func_args) # Record tool result reasoning_steps[-1]["tool_results"] = reasoning_steps[-1].get("tool_results", []) reasoning_steps[-1]["tool_results"].append({ "name": func_name, "result": str(result) }) if isinstance(result, Agent): active_agent = result history.append({ "role": "tool", "tool_call_id": tool_call.id, "content": f"Transferring to {active_agent.name}." }) else: history.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(result) }) # Save Trace even if max turns hit trace_service.save_trace({ "user_query": user_query, "steps": reasoning_steps, "final_response": history[-1]["content"], "success": False, "error": "Max turns reached" }) return Response( agent=active_agent, messages=history, context_variables=context_variables, trace=reasoning_steps ) swarm_client = SwarmClient()