Spaces:
Sleeping
Sleeping
| import inspect | |
| import json | |
| from typing import List, Dict, Any | |
| from app.agents.base import Agent, Response | |
| from app.services.llm import llm_service | |
| from app.services.trace import trace_service | |
| class SwarmClient: | |
| def _function_to_schema(self, func) -> Dict: | |
| """Converts Python function to OpenAI-style schema.""" | |
| sig = inspect.signature(func) | |
| return { | |
| "type": "function", | |
| "function": { | |
| "name": func.__name__, | |
| "description": func.__doc__ or "No description provided.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| name: {"type": "string"} # Simplified: assume string for ORA | |
| for name in sig.parameters | |
| }, | |
| "required": [name for name, p in sig.parameters.items() if p.default == inspect.Parameter.empty] | |
| }, | |
| }, | |
| } | |
| async def run( | |
| self, | |
| agent: Agent, | |
| messages: List[Dict], | |
| context_variables: Dict = {}, | |
| max_turns: int = 5 | |
| ) -> Response: | |
| active_agent = agent | |
| history = list(messages) | |
| user_query = messages[-1]["content"] if messages else "" | |
| reasoning_steps = [] | |
| for _ in range(max_turns): | |
| # 1. Update active agent's context | |
| active_agent.context_variables.update(context_variables) | |
| # 2. Prepare Tools | |
| tools = [self._function_to_schema(f) for f in active_agent.functions] if active_agent.functions else None | |
| # 3. Get LLM Choice | |
| instructions = active_agent.instructions | |
| if callable(instructions): | |
| sig = inspect.signature(instructions) | |
| if len(sig.parameters) > 0: | |
| instructions = instructions(active_agent.context_variables) | |
| else: | |
| instructions = instructions() | |
| print(f"Swarm [{active_agent.name}]: Processing...") | |
| llm_res = await llm_service.generate_response( | |
| message=history[-1]["content"], | |
| system_prompt=instructions, | |
| tools=tools | |
| ) | |
| content = llm_res.get("content", "") | |
| tool_calls = llm_res.get("tool_calls") | |
| # Record thought step | |
| reasoning_steps.append({ | |
| "agent": active_agent.name, | |
| "thought": content, | |
| "tool_calls": [ | |
| {"name": tc.function.name, "args": json.loads(tc.function.arguments)} | |
| for tc in tool_calls | |
| ] if tool_calls else [] | |
| }) | |
| # 3. Add Assistant Message to History | |
| history.append({"role": "assistant", "content": content}) | |
| if not tool_calls: | |
| # Capture and Save Trace | |
| trace_service.save_trace({ | |
| "user_query": user_query, | |
| "steps": reasoning_steps, | |
| "final_response": content, | |
| "success": True | |
| }) | |
| return Response( | |
| agent=active_agent, | |
| messages=history, | |
| context_variables=context_variables, | |
| trace=reasoning_steps | |
| ) | |
| # 4. Handle Tool Calls | |
| for tool_call in tool_calls: | |
| func_name = tool_call.function.name | |
| func_args = json.loads(tool_call.function.arguments) | |
| func = next((f for f in active_agent.functions if f.__name__ == func_name), None) | |
| if not func: | |
| history.append({"role": "tool", "tool_call_id": tool_call.id, "content": f"Error: Function {func_name} not found."}) | |
| continue | |
| print(f"Swarm: Executing {func_name}...") | |
| if inspect.iscoroutinefunction(func): | |
| result = await func(**func_args) | |
| else: | |
| result = func(**func_args) | |
| # Record tool result | |
| reasoning_steps[-1]["tool_results"] = reasoning_steps[-1].get("tool_results", []) | |
| reasoning_steps[-1]["tool_results"].append({ | |
| "name": func_name, | |
| "result": str(result) | |
| }) | |
| if isinstance(result, Agent): | |
| active_agent = result | |
| history.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": f"Transferring to {active_agent.name}." | |
| }) | |
| else: | |
| history.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": str(result) | |
| }) | |
| # Save Trace even if max turns hit | |
| trace_service.save_trace({ | |
| "user_query": user_query, | |
| "steps": reasoning_steps, | |
| "final_response": history[-1]["content"], | |
| "success": False, | |
| "error": "Max turns reached" | |
| }) | |
| return Response( | |
| agent=active_agent, | |
| messages=history, | |
| context_variables=context_variables, | |
| trace=reasoning_steps | |
| ) | |
| swarm_client = SwarmClient() | |