taahaasaif's picture
Deploy None agent
679162e verified
"""
AgentForge - Hugging Face Space Template
This is a generic, reusable agent runner that reads configuration from environment variables.
"""
import os
import json
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from agents import Agent, AsyncOpenAI as AgentsAsyncOpenAI, OpenAIChatCompletionsModel, function_tool, Runner, SQLiteSession
import aiosmtplib
from email.message import EmailMessage
# ============================================
# Load Agent Configuration from Environment
# ============================================
AGENT_CONFIG_STR = os.getenv("AGENT_CONFIG")
if not AGENT_CONFIG_STR:
raise ValueError("AGENT_CONFIG environment variable is required")
# Parse the config - handle both nested and flat structures
try:
raw_config = json.loads(AGENT_CONFIG_STR)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse AGENT_CONFIG as JSON: {e}")
# Handle nested structure (from full API response)
if isinstance(raw_config, dict):
# Check if it's the full response structure with result.agent_build
if "result" in raw_config and "agent_build" in raw_config.get("result", {}):
AGENT_CONFIG = raw_config["result"]["agent_build"]
# Check if it's nested under a different key
elif "agent_build" in raw_config:
AGENT_CONFIG = raw_config["agent_build"]
# Otherwise assume it's already the flat agent_build structure
else:
AGENT_CONFIG = raw_config
else:
AGENT_CONFIG = raw_config
# Validate that we have the required fields
if not isinstance(AGENT_CONFIG, dict):
raise ValueError(f"AGENT_CONFIG must be a dictionary, got {type(AGENT_CONFIG)}")
# Log config keys for debugging (in production, this helps identify issues)
print(f"Loaded AGENT_CONFIG with keys: {list(AGENT_CONFIG.keys())[:10]}...") # Print first 10 keys
# API Keys from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
GROK_API_KEY = os.getenv("GROK_API_KEY")
# ============================================
# FastAPI App Setup
# ============================================
app = FastAPI(
title=f"{AGENT_CONFIG.get('name', 'Agent')} API",
description=f"Deployed agent for {AGENT_CONFIG.get('business_context', {}).get('business_name', 'Business')}",
version="1.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================
# Request/Response Models
# ============================================
class ChatRequest(BaseModel):
message: str = Field(..., description="User message to the agent")
session_id: Optional[str] = Field(default="default", description="Session ID for conversation tracking")
class ChatResponse(BaseModel):
status: str
agent_name: Optional[str] = None # May be missing in config
user_message: str
agent_response: str
tools_available: List[str]
timestamp: float
# ============================================
# Dynamic Tool Recreation
# ============================================
def recreate_tools_from_config(domain: str, business_name: str):
"""
Recreate tools based on domain.
This mirrors the DynamicToolFactory logic from agent_architect.py
"""
if domain == "pharmacy":
@function_tool
async def manage_prescription(action: str, prescription_id: str = None, patient_id: str = None, medication: str = None) -> dict:
"""Manage prescriptions - check, refill, or create"""
from datetime import datetime
return {"prescription_id": prescription_id or f"RX-{datetime.now().strftime('%Y%m%d%H%M')}",
"action": action, "status": "Processed", "refills": 3}
@function_tool
async def check_drug_inventory(medication_name: str) -> dict:
"""Check medication stock and expiry"""
return {"medication": medication_name, "in_stock": True, "quantity": 250, "expiry": "2026-06-15"}
@function_tool
async def get_patient_info(patient_id: str) -> dict:
"""Retrieve patient records and allergies"""
return {"patient_id": patient_id, "allergies": ["Penicillin"], "medications": ["Metformin"]}
@function_tool
def web_search(query: str) -> dict:
"""Perform a web search for current information"""
return {"query": query, "results": "Web search functionality - integrate with real API"}
return [manage_prescription, check_drug_inventory, get_patient_info, web_search]
elif domain == "ecommerce":
@function_tool
async def search_products(query: str, category: str = None) -> dict:
"""Search product catalog"""
return {"query": query, "results": [{"id": "P001", "name": query, "price": 49.99, "stock": 50}]}
@function_tool
async def track_order(order_id: str) -> dict:
"""Track order status and delivery"""
return {"order_id": order_id, "status": "In Transit", "eta": "2025-11-20", "location": "Distribution Center"}
@function_tool
async def manage_cart(action: str, product_id: str = None, quantity: int = 1) -> dict:
"""Add, remove, or view cart items"""
return {"action": action, "product_id": product_id, "cart_total": 149.99, "items": 3}
@function_tool
def web_search(query: str) -> dict:
"""Perform a web search for current information"""
return {"query": query, "results": "Web search functionality"}
return [search_products, track_order, manage_cart, web_search]
elif domain == "weather":
@function_tool
async def get_forecast(location: str, days: int = 7) -> dict:
"""Get weather forecast"""
return {"location": location, "days": days, "forecast": [{"date": "2025-12-12", "high": 22, "low": 15, "condition": "partly cloudy"}]}
@function_tool
async def severe_weather_alert(location: str) -> dict:
"""Check for severe weather alerts"""
return {"location": location, "alerts": [], "severity": "none", "preparedness_tips": ["Normal precautions"]}
@function_tool
async def historical_weather_comparison(location: str, date: str) -> dict:
"""Compare current weather to historical data"""
return {"location": location, "date": date, "current_temp": 20, "historical_avg": 18, "difference": 2, "percentile": 65}
@function_tool
def web_search(query: str) -> dict:
"""Perform a web search for current information"""
return {"query": query, "results": "Web search functionality"}
return [get_forecast, severe_weather_alert, historical_weather_comparison, web_search]
elif domain == "email_marketing":
@function_tool
async def draft_cold_email(recipient_name: str, company: str, pain_point: str, solution_offer: str) -> dict:
"""Draft a personalized cold email based on research and pain points"""
return {
"subject": f"Question regarding {company}'s {pain_point} strategy",
"body": f"Hi {recipient_name},\n\nI noticed {company} might be facing challenges with {pain_point}. Our solution for {solution_offer} has helped similar companies...\n\nBest regards,\nAgent",
"status": "drafted",
"quality_score": 0.95
}
@function_tool
async def verify_email_format(email: str) -> dict:
"""Verify if an email address is valid and formatted correctly"""
is_valid = "@" in email and "." in email.split("@")[-1]
return {"email": email, "is_valid": is_valid, "suggestion": None if is_valid else "Check format"}
@function_tool
async def send_email(to: str, subject: str, body: str, is_html: bool = True) -> dict:
"""Actually send an email using SMTP configurations from environment."""
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT", "587"))
username = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASSWORD")
from_email = os.getenv("SMTP_FROM_EMAIL", username)
if not all([host, username, password]):
return {
"status": "error",
"message": "SMTP credentials (SMTP_HOST, SMTP_USER, SMTP_PASSWORD) are not configured in environment."
}
message = EmailMessage()
message["From"] = from_email
message["To"] = to
message["Subject"] = subject
if is_html:
message.set_content(body, subtype="html")
else:
message.set_content(body)
try:
await aiosmtplib.send(
message,
hostname=host,
port=port,
username=username,
password=password,
use_tls=(port == 465),
start_tls=(port == 587),
)
return {"to": to, "subject": subject, "status": "sent", "timestamp": "2024-02-09T12:00:00"}
except Exception as e:
return {"status": "error", "message": str(e)}
@function_tool
def web_search(query: str) -> dict:
"""Perform a web search for prospect research"""
return {"query": query, "results": f"Research data for {query}"}
return [draft_cold_email, verify_email_format, send_email, web_search]
# Add more domains as needed...
else: # generic
@function_tool
async def generate_analytics(metric: str, time_range: str) -> dict:
"""Generate business analytics"""
return {"metric": metric, "time_range": time_range, "value": 12500, "trend": "+15%", "insights": f"{metric} growing"}
@function_tool
async def send_notification(recipient: str, message: str, channel: str = "email") -> dict:
"""Send notifications"""
if channel == "email":
host = os.getenv("SMTP_HOST")
if host:
# Implementation similar to send_email
return {"recipient": recipient, "message": "Notification sent via actual email", "status": "Sent"}
return {"recipient": recipient, "message": message, "channel": channel, "status": "Sent"}
@function_tool
async def send_email(to: str, subject: str, body: str, is_html: bool = True) -> dict:
"""Actually send an email using SMTP configurations from environment."""
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT", "587"))
username = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASSWORD")
from_email = os.getenv("SMTP_FROM_EMAIL", username)
if not all([host, username, password]):
return {
"status": "error",
"message": "SMTP credentials (SMTP_HOST, SMTP_USER, SMTP_PASSWORD) are not configured in environment."
}
message = EmailMessage()
message["From"] = from_email
message["To"] = to
message["Subject"] = subject
if is_html:
message.set_content(body, subtype="html")
else:
message.set_content(body)
try:
await aiosmtplib.send(
message,
hostname=host,
port=port,
username=username,
password=password,
use_tls=(port == 465),
start_tls=(port == 587),
)
return {"to": to, "subject": subject, "status": "sent", "timestamp": "2024-02-09T12:00:00"}
except Exception as e:
return {"status": "error", "message": str(e)}
@function_tool
def web_search(query: str) -> dict:
"""Perform a web search for current information"""
return {"query": query, "results": "Web search functionality"}
return [generate_analytics, send_notification, send_email, web_search]
# ============================================
# Initialize Agent
# ============================================
def initialize_agent():
"""Initialize the agent with configuration from environment"""
model = AGENT_CONFIG.get("model", "gpt-4o")
# Select appropriate API key and client
if "gemini" in model.lower():
api_key = GEMINI_API_KEY
client = AgentsAsyncOpenAI(api_key=api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
model_name = "gemini-2.0-flash-exp"
elif "grok" in model.lower():
api_key = GROK_API_KEY
client = AgentsAsyncOpenAI(api_key=api_key, base_url="https://api.x.ai/v1")
model_name = "grok-beta"
else:
api_key = OPENAI_API_KEY
client = AgentsAsyncOpenAI(api_key=api_key)
model_name = "gpt-4o"
if not api_key:
raise ValueError(f"API key not found for model: {model}")
MODEL = OpenAIChatCompletionsModel(model=model_name, openai_client=client)
# Recreate tools - handle both nested and flat business_context
business_context = AGENT_CONFIG.get("business_context", {})
if not isinstance(business_context, dict):
business_context = {}
domain = business_context.get("domain") or AGENT_CONFIG.get("domain", "generic")
business_name = business_context.get("business_name") or AGENT_CONFIG.get("business_name", "Business")
tools = recreate_tools_from_config(domain, business_name)
# Get agent name - try multiple possible keys
agent_name = AGENT_CONFIG.get("name") or AGENT_CONFIG.get("agent_name", "AI Agent")
# Get instructions
instructions = AGENT_CONFIG.get("instructions", "You are a helpful AI assistant.")
# Create agent
agent = Agent(
name=agent_name,
instructions=instructions,
model=MODEL,
tools=tools
)
return agent, tools
# Initialize agent on startup
AGENT_INSTANCE, AGENT_TOOLS = initialize_agent()
# ============================================
# API Endpoints
# ============================================
@app.get("/")
async def root():
"""Health check and agent info"""
# Extract tool names properly
tool_names = []
for tool in AGENT_TOOLS:
if hasattr(tool, '__name__'):
tool_names.append(tool.__name__)
elif hasattr(tool, 'name'):
tool_names.append(tool.name)
else:
# Try to extract from string representation
tool_str = str(tool)
if "name='" in tool_str:
try:
name_start = tool_str.index("name='") + 6
name_end = tool_str.index("'", name_start)
tool_names.append(tool_str[name_start:name_end])
except:
tool_names.append(str(tool)[:50]) # Truncate long strings
else:
tool_names.append(str(tool)[:50])
return {
"status": "online",
"agent_name": AGENT_CONFIG.get("name") or AGENT_CONFIG.get("agent_name") or "GenericAgent",
"agent_id": AGENT_CONFIG.get("agent_id"),
"business": AGENT_CONFIG.get("business_context", {}).get("business_name") if isinstance(AGENT_CONFIG.get("business_context"), dict) else None,
"domain": AGENT_CONFIG.get("business_context", {}).get("domain") if isinstance(AGENT_CONFIG.get("business_context"), dict) else AGENT_CONFIG.get("domain"),
"tools_count": len(AGENT_TOOLS),
"tools": tool_names,
"model": AGENT_CONFIG.get("model"),
"deployment": "Hugging Face Space"
}
@app.post("/run", response_model=ChatResponse)
async def run_agent(request: ChatRequest) -> ChatResponse:
"""
Main endpoint to interact with the agent.
This is the primary interface for users.
"""
import time
try:
# Run the agent
runner = Runner()
temp_session = SQLiteSession(":memory:")
response = await runner.run(AGENT_INSTANCE, request.message, session=temp_session)
final_output = str(response.final_output) if hasattr(response, 'final_output') else str(response)
return ChatResponse(
status="success",
agent_name=AGENT_CONFIG.get("name", "GenericAgent"),
user_message=request.message,
agent_response=final_output,
tools_available=[tool.__name__ if hasattr(tool, '__name__') else str(tool) for tool in AGENT_TOOLS],
timestamp=time.time()
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Agent execution error: {str(e)}")
@app.get("/config")
async def get_config():
"""Get agent configuration (without sensitive data)"""
safe_config = {
"agent_id": AGENT_CONFIG.get("agent_id"),
"name": AGENT_CONFIG.get("name"),
"model": AGENT_CONFIG.get("model"),
"business_context": AGENT_CONFIG.get("business_context"),
"tools_count": len(AGENT_TOOLS),
"deployment_ready": AGENT_CONFIG.get("deployment_ready")
}
return safe_config
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "agent": AGENT_CONFIG.get("name") or AGENT_CONFIG.get("agent_name")}
@app.get("/debug/config")
async def debug_config():
"""Debug endpoint to see what config is loaded (without sensitive data)"""
safe_config = {
"has_config": bool(AGENT_CONFIG),
"config_keys": list(AGENT_CONFIG.keys()) if isinstance(AGENT_CONFIG, dict) else [],
"agent_name": AGENT_CONFIG.get("name") or AGENT_CONFIG.get("agent_name"),
"agent_id": AGENT_CONFIG.get("agent_id"),
"model": AGENT_CONFIG.get("model"),
"has_business_context": "business_context" in AGENT_CONFIG,
"business_context_type": type(AGENT_CONFIG.get("business_context")).__name__,
"domain": AGENT_CONFIG.get("business_context", {}).get("domain") if isinstance(AGENT_CONFIG.get("business_context"), dict) else AGENT_CONFIG.get("domain"),
"business_name": AGENT_CONFIG.get("business_context", {}).get("business_name") if isinstance(AGENT_CONFIG.get("business_context"), dict) else None,
"tools_count_from_config": len(AGENT_CONFIG.get("tools", [])),
"tools_count_loaded": len(AGENT_TOOLS),
}
return safe_config
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)