agentforge / extractor_agent_runner.py
Tahasaif3's picture
'code'
fd1a435
# extractor_agent_runner.py
import asyncio
import json
from typing import Dict, Any, Optional
from datetime import datetime
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from firebase_admin import db
# ------------------------
# Pydantic Models for Structured Outputs
# ------------------------
class BusinessExtraction(BaseModel):
"""Extracted business information"""
business_name: str = Field(description="Name of the business idea")
industry: str = Field(description="Industry or sector")
target_audience: str = Field(description="Target customer base")
core_problem: str = Field(description="Main problem being solved")
solution: str = Field(description="Proposed solution")
key_features: list[str] = Field(description="List of key features or capabilities")
tech_stack: list[str] = Field(description="Recommended technology stack")
estimated_complexity: str = Field(description="Low, Medium, or High")
class Config:
extra = "forbid"
class AgentSpecification(BaseModel):
"""AI Agent specifications"""
agent_name: str = Field(description="Name of the AI agent")
agent_purpose: str = Field(description="Main purpose of the agent")
capabilities: list[str] = Field(description="List of agent capabilities")
integrations: list[str] = Field(description="Required integrations or APIs")
data_requirements: list[str] = Field(description="Data needed for the agent")
deployment_model: str = Field(description="e.g., Cloud, On-premise, Hybrid")
class Config:
extra = "forbid"
class Phase(BaseModel):
"""Single implementation phase"""
phase: str = Field(description="Phase name")
duration: str = Field(description="Time duration")
tasks: list[str] = Field(description="List of tasks in this phase")
class Config:
extra = "forbid"
class ImplementationPlan(BaseModel):
"""Implementation roadmap"""
phases: list[Phase] = Field(description="Implementation phases with timeline")
estimated_timeline: str = Field(description="Overall timeline (e.g., 3-6 months)")
team_requirements: list[str] = Field(description="Required team members and roles")
estimated_cost: str = Field(description="Estimated cost range")
risks: list[str] = Field(description="Potential risks and challenges")
success_metrics: list[str] = Field(description="KPIs to measure success")
class Config:
extra = "forbid"
class CompleteExtraction(BaseModel):
"""Complete extraction result"""
business: BusinessExtraction
agent: AgentSpecification
implementation: ImplementationPlan
summary: str = Field(description="Executive summary of the entire plan")
class Config:
extra = "forbid"
# ------------------------
# Agent Orchestrator
# ------------------------
class AgentOrchestrator:
"""
Orchestrates multiple AI agents for business idea extraction.
Supports OpenAI GPT, Google Gemini, and xAI Grok models.
"""
def __init__(self, session_id: str):
self.session_id = session_id
self.client: Optional[AsyncOpenAI] = None
self.model: str = ""
async def _emit_log(self, message: str, type: str = "log"):
"""Emit log to Firebase"""
try:
ref = db.reference(f'sessions/{self.session_id}/logs')
ref.push({
'message': message,
'type': type,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
print(f"Error emitting log: {e}")
def _setup_client(self, model: str, api_key: str):
"""Setup appropriate client based on model type"""
model_lower = model.lower()
if "gpt" in model_lower or "o1" in model_lower:
# OpenAI models
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
elif "grok" in model_lower:
# xAI Grok models (OpenAI-compatible)
self.client = AsyncOpenAI(
api_key=api_key,
base_url="https://api.x.ai/v1"
)
self.model = model
elif "gemini" in model_lower:
# Google Gemini - use direct API
self.client = None # Will handle separately
self.model = model
else:
raise ValueError(f"Unsupported model: {model}")
async def _call_openai_structured(self, system_prompt: str, user_prompt: str, response_format: type[BaseModel]) -> BaseModel:
"""Call OpenAI with structured output"""
try:
response = await self.client.beta.chat.completions.parse(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_format=response_format,
temperature=0.7
)
return response.choices[0].message.parsed
except Exception as e:
raise Exception(f"OpenAI API call failed: {str(e)}")
async def _call_gemini_structured(self, system_prompt: str, user_prompt: str, api_key: str, response_format: type[BaseModel]) -> BaseModel:
"""Call Gemini and parse JSON response into Pydantic model"""
try:
import google.generativeai as genai
genai.configure(api_key=api_key)
model = genai.GenerativeModel(self.model)
# Get schema for structured output
schema_example = response_format.model_json_schema()
full_prompt = f"""{system_prompt}
User Query: {user_prompt}
Respond ONLY with valid JSON matching this schema:
{json.dumps(schema_example, indent=2)}
Important: No markdown, no backticks, just pure JSON."""
response = await model.generate_content_async(full_prompt)
# Extract JSON from response
text = response.text.strip()
if text.startswith("```json"):
text = text[7:]
if text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
text = text.strip()
# Parse and validate with Pydantic
data = json.loads(text)
return response_format(**data)
except Exception as e:
raise Exception(f"Gemini API call failed: {str(e)}")
async def _extract_business_info(self, query: str, api_key: str) -> BusinessExtraction:
"""Extract business information from query"""
await self._emit_log("πŸ” Analyzing business idea...", "system")
system_prompt = """You are a business analyst expert. Extract detailed business information from the user's query.
Be thorough and specific. If information is not explicitly stated, make reasonable inferences based on the context."""
user_prompt = f"""Analyze this business idea and extract key information:
Query: {query}
Provide detailed extraction including:
- Business name (create a suitable name if not provided)
- Industry classification
- Target audience
- Core problem being solved
- Proposed solution
- Key features (at least 3-5)
- Recommended tech stack
- Estimated complexity (Low/Medium/High)"""
if "gemini" in self.model.lower():
# Gemini: Parse JSON manually
return await self._call_gemini_structured(system_prompt, user_prompt, api_key, BusinessExtraction)
else:
# OpenAI/Grok: Use structured outputs
return await self._call_openai_structured(system_prompt, user_prompt, BusinessExtraction)
async def _extract_agent_specs(self, query: str, business: BusinessExtraction, api_key: str) -> AgentSpecification:
"""Extract AI agent specifications"""
await self._emit_log("πŸ€– Designing AI agent specifications...", "system")
system_prompt = """You are an AI agent architect. Design detailed specifications for an AI agent based on the business requirements.
Be specific about capabilities, integrations, and technical requirements."""
user_prompt = f"""Design an AI agent for this business:
Business: {business.business_name}
Industry: {business.industry}
Problem: {business.core_problem}
Solution: {business.solution}
Create detailed agent specifications including:
- Agent name
- Primary purpose
- Specific capabilities (at least 5)
- Required integrations (APIs, databases, services)
- Data requirements
- Deployment model (Cloud/On-premise/Hybrid)"""
if "gemini" in self.model.lower():
return await self._call_gemini_structured(system_prompt, user_prompt, api_key, AgentSpecification)
else:
return await self._call_openai_structured(system_prompt, user_prompt, AgentSpecification)
async def _create_implementation_plan(self, query: str, business: BusinessExtraction, agent: AgentSpecification, api_key: str) -> ImplementationPlan:
"""Create implementation roadmap"""
await self._emit_log("πŸ“‹ Creating implementation roadmap...", "system")
system_prompt = """You are a project manager and implementation strategist. Create a detailed implementation plan.
Include realistic timelines, resource requirements, and risk assessments."""
user_prompt = f"""Create an implementation plan for:
Business: {business.business_name}
Complexity: {business.estimated_complexity}
Agent: {agent.agent_name}
Capabilities: {', '.join(agent.capabilities)}
Provide:
- Implementation phases (at least 3-4 phases with specific tasks)
- Overall timeline estimate
- Team requirements (specific roles)
- Cost estimate range
- Potential risks (at least 3-5)
- Success metrics (KPIs)"""
if "gemini" in self.model.lower():
return await self._call_gemini_structured(system_prompt, user_prompt, api_key, ImplementationPlan)
else:
return await self._call_openai_structured(system_prompt, user_prompt, ImplementationPlan)
async def _create_summary(self, business: BusinessExtraction, agent: AgentSpecification, implementation: ImplementationPlan, api_key: str) -> str:
"""Create executive summary"""
await self._emit_log("πŸ“ Generating executive summary...", "system")
system_prompt = "You are an executive summary writer. Create a concise, compelling summary."
user_prompt = f"""Create an executive summary (2-3 paragraphs) for:
Business: {business.business_name}
Industry: {business.industry}
Solution: {business.solution}
Agent: {agent.agent_name}
Timeline: {implementation.estimated_timeline}
Cost: {implementation.estimated_cost}
Make it compelling and actionable."""
if "gemini" in self.model.lower():
import google.generativeai as genai
genai.configure(api_key=api_key)
model = genai.GenerativeModel(self.model)
response = await model.generate_content_async(f"{system_prompt}\n\n{user_prompt}")
return response.text.strip()
else:
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.7,
max_tokens=500
)
return response.choices[0].message.content.strip()
async def run(self, query: str, model: str, api_key: str) -> Dict[str, Any]:
"""
Main orchestration method - runs all extraction agents in sequence.
Args:
query: User's business idea query
model: Model to use (gpt-4o, gemini-2.0-flash-exp, grok-beta, etc.)
api_key: API key for the model
Returns:
Dict with extraction results or error information
"""
try:
await self._emit_log(f"πŸš€ Starting extraction with {model}...", "system")
# Setup client
self._setup_client(model, api_key)
# Step 1: Extract business information
await self._emit_log("Step 1/4: Business Analysis", "system")
business = await self._extract_business_info(query, api_key)
await self._emit_log(f"βœ… Identified: {business.business_name}", "success")
# Step 2: Design agent specifications
await self._emit_log("Step 2/4: Agent Design", "system")
agent = await self._extract_agent_specs(query, business, api_key)
await self._emit_log(f"βœ… Agent designed: {agent.agent_name}", "success")
# Step 3: Create implementation plan
await self._emit_log("Step 3/4: Implementation Planning", "system")
implementation = await self._create_implementation_plan(query, business, agent, api_key)
await self._emit_log(f"βœ… Timeline: {implementation.estimated_timeline}", "success")
# Step 4: Generate summary
await self._emit_log("Step 4/4: Summary Generation", "system")
summary = await self._create_summary(business, agent, implementation, api_key)
await self._emit_log("βœ… Extraction complete!", "success")
# Compile results
complete_extraction = CompleteExtraction(
business=business,
agent=agent,
implementation=implementation,
summary=summary
)
return {
"status": "success",
"stage": "complete",
"extraction": complete_extraction.model_dump(),
"metadata": {
"model_used": model,
"timestamp": datetime.now().isoformat(),
"session_id": self.session_id
}
}
except Exception as e:
error_msg = str(e)
await self._emit_log(f"❌ Error: {error_msg}", "error")
return {
"status": "error",
"stage": "extraction",
"message": error_msg,
"metadata": {
"model_used": model,
"timestamp": datetime.now().isoformat(),
"session_id": self.session_id
}
}
# ------------------------
# Testing
# ------------------------
if __name__ == "__main__":
import os
from dotenv import load_dotenv
load_dotenv()
async def test_orchestrator():
# Mock session ID
test_session = "test-session-123"
# Test query
query = "Create an AI agent for pharmacy inventory management that tracks medication expiry dates and auto-orders stock"
# Get API key from env
api_key = os.getenv("OPENAI_API_KEY", "")
if not api_key:
print("❌ OPENAI_API_KEY not found in .env")
return
# Run orchestrator
orchestrator = AgentOrchestrator(session_id=test_session)
result = await orchestrator.run(query, "gpt-4o", api_key)
print("\n" + "="*50)
print("EXTRACTION RESULT")
print("="*50)
print(json.dumps(result, indent=2))
asyncio.run(test_orchestrator())