Spaces:
Sleeping
Sleeping
| # extractor_agent_runner.py | |
| import asyncio | |
| import json | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| from openai import AsyncOpenAI | |
| from pydantic import BaseModel, Field | |
| from firebase_admin import db | |
| # ------------------------ | |
| # Pydantic Models for Structured Outputs | |
| # ------------------------ | |
| class BusinessExtraction(BaseModel): | |
| """Extracted business information""" | |
| business_name: str = Field(description="Name of the business idea") | |
| industry: str = Field(description="Industry or sector") | |
| target_audience: str = Field(description="Target customer base") | |
| core_problem: str = Field(description="Main problem being solved") | |
| solution: str = Field(description="Proposed solution") | |
| key_features: list[str] = Field(description="List of key features or capabilities") | |
| tech_stack: list[str] = Field(description="Recommended technology stack") | |
| estimated_complexity: str = Field(description="Low, Medium, or High") | |
| class Config: | |
| extra = "forbid" | |
| class AgentSpecification(BaseModel): | |
| """AI Agent specifications""" | |
| agent_name: str = Field(description="Name of the AI agent") | |
| agent_purpose: str = Field(description="Main purpose of the agent") | |
| capabilities: list[str] = Field(description="List of agent capabilities") | |
| integrations: list[str] = Field(description="Required integrations or APIs") | |
| data_requirements: list[str] = Field(description="Data needed for the agent") | |
| deployment_model: str = Field(description="e.g., Cloud, On-premise, Hybrid") | |
| class Config: | |
| extra = "forbid" | |
| class Phase(BaseModel): | |
| """Single implementation phase""" | |
| phase: str = Field(description="Phase name") | |
| duration: str = Field(description="Time duration") | |
| tasks: list[str] = Field(description="List of tasks in this phase") | |
| class Config: | |
| extra = "forbid" | |
| class ImplementationPlan(BaseModel): | |
| """Implementation roadmap""" | |
| phases: list[Phase] = Field(description="Implementation phases with timeline") | |
| estimated_timeline: str = Field(description="Overall timeline (e.g., 3-6 months)") | |
| team_requirements: list[str] = Field(description="Required team members and roles") | |
| estimated_cost: str = Field(description="Estimated cost range") | |
| risks: list[str] = Field(description="Potential risks and challenges") | |
| success_metrics: list[str] = Field(description="KPIs to measure success") | |
| class Config: | |
| extra = "forbid" | |
| class CompleteExtraction(BaseModel): | |
| """Complete extraction result""" | |
| business: BusinessExtraction | |
| agent: AgentSpecification | |
| implementation: ImplementationPlan | |
| summary: str = Field(description="Executive summary of the entire plan") | |
| class Config: | |
| extra = "forbid" | |
| # ------------------------ | |
| # Agent Orchestrator | |
| # ------------------------ | |
| class AgentOrchestrator: | |
| """ | |
| Orchestrates multiple AI agents for business idea extraction. | |
| Supports OpenAI GPT, Google Gemini, and xAI Grok models. | |
| """ | |
| def __init__(self, session_id: str): | |
| self.session_id = session_id | |
| self.client: Optional[AsyncOpenAI] = None | |
| self.model: str = "" | |
| async def _emit_log(self, message: str, type: str = "log"): | |
| """Emit log to Firebase""" | |
| try: | |
| ref = db.reference(f'sessions/{self.session_id}/logs') | |
| ref.push({ | |
| 'message': message, | |
| 'type': type, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| print(f"Error emitting log: {e}") | |
| def _setup_client(self, model: str, api_key: str): | |
| """Setup appropriate client based on model type""" | |
| model_lower = model.lower() | |
| if "gpt" in model_lower or "o1" in model_lower: | |
| # OpenAI models | |
| self.client = AsyncOpenAI(api_key=api_key) | |
| self.model = model | |
| elif "grok" in model_lower: | |
| # xAI Grok models (OpenAI-compatible) | |
| self.client = AsyncOpenAI( | |
| api_key=api_key, | |
| base_url="https://api.x.ai/v1" | |
| ) | |
| self.model = model | |
| elif "gemini" in model_lower: | |
| # Google Gemini - use direct API | |
| self.client = None # Will handle separately | |
| self.model = model | |
| else: | |
| raise ValueError(f"Unsupported model: {model}") | |
| async def _call_openai_structured(self, system_prompt: str, user_prompt: str, response_format: type[BaseModel]) -> BaseModel: | |
| """Call OpenAI with structured output""" | |
| try: | |
| response = await self.client.beta.chat.completions.parse( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| response_format=response_format, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.parsed | |
| except Exception as e: | |
| raise Exception(f"OpenAI API call failed: {str(e)}") | |
| async def _call_gemini_structured(self, system_prompt: str, user_prompt: str, api_key: str, response_format: type[BaseModel]) -> BaseModel: | |
| """Call Gemini and parse JSON response into Pydantic model""" | |
| try: | |
| import google.generativeai as genai | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel(self.model) | |
| # Get schema for structured output | |
| schema_example = response_format.model_json_schema() | |
| full_prompt = f"""{system_prompt} | |
| User Query: {user_prompt} | |
| Respond ONLY with valid JSON matching this schema: | |
| {json.dumps(schema_example, indent=2)} | |
| Important: No markdown, no backticks, just pure JSON.""" | |
| response = await model.generate_content_async(full_prompt) | |
| # Extract JSON from response | |
| text = response.text.strip() | |
| if text.startswith("```json"): | |
| text = text[7:] | |
| if text.startswith("```"): | |
| text = text[3:] | |
| if text.endswith("```"): | |
| text = text[:-3] | |
| text = text.strip() | |
| # Parse and validate with Pydantic | |
| data = json.loads(text) | |
| return response_format(**data) | |
| except Exception as e: | |
| raise Exception(f"Gemini API call failed: {str(e)}") | |
| async def _extract_business_info(self, query: str, api_key: str) -> BusinessExtraction: | |
| """Extract business information from query""" | |
| await self._emit_log("π Analyzing business idea...", "system") | |
| system_prompt = """You are a business analyst expert. Extract detailed business information from the user's query. | |
| Be thorough and specific. If information is not explicitly stated, make reasonable inferences based on the context.""" | |
| user_prompt = f"""Analyze this business idea and extract key information: | |
| Query: {query} | |
| Provide detailed extraction including: | |
| - Business name (create a suitable name if not provided) | |
| - Industry classification | |
| - Target audience | |
| - Core problem being solved | |
| - Proposed solution | |
| - Key features (at least 3-5) | |
| - Recommended tech stack | |
| - Estimated complexity (Low/Medium/High)""" | |
| if "gemini" in self.model.lower(): | |
| # Gemini: Parse JSON manually | |
| return await self._call_gemini_structured(system_prompt, user_prompt, api_key, BusinessExtraction) | |
| else: | |
| # OpenAI/Grok: Use structured outputs | |
| return await self._call_openai_structured(system_prompt, user_prompt, BusinessExtraction) | |
| async def _extract_agent_specs(self, query: str, business: BusinessExtraction, api_key: str) -> AgentSpecification: | |
| """Extract AI agent specifications""" | |
| await self._emit_log("π€ Designing AI agent specifications...", "system") | |
| system_prompt = """You are an AI agent architect. Design detailed specifications for an AI agent based on the business requirements. | |
| Be specific about capabilities, integrations, and technical requirements.""" | |
| user_prompt = f"""Design an AI agent for this business: | |
| Business: {business.business_name} | |
| Industry: {business.industry} | |
| Problem: {business.core_problem} | |
| Solution: {business.solution} | |
| Create detailed agent specifications including: | |
| - Agent name | |
| - Primary purpose | |
| - Specific capabilities (at least 5) | |
| - Required integrations (APIs, databases, services) | |
| - Data requirements | |
| - Deployment model (Cloud/On-premise/Hybrid)""" | |
| if "gemini" in self.model.lower(): | |
| return await self._call_gemini_structured(system_prompt, user_prompt, api_key, AgentSpecification) | |
| else: | |
| return await self._call_openai_structured(system_prompt, user_prompt, AgentSpecification) | |
| async def _create_implementation_plan(self, query: str, business: BusinessExtraction, agent: AgentSpecification, api_key: str) -> ImplementationPlan: | |
| """Create implementation roadmap""" | |
| await self._emit_log("π Creating implementation roadmap...", "system") | |
| system_prompt = """You are a project manager and implementation strategist. Create a detailed implementation plan. | |
| Include realistic timelines, resource requirements, and risk assessments.""" | |
| user_prompt = f"""Create an implementation plan for: | |
| Business: {business.business_name} | |
| Complexity: {business.estimated_complexity} | |
| Agent: {agent.agent_name} | |
| Capabilities: {', '.join(agent.capabilities)} | |
| Provide: | |
| - Implementation phases (at least 3-4 phases with specific tasks) | |
| - Overall timeline estimate | |
| - Team requirements (specific roles) | |
| - Cost estimate range | |
| - Potential risks (at least 3-5) | |
| - Success metrics (KPIs)""" | |
| if "gemini" in self.model.lower(): | |
| return await self._call_gemini_structured(system_prompt, user_prompt, api_key, ImplementationPlan) | |
| else: | |
| return await self._call_openai_structured(system_prompt, user_prompt, ImplementationPlan) | |
| async def _create_summary(self, business: BusinessExtraction, agent: AgentSpecification, implementation: ImplementationPlan, api_key: str) -> str: | |
| """Create executive summary""" | |
| await self._emit_log("π Generating executive summary...", "system") | |
| system_prompt = "You are an executive summary writer. Create a concise, compelling summary." | |
| user_prompt = f"""Create an executive summary (2-3 paragraphs) for: | |
| Business: {business.business_name} | |
| Industry: {business.industry} | |
| Solution: {business.solution} | |
| Agent: {agent.agent_name} | |
| Timeline: {implementation.estimated_timeline} | |
| Cost: {implementation.estimated_cost} | |
| Make it compelling and actionable.""" | |
| if "gemini" in self.model.lower(): | |
| import google.generativeai as genai | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel(self.model) | |
| response = await model.generate_content_async(f"{system_prompt}\n\n{user_prompt}") | |
| return response.text.strip() | |
| else: | |
| response = await self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| temperature=0.7, | |
| max_tokens=500 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| async def run(self, query: str, model: str, api_key: str) -> Dict[str, Any]: | |
| """ | |
| Main orchestration method - runs all extraction agents in sequence. | |
| Args: | |
| query: User's business idea query | |
| model: Model to use (gpt-4o, gemini-2.0-flash-exp, grok-beta, etc.) | |
| api_key: API key for the model | |
| Returns: | |
| Dict with extraction results or error information | |
| """ | |
| try: | |
| await self._emit_log(f"π Starting extraction with {model}...", "system") | |
| # Setup client | |
| self._setup_client(model, api_key) | |
| # Step 1: Extract business information | |
| await self._emit_log("Step 1/4: Business Analysis", "system") | |
| business = await self._extract_business_info(query, api_key) | |
| await self._emit_log(f"β Identified: {business.business_name}", "success") | |
| # Step 2: Design agent specifications | |
| await self._emit_log("Step 2/4: Agent Design", "system") | |
| agent = await self._extract_agent_specs(query, business, api_key) | |
| await self._emit_log(f"β Agent designed: {agent.agent_name}", "success") | |
| # Step 3: Create implementation plan | |
| await self._emit_log("Step 3/4: Implementation Planning", "system") | |
| implementation = await self._create_implementation_plan(query, business, agent, api_key) | |
| await self._emit_log(f"β Timeline: {implementation.estimated_timeline}", "success") | |
| # Step 4: Generate summary | |
| await self._emit_log("Step 4/4: Summary Generation", "system") | |
| summary = await self._create_summary(business, agent, implementation, api_key) | |
| await self._emit_log("β Extraction complete!", "success") | |
| # Compile results | |
| complete_extraction = CompleteExtraction( | |
| business=business, | |
| agent=agent, | |
| implementation=implementation, | |
| summary=summary | |
| ) | |
| return { | |
| "status": "success", | |
| "stage": "complete", | |
| "extraction": complete_extraction.model_dump(), | |
| "metadata": { | |
| "model_used": model, | |
| "timestamp": datetime.now().isoformat(), | |
| "session_id": self.session_id | |
| } | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| await self._emit_log(f"β Error: {error_msg}", "error") | |
| return { | |
| "status": "error", | |
| "stage": "extraction", | |
| "message": error_msg, | |
| "metadata": { | |
| "model_used": model, | |
| "timestamp": datetime.now().isoformat(), | |
| "session_id": self.session_id | |
| } | |
| } | |
| # ------------------------ | |
| # Testing | |
| # ------------------------ | |
| if __name__ == "__main__": | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| async def test_orchestrator(): | |
| # Mock session ID | |
| test_session = "test-session-123" | |
| # Test query | |
| query = "Create an AI agent for pharmacy inventory management that tracks medication expiry dates and auto-orders stock" | |
| # Get API key from env | |
| api_key = os.getenv("OPENAI_API_KEY", "") | |
| if not api_key: | |
| print("β OPENAI_API_KEY not found in .env") | |
| return | |
| # Run orchestrator | |
| orchestrator = AgentOrchestrator(session_id=test_session) | |
| result = await orchestrator.run(query, "gpt-4o", api_key) | |
| print("\n" + "="*50) | |
| print("EXTRACTION RESULT") | |
| print("="*50) | |
| print(json.dumps(result, indent=2)) | |
| asyncio.run(test_orchestrator()) |