# extractor_agent_runner.py import asyncio import json from typing import Dict, Any, Optional from datetime import datetime from openai import AsyncOpenAI from pydantic import BaseModel, Field from firebase_admin import db # ------------------------ # Pydantic Models for Structured Outputs # ------------------------ class BusinessExtraction(BaseModel): """Extracted business information""" business_name: str = Field(description="Name of the business idea") industry: str = Field(description="Industry or sector") target_audience: str = Field(description="Target customer base") core_problem: str = Field(description="Main problem being solved") solution: str = Field(description="Proposed solution") key_features: list[str] = Field(description="List of key features or capabilities") tech_stack: list[str] = Field(description="Recommended technology stack") estimated_complexity: str = Field(description="Low, Medium, or High") class Config: extra = "forbid" class AgentSpecification(BaseModel): """AI Agent specifications""" agent_name: str = Field(description="Name of the AI agent") agent_purpose: str = Field(description="Main purpose of the agent") capabilities: list[str] = Field(description="List of agent capabilities") integrations: list[str] = Field(description="Required integrations or APIs") data_requirements: list[str] = Field(description="Data needed for the agent") deployment_model: str = Field(description="e.g., Cloud, On-premise, Hybrid") class Config: extra = "forbid" class Phase(BaseModel): """Single implementation phase""" phase: str = Field(description="Phase name") duration: str = Field(description="Time duration") tasks: list[str] = Field(description="List of tasks in this phase") class Config: extra = "forbid" class ImplementationPlan(BaseModel): """Implementation roadmap""" phases: list[Phase] = Field(description="Implementation phases with timeline") estimated_timeline: str = Field(description="Overall timeline (e.g., 3-6 months)") team_requirements: list[str] = Field(description="Required team members and roles") estimated_cost: str = Field(description="Estimated cost range") risks: list[str] = Field(description="Potential risks and challenges") success_metrics: list[str] = Field(description="KPIs to measure success") class Config: extra = "forbid" class CompleteExtraction(BaseModel): """Complete extraction result""" business: BusinessExtraction agent: AgentSpecification implementation: ImplementationPlan summary: str = Field(description="Executive summary of the entire plan") class Config: extra = "forbid" # ------------------------ # Agent Orchestrator # ------------------------ class AgentOrchestrator: """ Orchestrates multiple AI agents for business idea extraction. Supports OpenAI GPT, Google Gemini, and xAI Grok models. """ def __init__(self, session_id: str): self.session_id = session_id self.client: Optional[AsyncOpenAI] = None self.model: str = "" async def _emit_log(self, message: str, type: str = "log"): """Emit log to Firebase""" try: ref = db.reference(f'sessions/{self.session_id}/logs') ref.push({ 'message': message, 'type': type, 'timestamp': datetime.now().isoformat() }) except Exception as e: print(f"Error emitting log: {e}") def _setup_client(self, model: str, api_key: str): """Setup appropriate client based on model type""" model_lower = model.lower() if "gpt" in model_lower or "o1" in model_lower: # OpenAI models self.client = AsyncOpenAI(api_key=api_key) self.model = model elif "grok" in model_lower: # xAI Grok models (OpenAI-compatible) self.client = AsyncOpenAI( api_key=api_key, base_url="https://api.x.ai/v1" ) self.model = model elif "gemini" in model_lower: # Google Gemini - use direct API self.client = None # Will handle separately self.model = model else: raise ValueError(f"Unsupported model: {model}") async def _call_openai_structured(self, system_prompt: str, user_prompt: str, response_format: type[BaseModel]) -> BaseModel: """Call OpenAI with structured output""" try: response = await self.client.beta.chat.completions.parse( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], response_format=response_format, temperature=0.7 ) return response.choices[0].message.parsed except Exception as e: raise Exception(f"OpenAI API call failed: {str(e)}") async def _call_gemini_structured(self, system_prompt: str, user_prompt: str, api_key: str, response_format: type[BaseModel]) -> BaseModel: """Call Gemini and parse JSON response into Pydantic model""" try: import google.generativeai as genai genai.configure(api_key=api_key) model = genai.GenerativeModel(self.model) # Get schema for structured output schema_example = response_format.model_json_schema() full_prompt = f"""{system_prompt} User Query: {user_prompt} Respond ONLY with valid JSON matching this schema: {json.dumps(schema_example, indent=2)} Important: No markdown, no backticks, just pure JSON.""" response = await model.generate_content_async(full_prompt) # Extract JSON from response text = response.text.strip() if text.startswith("```json"): text = text[7:] if text.startswith("```"): text = text[3:] if text.endswith("```"): text = text[:-3] text = text.strip() # Parse and validate with Pydantic data = json.loads(text) return response_format(**data) except Exception as e: raise Exception(f"Gemini API call failed: {str(e)}") async def _extract_business_info(self, query: str, api_key: str) -> BusinessExtraction: """Extract business information from query""" await self._emit_log("🔍 Analyzing business idea...", "system") system_prompt = """You are a business analyst expert. Extract detailed business information from the user's query. Be thorough and specific. If information is not explicitly stated, make reasonable inferences based on the context.""" user_prompt = f"""Analyze this business idea and extract key information: Query: {query} Provide detailed extraction including: - Business name (create a suitable name if not provided) - Industry classification - Target audience - Core problem being solved - Proposed solution - Key features (at least 3-5) - Recommended tech stack - Estimated complexity (Low/Medium/High)""" if "gemini" in self.model.lower(): # Gemini: Parse JSON manually return await self._call_gemini_structured(system_prompt, user_prompt, api_key, BusinessExtraction) else: # OpenAI/Grok: Use structured outputs return await self._call_openai_structured(system_prompt, user_prompt, BusinessExtraction) async def _extract_agent_specs(self, query: str, business: BusinessExtraction, api_key: str) -> AgentSpecification: """Extract AI agent specifications""" await self._emit_log("🤖 Designing AI agent specifications...", "system") system_prompt = """You are an AI agent architect. Design detailed specifications for an AI agent based on the business requirements. Be specific about capabilities, integrations, and technical requirements.""" user_prompt = f"""Design an AI agent for this business: Business: {business.business_name} Industry: {business.industry} Problem: {business.core_problem} Solution: {business.solution} Create detailed agent specifications including: - Agent name - Primary purpose - Specific capabilities (at least 5) - Required integrations (APIs, databases, services) - Data requirements - Deployment model (Cloud/On-premise/Hybrid)""" if "gemini" in self.model.lower(): return await self._call_gemini_structured(system_prompt, user_prompt, api_key, AgentSpecification) else: return await self._call_openai_structured(system_prompt, user_prompt, AgentSpecification) async def _create_implementation_plan(self, query: str, business: BusinessExtraction, agent: AgentSpecification, api_key: str) -> ImplementationPlan: """Create implementation roadmap""" await self._emit_log("📋 Creating implementation roadmap...", "system") system_prompt = """You are a project manager and implementation strategist. Create a detailed implementation plan. Include realistic timelines, resource requirements, and risk assessments.""" user_prompt = f"""Create an implementation plan for: Business: {business.business_name} Complexity: {business.estimated_complexity} Agent: {agent.agent_name} Capabilities: {', '.join(agent.capabilities)} Provide: - Implementation phases (at least 3-4 phases with specific tasks) - Overall timeline estimate - Team requirements (specific roles) - Cost estimate range - Potential risks (at least 3-5) - Success metrics (KPIs)""" if "gemini" in self.model.lower(): return await self._call_gemini_structured(system_prompt, user_prompt, api_key, ImplementationPlan) else: return await self._call_openai_structured(system_prompt, user_prompt, ImplementationPlan) async def _create_summary(self, business: BusinessExtraction, agent: AgentSpecification, implementation: ImplementationPlan, api_key: str) -> str: """Create executive summary""" await self._emit_log("📝 Generating executive summary...", "system") system_prompt = "You are an executive summary writer. Create a concise, compelling summary." user_prompt = f"""Create an executive summary (2-3 paragraphs) for: Business: {business.business_name} Industry: {business.industry} Solution: {business.solution} Agent: {agent.agent_name} Timeline: {implementation.estimated_timeline} Cost: {implementation.estimated_cost} Make it compelling and actionable.""" if "gemini" in self.model.lower(): import google.generativeai as genai genai.configure(api_key=api_key) model = genai.GenerativeModel(self.model) response = await model.generate_content_async(f"{system_prompt}\n\n{user_prompt}") return response.text.strip() else: response = await self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.7, max_tokens=500 ) return response.choices[0].message.content.strip() async def run(self, query: str, model: str, api_key: str) -> Dict[str, Any]: """ Main orchestration method - runs all extraction agents in sequence. Args: query: User's business idea query model: Model to use (gpt-4o, gemini-2.0-flash-exp, grok-beta, etc.) api_key: API key for the model Returns: Dict with extraction results or error information """ try: await self._emit_log(f"🚀 Starting extraction with {model}...", "system") # Setup client self._setup_client(model, api_key) # Step 1: Extract business information await self._emit_log("Step 1/4: Business Analysis", "system") business = await self._extract_business_info(query, api_key) await self._emit_log(f"✅ Identified: {business.business_name}", "success") # Step 2: Design agent specifications await self._emit_log("Step 2/4: Agent Design", "system") agent = await self._extract_agent_specs(query, business, api_key) await self._emit_log(f"✅ Agent designed: {agent.agent_name}", "success") # Step 3: Create implementation plan await self._emit_log("Step 3/4: Implementation Planning", "system") implementation = await self._create_implementation_plan(query, business, agent, api_key) await self._emit_log(f"✅ Timeline: {implementation.estimated_timeline}", "success") # Step 4: Generate summary await self._emit_log("Step 4/4: Summary Generation", "system") summary = await self._create_summary(business, agent, implementation, api_key) await self._emit_log("✅ Extraction complete!", "success") # Compile results complete_extraction = CompleteExtraction( business=business, agent=agent, implementation=implementation, summary=summary ) return { "status": "success", "stage": "complete", "extraction": complete_extraction.model_dump(), "metadata": { "model_used": model, "timestamp": datetime.now().isoformat(), "session_id": self.session_id } } except Exception as e: error_msg = str(e) await self._emit_log(f"❌ Error: {error_msg}", "error") return { "status": "error", "stage": "extraction", "message": error_msg, "metadata": { "model_used": model, "timestamp": datetime.now().isoformat(), "session_id": self.session_id } } # ------------------------ # Testing # ------------------------ if __name__ == "__main__": import os from dotenv import load_dotenv load_dotenv() async def test_orchestrator(): # Mock session ID test_session = "test-session-123" # Test query query = "Create an AI agent for pharmacy inventory management that tracks medication expiry dates and auto-orders stock" # Get API key from env api_key = os.getenv("OPENAI_API_KEY", "") if not api_key: print("❌ OPENAI_API_KEY not found in .env") return # Run orchestrator orchestrator = AgentOrchestrator(session_id=test_session) result = await orchestrator.run(query, "gpt-4o", api_key) print("\n" + "="*50) print("EXTRACTION RESULT") print("="*50) print(json.dumps(result, indent=2)) asyncio.run(test_orchestrator())