Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| @Time : 2023/5/11 22:12 | |
| @Author : alexanderwu | |
| @File : environment.py | |
| @Modified From: https://github.com/geekan/MetaGPT/blob/main/metagpt/environment.py | |
| """ | |
| import asyncio | |
| import re | |
| import json | |
| import datetime | |
| import websockets | |
| from common import MessageType, format_message, timestamp | |
| from typing import Iterable | |
| from pydantic import BaseModel, Field | |
| from .roles import Role | |
| from .actions import Requirement | |
| from .roles import CustomRole, ActionObserver, Group, ROLES_LIST, ROLES_MAPPING | |
| from .system.memory import Memory | |
| from .system.schema import Message | |
| class Environment(BaseModel): | |
| """环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到""" | |
| roles: dict[str, Role] = Field(default_factory=dict) | |
| memory: Memory = Field(default_factory=Memory) | |
| history: str = Field(default='') | |
| new_roles_args: dict = Field(default_factory=dict) | |
| new_roles: dict[str, Role] = Field(default_factory=dict) | |
| steps: list = Field(default_factory=list) | |
| msg_json: list = Field(default_factory=list) | |
| json_log: str = Field(default='./logs/json_log.json') | |
| task_id: str = Field(default='') | |
| proxy: str = Field(default='') | |
| llm_api_key: str = Field(default='') | |
| serpapi_key: str = Field(default='') | |
| alg_msg_queue: object = Field(default=None) | |
| class Config: | |
| arbitrary_types_allowed = True | |
| def add_role(self, role: Role): | |
| """增加一个在当前环境的Role""" | |
| role.set_env(self) | |
| self.roles[role.profile] = role | |
| def add_roles(self, roles: Iterable[Role]): | |
| """增加一批在当前环境的Role""" | |
| for role in roles: | |
| self.add_role(role) | |
| def _parser_roles(self, text): | |
| """解析添加的Roles""" | |
| agents = re.findall('{[\s\S]*?}', text) # re.findall('{{.*}}', agents) | |
| agents_args = [] | |
| for agent in agents: | |
| agent = json.loads(agent.strip()) | |
| if len(agent.keys()) > 0: | |
| agents_args.append(agent) | |
| print('---------------Agents---------------') | |
| for i, agent in enumerate(agents_args): | |
| print('Role', i, agent) | |
| return agents_args | |
| def _parser_plan(self, context): | |
| """解析生成的计划Plan""" | |
| plan_context = re.findall('## Execution Plan([\s\S]*?)##', str(context))[0] | |
| steps = [v.split("\n")[0] for v in re.split("\n\d+\. ", plan_context)[1:]] | |
| print('---------------Steps---------------') | |
| for i, step in enumerate(steps): | |
| print('Step', i, step) | |
| steps.insert(0, '') | |
| return steps | |
| def create_roles(self, plan: list, args: dict): | |
| """创建Role""" | |
| requirement_type = type('Requirement_Group', (Requirement,), {}) | |
| self.add_role(Group(roles=args, steps=plan, watch_actions=[Requirement,requirement_type], proxy=self.proxy, serpapi_api_key=self.serpapi_key, llm_api_key=self.llm_api_key)) | |
| # existing_roles = dict() | |
| # for item in ROLES_LIST: | |
| # existing_roles[item['name']] = item | |
| # init_actions, watch_actions = [], [] | |
| # for role in args: | |
| # class_name = role['name'].replace(' ', '_') + '_Requirement' | |
| # requirement_type = type(class_name, (Requirement,), {}) | |
| # if role['name'] in existing_roles.keys(): | |
| # print('Add a predefiend role:', role['name']) | |
| # role_object = ROLES_MAPPING[role['name']] | |
| # if 'Engineer' in role['name']: | |
| # _role = role_object(n_borg=2, use_code_review=True, proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key) | |
| # else: | |
| # _role = role_object(watch_actions=[requirement_type], proxy=self.proxy, llm_api_key=self.llm_api_key, serpapi_api_key=self.serpapi_key) | |
| # else: | |
| # print('Add a new role:', role['name']) | |
| # _role = CustomRole( | |
| # name=role['name'], | |
| # profile=role['name'], | |
| # goal=role['description'], | |
| # role_prompt=role['prompt'], | |
| # steps=role['steps'], | |
| # tool=role['tools'], | |
| # watch_actions=[requirement_type], | |
| # proxy=self.proxy, | |
| # llm_api_key=self.llm_api_key, | |
| # serpapi_api_key=self.serpapi_key, | |
| # ) | |
| # self.add_role(_role) | |
| # watch_actions.append(requirement_type) | |
| # init_actions.append(_role.init_actions) | |
| # init_actions.append(Requirement) | |
| # self.add_role(ActionObserver(steps=plan, watch_actions=init_actions, init_actions=watch_actions, proxy=self.proxy, llm_api_key=self.llm_api_key)) | |
| async def publish_message(self, message: Message): | |
| """向当前环境发布信息""" | |
| # self.message_queue.put(message) | |
| self.memory.add(message) | |
| self.history += f"\n{message}" | |
| if 'Manager' in message.role: | |
| self.steps = self._parser_plan(message.content) | |
| self.new_roles_args = self._parser_roles(message.content) | |
| self.new_roles = self.create_roles(self.steps, self.new_roles_args) | |
| filename, file_content = None, None | |
| if hasattr(message.instruct_content, 'Type') and 'FILE' in message.instruct_content.Type: | |
| filename = message.instruct_content.Key | |
| file_type = re.findall('```(.*?)\n', str(message.content))[0] | |
| file_content = re.findall(f'```{file_type}([\s\S]*?)```', str(message.content))[0] | |
| if message.role and 'ActionObserver' != message.role: | |
| if hasattr(message.instruct_content, 'Response'): | |
| content = message.instruct_content.Response | |
| else: | |
| content = message.content | |
| msg = { | |
| 'timestamp': timestamp(), | |
| 'role': message.role, | |
| 'content': content, | |
| 'file': { | |
| 'file_type': filename, | |
| 'file_data': file_content, | |
| } | |
| } | |
| if self.alg_msg_queue: | |
| self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg})) | |
| if 'Agents Observer' in message.role: | |
| # send role list | |
| msg = { | |
| 'timestamp': timestamp(), | |
| 'role': "Revised Role List", | |
| 'content': self.new_roles_args, | |
| 'file': { | |
| 'file_type': None, | |
| 'file_data': None, | |
| } | |
| } | |
| if self.alg_msg_queue: | |
| self.alg_msg_queue.put_nowait(format_message(action=MessageType.RunTask.value, data={'task_id': self.task_id, 'task_message':msg})) | |
| async def run(self, k=1): | |
| """处理一次所有Role的运行""" | |
| old_roles = [] | |
| for _ in range(k): | |
| futures = [] | |
| for key in self.roles.keys(): | |
| old_roles.append(key) | |
| role = self.roles[key] | |
| future = role.run() | |
| futures.append(future) | |
| await asyncio.gather(*futures) | |
| if len(old_roles) < len(self.roles): | |
| while len(self.get_role(name='Group').steps) > 0: | |
| futures = [] | |
| for key in self.roles.keys(): | |
| if key not in old_roles: | |
| role = self.roles[key] | |
| future = role.run() | |
| futures.append(future) | |
| await asyncio.gather(*futures) | |
| def get_roles(self) -> dict[str, Role]: | |
| """获得环境内的所有Role""" | |
| return self.roles | |
| def get_role(self, name: str) -> Role: | |
| """获得环境内的指定Role""" | |
| return self.roles.get(name, None) | |