Spaces:
Sleeping
Sleeping
| import os | |
| from typing import List, Dict, Any, Optional, Callable, get_type_hints | |
| from dataclasses import dataclass, field | |
| import json | |
| from datetime import datetime | |
| import inspect | |
| import typing | |
| from typing import Union | |
| from swarms import Agent | |
| from swarm_models import OpenAIChat | |
| class ToolDefinition: | |
| name: str | |
| description: str | |
| parameters: Dict[str, Any] | |
| required_params: List[str] | |
| callable: Optional[Callable] = None | |
| def extract_type_hints(func: Callable) -> Dict[str, Any]: | |
| """Extract parameter types from function type hints.""" | |
| return typing.get_type_hints(func) | |
| def extract_tool_info(func: Callable) -> ToolDefinition: | |
| """Extract tool information from a callable function.""" | |
| # Get function name | |
| name = func.__name__ | |
| # Get docstring | |
| description = inspect.getdoc(func) or "No description available" | |
| # Get parameters and their types | |
| signature = inspect.signature(func) | |
| type_hints = extract_type_hints(func) | |
| parameters = {} | |
| required_params = [] | |
| for param_name, param in signature.parameters.items(): | |
| # Skip self parameter for methods | |
| if param_name == "self": | |
| continue | |
| param_type = type_hints.get(param_name, Any) | |
| # Handle optional parameters | |
| is_optional = ( | |
| param.default != inspect.Parameter.empty | |
| or getattr(param_type, "__origin__", None) is Union | |
| and type(None) in param_type.__args__ | |
| ) | |
| if not is_optional: | |
| required_params.append(param_name) | |
| parameters[param_name] = { | |
| "type": str(param_type), | |
| "default": ( | |
| None | |
| if param.default is inspect.Parameter.empty | |
| else param.default | |
| ), | |
| "required": not is_optional, | |
| } | |
| return ToolDefinition( | |
| name=name, | |
| description=description, | |
| parameters=parameters, | |
| required_params=required_params, | |
| callable=func, | |
| ) | |
| class FunctionSpec: | |
| """Specification for a callable tool function.""" | |
| name: str | |
| description: str | |
| parameters: Dict[ | |
| str, dict | |
| ] # Contains type and description for each parameter | |
| return_type: str | |
| return_description: str | |
| class ExecutionStep: | |
| """Represents a single step in the execution plan.""" | |
| step_id: int | |
| function_name: str | |
| parameters: Dict[str, Any] | |
| expected_output: str | |
| completed: bool = False | |
| result: Any = None | |
| class ExecutionContext: | |
| """Maintains state during execution.""" | |
| task: str | |
| steps: List[ExecutionStep] = field(default_factory=list) | |
| results: Dict[int, Any] = field(default_factory=dict) | |
| current_step: int = 0 | |
| history: List[Dict[str, Any]] = field(default_factory=list) | |
| hints = get_type_hints(func) | |
| class ToolAgent: | |
| def __init__( | |
| self, | |
| functions: List[Callable], | |
| openai_api_key: str, | |
| model_name: str = "gpt-4", | |
| temperature: float = 0.1, | |
| ): | |
| self.functions = {func.__name__: func for func in functions} | |
| self.function_specs = self._analyze_functions(functions) | |
| self.model = OpenAIChat( | |
| openai_api_key=openai_api_key, | |
| model_name=model_name, | |
| temperature=temperature, | |
| ) | |
| self.system_prompt = self._create_system_prompt() | |
| self.agent = Agent( | |
| agent_name="Tool-Agent", | |
| system_prompt=self.system_prompt, | |
| llm=self.model, | |
| max_loops=1, | |
| verbose=True, | |
| ) | |
| def _analyze_functions( | |
| self, functions: List[Callable] | |
| ) -> Dict[str, FunctionSpec]: | |
| """Analyze functions to create detailed specifications.""" | |
| specs = {} | |
| for func in functions: | |
| hints = get_type_hints(func) | |
| sig = inspect.signature(func) | |
| doc = inspect.getdoc(func) or "" | |
| # Parse docstring for parameter descriptions | |
| param_descriptions = {} | |
| current_param = None | |
| for line in doc.split("\n"): | |
| if ":param" in line: | |
| param_name = ( | |
| line.split(":param")[1].split(":")[0].strip() | |
| ) | |
| desc = line.split(":", 2)[-1].strip() | |
| param_descriptions[param_name] = desc | |
| elif ":return:" in line: | |
| return_desc = line.split(":return:")[1].strip() | |
| # Build parameter specifications | |
| parameters = {} | |
| for name, param in sig.parameters.items(): | |
| param_type = hints.get(name, Any) | |
| parameters[name] = { | |
| "type": str(param_type), | |
| "type_class": param_type, | |
| "description": param_descriptions.get(name, ""), | |
| "required": param.default == param.empty, | |
| } | |
| specs[func.__name__] = FunctionSpec( | |
| name=func.__name__, | |
| description=doc.split("\n")[0], | |
| parameters=parameters, | |
| return_type=str(hints.get("return", Any)), | |
| return_description=( | |
| return_desc if "return_desc" in locals() else "" | |
| ), | |
| ) | |
| return specs | |
| def _create_system_prompt(self) -> str: | |
| """Create system prompt with detailed function specifications.""" | |
| functions_desc = [] | |
| for spec in self.function_specs.values(): | |
| params_desc = [] | |
| for name, details in spec.parameters.items(): | |
| params_desc.append( | |
| f" - {name}: {details['type']} - {details['description']}" | |
| ) | |
| functions_desc.append( | |
| f""" | |
| Function: {spec.name} | |
| Description: {spec.description} | |
| Parameters: | |
| {chr(10).join(params_desc)} | |
| Returns: {spec.return_type} - {spec.return_description} | |
| """ | |
| ) | |
| return f"""You are an AI agent that creates and executes plans using available functions. | |
| Available Functions: | |
| {chr(10).join(functions_desc)} | |
| You must respond in two formats depending on the phase: | |
| 1. Planning Phase: | |
| {{ | |
| "phase": "planning", | |
| "plan": {{ | |
| "description": "Overall plan description", | |
| "steps": [ | |
| {{ | |
| "step_id": 1, | |
| "function": "function_name", | |
| "parameters": {{ | |
| "param1": "value1", | |
| "param2": "value2" | |
| }}, | |
| "purpose": "Why this step is needed" | |
| }} | |
| ] | |
| }} | |
| }} | |
| 2. Execution Phase: | |
| {{ | |
| "phase": "execution", | |
| "analysis": "Analysis of current result", | |
| "next_action": {{ | |
| "type": "continue|request_input|complete", | |
| "reason": "Why this action was chosen", | |
| "needed_input": {{}} # If requesting input | |
| }} | |
| }} | |
| Always: | |
| - Use exact function names | |
| - Ensure parameter types match specifications | |
| - Provide clear reasoning for each decision | |
| """ | |
| def _execute_function( | |
| self, spec: FunctionSpec, parameters: Dict[str, Any] | |
| ) -> Any: | |
| """Execute a function with type checking.""" | |
| converted_params = {} | |
| for name, value in parameters.items(): | |
| param_spec = spec.parameters[name] | |
| try: | |
| # Convert value to required type | |
| param_type = param_spec["type_class"] | |
| if param_type in (int, float, str, bool): | |
| converted_params[name] = param_type(value) | |
| else: | |
| converted_params[name] = value | |
| except (ValueError, TypeError) as e: | |
| raise ValueError( | |
| f"Parameter '{name}' conversion failed: {str(e)}" | |
| ) | |
| return self.functions[spec.name](**converted_params) | |
| def run(self, task: str) -> Dict[str, Any]: | |
| """Execute task with planning and step-by-step execution.""" | |
| context = ExecutionContext(task=task) | |
| execution_log = { | |
| "task": task, | |
| "start_time": datetime.utcnow().isoformat(), | |
| "steps": [], | |
| "final_result": None, | |
| } | |
| try: | |
| # Planning phase | |
| plan_prompt = f"Create a plan to: {task}" | |
| plan_response = self.agent.run(plan_prompt) | |
| plan_data = json.loads( | |
| plan_response.replace("System:", "").strip() | |
| ) | |
| # Convert plan to execution steps | |
| for step in plan_data["plan"]["steps"]: | |
| context.steps.append( | |
| ExecutionStep( | |
| step_id=step["step_id"], | |
| function_name=step["function"], | |
| parameters=step["parameters"], | |
| expected_output=step["purpose"], | |
| ) | |
| ) | |
| # Execution phase | |
| while context.current_step < len(context.steps): | |
| step = context.steps[context.current_step] | |
| print( | |
| f"\nExecuting step {step.step_id}: {step.function_name}" | |
| ) | |
| try: | |
| # Execute function | |
| spec = self.function_specs[step.function_name] | |
| result = self._execute_function( | |
| spec, step.parameters | |
| ) | |
| context.results[step.step_id] = result | |
| step.completed = True | |
| step.result = result | |
| # Get agent's analysis | |
| analysis_prompt = f""" | |
| Step {step.step_id} completed: | |
| Function: {step.function_name} | |
| Result: {json.dumps(result)} | |
| Remaining steps: {len(context.steps) - context.current_step - 1} | |
| Analyze the result and decide next action. | |
| """ | |
| analysis_response = self.agent.run( | |
| analysis_prompt | |
| ) | |
| analysis_data = json.loads( | |
| analysis_response.replace( | |
| "System:", "" | |
| ).strip() | |
| ) | |
| execution_log["steps"].append( | |
| { | |
| "step_id": step.step_id, | |
| "function": step.function_name, | |
| "parameters": step.parameters, | |
| "result": result, | |
| "analysis": analysis_data, | |
| } | |
| ) | |
| if ( | |
| analysis_data["next_action"]["type"] | |
| == "complete" | |
| ): | |
| if ( | |
| context.current_step | |
| < len(context.steps) - 1 | |
| ): | |
| continue | |
| break | |
| context.current_step += 1 | |
| except Exception as e: | |
| print(f"Error in step {step.step_id}: {str(e)}") | |
| execution_log["steps"].append( | |
| { | |
| "step_id": step.step_id, | |
| "function": step.function_name, | |
| "parameters": step.parameters, | |
| "error": str(e), | |
| } | |
| ) | |
| raise | |
| # Final analysis | |
| final_prompt = f""" | |
| Task completed. Results: | |
| {json.dumps(context.results, indent=2)} | |
| Provide final analysis and recommendations. | |
| """ | |
| final_analysis = self.agent.run(final_prompt) | |
| execution_log["final_result"] = { | |
| "success": True, | |
| "results": context.results, | |
| "analysis": json.loads( | |
| final_analysis.replace("System:", "").strip() | |
| ), | |
| } | |
| except Exception as e: | |
| execution_log["final_result"] = { | |
| "success": False, | |
| "error": str(e), | |
| } | |
| execution_log["end_time"] = datetime.utcnow().isoformat() | |
| return execution_log | |
| def calculate_investment_return( | |
| principal: float, rate: float, years: int | |
| ) -> float: | |
| """Calculate investment return with compound interest. | |
| :param principal: Initial investment amount in dollars | |
| :param rate: Annual interest rate as decimal (e.g., 0.07 for 7%) | |
| :param years: Number of years to invest | |
| :return: Final investment value | |
| """ | |
| return principal * (1 + rate) ** years | |
| agent = ToolAgent( | |
| functions=[calculate_investment_return], | |
| openai_api_key=os.getenv("OPENAI_API_KEY"), | |
| ) | |
| result = agent.run( | |
| "Calculate returns for $10000 invested at 7% for 10 years" | |
| ) | |