Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from contextlib import asynccontextmanager | |
| import re | |
| import json | |
| import os | |
| import logging | |
| # Import refactored services | |
| from services.lead_agent import lead_agent | |
| from services.scheduler import daily_scheduler | |
| from database.supabase_manager import db_manager | |
| from config.settings import settings | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Pydantic models for API requests | |
| class SavePreferencesRequest(BaseModel): | |
| email: str | |
| search_criteria: str | |
| system_prompt: str | |
| preferences: str | |
| class GetPreferencesRequest(BaseModel): | |
| email: str | |
| class UpdateScheduleRequest(BaseModel): | |
| email: str | |
| enabled: bool | |
| class GenerateLeadReportRequest(BaseModel): | |
| email: str | |
| search_criteria: str | |
| system_prompt: str | |
| preferences: str | |
| def validate_email(email): | |
| """Validate email format""" | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| return re.match(pattern, email) is not None | |
| def parse_rss_urls(rss_text): | |
| """Parse RSS URLs from text input""" | |
| urls = [] | |
| for line in rss_text.strip().split('\n'): | |
| line = line.strip() | |
| if line and (line.startswith('http://') or line.startswith('https://')): | |
| urls.append(line) | |
| return urls | |
| def create_enhanced_prompt(system_prompt, search_criteria, preferences, email): | |
| """Create enhanced prompt with all user inputs for autonomous LinkedIn lead generation""" | |
| # Parse search keywords from the input text | |
| keywords = [] | |
| for line in search_criteria.strip().split('\n'): | |
| line = line.strip() | |
| if line: | |
| keywords.append(line) | |
| keywords_str = ', '.join(keywords) | |
| enhanced_prompt = f"""DOCKER ENVIRONMENT - AUTONOMOUS LINKEDIN LEAD GENERATION | |
| Search Keywords: {keywords_str} | |
| {system_prompt} | |
| User Preferences: | |
| {preferences} | |
| CRITICAL INSTRUCTIONS FOR DOCKER ENVIRONMENT: | |
| - You are running in a headless Docker container | |
| - NO user interaction is possible - operate completely autonomously | |
| - Make all decisions independently without asking questions | |
| - Handle any errors gracefully and continue processing | |
| - Generate complete reports without requesting clarifications | |
| - Proceed through the entire workflow automatically | |
| - Limit yourself with 50 tool usages max, this is important!! | |
| LINKEDIN LEAD GENERATION WORKFLOW: | |
| 1. Search for companies based on the provided keywords (funding news, hiring announcements, etc.) | |
| 2. For each company found, use Brave Search to find LinkedIn profiles of key decision makers: | |
| - CEOs, Founders, CTOs | |
| - Head of Talent, VP Recruiting, HR Directors | |
| - Chief People Officers, Talent Acquisition Managers | |
| 3. Extract LinkedIn URLs and professional information | |
| 4. Create personalized outreach message templates for each contact | |
| 5. Compile comprehensive lead report with LinkedIn profiles and outreach messages | |
| REQUIRED OUTPUT FORMAT: | |
| For each lead, include: | |
| - Company name and recent news/funding information | |
| - Contact person's name, title, and LinkedIn URL | |
| - Personalized outreach message template | |
| - Industry context and recruitment challenges | |
| - Recommended follow-up strategy | |
| After completing the autonomous LinkedIn lead generation and crafting personalized outreach messages, send a comprehensive and well-formatted report to: {email} using the send_email tool | |
| The report should include LinkedIn profile URLs and ready-to-use outreach message templates for each lead found.""" | |
| return enhanced_prompt | |
| def generate_lead_report(email, rss_feeds, system_prompt, preferences): | |
| """Main API function to generate lead report using autonomous LinkedIn profile search and outreach message generation""" | |
| print("\n" + "="*60) | |
| print("π― STARTING LEAD GENERATION PROCESS") | |
| print("="*60) | |
| try: | |
| print(f"π§ Input Email: {email}") | |
| print(f"π Search Keywords Length: {len(rss_feeds)}") | |
| print(f"π€ System Prompt Length: {len(system_prompt)}") | |
| print(f"βοΈ Preferences Length: {len(preferences)}") | |
| # Enhanced validation with better feedback | |
| if not email or not validate_email(email): | |
| print("β Email validation failed") | |
| return { | |
| "success": False, | |
| "error": "Invalid email address", | |
| "message": "Please provide a valid email address to receive your lead intelligence report." | |
| } | |
| print("β Email validation passed") | |
| # Check if email is authorized | |
| if not settings.is_email_authorized(email): | |
| print(f"β Email authorization failed: {email}") | |
| return { | |
| "success": False, | |
| "error": "Unauthorized email", | |
| "message": "This email address is not authorized to use this service. Please contact the administrator." | |
| } | |
| print("β Email authorization passed") | |
| # Parse search keywords from rss_feeds parameter (repurposed for search criteria) | |
| search_keywords = [] | |
| for line in rss_feeds.strip().split('\n'): | |
| line = line.strip() | |
| if line: | |
| search_keywords.append(line) | |
| print(f"π·οΈ Parsed keywords: {search_keywords}") | |
| if not search_keywords: | |
| print("β No search keywords provided") | |
| return { | |
| "success": False, | |
| "error": "No search criteria", | |
| "message": "At least one search keyword is required for autonomous LinkedIn lead generation." | |
| } | |
| print(f"β Found {len(search_keywords)} search keywords") | |
| if not system_prompt.strip(): | |
| print("β System prompt validation failed") | |
| return { | |
| "success": False, | |
| "error": "Missing AI prompt", | |
| "message": "AI system prompt is required for intelligent lead analysis." | |
| } | |
| print("β System prompt validation passed") | |
| print("\nπ€ Using refactored lead agent service...") | |
| # Use the refactored lead agent service | |
| print("\nπ RUNNING LEAD GENERATION - This may take several minutes...") | |
| print("β³ Lead agent is processing... Please wait...") | |
| # Run lead generation using the refactored service | |
| result = lead_agent.run_lead_generation( | |
| email=email, | |
| search_criteria=rss_feeds, | |
| system_prompt=system_prompt, | |
| preferences=preferences | |
| ) | |
| if result["success"]: | |
| print("\nπ LEAD GENERATION COMPLETED SUCCESSFULLY") | |
| print("="*60) | |
| return { | |
| "success": True, | |
| "result": result["result"], | |
| "message": result["message"], | |
| "email": email, | |
| "keywords_count": len(search_keywords) | |
| } | |
| else: | |
| print(f"\nβ LEAD GENERATION FAILED: {result.get('error', 'Unknown error')}") | |
| print("="*60) | |
| return { | |
| "success": False, | |
| "error": result.get("error", "Unknown error"), | |
| "message": result.get("message", "Lead generation failed") | |
| } | |
| except Exception as e: | |
| print(f"\nβ ERROR IN LEAD GENERATION:") | |
| print(f"Error type: {type(e).__name__}") | |
| print(f"Error message: {str(e)}") | |
| # More detailed error logging | |
| import traceback | |
| print("Full traceback:") | |
| traceback.print_exc() | |
| print("="*60) | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "message": "An error occurred during LinkedIn lead generation. Please check your configuration and try again." | |
| } | |
| def get_server_info(): | |
| """API function to get server information""" | |
| return { | |
| "success": True, | |
| "server_info": { | |
| "name": "AI Lead Generation API", | |
| "version": "1.0.0", | |
| "description": "FastAPI server for lead generation with custom UI", | |
| "endpoints": [ | |
| "/api/generate_lead_report", | |
| "/api/save_preferences", | |
| "/api/get_preferences", | |
| "/api/update_schedule", | |
| "/api/scheduler_status" | |
| ] | |
| } | |
| } | |
| def validate_configuration(email, rss_feeds, system_prompt, preferences): | |
| """API function to validate configuration without running the agent""" | |
| validation_results = { | |
| "email_valid": validate_email(email) if email else False, | |
| "rss_valid": len(parse_rss_urls(rss_feeds)) > 0, | |
| "prompt_valid": bool(system_prompt.strip()), | |
| "preferences_valid": bool(preferences.strip()) | |
| } | |
| validation_results["all_valid"] = all(validation_results.values()) | |
| return { | |
| "success": True, | |
| "validation": validation_results, | |
| "rss_count": len(parse_rss_urls(rss_feeds)) | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("π Launching AI Lead Generation API Server...") | |
| # Define lifespan event handler to replace deprecated on_event decorators | |
| async def lifespan(app: FastAPI): | |
| """Lifespan event handler for startup and shutdown""" | |
| # Startup | |
| print("π Starting daily scheduler...") | |
| try: | |
| scheduler_started = daily_scheduler.start_scheduler() | |
| if scheduler_started: | |
| print("β Daily scheduler started successfully") | |
| else: | |
| print("β οΈ Daily scheduler failed to start - scheduling features may not work") | |
| except Exception as e: | |
| print(f"β Error starting scheduler: {e}") | |
| yield | |
| # Shutdown | |
| print("π Stopping daily scheduler...") | |
| try: | |
| daily_scheduler.stop_scheduler() | |
| print("β Daily scheduler stopped successfully") | |
| except Exception as e: | |
| print(f"β οΈ Error stopping scheduler: {e}") | |
| # Create FastAPI app with lifespan handler | |
| app = FastAPI(title="AI Lead Generation API", version="1.0.0", lifespan=lifespan) | |
| # Add CORS middleware | |
| print("π‘ Configuring server with CORS support...") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount the custom UI as the default root path | |
| async def serve_root(): | |
| return FileResponse("ai-agent-ui/index.html") | |
| # Add FastAPI routes for preferences and scheduling | |
| async def save_preferences_endpoint(request_data: dict): | |
| """Save user preferences""" | |
| try: | |
| # Validate required fields | |
| email = request_data.get("email") | |
| search_criteria = request_data.get("search_criteria") | |
| system_prompt = request_data.get("system_prompt") | |
| preferences = request_data.get("preferences") | |
| if not all([email, search_criteria, system_prompt, preferences]): | |
| return {"success": False, "error": "Missing required fields"} | |
| # Check if email is authorized | |
| if not email or not settings.is_email_authorized(str(email)): | |
| return {"success": False, "error": "Unauthorized email address"} | |
| result = db_manager.save_user_preferences( | |
| email=str(email), | |
| search_criteria=str(search_criteria), | |
| system_prompt=str(system_prompt), | |
| preferences=str(preferences) | |
| ) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def get_preferences_endpoint(request_data: dict): | |
| """Get user preferences by email""" | |
| try: | |
| email = request_data.get("email") | |
| if not email: | |
| return {"success": False, "error": "Email is required"} | |
| # Check if email is authorized | |
| if not settings.is_email_authorized(str(email)): | |
| return {"success": False, "error": "Unauthorized email address"} | |
| result = db_manager.get_user_preferences(str(email)) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def update_schedule_endpoint(request_data: dict): | |
| """Update user schedule setting""" | |
| try: | |
| email = request_data.get("email") | |
| enabled = request_data.get("enabled", False) | |
| if not email: | |
| return {"success": False, "error": "Email is required"} | |
| # First ensure user preferences exist | |
| prefs_result = db_manager.get_user_preferences(str(email)) | |
| if not prefs_result["success"]: | |
| return {"success": False, "error": "Please save your preferences first before enabling scheduling"} | |
| # Update schedule status | |
| result = db_manager.update_schedule_status(str(email), bool(enabled)) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def scheduler_status_endpoint(): | |
| """Get scheduler status""" | |
| try: | |
| status = daily_scheduler.get_scheduler_status() | |
| return {"success": True, "data": status} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def generate_lead_report_endpoint(request_data: dict): | |
| """Generate lead report""" | |
| try: | |
| email = request_data.get("email") | |
| search_criteria = request_data.get("search_criteria") | |
| system_prompt = request_data.get("system_prompt") | |
| preferences = request_data.get("preferences") | |
| if not all([email, search_criteria, system_prompt, preferences]): | |
| return {"success": False, "error": "Missing required fields"} | |
| # Call the existing generate_lead_report function | |
| result = generate_lead_report( | |
| email=str(email), | |
| rss_feeds=str(search_criteria), | |
| system_prompt=str(system_prompt), | |
| preferences=str(preferences) | |
| ) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def server_info_endpoint(): | |
| """Get server information""" | |
| try: | |
| return get_server_info() | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| async def validate_configuration_endpoint(request_data: dict): | |
| """Validate configuration without running the agent""" | |
| try: | |
| email = request_data.get("email", "") | |
| search_criteria = request_data.get("search_criteria", "") | |
| system_prompt = request_data.get("system_prompt", "") | |
| preferences = request_data.get("preferences", "") | |
| result = validate_configuration(email, search_criteria, system_prompt, preferences) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| # Mount the entire ai-agent-ui directory to serve all static files | |
| app.mount("/", StaticFiles(directory="ai-agent-ui", html=True), name="frontend") | |
| print("β CORS configured successfully") | |
| print("π Server will be available at: http://localhost:7860") | |
| print("π¨ Frontend available at: http://localhost:7860/") | |
| print("π API documentation at: http://localhost:7860/docs") | |
| print("π― Starting server...") | |
| # Launch server | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |