MVP-Agent / src /agent_brain.py
Furqan Ahmad Rao
fix: markdown rendering issue by skipping markdownify conversion
2e081ba
"""
Agent Brain Module - Main orchestration logic for MVP Agent
Coordinates all components: AI models, MCP clients, and prompts
"""
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
import time
from .ai_models import GeminiClient, ModelRouter, ModelType
from .mcp_clients import get_research_orchestrator
from .prompts import PromptTemplates, get_system_prompt
from .mcp_http_clients import GoogleSearchMCPClient, MarkdownifyMCPClient
from .file_manager import sanitize_markdown
@dataclass
class AgentState:
"""Tracks the current state of the agent"""
phase: str = "idle"
progress: str = ""
error: Optional[str] = None
data: Dict[str, Any] = None
def __post_init__(self):
if self.data is None:
self.data = {}
class MVPAgent:
"""
Main agent that orchestrates the MVP generation process
Coordinates AI models, MCP tools, and prompt templates
"""
def __init__(self, gemini_api_key: str):
"""
Initialize MVP Agent
Args:
gemini_api_key: Google Gemini API key
"""
# Initialize components
self.gemini_client = GeminiClient(gemini_api_key)
self.model_router = ModelRouter(self.gemini_client)
self.research_orchestrator = get_research_orchestrator()
self.google_search_mcp = GoogleSearchMCPClient()
self.markdownify_mcp = MarkdownifyMCPClient()
self.prompts = PromptTemplates()
# Agent state
self.state = AgentState()
# Callbacks for UI updates
self.status_callback: Optional[Callable[[Dict,], None]] = None
# Metrics
self.start_time: float = 0.0
self.total_tokens_used: int = 0
def set_status_callback(self, callback: Callable[[Dict,], None]):
"""Set callback function for status updates"""
self.status_callback = callback
def _update_status(self, message: str, type: str = "INFO", phase: str = None, details: Dict = None):
"""Update status and call callback if set"""
if phase is None:
phase = self.state.phase
status_event = {
"timestamp": time.time(),
"elapsed_time": time.time() - self.start_time if self.start_time else 0.0,
"message": message,
"type": type, # INFO, WARNING, ERROR, SUCCESS, DEBUG
"phase": phase,
"tokens_used": self.total_tokens_used,
"details": details if details else {}
}
self.state.progress = message # Keep old progress string for compatibility, though we'll use event
if self.status_callback:
self.status_callback(status_event)
def generate_mvp(self, idea: str) -> Dict[str, str]:
"""
Main entry point: Generate complete MVP specification
Args:
idea: The startup idea from user
Returns:
Dictionary with 8 markdown files
"""
self.start_time = time.time() # Initialize timer for this run
self.total_tokens_used = 0 # Reset token count for this run
try:
# Phase 1: Plan research queries
self._update_status("🤖 Understanding your idea and planning research...", phase="planning")
self.state.phase = "planning"
queries = self._generate_search_queries(idea)
# Phase 2: Conduct research
self._update_status("🔍 Researching competitor features and user feedback...", phase="research")
self.state.phase = "research"
research_results = self._conduct_research(queries)
# Phase 3: Synthesize research
self._update_status("🧠 Analyzing research and identifying insights...", phase="synthesis")
self.state.phase = "synthesis"
research_summary = self._summarize_research(idea, research_results)
# Phase 4: Generate 8 MVP files
self._update_status("✍️ Generating complete MVP specification...", phase="generation")
self.state.phase = "generation"
mvp_files = self._generate_files(idea, research_summary)
# Done
self._update_status("✅ Complete! Your MVP blueprint is ready.", type="SUCCESS", phase="complete")
self.state.phase = "complete"
return mvp_files
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
self._update_status(error_msg, type="ERROR", phase="error", details={"error_details": str(e)})
self.state.phase = "error"
self.state.error = str(e)
# Return fallback generation
return self._generate_fallback(idea, str(e))
def _generate_search_queries(self, idea: str) -> Dict[str, list]:
"""
Phase 1: Generate search queries (Flash-Lite)
Args:
idea: Startup idea
Returns:
Dictionary with competitor_queries and pain_point_queries
"""
self._update_status("Starting query generation (Phase 1)...", type="DEBUG", phase="planning")
prompt = self.prompts.format_search_queries(idea)
# Try Flash-Lite first (fastest, cheapest for simple queries)
try:
queries = self.model_router.route_json(
task="search_query", # Flash-Lite
prompt=prompt,
temperature=0.5
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
# Validate structure
if "competitor_queries" not in queries or "pain_point_queries" not in queries:
raise ValueError("Invalid query structure")
self.state.data['queries'] = queries
self._update_status("Query generation successful (Flash-Lite).", type="SUCCESS", phase="planning")
return queries
except Exception as e1:
self._update_status("⚠️ Retrying query generation (Flash-Lite)...", type="WARNING", phase="planning", details={"error": str(e1)})
time.sleep(2)
queries = self.model_router.route_json(
task="search_query", # Flash-Lite
prompt=prompt,
temperature=0.5
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
if "competitor_queries" in queries and "pain_point_queries" in queries:
self._update_status("Query generation successful (Flash-Lite fallback).", type="SUCCESS", phase="planning")
return queries
raise ValueError("Invalid structure from Flash-Lite")
except Exception as e2:
self._update_status("⚠️ Using hardcoded fallback queries...", type="WARNING", phase="planning", details={"error": str(e2)})
return {
"competitor_queries": [
f"{idea} features",
f"{idea} product review",
f"best {idea} apps"
],
"pain_point_queries": [
f"{idea} problems",
f"{idea} complaints reddit",
f"what {idea} users want"
]
}
def _conduct_research(self, queries: Dict[str, list]) -> Dict[str, str]:
"""
Phase 2: Conduct research using MCP servers
Args:
queries: Dictionary with search queries
Returns:
Dictionary with web_results and social_results
"""
self._update_status("Starting web research (Phase 2)...", type="DEBUG", phase="research")
competitor_queries = queries.get("competitor_queries", [])
pain_point_queries = queries.get("pain_point_queries", [])
# Use google-search-mcp for web research (MCP call visible in logs)
web_results_blocks = []
all_queries = competitor_queries + pain_point_queries
self._update_status(f"Executing {len(all_queries)} web searches...", type="INFO", phase="research", details={"num_queries": len(all_queries)})
for i, q in enumerate(all_queries):
self._update_status(f"Searching query {i+1}/{len(all_queries)}: '{q}'", type="DEBUG", phase="research")
resp = self.google_search_mcp.search(q, limit=3)
if resp.get("success"):
num_results = len(resp.get("results", []))
self._update_status(f"Found {num_results} results for query '{q}'", type="DEBUG", phase="research", details={"query": q, "num_results": num_results})
for item in resp.get("results", []):
web_results_blocks.append(
f"- [{item.get('title','')}]({item.get('link','')}) — {item.get('snippet','')}"
)
else:
self._update_status(f"⚠️ Google Search MCP failed for query '{q}'. Falling back to legacy orchestrator.", type="WARNING", phase="research", details={"query": q, "error": resp.get("error", "unknown")})
# Fallback: rely on existing orchestrator if MCP search fails
break
if web_results_blocks:
web_results = "## Web Research (via google-search-mcp)\n\n" + "\n".join(web_results_blocks)
self._update_status("Successfully gathered web research results via Google Search MCP.", type="INFO", phase="research")
else:
self._update_status("⚠️ No results from Google Search MCP. Using legacy research orchestrator.", type="WARNING", phase="research")
# Fallback to legacy orchestrator (which now itself uses web-only research)
legacy = self.research_orchestrator.conduct_full_research(
competitor_queries=competitor_queries,
pain_point_queries=pain_point_queries,
subreddits=None,
)
web_results = legacy.get("web_results", "")
self._update_status("Successfully gathered web research results via legacy orchestrator.", type="INFO", phase="research")
# For social_results, keep compatibility but clarify web-only if using our MCP path
social_results = "User feedback is inferred from web research and forums via MCP web search."
self._update_status("Social media research inferred from web search results.", type="INFO", phase="research")
results = {
"web_results": web_results or "No web research results.",
"social_results": social_results,
}
self.state.data["research_results"] = results
self._update_status("Web research phase completed.", type="SUCCESS", phase="research")
return results
def _summarize_research(
self,
idea: str,
research_results: Dict[str, str]
) -> Dict[str, Any]:
"""
Phase 3: Synthesize research (Flash with fallback to Flash-Lite)
Args:
idea: Startup idea
research_results: Raw research data
Returns:
Structured research summary
"""
self._update_status("Starting research summarization (Phase 3)...", type="DEBUG", phase="synthesis")
prompt = self.prompts.format_summarize_research(
idea=idea,
web_results=research_results.get("web_results", "No data"),
social_results=research_results.get("social_results", "No data")
)
# Try Flash first (good balance of speed and quality)
try:
summary = self.model_router.route_json(
task="planning", # Flash (15 RPM)
prompt=prompt,
temperature=0.4
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
self.state.data['research_summary'] = summary
self._update_status("Research summarization successful (Flash).", type="SUCCESS", phase="synthesis")
return summary
except Exception as e1:
# Fallback 1: Try Flash-Lite
try:
self._update_status("⚠️ Retrying synthesis with Flash-Lite...", type="WARNING", phase="synthesis", details={"error": str(e1)})
time.sleep(2)
summary = self.model_router.route_json(
task="simple", # Flash-Lite
prompt=prompt,
temperature=0.4
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
self._update_status("Research summarization successful (Flash-Lite fallback).", type="SUCCESS", phase="synthesis")
return summary
except Exception as e2:
# Fallback 2: Hardcoded summary
self._update_status("⚠️ Using hardcoded fallback research summary...", type="WARNING", phase="synthesis", details={"error": str(e2)})
return {
"core_problem": f"Solving challenges in the {idea} space",
"target_audience": "General users",
"key_features_found": ["Basic functionality", "User interface", "Data storage"],
"user_complaints": ["Complexity", "Performance", "Cost"],
"market_gaps": ["Better UX", "Faster performance", "Lower cost"],
"competitive_advantages": ["Modern tech stack", "User-focused", "Scalable"]
}
def _generate_files(
self,
idea: str,
research_summary: Dict[str, Any],
tech_preference: str = "",
platform: str = "",
constraint: str = ""
) -> Dict[str, str]:
"""
Phase 4: Generate MVP files (Pro with fallback to Flash-Lite)
Args:
idea: Startup idea
research_summary: Synthesized research
tech_preference: Optional tech stack preference
platform: Optional target platform
constraint: Optional constraints
Returns:
Dictionary with 8 markdown file contents
"""
self._update_status(
"Starting MVP file generation (Phase 4)...",
type="DEBUG",
phase="generation",
details={
"tech_preference": tech_preference,
"platform": platform,
"constraint": constraint
}
)
prompt = self.prompts.format_generate_mvp(
idea,
research_summary,
tech_preference=tech_preference,
platform=platform,
constraint=constraint
)
# Add delay to respect rate limits (Pro: 2 RPM)
time.sleep(7)
# Try Pro first (better context window for large MVP generation)
try:
self._update_status("Generating MVP files using Pro model...", type="INFO", phase="generation")
# First pass: generate raw files
files = self.model_router.route_json(
task="generation", # Pro (2 RPM, large context)
prompt=prompt,
temperature=0.6
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
# Validate all files present
required_keys = [
"overview_md",
"features_md",
"architecture_md",
"design_md",
"user_flow_md",
"roadmap_md",
"business_model_md",
"testing_plan_md"
]
for key in required_keys:
if key not in files:
raise ValueError(f"Missing file: {key}")
self._update_status("MVP files generated by Pro. Sanitizing markdown...", type="DEBUG", phase="generation")
# Sanitize markdown (content from Gemini is already valid markdown)
normalized_files = {}
for key in required_keys:
content = files.get(key, "")
# Only sanitize to remove invisible characters (skip markdownify - content is already markdown)
sanitized = sanitize_markdown(content)
normalized_files[key] = sanitized
self.state.data["mvp_files"] = normalized_files
self._update_status("MVP file generation successful.", type="SUCCESS", phase="generation")
return normalized_files
except Exception as e1:
# Fallback 1: Retry with Pro model (transient errors like network, rate limits)
try:
self._update_status("⚠️ Pro model failed. Retrying with Pro model (attempt 2)...", type="WARNING", phase="generation", details={"error": str(e1)})
time.sleep(35) # Wait 35s to respect 2 RPM rate limit (30s) + 5s buffer
files = self.model_router.route_json(
task="generation", # Pro (same model, transient errors)
prompt=prompt,
temperature=0.6
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
# Validate
required_keys = ["overview_md", "features_md", "architecture_md", "design_md", "user_flow_md", "roadmap_md", "business_model_md", "testing_plan_md"]
for key in required_keys:
if key not in files:
raise ValueError(f"Missing {key}")
# Sanitize markdown (content from Gemini is already valid markdown)
normalized_files = {}
for key in required_keys:
content = files.get(key, "")
# Only sanitize to remove invisible characters (skip markdownify - content is already markdown)
sanitized = sanitize_markdown(content)
normalized_files[key] = sanitized
self._update_status("MVP files generated by Pro model (retry successful).", type="SUCCESS", phase="generation")
return normalized_files
except Exception as e2:
# If Pro fails twice, raise to trigger main fallback (hardcoded templates)
raise Exception(f"File generation failed after 2 Pro attempts: {str(e2)}")
def _generate_fallback(self, idea: str, error: str) -> Dict[str, str]:
"""
Last-resort fallback generation
Args:
idea: Startup idea
error: Error message
Returns:
Dictionary with 8 basic MVP files
"""
self._update_status("🚨 Entering emergency fallback generation...", type="ERROR", phase="fallback", details={"trigger_error": error})
prompt = self.prompts.format_generate_mvp_fallback(
idea=idea,
context=f"Error occurred: {error}"
)
try:
self._update_status("Trying Pro for fallback generation...", type="INFO", phase="fallback")
# Try Pro as last resort (large context needed)
time.sleep(3)
files = self.model_router.route_json(
task="generation", # Pro
prompt=prompt,
temperature=0.7
)
self.total_tokens_used += self.gemini_client.get_token_usage() # Accumulate tokens
self._update_status("Fallback generation successful (Pro).", type="SUCCESS", phase="fallback")
return files
except Exception as e:
self._update_status("⚠️ Flash Lite fallback also failed. Using hardcoded templates...", type="WARNING", phase="fallback", details={"error": str(e)})
return {
"overview_md": f"# [Product Name] – MVP Blueprint Overview\n\n*Error occurred. Basic template provided.*\n\n## Tagline\n- One-line summary.\n\n## Purpose & Vision\n- What the product aims to achieve.\n\n## What’s Included\n- List of files.\n\n## How to Use This Blueprint\n- Instructions for humans and agents.\n\n## Style & Formatting Conventions\n- Markdown, tables, diagrams.\n\n## Glossary\n- Key terms.\n\n## References\n- Research links.",
"features_md": f"# MVP Features for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Core Features\n- Feature 1\n- Feature 2\n- Feature 3",
"architecture_md": f"# Architecture for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Tech Stack\n- Frontend\n- Backend\n- Database",
"design_md": f"# Design for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Design Principles\n- Simple\n- Intuitive\n- Accessible",
"user_flow_md": f"# User Flow for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Main Flow\n1. User lands\n2. User interacts\n3. User completes",
"roadmap_md": f"# Roadmap for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Timeline\n- Week 1-2: Setup\n- Week 3-4: Build\n- Week 5-6: Launch",
"business_model_md": f"# Business Model for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Executive Summary\n- Describe the business vision here.\n\n## Business Model Canvas\n| Key Partners | Key Activities | Key Resources | Value Propositions | Customer Relationships | Channels | Customer Segments | Cost Structure | Revenue Streams |\n|--------------|---------------|--------------|--------------------|-----------------------|----------|-------------------|---------------|----------------|\n| Example | Example | Example | Example | Example | Example | Example | Example | Example |\n\n## Revenue Model\n- List revenue streams here.\n\n## Cost Structure\n- List major costs here.\n\n## Go-to-Market Strategy\n- List main strategies here.\n\n## Competitive Advantage\n- List differentiators here.\n\n## Risks & Mitigations\n- List risks and mitigations here.\n\n## Edge Cases & Fallbacks\n- List edge cases here.\n\n## Implementation Hints & Best Practices\n- List best practices here.",
"testing_plan_md": f"# Testing Plan for: {idea}\n\n*Error occurred. Basic template provided.*\n\n## Test Strategy\n- Manual and automated\n\n## Test Cases\n- Test 1\n- Test 2\n- Test 3\n\n## Tools\n- pytest\n- Selenium\n\n## Success Criteria\n- All tests pass\n\n## Risks\n- List risks here."
}
def get_token_usage(self) -> int:
"""Get total tokens used in this session"""
return self.gemini_client.get_token_usage()
def create_agent(gemini_api_key: str) -> MVPAgent:
"""
Create a new MVP Agent instance
Args:
gemini_api_key: Google Gemini API key
Returns:
Configured MVPAgent instance
"""
return MVPAgent(gemini_api_key)