Spaces:
Build error
Build error
| import asyncio | |
| from enum import Enum | |
| from typing import Any, Dict, List, Tuple, Union | |
| from colorama import Fore | |
| from agentverse.environments import BaseEnvironment | |
| from agentverse.agents.base import BaseAgent | |
| from agentverse.logging import logger | |
| from agentverse.message import Message, SolverMessage, ExecutorMessage | |
| from .. import env_registry as EnvironmentRegistry | |
| from agentverse.environments.tasksolving_env.rules import TasksolvingRule | |
| class BasicEnvironment(BaseEnvironment): | |
| rule: TasksolvingRule | |
| agents: Dict[Enum, Union[BaseAgent, List[BaseAgent]]] = None | |
| task_description: str | |
| cnt_turn: int = 0 | |
| max_turn: int = 10 | |
| success: bool = False | |
| def __init__(self, **kwargs): | |
| rule_config = kwargs.pop("rule", {}) | |
| role_assigner_config = rule_config.pop( | |
| "role_assigner", {"type": "role_description"} | |
| ) | |
| decision_maker_config = rule_config.pop("decision_maker", {"type": "vertical"}) | |
| executor_config = rule_config.pop("executor", {"type": "none"}) | |
| evaluator_config = rule_config.pop("evaluator", {"type": "basic"}) | |
| rule = TasksolvingRule( | |
| role_assigner_config=role_assigner_config, | |
| decision_maker_config=decision_maker_config, | |
| executor_config=executor_config, | |
| evaluator_config=evaluator_config, | |
| ) | |
| super().__init__(rule=rule, **kwargs) | |
| async def step( | |
| self, advice: str = "No advice yet.", previous_plan: str = "No solution yet." | |
| ) -> List[Message]: | |
| result = "" | |
| logs = [] | |
| logger.info(f"Loop Round {self.cnt_turn}") | |
| # ================== EXPERT RECRUITMENT ================== | |
| agents = self.rule.role_assign( | |
| self.task_description, self.agents, self.cnt_turn, advice | |
| ) | |
| description = "\n".join([agent.role_description for agent in agents]) | |
| logs.append({"module": "Role Assigner", "content": description}) | |
| logger.info("", f"Role Assignment:\n{description}", Fore.CYAN) | |
| # ================== EXPERT RECRUITMENT ================== | |
| # ================== DECISION MAKING ================== | |
| plan: List[SolverMessage] = await self.rule.decision_making( | |
| self.task_description, self.agents, previous_plan, advice | |
| ) | |
| flatten_plan = "\n".join([p.content for p in plan]) | |
| logs.append({"module": "Decision Maker", "content": flatten_plan}) | |
| logger.info("", f"Decision Plan:\n{flatten_plan}", Fore.YELLOW) | |
| # ================== DECISION MAKING ================== | |
| # ================== EXECUTION ================== | |
| result: List[ExecutorMessage] = await self.rule.execute( | |
| self.task_description, self.agents, plan | |
| ) | |
| flatten_result = "\n".join([r.content for r in result]) | |
| logs.append({"module": "Executor", "content": flatten_result}) | |
| logger.info("", f"Execution Result:", Fore.GREEN) | |
| logger.info("", flatten_result, Fore.GREEN) | |
| # ================== EXECUTION ================== | |
| # ================== EVALUATION ================== | |
| score, advice = self.rule.evaluate( | |
| self.task_description, self.agents, plan, result | |
| ) | |
| logs.append( | |
| { | |
| "agent": "evaluator", | |
| "content": f"Evaluation result: Score: {score}\nAdvice: {advice}", | |
| } | |
| ) | |
| logger.info( | |
| "", f"Evaluation result:\nScore: {score}\nAdvice: {advice}", Fore.YELLOW | |
| ) | |
| if score is not None and ( | |
| (isinstance(score, bool) and score is True) | |
| or (isinstance(score, (list, tuple)) and all([s >= 8 for s in score])) | |
| ): | |
| # TODO: 8 is an arbitrary threshold | |
| logs.append({"agent": "system", "content": "Good score! Accept!"}) | |
| logger.info( | |
| "", f"Good score! Accept! Final Result:\n{flatten_plan}", Fore.GREEN | |
| ) | |
| self.success = True | |
| else: | |
| logs.append({"agent": "system", "content": "Bad score! Reject!"}) | |
| logger.info("", "Bad score! Reject!", Fore.RED) | |
| self.cnt_turn += 1 | |
| return flatten_result, advice, flatten_plan, logs, self.success | |
| def iter_agents(self): | |
| for role, agent_or_agents in self.agents.items(): | |
| if isinstance(agent_or_agents, list): | |
| for agent in agent_or_agents: | |
| yield role, agent | |
| else: | |
| yield role, agent_or_agents | |
| def get_spend(self): | |
| total_spent = sum([agent.get_spend() for (_, agent) in self.iter_agents()]) | |
| return total_spent | |
| def report_metrics(self) -> None: | |
| logger.info("", "Agent spend:", Fore.GREEN) | |
| for role, agent in self.iter_agents(): | |
| name = agent.name.split(":")[0] | |
| logger.info( | |
| "", | |
| f"Agent (Role: {role}) {name}: {agent.get_spend_formatted()}", | |
| Fore.GREEN, | |
| ) | |
| logger.info("", f"Total spent: ${self.get_spend():.6f}", Fore.GREEN) | |
| def is_done(self): | |
| """Check if the environment is done""" | |
| return self.cnt_turn >= self.max_turn or self.success | |
| def set_task_description(self, task_description: str = ""): | |
| self.task_description = task_description | |
| def reset(self) -> None: | |
| """Reset the environment""" | |
| self.cnt_turn = 0 | |
| self.rule.reset() | |