agentforge / main.py
Tahasaif3's picture
'code'
fd1a435
import os
import uuid
import time
import json
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional
import socketio
from firebase_admin import db, initialize_app, credentials, _apps
from dotenv import load_dotenv
from cryptography.fernet import Fernet
from validation_agent import run_validation_agent
from extractor_agent_runner import AgentOrchestrator
from agent_architect import LiveAgentArchitect, AgentDeployer
from agents import Runner, SQLiteSession, OpenAIChatCompletionsModel, Agent
from openai import AsyncOpenAI as AgentsAsyncOpenAI
load_dotenv()
# --- Encryption setup ---
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY")
if not ENCRYPTION_KEY:
raise ValueError("ENCRYPTION_KEY not set in .env")
fernet = Fernet(ENCRYPTION_KEY.encode())
def encrypt_api_key(api_key: str) -> str:
return fernet.encrypt(api_key.encode()).decode()
def decrypt_api_key(encrypted_key: str) -> str:
return fernet.decrypt(encrypted_key.encode()).decode()
# --- Firebase initialization ---
cred = credentials.Certificate("firebase/serviceAccountKey.json")
if not _apps:
initialize_app(cred, {'databaseURL': os.getenv("FIREBASE_DB_URL")})
# --- FastAPI + Socket.IO setup ---
app = FastAPI(title="AgentForge API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins="*", max_http_buffer_size=10**8)
socket_app = socketio.ASGIApp(sio, app)
# --- Request/Response Models ---
class ValidateRequest(BaseModel):
model: str = Field(..., description="Model name")
api_key: str = Field(..., description="API key to validate")
class ValidateResponse(BaseModel):
is_valid: bool
error: Optional[str] = None
model: str
class RunSessionRequest(BaseModel):
query: str = Field(..., description="User's business idea")
model: str = Field(default="gpt-4o", description="Model to use")
api_key: str = Field(default=os.getenv("OPENAI_API_KEY"), description="API key")
class SessionResponse(BaseModel):
session_id: str
status: str = "created"
class RunResponse(BaseModel):
status: str
result: dict
class ChatWithAgentRequest(BaseModel):
agent_id: str = Field(..., description="Agent ID from build result")
message: str = Field(..., description="User message to the agent")
session_id: str = Field(..., description="Session ID")
# --- Helper functions ---
async def emit_log(session_id: str, message: str, type: str = "log"):
"""Emit log to Firebase and Socket.IO"""
try:
ref = db.reference(f'sessions/{session_id}/logs')
ref.push({
'message': message,
'type': type,
'timestamp': time.time()
})
await sio.emit('log_update', {
'message': message,
'type': type,
'timestamp': time.time()
}, room=session_id)
except Exception as e:
print(f"Error emitting log: {e}")
async def save_session_result(session_id: str, result: dict):
"""Save session result to Firebase"""
try:
ref = db.reference(f'sessions/{session_id}/result')
ref.set(result)
except Exception as e:
print(f"Error saving session result: {e}")
# --- API Endpoints ---
@app.get("/", tags=["Health"])
async def root():
return {
"status": "AgentForge API Online",
"version": "1.0.0",
"features": ["validation", "extraction", "live_agent_builder"],
"socket_io": "initialized"
}
@app.post("/validate", response_model=ValidateResponse, tags=["Validation"])
async def validate_key(request: ValidateRequest) -> ValidateResponse:
try:
result = await run_validation_agent(request.model, request.api_key)
return ValidateResponse(
is_valid=result.is_valid,
error=result.error,
model=request.model
)
except Exception as e:
return ValidateResponse(
is_valid=False,
error=f"Validation error: {str(e)}",
model=request.model
)
@app.post("/start", response_model=SessionResponse, tags=["Sessions"])
async def start_session() -> SessionResponse:
session_id = str(uuid.uuid4())
try:
db.reference(f'sessions/{session_id}').set({
'status': 'awaiting_query',
'created_at': time.time(),
'logs': [],
'api_keys': {}
})
await emit_log(session_id, "Session started. Ready to receive query.", "system")
return SessionResponse(session_id=session_id, status="created")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create session: {str(e)}")
@app.post("/session/{sid}/run", response_model=RunResponse, tags=["Sessions"])
async def run_agents(sid: str, request_data: RunSessionRequest) -> RunResponse:
"""
Complete pipeline: Validation โ†’ Extraction โ†’ Live Agent Build
"""
query = request_data.query
model = request_data.model.lower()
api_key = request_data.api_key
if not query or not model or not api_key:
await emit_log(sid, "Error: Missing required fields", "error")
raise HTTPException(status_code=400, detail="query, model, and api_key are required")
try:
# Step 1: Log start
await emit_log(sid, f"๐Ÿ’ฌ User Query: {query}", "user")
await emit_log(sid, f"๐Ÿค– Model: {model}", "system")
# Step 2: Encrypt & store key
encrypted_key = encrypt_api_key(api_key)
db.reference(f'sessions/{sid}/api_keys/{model}').set(encrypted_key)
await emit_log(sid, f"๐Ÿ” API key encrypted", "success")
api_key_decrypted = decrypt_api_key(encrypted_key)
# Step 3: Validate API key
await emit_log(sid, "๐Ÿ” Validating API key...", "system")
validation = await run_validation_agent(model, api_key_decrypted)
if not validation.is_valid:
await emit_log(sid, f"โŒ Validation failed: {validation.error}", "error")
return RunResponse(
status="error",
result={"stage": "validation", "message": validation.error}
)
await emit_log(sid, "โœ… API key validated", "success")
# Step 4: Run extraction
await emit_log(sid, "๐Ÿš€ Starting extraction...", "system")
orchestrator = AgentOrchestrator(session_id=sid)
extraction_result = await orchestrator.run(query, model, api_key_decrypted)
if extraction_result["status"] != "success":
await emit_log(sid, f"โŒ Extraction failed", "error")
db.reference(f'sessions/{sid}').update({'status': 'error'})
return RunResponse(status="error", result=extraction_result)
await emit_log(sid, "โœ… Extraction complete!", "success")
# Step 5: BUILD LIVE AGENT (This is the missing part!)
await emit_log(sid, "๐Ÿ—๏ธ Building live OpenAI agent...", "system")
await emit_log(sid, "โš™๏ธ Mapping features to tools...", "system")
architect = LiveAgentArchitect(session_id=sid)
extraction_data = extraction_result.get('extraction', {})
# Build the agent
agent_result = await architect.build_agent(extraction_data, model, api_key_decrypted)
# Add agent build results to response
if agent_result.status == "success":
# Save to Firebase
await AgentDeployer.save_to_firebase(sid, agent_result.agent_config)
# Deployment code is already in agent_result
deployment_code = agent_result.deployment_code
# Add to result
extraction_result["agent_build"] = {
"status": "success",
"agent_id": agent_result.agent_config.agent_id,
"agent_name": agent_result.agent_config.name,
"model": agent_result.agent_config.model,
"tools": [
{
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
} for tool in agent_result.agent_config.tools
],
"tools_count": len(agent_result.agent_config.tools),
"tone": agent_result.agent_config.tone,
"instructions": agent_result.agent_config.instructions,
"test_response": agent_result.agent_test_response,
"deployment_code": deployment_code,
"business_context": agent_result.agent_config.business_context
}
await emit_log(sid, f"โœ… Agent '{agent_result.agent_config.name}' built!", "success")
await emit_log(sid, f"๐Ÿ†” Agent ID: {agent_result.agent_config.agent_id}", "info")
await emit_log(sid, f"๐Ÿ”ง Tools: {len(agent_result.agent_config.tools)}", "info")
await emit_log(sid, f"๐ŸŽญ Tone: {agent_result.agent_config.tone}", "info")
await emit_log(sid, f"\n๐Ÿ’ฌ Test: {agent_result.agent_test_response[:100]}...", "info")
await emit_log(sid, "\n๐ŸŽ‰ LIVE AGENT READY FOR DEPLOYMENT!", "success")
else:
await emit_log(sid, f"โš ๏ธ Agent build failed: {agent_result.error}", "warning")
extraction_result["agent_build"] = {
"status": "error",
"error": agent_result.error
}
# Save final result
await save_session_result(sid, extraction_result)
# Summary logs
business = extraction_data.get('business', {})
await emit_log(sid, f"\n๐Ÿ“Š Business: {business.get('business_name')}", "result")
await emit_log(sid, f"๐Ÿญ Industry: {business.get('industry')}", "result")
if extraction_result.get("agent_build", {}).get("status") == "success":
agent_build = extraction_result["agent_build"]
await emit_log(sid, f"\n๐Ÿค– Live Agent: {agent_build['agent_name']}", "success")
await emit_log(sid, f"๐Ÿ”ง Tools Available: {agent_build['tools_count']}", "success")
await emit_log(sid, "\nโœ… Complete! Agent is live and ready.", "success")
db.reference(f'sessions/{sid}').update({'status': 'completed'})
return RunResponse(status="success", result=extraction_result)
except Exception as e:
error_msg = str(e)
await emit_log(sid, f"๐Ÿ’ฅ Error: {error_msg}", "error")
db.reference(f'sessions/{sid}').update({'status': 'error'})
raise HTTPException(status_code=500, detail=error_msg)
@app.post("/agent/chat", tags=["Agent Execution"])
async def chat_with_agent(request: ChatWithAgentRequest):
try:
agent_id = request.agent_id
user_message = request.message
session_id = request.session_id
# Get agent configuration from Firebase
agent_ref = db.reference(f'agents/{agent_id}')
agent_data = agent_ref.get()
if not agent_data:
raise HTTPException(status_code=404, detail="Agent not found")
# Get encrypted API key from session
session_ref = db.reference(f'sessions/{session_id}/api_keys')
api_keys = session_ref.get()
model = agent_data.get('model', 'gpt-4o')
model_key = model.split('-')[0] if '-' in model else model
encrypted_key = None
for key_name, key_value in (api_keys or {}).items():
if model_key in key_name:
encrypted_key = key_value
break
if not encrypted_key:
raise HTTPException(status_code=400, detail="API key not found for this model")
api_key = decrypt_api_key(encrypted_key)
# Setup OpenAI client
if "gemini" in model.lower():
client = AgentsAsyncOpenAI(api_key=api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
model_name = "gemini-2.0-flash-exp"
elif "grok" in model.lower():
client = AgentsAsyncOpenAI(api_key=api_key, base_url="https://api.x.ai/v1")
model_name = "grok-beta"
else:
client = AgentsAsyncOpenAI(api_key=api_key)
model_name = "gpt-4o"
MODEL = OpenAIChatCompletionsModel(model=model_name, openai_client=client)
# === FIXED PART: Recreate tools safely using domain ===
from agent_architect import DynamicToolFactory
domain = agent_data['business_context']['domain']
business_name = agent_data['business_context'].get('business_name', 'Agent')
tools, _ = DynamicToolFactory.create_tools_for_domain(domain, business_name)
agent = Agent(
name=agent_data['name'],
instructions=agent_data['instructions'],
model=MODEL,
tools=tools
)
# ======================================================
runner = Runner()
temp_session = SQLiteSession(f":memory:")
response = await runner.run(agent, user_message, session=temp_session)
final_output = str(response.final_output) if hasattr(response, 'final_output') else str(response)
await emit_log(session_id, f"User: {user_message}", "user")
await emit_log(session_id, f"{agent_data['name']}: {final_output[:200]}...", "agent")
return {
"status": "success",
"agent_id": agent_id,
"agent_name": agent_data['name'],
"user_message": user_message,
"agent_response": final_output,
"tools_used": [tool['name'] for tool in agent_data.get('tools', [])],
"timestamp": time.time()
}
except Exception as e:
error_msg = str(e)
await emit_log(session_id, f"Agent error: {error_msg}", "error")
raise HTTPException(status_code=500, detail=error_msg)
# --- Socket.IO Events ---
@sio.event
async def connect(sid, environ):
print(f"โœ… Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"โŒ Client disconnected: {sid}")
@sio.event
async def join(sid, data):
session_id = data.get('session_id')
if session_id:
await sio.enter_room(sid, session_id)
print(f"๐Ÿ‘ฅ Client {sid} joined room: {session_id}")
await sio.emit('joined', {'session_id': session_id}, room=sid)
# Mount Socket.IO - IMPORTANT: Do this LAST
app.mount("/", socket_app)