Spaces:
Runtime error
Runtime error
| import asyncio | |
| import datetime | |
| import json | |
| import os | |
| from functools import partial | |
| from pathlib import Path | |
| from typing import Any, Coroutine, Optional | |
| import aiofiles | |
| from aiobotocore.session import get_session | |
| from mdutils.mdutils import MdUtils | |
| from metagpt.actions import Action | |
| from metagpt.actions.action_output import ActionOutput | |
| from metagpt.actions.design_api import WriteDesign | |
| from metagpt.actions.prepare_documents import PrepareDocuments | |
| from metagpt.actions.project_management import WriteTasks | |
| from metagpt.actions.summarize_code import SummarizeCode | |
| from metagpt.actions.write_code import WriteCode | |
| from metagpt.actions.write_prd import WritePRD | |
| from metagpt.config import CONFIG | |
| from metagpt.const import ( | |
| COMPETITIVE_ANALYSIS_FILE_REPO, | |
| DATA_API_DESIGN_FILE_REPO, | |
| SEQ_FLOW_FILE_REPO, | |
| SERDESER_PATH, | |
| ) | |
| from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, Role | |
| from metagpt.schema import Message | |
| from metagpt.team import Team | |
| from metagpt.utils.common import any_to_str, read_json_file, write_json_file | |
| from metagpt.utils.git_repository import GitRepository | |
| from pydantic import BaseModel, Field | |
| from zipstream import AioZipStream | |
| _default_llm_stream_log = partial(print, end="") | |
| class RoleRun(Action): | |
| role: Role | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| action = self.role._rc.todo | |
| self.desc = f"{self.role.profile} {action.desc or str(action)}" | |
| class PackProject(Action): | |
| role: Role | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.desc = "Pack the project with prd, design, code and more." | |
| async def run(self, key: str): | |
| url = await self.upload(key) | |
| mdfile = MdUtils(None) | |
| mdfile.new_line(mdfile.new_inline_link(url, url.rsplit("/", 1)[-1])) | |
| return ActionOutput(mdfile.get_md_text(), BaseModel()) | |
| async def upload(self, key: str): | |
| files = [] | |
| workspace = CONFIG.git_repo.workdir | |
| workspace = str(workspace) | |
| for r, _, fs in os.walk(workspace): | |
| _r = r[len(workspace) :].lstrip("/") | |
| for f in fs: | |
| files.append({"file": os.path.join(r, f), "name": os.path.join(_r, f)}) | |
| # aiozipstream | |
| chunks = [] | |
| async for chunk in AioZipStream(files, chunksize=32768).stream(): | |
| chunks.append(chunk) | |
| return await get_download_url(b"".join(chunks), key) | |
| class SoftwareCompany(Role): | |
| """封装软件公司成角色,以快速接入agent store。""" | |
| finish: bool = False | |
| company: Team = Field(default_factory=Team) | |
| active_role: Optional[Role] = None | |
| git_repo: Optional[GitRepository] = None | |
| max_auto_summarize_code: int = 0 | |
| def __init__(self, use_code_review=False, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| engineer = Engineer(n_borg=5, use_code_review=use_code_review) | |
| self.company.hire([ProductManager(), Architect(), ProjectManager(), engineer]) | |
| self._init_actions([PackProject(role=engineer)]) | |
| def recv(self, message: Message) -> None: | |
| self.company.run_project(message.content) | |
| async def _think(self) -> Coroutine[Any, Any, bool]: | |
| """软件公司运行需要4轮 | |
| BOSS -> ProductManager -> Architect -> ProjectManager -> Engineer | |
| BossRequirement -> WritePRD -> WriteDesign -> WriteTasks -> WriteCode -> | |
| """ | |
| if self.finish: | |
| self._rc.todo = None | |
| return False | |
| if self.git_repo is not None: | |
| CONFIG.git_repo = self.git_repo | |
| environment = self.company.env | |
| for role in environment.roles.values(): | |
| if await role._observe(): | |
| await role._think() | |
| if isinstance(role._rc.todo, PrepareDocuments): | |
| self.active_role = role | |
| await self.act() | |
| self.git_repo = CONFIG.git_repo | |
| return await self._think() | |
| if isinstance(role._rc.todo, SummarizeCode): | |
| return await self._think() | |
| self._rc.todo = RoleRun(role=role) | |
| self.active_role = role | |
| return True | |
| self._set_state(0) | |
| return True | |
| async def _act(self) -> Message: | |
| if self.git_repo is not None: | |
| CONFIG.git_repo = self.git_repo | |
| CONFIG.src_workspace = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name | |
| CONFIG.max_auto_summarize_code = self.max_auto_summarize_code | |
| if isinstance(self._rc.todo, PackProject): | |
| workdir = CONFIG.git_repo.workdir | |
| name = workdir.name | |
| uid = workdir.parent.name | |
| now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
| key = f"{uid}/metagpt-{name}-{now}.zip" | |
| output = await self._rc.todo.run(key) | |
| self.finish = True | |
| return Message(output.content, role=self.profile, cause_by=type(self._rc.todo)) | |
| default_log_stream = CONFIG.get("LLM_STREAM_LOG", _default_llm_stream_log) | |
| start = False | |
| insert_code = False | |
| def log_stream(msg): | |
| nonlocal start, insert_code | |
| if not start: | |
| if msg.startswith("["): | |
| msg = "```json\n" + msg | |
| insert_code = True | |
| start = True | |
| return default_log_stream(msg) | |
| CONFIG.LLM_STREAM_LOG = log_stream | |
| output = await self.active_role._act() | |
| self.active_role._set_state(state=-1) | |
| self.active_role.publish_message(output) | |
| if insert_code: | |
| default_log_stream("\n```\n") | |
| cause_by = output.cause_by | |
| if cause_by == any_to_str(WritePRD): | |
| output = await self.format_prd(output) | |
| elif cause_by == any_to_str(WriteDesign): | |
| output = await self.format_system_design(output) | |
| elif cause_by == any_to_str(WriteTasks): | |
| output = await self.format_tasks(output) | |
| elif cause_by == any_to_str(WriteCode): | |
| output = await self.format_code(output) | |
| elif cause_by == any_to_str(SummarizeCode): | |
| output = await self.format_code_summary(output) | |
| return output | |
| async def format_prd(self, msg: Message): | |
| docs = [(k, v) for k, v in msg.instruct_content.docs.items()] | |
| prd_doc = docs[0][1] | |
| data = json.loads(prd_doc.content) | |
| mdfile = MdUtils(None) | |
| title = "Original Requirements" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| title = "Product Goals" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_list(data[title], marked_with="1") | |
| title = "User Stories" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_list(data[title], marked_with="1") | |
| title = "Competitive Analysis" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| if all(i.count(":") == 1 for i in data[title]): | |
| mdfile.new_table( | |
| 2, len(data[title]) + 1, ["Competitor", "Description", *(i for j in data[title] for i in j.split(":"))] | |
| ) | |
| else: | |
| mdfile.new_list(data[title], marked_with="1") | |
| title = "Competitive Quadrant Chart" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| competitive_analysis_path = ( | |
| CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix(".png") | |
| ) | |
| if competitive_analysis_path.exists(): | |
| key = str(competitive_analysis_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) | |
| url = await upload_file_to_s3(competitive_analysis_path, key) | |
| mdfile.new_line(mdfile.new_inline_image(title, url)) | |
| else: | |
| mdfile.insert_code(data[title], "mermaid") | |
| title = "Requirement Analysis" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| title = "Requirement Pool" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_table( | |
| 2, len(data[title]) + 1, ["Task Description", "Priority", *(i for j in data[title] for i in j)] | |
| ) | |
| title = "UI Design draft" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| title = "Anything UNCLEAR" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| content = mdfile.get_md_text() | |
| return Message(content, cause_by=msg.cause_by, role=msg.role) | |
| async def format_system_design(self, msg: Message): | |
| system_designs = [(k, v) for k, v in msg.instruct_content.docs.items()] | |
| system_design_doc = system_designs[0][1] | |
| data = json.loads(system_design_doc.content) | |
| mdfile = MdUtils(None) | |
| title = "Implementation approach" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| title = "File list" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_list(data[title], marked_with="1") | |
| title = "Data structures and interfaces" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| data_api_design_path = ( | |
| CONFIG.git_repo.workdir | |
| / Path(DATA_API_DESIGN_FILE_REPO) | |
| / Path(system_design_doc.filename).with_suffix(".png") | |
| ) | |
| if data_api_design_path.exists(): | |
| key = str(data_api_design_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) | |
| url = await upload_file_to_s3(data_api_design_path, key) | |
| mdfile.new_line(mdfile.new_inline_image(title, url)) | |
| else: | |
| mdfile.insert_code(data[title], "mermaid") | |
| title = "Program call flow" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| seq_flow_path = ( | |
| CONFIG.git_repo.workdir / SEQ_FLOW_FILE_REPO / Path(system_design_doc.filename).with_suffix(".png") | |
| ) | |
| if seq_flow_path.exists(): | |
| key = str(seq_flow_path.relative_to(CONFIG.git_repo.workdir.parent.parent)) | |
| url = await upload_file_to_s3(seq_flow_path, key) | |
| mdfile.new_line(mdfile.new_inline_image(title, url)) | |
| else: | |
| mdfile.insert_code(data[title], "mermaid") | |
| title = "Anything UNCLEAR" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_paragraph(data[title]) | |
| content = mdfile.get_md_text() | |
| return Message(content, cause_by=msg.cause_by, role=msg.role) | |
| async def format_tasks(self, msg: Message): | |
| tasks = [(k, v) for k, v in msg.instruct_content.docs.items()] | |
| task_doc = tasks[0][1] | |
| data = json.loads(task_doc.content) | |
| mdfile = MdUtils(None) | |
| title = "Required Python packages" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.insert_code("\n".join(data[title]), "txt") | |
| title = "Required Other language third-party packages" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.insert_code("\n".join(data[title]), "txt") | |
| title = "Logic Analysis" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_table( | |
| 2, len(data[title]) + 1, ["Filename", "Class/Function Name", *(i for j in data[title] for i in j)] | |
| ) | |
| title = "Task list" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.new_list(data[title]) | |
| title = "Full API spec" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| if data[title]: | |
| mdfile.insert_code(data[title], "json") | |
| title = "Shared Knowledge" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.insert_code(data[title], "python") | |
| title = "Anything UNCLEAR" | |
| mdfile.new_header(2, title, add_table_of_contents=False) | |
| mdfile.insert_code(data[title], "python") | |
| content = mdfile.get_md_text() | |
| return Message(content, cause_by=msg.cause_by, role=msg.role) | |
| async def format_code(self, msg: Message): | |
| data = msg.content.splitlines() | |
| workdir = CONFIG.git_repo.workdir | |
| code_root = workdir / workdir.name | |
| mdfile = MdUtils(None) | |
| for filename in data: | |
| mdfile.new_header(2, filename, add_table_of_contents=False) | |
| async with aiofiles.open(code_root / filename) as f: | |
| content = await f.read() | |
| suffix = filename.rsplit(".", maxsplit=1)[-1] | |
| mdfile.insert_code(content, "python" if suffix == "py" else suffix) | |
| return Message(mdfile.get_md_text(), cause_by=msg.cause_by, role=msg.role) | |
| async def format_code_summary(self, msg: Message): | |
| # TODO | |
| return msg | |
| async def think(self): | |
| await self._think() | |
| return self._rc.todo | |
| async def act(self): | |
| return await self._act() | |
| def serialize(self, stg_path: Path = None): | |
| stg_path = SERDESER_PATH.joinpath("software_company") if stg_path is None else stg_path | |
| team_info_path = stg_path.joinpath("software_company_info.json") | |
| write_json_file(team_info_path, self.dict(exclude={"company": True})) | |
| self.company.serialize(stg_path.joinpath("company")) # save company alone | |
| def deserialize(cls, stg_path: Path) -> "Team": | |
| """stg_path = ./storage/team""" | |
| # recover team_info | |
| software_company_info_path = stg_path.joinpath("software_company_info.json") | |
| if not software_company_info_path.exists(): | |
| raise FileNotFoundError( | |
| "recover storage meta file `team_info.json` not exist, " | |
| "not to recover and please start a new project." | |
| ) | |
| software_company_info: dict = read_json_file(software_company_info_path) | |
| # recover environment | |
| company = Team.deserialize(stg_path=stg_path.joinpath("company")) | |
| software_company_info.update({"company": company}) | |
| return cls(**software_company_info) | |
| async def upload_file_to_s3(filepath: str, key: str): | |
| async with aiofiles.open(filepath, "rb") as f: | |
| content = await f.read() | |
| return await get_download_url(content, key) | |
| async def get_download_url(content: bytes, key: str) -> str: | |
| if CONFIG.get("STORAGE_TYPE") == "S3": | |
| session = get_session() | |
| async with session.create_client( | |
| "s3", | |
| aws_secret_access_key=CONFIG.get("S3_SECRET_KEY"), | |
| aws_access_key_id=CONFIG.get("S3_ACCESS_KEY"), | |
| endpoint_url=CONFIG.get("S3_ENDPOINT_URL"), | |
| use_ssl=CONFIG.get("S3_SECURE"), | |
| ) as client: | |
| # upload object to amazon s3 | |
| bucket = CONFIG.get("S3_BUCKET") | |
| await client.put_object(Bucket=bucket, Key=key, Body=content) | |
| return f"{CONFIG.get('S3_ENDPOINT_URL')}/{bucket}/{key}" | |
| else: | |
| storage = CONFIG.get("LOCAL_ROOT", "storage") | |
| base_url = CONFIG.get("LOCAL_BASE_URL", "storage") | |
| filepath = Path(storage) / key | |
| filepath.parent.mkdir(exist_ok=True, parents=True) | |
| async with aiofiles.open(filepath, "wb") as f: | |
| await f.write(content) | |
| return f"{base_url}/{key}" | |
| async def main(idea, **kwargs): | |
| sc = SoftwareCompany(**kwargs) | |
| sc.recv(Message(idea)) | |
| while await sc.think(): | |
| print(await sc.act()) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |