| | from core.base_agent import BaseAgent
|
| | from core.database import db
|
| | from core.workflow import DevelopmentWorkflow
|
| | from typing import Dict, Any
|
| |
|
| | class ProjectManagerAgent(BaseAgent):
|
| | def __init__(self):
|
| | super().__init__("Project Manager")
|
| | self.workflow = DevelopmentWorkflow()
|
| |
|
| | self.create_chain("""
|
| | You are a Project Manager. Your task is to oversee the development process and provide status updates.
|
| |
|
| | Current Status:
|
| | {input}
|
| |
|
| | Please provide a comprehensive status report including:
|
| | 1. Project progress
|
| | 2. Any blockers or issues
|
| | 3. Next steps
|
| | 4. Quality metrics
|
| | 5. Recommendations
|
| |
|
| | Please provide clear, actionable insights.
|
| | """)
|
| |
|
| | async def start_project(self, requirements: str) -> Dict[str, Any]:
|
| | """Start a new project with the given requirements"""
|
| |
|
| | result = await self.workflow.run(requirements)
|
| |
|
| |
|
| | self._store_artifacts(result)
|
| |
|
| | return {
|
| | "status": "success",
|
| | "user_stories": result["user_stories"],
|
| | "design": result["design"],
|
| | "code": result["code"],
|
| | "test_results": result["test_results"],
|
| | "messages": result["messages"],
|
| | "message": "Project completed successfully"
|
| | }
|
| |
|
| | def _store_artifacts(self, result: Dict[str, Any]):
|
| | """Store all project artifacts in the database"""
|
| |
|
| | db.store_artifact(
|
| | "user_stories",
|
| | result["user_stories"],
|
| | {
|
| | "type": "user_story",
|
| | "source": "business_analyst",
|
| | "status": "created"
|
| | }
|
| | )
|
| |
|
| |
|
| | db.store_artifact(
|
| | "designs",
|
| | result["design"],
|
| | {
|
| | "type": "design",
|
| | "source": "designer",
|
| | "status": "created"
|
| | }
|
| | )
|
| |
|
| |
|
| | db.store_artifact(
|
| | "code",
|
| | result["code"],
|
| | {
|
| | "type": "code",
|
| | "source": "developer",
|
| | "status": "created",
|
| | "version": "1.0.0"
|
| | }
|
| | )
|
| |
|
| |
|
| | db.store_artifact(
|
| | "test_results",
|
| | result["test_results"],
|
| | {
|
| | "type": "test_result",
|
| | "source": "tester",
|
| | "status": "executed"
|
| | }
|
| | )
|
| |
|
| | async def get_status(self) -> Dict[str, Any]:
|
| | """Get the current project status"""
|
| |
|
| | user_stories = db.query_artifacts("user_stories", "status:created")
|
| | designs = db.query_artifacts("designs", "status:created")
|
| | code = db.query_artifacts("code", "status:created")
|
| | test_results = db.query_artifacts("test_results", "status:executed")
|
| |
|
| | status = {
|
| | "user_stories": len(user_stories["ids"]),
|
| | "designs": len(designs["ids"]),
|
| | "code": len(code["ids"]),
|
| | "test_results": len(test_results["ids"])
|
| | }
|
| |
|
| | return {
|
| | "status": "success",
|
| | "data": status,
|
| | "message": "Status retrieved successfully"
|
| | }
|
| |
|
| | async def check_quality(self) -> Dict[str, Any]:
|
| | """Check the quality of project artifacts"""
|
| | result = await self.process({
|
| | "input": "Checking quality of all project artifacts"
|
| | })
|
| |
|
| | return {
|
| | "status": "success",
|
| | "quality_report": result,
|
| | "message": "Quality check completed successfully"
|
| | } |