Spaces:
Running
Running
| """ | |
| Agent Brain Module - Main orchestration logic for MVP Agent | |
| Coordinates all components: AI models, MCP clients, and prompts | |
| """ | |
| from typing import Dict, Any, Callable, Optional | |
| from dataclasses import dataclass | |
| import time | |
| from .ai_models import GeminiClient, ModelRouter, ModelType | |
| from .mcp_clients import get_research_orchestrator | |
| from .prompts import PromptTemplates, get_system_prompt | |
| from .mcp_http_clients import GoogleSearchMCPClient, MarkdownifyMCPClient | |
| from .file_manager import sanitize_markdown | |
| class AgentState: | |
| """Tracks the current state of the agent""" | |
| phase: str = "idle" | |
| progress: str = "" | |
| error: Optional[str] = None | |
| data: Dict[str, Any] = None | |
| def __post_init__(self): | |
| if self.data is None: | |
| self.data = {} | |
| class MVPAgent: | |
| """ | |
| Main agent that orchestrates the MVP generation process | |
| Coordinates AI models, MCP tools, and prompt templates | |
| """ | |
| def __init__(self, gemini_api_key: str): | |
| """ | |
| Initialize MVP Agent | |
| Args: | |
| gemini_api_key: Google Gemini API key | |
| """ | |
| # Initialize components | |
| self.gemini_client = GeminiClient(gemini_api_key) | |
| self.model_router = ModelRouter(self.gemini_client) | |
| self.research_orchestrator = get_research_orchestrator() | |
| self.google_search_mcp = GoogleSearchMCPClient() | |
| self.markdownify_mcp = MarkdownifyMCPClient() | |
| self.prompts = PromptTemplates() | |
| # Agent state | |
| self.state = AgentState() | |
| # Callbacks for UI updates | |
| self.status_callback: Optional[Callable[[Dict,], None]] = None | |
| # Metrics | |
| self.start_time: float = 0.0 | |
| self.total_tokens_used: int = 0 | |
| def set_status_callback(self, callback: Callable[[Dict,], None]): | |
| """Set callback function for status updates""" | |
| self.status_callback = callback | |
| def _update_status(self, message: str, type: str = "INFO", phase: str = None, details: Dict = None): | |
| """Update status and call callback if set""" | |
| if phase is None: | |
| phase = self.state.phase | |
| status_event = { | |
| "timestamp": time.time(), | |
| "elapsed_time": time.time() - self.start_time if self.start_time else 0.0, | |
| "message": message, | |
| "type": type, # INFO, WARNING, ERROR, SUCCESS, DEBUG | |
| "phase": phase, | |
| "tokens_used": self.total_tokens_used, | |
| "details": details if details else {} | |
| } | |
| self.state.progress = message # Keep old progress string for compatibility, though we'll use event | |
| if self.status_callback: | |
| self.status_callback(status_event) | |
| def generate_mvp(self, idea: str) -> Dict[str, str]: | |
| """ | |
| Main entry point: Generate complete MVP specification | |
| Args: | |
| idea: The startup idea from user | |
| Returns: | |
| Dictionary with 8 markdown files | |
| """ | |
| self.start_time = time.time() # Initialize timer for this run | |
| self.total_tokens_used = 0 # Reset token count for this run | |
| try: | |
| # Phase 1: Plan research queries | |
| self._update_status("🤖 Understanding your idea and planning research...", phase="planning") | |
| self.state.phase = "planning" | |
| queries = self._generate_search_queries(idea) | |
| # Phase 2: Conduct research | |
| self._update_status("🔍 Researching competitor features and user feedback...", phase="research") | |
| self.state.phase = "research" | |
| research_results = self._conduct_research(queries) | |
| # Phase 3: Synthesize research | |
| self._update_status("🧠 Analyzing research and identifying insights...", phase="synthesis") | |
| self.state.phase = "synthesis" | |
| research_summary = self._summarize_research(idea, research_results) | |
| # Phase 4: Generate 8 MVP files | |
| self._update_status("✍️ Generating complete MVP specification...", phase="generation") | |
| self.state.phase = "generation" | |
| mvp_files = self._generate_files(idea, research_summary) | |
| # Done | |
| self._update_status("✅ Complete! Your MVP blueprint is ready.", type="SUCCESS", phase="complete") | |
| self.state.phase = "complete" | |
| return mvp_files | |
| except Exception as e: | |
| error_msg = f"❌ Error: {str(e)}" | |
| self._update_status(error_msg, type="ERROR", phase="error", details={"error_details": str(e)}) | |
| self.state.phase = "error" | |
| self.state.error = str(e) | |
| # Return fallback generation | |
| return self._generate_fallback(idea, str(e)) | |
| def _generate_search_queries(self, idea: str) -> Dict[str, list]: | |
| """ | |
| Phase 1: Generate search queries (Flash-Lite) | |
| Args: | |
| idea: Startup idea | |
| Returns: | |
| Dictionary with competitor_queries and pain_point_queries | |
| """ | |
| self._update_status("Starting query generation (Phase 1)...", type="DEBUG", phase="planning") | |
| prompt = self.prompts.format_search_queries(idea) | |
| # Try Flash-Lite first (fastest, cheapest for simple queries) | |
| try: | |
| queries = self.model_router.route_json( | |
| task="search_query", # Flash-Lite | |
| prompt=prompt, | |
| temperature=0.5 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| # Validate structure | |
| if "competitor_queries" not in queries or "pain_point_queries" not in queries: | |
| raise ValueError("Invalid query structure") | |
| self.state.data['queries'] = queries | |
| self._update_status("Query generation successful (Flash-Lite).", type="SUCCESS", phase="planning") | |
| return queries | |
| except Exception as e1: | |
| self._update_status("⚠️ Retrying query generation (Flash-Lite)...", type="WARNING", phase="planning", details={"error": str(e1)}) | |
| time.sleep(2) | |
| queries = self.model_router.route_json( | |
| task="search_query", # Flash-Lite | |
| prompt=prompt, | |
| temperature=0.5 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| if "competitor_queries" in queries and "pain_point_queries" in queries: | |
| self._update_status("Query generation successful (Flash-Lite fallback).", type="SUCCESS", phase="planning") | |
| return queries | |
| raise ValueError("Invalid structure from Flash-Lite") | |
| except Exception as e2: | |
| self._update_status("⚠️ Using hardcoded fallback queries...", type="WARNING", phase="planning", details={"error": str(e2)}) | |
| return { | |
| "competitor_queries": [ | |
| f"{idea} features", | |
| f"{idea} product review", | |
| f"best {idea} apps" | |
| ], | |
| "pain_point_queries": [ | |
| f"{idea} problems", | |
| f"{idea} complaints reddit", | |
| f"what {idea} users want" | |
| ] | |
| } | |
| def _conduct_research(self, queries: Dict[str, list]) -> Dict[str, str]: | |
| """ | |
| Phase 2: Conduct research using MCP servers | |
| Args: | |
| queries: Dictionary with search queries | |
| Returns: | |
| Dictionary with web_results and social_results | |
| """ | |
| self._update_status("Starting web research (Phase 2)...", type="DEBUG", phase="research") | |
| competitor_queries = queries.get("competitor_queries", []) | |
| pain_point_queries = queries.get("pain_point_queries", []) | |
| # Use google-search-mcp for web research (MCP call visible in logs) | |
| web_results_blocks = [] | |
| all_queries = competitor_queries + pain_point_queries | |
| self._update_status(f"Executing {len(all_queries)} web searches...", type="INFO", phase="research", details={"num_queries": len(all_queries)}) | |
| for i, q in enumerate(all_queries): | |
| self._update_status(f"Searching query {i+1}/{len(all_queries)}: '{q}'", type="DEBUG", phase="research") | |
| resp = self.google_search_mcp.search(q, limit=3) | |
| if resp.get("success"): | |
| num_results = len(resp.get("results", [])) | |
| self._update_status(f"Found {num_results} results for query '{q}'", type="DEBUG", phase="research", details={"query": q, "num_results": num_results}) | |
| for item in resp.get("results", []): | |
| web_results_blocks.append( | |
| f"- [{item.get('title','')}]({item.get('link','')}) — {item.get('snippet','')}" | |
| ) | |
| else: | |
| self._update_status(f"⚠️ Google Search MCP failed for query '{q}'. Falling back to legacy orchestrator.", type="WARNING", phase="research", details={"query": q, "error": resp.get("error", "unknown")}) | |
| # Fallback: rely on existing orchestrator if MCP search fails | |
| break | |
| if web_results_blocks: | |
| web_results = "## Web Research (via google-search-mcp)\n\n" + "\n".join(web_results_blocks) | |
| self._update_status("Successfully gathered web research results via Google Search MCP.", type="INFO", phase="research") | |
| else: | |
| self._update_status("⚠️ No results from Google Search MCP. Using legacy research orchestrator.", type="WARNING", phase="research") | |
| # Fallback to legacy orchestrator (which now itself uses web-only research) | |
| legacy = self.research_orchestrator.conduct_full_research( | |
| competitor_queries=competitor_queries, | |
| pain_point_queries=pain_point_queries, | |
| subreddits=None, | |
| ) | |
| web_results = legacy.get("web_results", "") | |
| self._update_status("Successfully gathered web research results via legacy orchestrator.", type="INFO", phase="research") | |
| # For social_results, keep compatibility but clarify web-only if using our MCP path | |
| social_results = "User feedback is inferred from web research and forums via MCP web search." | |
| self._update_status("Social media research inferred from web search results.", type="INFO", phase="research") | |
| results = { | |
| "web_results": web_results or "No web research results.", | |
| "social_results": social_results, | |
| } | |
| self.state.data["research_results"] = results | |
| self._update_status("Web research phase completed.", type="SUCCESS", phase="research") | |
| return results | |
| def _summarize_research( | |
| self, | |
| idea: str, | |
| research_results: Dict[str, str] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Phase 3: Synthesize research (Flash with fallback to Flash-Lite) | |
| Args: | |
| idea: Startup idea | |
| research_results: Raw research data | |
| Returns: | |
| Structured research summary | |
| """ | |
| self._update_status("Starting research summarization (Phase 3)...", type="DEBUG", phase="synthesis") | |
| prompt = self.prompts.format_summarize_research( | |
| idea=idea, | |
| web_results=research_results.get("web_results", "No data"), | |
| social_results=research_results.get("social_results", "No data") | |
| ) | |
| # Try Flash first (good balance of speed and quality) | |
| try: | |
| summary = self.model_router.route_json( | |
| task="planning", # Flash (15 RPM) | |
| prompt=prompt, | |
| temperature=0.4 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| self.state.data['research_summary'] = summary | |
| self._update_status("Research summarization successful (Flash).", type="SUCCESS", phase="synthesis") | |
| return summary | |
| except Exception as e1: | |
| # Fallback 1: Try Flash-Lite | |
| try: | |
| self._update_status("⚠️ Retrying synthesis with Flash-Lite...", type="WARNING", phase="synthesis", details={"error": str(e1)}) | |
| time.sleep(2) | |
| summary = self.model_router.route_json( | |
| task="simple", # Flash-Lite | |
| prompt=prompt, | |
| temperature=0.4 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| self._update_status("Research summarization successful (Flash-Lite fallback).", type="SUCCESS", phase="synthesis") | |
| return summary | |
| except Exception as e2: | |
| # Fallback 2: Hardcoded summary | |
| self._update_status("⚠️ Using hardcoded fallback research summary...", type="WARNING", phase="synthesis", details={"error": str(e2)}) | |
| return { | |
| "core_problem": f"Solving challenges in the {idea} space", | |
| "target_audience": "General users", | |
| "key_features_found": ["Basic functionality", "User interface", "Data storage"], | |
| "user_complaints": ["Complexity", "Performance", "Cost"], | |
| "market_gaps": ["Better UX", "Faster performance", "Lower cost"], | |
| "competitive_advantages": ["Modern tech stack", "User-focused", "Scalable"] | |
| } | |
| def _generate_files( | |
| self, | |
| idea: str, | |
| research_summary: Dict[str, Any], | |
| tech_preference: str = "", | |
| platform: str = "", | |
| constraint: str = "" | |
| ) -> Dict[str, str]: | |
| """ | |
| Phase 4: Generate MVP files (Pro with fallback to Flash-Lite) | |
| Args: | |
| idea: Startup idea | |
| research_summary: Synthesized research | |
| tech_preference: Optional tech stack preference | |
| platform: Optional target platform | |
| constraint: Optional constraints | |
| Returns: | |
| Dictionary with 8 markdown file contents | |
| """ | |
| self._update_status( | |
| "Starting MVP file generation (Phase 4)...", | |
| type="DEBUG", | |
| phase="generation", | |
| details={ | |
| "tech_preference": tech_preference, | |
| "platform": platform, | |
| "constraint": constraint | |
| } | |
| ) | |
| prompt = self.prompts.format_generate_mvp( | |
| idea, | |
| research_summary, | |
| tech_preference=tech_preference, | |
| platform=platform, | |
| constraint=constraint | |
| ) | |
| # Add delay to respect rate limits (Pro: 2 RPM) | |
| time.sleep(7) | |
| # Try Pro first (better context window for large MVP generation) | |
| try: | |
| self._update_status("Generating MVP files using Pro model...", type="INFO", phase="generation") | |
| # First pass: generate raw files | |
| files = self.model_router.route_json( | |
| task="generation", # Pro (2 RPM, large context) | |
| prompt=prompt, | |
| temperature=0.6 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| # Validate all files present | |
| required_keys = [ | |
| "overview_md", | |
| "features_md", | |
| "architecture_md", | |
| "design_md", | |
| "user_flow_md", | |
| "roadmap_md", | |
| "business_model_md", | |
| "testing_plan_md" | |
| ] | |
| for key in required_keys: | |
| if key not in files: | |
| raise ValueError(f"Missing file: {key}") | |
| self._update_status("MVP files generated by Pro. Sanitizing markdown...", type="DEBUG", phase="generation") | |
| # Sanitize markdown (content from Gemini is already valid markdown) | |
| normalized_files = {} | |
| for key in required_keys: | |
| content = files.get(key, "") | |
| # Only sanitize to remove invisible characters (skip markdownify - content is already markdown) | |
| sanitized = sanitize_markdown(content) | |
| normalized_files[key] = sanitized | |
| self.state.data["mvp_files"] = normalized_files | |
| self._update_status("MVP file generation successful.", type="SUCCESS", phase="generation") | |
| return normalized_files | |
| except Exception as e1: | |
| # Fallback 1: Retry with Pro model (transient errors like network, rate limits) | |
| try: | |
| self._update_status("⚠️ Pro model failed. Retrying with Pro model (attempt 2)...", type="WARNING", phase="generation", details={"error": str(e1)}) | |
| time.sleep(35) # Wait 35s to respect 2 RPM rate limit (30s) + 5s buffer | |
| files = self.model_router.route_json( | |
| task="generation", # Pro (same model, transient errors) | |
| prompt=prompt, | |
| temperature=0.6 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| # Validate | |
| required_keys = ["overview_md", "features_md", "architecture_md", "design_md", "user_flow_md", "roadmap_md", "business_model_md", "testing_plan_md"] | |
| for key in required_keys: | |
| if key not in files: | |
| raise ValueError(f"Missing {key}") | |
| # Sanitize markdown (content from Gemini is already valid markdown) | |
| normalized_files = {} | |
| for key in required_keys: | |
| content = files.get(key, "") | |
| # Only sanitize to remove invisible characters (skip markdownify - content is already markdown) | |
| sanitized = sanitize_markdown(content) | |
| normalized_files[key] = sanitized | |
| self._update_status("MVP files generated by Pro model (retry successful).", type="SUCCESS", phase="generation") | |
| return normalized_files | |
| except Exception as e2: | |
| # If Pro fails twice, raise to trigger main fallback (hardcoded templates) | |
| raise Exception(f"File generation failed after 2 Pro attempts: {str(e2)}") | |
| def _generate_fallback(self, idea: str, error: str) -> Dict[str, str]: | |
| """ | |
| Last-resort fallback generation | |
| Args: | |
| idea: Startup idea | |
| error: Error message | |
| Returns: | |
| Dictionary with 8 basic MVP files | |
| """ | |
| self._update_status("🚨 Entering emergency fallback generation...", type="ERROR", phase="fallback", details={"trigger_error": error}) | |
| prompt = self.prompts.format_generate_mvp_fallback( | |
| idea=idea, | |
| context=f"Error occurred: {error}" | |
| ) | |
| try: | |
| self._update_status("Trying Pro for fallback generation...", type="INFO", phase="fallback") | |
| # Try Pro as last resort (large context needed) | |
| time.sleep(3) | |
| files = self.model_router.route_json( | |
| task="generation", # Pro | |
| prompt=prompt, | |
| temperature=0.7 | |
| ) | |
| self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens | |
| self._update_status("Fallback generation successful (Pro).", type="SUCCESS", phase="fallback") | |
| return files | |
| except Exception as e: | |
| self._update_status("⚠️ Flash Lite fallback also failed. Using hardcoded templates...", type="WARNING", phase="fallback", details={"error": str(e)}) | |
| return { | |
| "overview_md": f"# [Product Name] – MVP Blueprint Overview\n\n*Error occurred. Basic template provided.*\n\n## Tagline\n- One-line summary.\n\n## Purpose & Vision\n- What the product aims to achieve.\n\n## What’s Included\n- List of files.\n\n## How to Use This Blueprint\n- Instructions for humans and agents.\n\n## Style & Formatting Conventions\n- Markdown, tables, diagrams.\n\n## Glossary\n- Key terms.\n\n## References\n- Research links.", | |
| "features_md": f"# MVP Features for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Core Features\n- Feature 1\n- Feature 2\n- Feature 3", | |
| "architecture_md": f"# Architecture for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Tech Stack\n- Frontend\n- Backend\n- Database", | |
| "design_md": f"# Design for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Design Principles\n- Simple\n- Intuitive\n- Accessible", | |
| "user_flow_md": f"# User Flow for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Main Flow\n1. User lands\n2. User interacts\n3. User completes", | |
| "roadmap_md": f"# Roadmap for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Timeline\n- Week 1-2: Setup\n- Week 3-4: Build\n- Week 5-6: Launch", | |
| "business_model_md": f"# Business Model for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Executive Summary\n- Describe the business vision here.\n\n## Business Model Canvas\n| Key Partners | Key Activities | Key Resources | Value Propositions | Customer Relationships | Channels | Customer Segments | Cost Structure | Revenue Streams |\n|--------------|---------------|--------------|--------------------|-----------------------|----------|-------------------|---------------|----------------|\n| Example | Example | Example | Example | Example | Example | Example | Example | Example |\n\n## Revenue Model\n- List revenue streams here.\n\n## Cost Structure\n- List major costs here.\n\n## Go-to-Market Strategy\n- List main strategies here.\n\n## Competitive Advantage\n- List differentiators here.\n\n## Risks & Mitigations\n- List risks and mitigations here.\n\n## Edge Cases & Fallbacks\n- List edge cases here.\n\n## Implementation Hints & Best Practices\n- List best practices here.", | |
| "testing_plan_md": f"# Testing Plan for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Test Strategy\n- Manual and automated\n\n## Test Cases\n- Test 1\n- Test 2\n- Test 3\n\n## Tools\n- pytest\n- Selenium\n\n## Success Criteria\n- All tests pass\n\n## Risks\n- List risks here." | |
| } | |
| def get_token_usage(self) -> int: | |
| """Get total tokens used in this session""" | |
| return self.gemini_client.get_token_usage() | |
| def create_agent(gemini_api_key: str) -> MVPAgent: | |
| """ | |
| Create a new MVP Agent instance | |
| Args: | |
| gemini_api_key: Google Gemini API key | |
| Returns: | |
| Configured MVPAgent instance | |
| """ | |
| return MVPAgent(gemini_api_key) | |