togitoon's picture
Implement new form
d2cd8c8
raw
history blame
16.5 kB
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import Optional
from contextlib import asynccontextmanager
import re
import json
import os
import logging
# Import refactored services
from services.lead_agent import lead_agent
from services.scheduler import daily_scheduler
from database.supabase_manager import db_manager
from config.settings import settings
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Pydantic models for API requests
class SavePreferencesRequest(BaseModel):
email: str
search_criteria: str
system_prompt: str
preferences: str
class GetPreferencesRequest(BaseModel):
email: str
class UpdateScheduleRequest(BaseModel):
email: str
enabled: bool
class GenerateLeadReportRequest(BaseModel):
email: str
search_criteria: str
system_prompt: str
preferences: str
def validate_email(email):
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def parse_rss_urls(rss_text):
"""Parse RSS URLs from text input"""
urls = []
for line in rss_text.strip().split('\n'):
line = line.strip()
if line and (line.startswith('http://') or line.startswith('https://')):
urls.append(line)
return urls
def create_enhanced_prompt(system_prompt, search_criteria, preferences, email):
"""Create enhanced prompt with all user inputs for autonomous LinkedIn lead generation"""
# Parse search keywords from the input text
keywords = []
for line in search_criteria.strip().split('\n'):
line = line.strip()
if line:
keywords.append(line)
keywords_str = ', '.join(keywords)
enhanced_prompt = f"""DOCKER ENVIRONMENT - AUTONOMOUS LINKEDIN LEAD GENERATION
Search Keywords: {keywords_str}
{system_prompt}
User Preferences:
{preferences}
CRITICAL INSTRUCTIONS FOR DOCKER ENVIRONMENT:
- You are running in a headless Docker container
- NO user interaction is possible - operate completely autonomously
- Make all decisions independently without asking questions
- Handle any errors gracefully and continue processing
- Generate complete reports without requesting clarifications
- Proceed through the entire workflow automatically
- Limit yourself with 50 tool usages max, this is important!!
LINKEDIN LEAD GENERATION WORKFLOW:
1. Search for companies based on the provided keywords (funding news, hiring announcements, etc.)
2. For each company found, use Brave Search to find LinkedIn profiles of key decision makers:
- CEOs, Founders, CTOs
- Head of Talent, VP Recruiting, HR Directors
- Chief People Officers, Talent Acquisition Managers
3. Extract LinkedIn URLs and professional information
4. Create personalized outreach message templates for each contact
5. Compile comprehensive lead report with LinkedIn profiles and outreach messages
REQUIRED OUTPUT FORMAT:
For each lead, include:
- Company name and recent news/funding information
- Contact person's name, title, and LinkedIn URL
- Personalized outreach message template
- Industry context and recruitment challenges
- Recommended follow-up strategy
After completing the autonomous LinkedIn lead generation and crafting personalized outreach messages, send a comprehensive and well-formatted report to: {email} using the send_email tool
The report should include LinkedIn profile URLs and ready-to-use outreach message templates for each lead found."""
return enhanced_prompt
def generate_lead_report(email, rss_feeds, system_prompt, preferences):
"""Main API function to generate lead report using autonomous LinkedIn profile search and outreach message generation"""
print("\n" + "="*60)
print("🎯 STARTING LEAD GENERATION PROCESS")
print("="*60)
try:
print(f"πŸ“§ Input Email: {email}")
print(f"πŸ” Search Keywords Length: {len(rss_feeds)}")
print(f"πŸ€– System Prompt Length: {len(system_prompt)}")
print(f"βš™οΈ Preferences Length: {len(preferences)}")
# Enhanced validation with better feedback
if not email or not validate_email(email):
print("❌ Email validation failed")
return {
"success": False,
"error": "Invalid email address",
"message": "Please provide a valid email address to receive your lead intelligence report."
}
print("βœ… Email validation passed")
# Check if email is authorized
if not settings.is_email_authorized(email):
print(f"❌ Email authorization failed: {email}")
return {
"success": False,
"error": "Unauthorized email",
"message": "This email address is not authorized to use this service. Please contact the administrator."
}
print("βœ… Email authorization passed")
# Parse search keywords from rss_feeds parameter (repurposed for search criteria)
search_keywords = []
for line in rss_feeds.strip().split('\n'):
line = line.strip()
if line:
search_keywords.append(line)
print(f"🏷️ Parsed keywords: {search_keywords}")
if not search_keywords:
print("❌ No search keywords provided")
return {
"success": False,
"error": "No search criteria",
"message": "At least one search keyword is required for autonomous LinkedIn lead generation."
}
print(f"βœ… Found {len(search_keywords)} search keywords")
if not system_prompt.strip():
print("❌ System prompt validation failed")
return {
"success": False,
"error": "Missing AI prompt",
"message": "AI system prompt is required for intelligent lead analysis."
}
print("βœ… System prompt validation passed")
print("\nπŸ€– Using refactored lead agent service...")
# Use the refactored lead agent service
print("\nπŸš€ RUNNING LEAD GENERATION - This may take several minutes...")
print("⏳ Lead agent is processing... Please wait...")
# Run lead generation using the refactored service
result = lead_agent.run_lead_generation(
email=email,
search_criteria=rss_feeds,
system_prompt=system_prompt,
preferences=preferences
)
if result["success"]:
print("\nπŸŽ‰ LEAD GENERATION COMPLETED SUCCESSFULLY")
print("="*60)
return {
"success": True,
"result": result["result"],
"message": result["message"],
"email": email,
"keywords_count": len(search_keywords)
}
else:
print(f"\n❌ LEAD GENERATION FAILED: {result.get('error', 'Unknown error')}")
print("="*60)
return {
"success": False,
"error": result.get("error", "Unknown error"),
"message": result.get("message", "Lead generation failed")
}
except Exception as e:
print(f"\n❌ ERROR IN LEAD GENERATION:")
print(f"Error type: {type(e).__name__}")
print(f"Error message: {str(e)}")
# More detailed error logging
import traceback
print("Full traceback:")
traceback.print_exc()
print("="*60)
return {
"success": False,
"error": str(e),
"message": "An error occurred during LinkedIn lead generation. Please check your configuration and try again."
}
def get_server_info():
"""API function to get server information"""
return {
"success": True,
"server_info": {
"name": "AI Lead Generation API",
"version": "1.0.0",
"description": "FastAPI server for lead generation with custom UI",
"endpoints": [
"/api/generate_lead_report",
"/api/save_preferences",
"/api/get_preferences",
"/api/update_schedule",
"/api/scheduler_status"
]
}
}
def validate_configuration(email, rss_feeds, system_prompt, preferences):
"""API function to validate configuration without running the agent"""
validation_results = {
"email_valid": validate_email(email) if email else False,
"rss_valid": len(parse_rss_urls(rss_feeds)) > 0,
"prompt_valid": bool(system_prompt.strip()),
"preferences_valid": bool(preferences.strip())
}
validation_results["all_valid"] = all(validation_results.values())
return {
"success": True,
"validation": validation_results,
"rss_count": len(parse_rss_urls(rss_feeds))
}
if __name__ == "__main__":
import uvicorn
print("πŸš€ Launching AI Lead Generation API Server...")
# Define lifespan event handler to replace deprecated on_event decorators
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan event handler for startup and shutdown"""
# Startup
print("πŸ“… Starting daily scheduler...")
try:
scheduler_started = daily_scheduler.start_scheduler()
if scheduler_started:
print("βœ… Daily scheduler started successfully")
else:
print("⚠️ Daily scheduler failed to start - scheduling features may not work")
except Exception as e:
print(f"❌ Error starting scheduler: {e}")
yield
# Shutdown
print("πŸ“… Stopping daily scheduler...")
try:
daily_scheduler.stop_scheduler()
print("βœ… Daily scheduler stopped successfully")
except Exception as e:
print(f"⚠️ Error stopping scheduler: {e}")
# Create FastAPI app with lifespan handler
app = FastAPI(title="AI Lead Generation API", version="1.0.0", lifespan=lifespan)
# Add CORS middleware
print("πŸ“‘ Configuring server with CORS support...")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount the custom UI as the default root path
@app.get("/")
async def serve_root():
return FileResponse("ai-agent-ui/index.html")
# Add FastAPI routes for preferences and scheduling
@app.post("/api/save_preferences")
async def save_preferences_endpoint(request_data: dict):
"""Save user preferences"""
try:
# Validate required fields
email = request_data.get("email")
search_criteria = request_data.get("search_criteria")
system_prompt = request_data.get("system_prompt")
preferences = request_data.get("preferences")
if not all([email, search_criteria, system_prompt, preferences]):
return {"success": False, "error": "Missing required fields"}
# Check if email is authorized
if not email or not settings.is_email_authorized(str(email)):
return {"success": False, "error": "Unauthorized email address"}
result = db_manager.save_user_preferences(
email=str(email),
search_criteria=str(search_criteria),
system_prompt=str(system_prompt),
preferences=str(preferences)
)
return result
except Exception as e:
return {"success": False, "error": str(e)}
@app.post("/api/get_preferences")
async def get_preferences_endpoint(request_data: dict):
"""Get user preferences by email"""
try:
email = request_data.get("email")
if not email:
return {"success": False, "error": "Email is required"}
# Check if email is authorized
if not settings.is_email_authorized(str(email)):
return {"success": False, "error": "Unauthorized email address"}
result = db_manager.get_user_preferences(str(email))
return result
except Exception as e:
return {"success": False, "error": str(e)}
@app.post("/api/update_schedule")
async def update_schedule_endpoint(request_data: dict):
"""Update user schedule setting"""
try:
email = request_data.get("email")
enabled = request_data.get("enabled", False)
if not email:
return {"success": False, "error": "Email is required"}
# First ensure user preferences exist
prefs_result = db_manager.get_user_preferences(str(email))
if not prefs_result["success"]:
return {"success": False, "error": "Please save your preferences first before enabling scheduling"}
# Update schedule status
result = db_manager.update_schedule_status(str(email), bool(enabled))
return result
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/api/scheduler_status")
async def scheduler_status_endpoint():
"""Get scheduler status"""
try:
status = daily_scheduler.get_scheduler_status()
return {"success": True, "data": status}
except Exception as e:
return {"success": False, "error": str(e)}
@app.post("/api/generate_lead_report")
async def generate_lead_report_endpoint(request_data: dict):
"""Generate lead report"""
try:
email = request_data.get("email")
search_criteria = request_data.get("search_criteria")
system_prompt = request_data.get("system_prompt")
preferences = request_data.get("preferences")
if not all([email, search_criteria, system_prompt, preferences]):
return {"success": False, "error": "Missing required fields"}
# Call the existing generate_lead_report function
result = generate_lead_report(
email=str(email),
rss_feeds=str(search_criteria),
system_prompt=str(system_prompt),
preferences=str(preferences)
)
return result
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/api/server_info")
async def server_info_endpoint():
"""Get server information"""
try:
return get_server_info()
except Exception as e:
return {"success": False, "error": str(e)}
@app.post("/api/validate_configuration")
async def validate_configuration_endpoint(request_data: dict):
"""Validate configuration without running the agent"""
try:
email = request_data.get("email", "")
search_criteria = request_data.get("search_criteria", "")
system_prompt = request_data.get("system_prompt", "")
preferences = request_data.get("preferences", "")
result = validate_configuration(email, search_criteria, system_prompt, preferences)
return result
except Exception as e:
return {"success": False, "error": str(e)}
# Mount the entire ai-agent-ui directory to serve all static files
app.mount("/", StaticFiles(directory="ai-agent-ui", html=True), name="frontend")
print("βœ… CORS configured successfully")
print("🌐 Server will be available at: http://localhost:7860")
print("🎨 Frontend available at: http://localhost:7860/")
print("πŸ“‹ API documentation at: http://localhost:7860/docs")
print("🎯 Starting server...")
# Launch server
uvicorn.run(app, host="0.0.0.0", port=7860)