from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware import time from typing import List, Optional, Dict, Any import logging from app.models import SourcingRequest, SourcingResponse, CandidateWithScore, CandidateProfile, ScoreBreakdown from app.utils.config import Config from app.services.linkedin_search import LinkedInSearchService from app.services.scoring import ScoringService from app.services.outreach import OutreachService from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI( title="LinkedIn Sourcing Agent", description="AI-powered LinkedIn candidate sourcing and scoring system", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize services linkedin_search_service = LinkedInSearchService() scoring_service = ScoringService() outreach_service = OutreachService() class SearchRequest(BaseModel): job_description: str location: Optional[str] = None max_results: int = 10 class SearchResponse(BaseModel): candidates: List[Dict[str, Any]] total_found: int search_metadata: Dict[str, Any] class OutreachRequest(BaseModel): candidate_profiles: List[Dict[str, Any]] company_info: Dict[str, Any] job_description: str class OutreachResponse(BaseModel): messages: List[Dict[str, Any]] total_messages: int class CacheStatsResponse(BaseModel): cache_enabled: bool cache_ttl: int cache_max_size: int search_cache_size: int profile_cache_size: int query_cache_size: int search_cache_currsize: int profile_cache_currsize: int query_cache_currsize: int class SourcingRequest(BaseModel): job_description: str location: Optional[str] = None max_candidates: int = 20 batch_size: Optional[int] = 5 @app.on_event("startup") async def startup_event(): """Validate configuration on startup""" try: Config.validate_config() logger.info("✅ Configuration validated successfully") except ValueError as e: logger.error(f"❌ Configuration error: {e}") raise @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "LinkedIn Sourcing Agent", "version": "1.0.0", "description": "AI-powered LinkedIn candidate sourcing and scoring system", "endpoints": { "source-candidates": "/api/source-candidates", "search": "/search", "outreach": "/outreach", "cache_stats": "/cache/stats", "cache_clear": "/cache/clear", "health": "/health" } } @app.get("/health") async def health_check(): """Health check endpoint""" try: # Basic health checks health_status = { "status": "healthy", "timestamp": time.time(), "services": { "google_search": "configured" if Config.GOOGLE_API_KEY else "missing", "gemini": "configured" if Config.GEMINI_API_KEY else "missing", "linkedin_search": "operational", "outreach": "operational", "cache": "operational" }, "configuration": { "cache_enabled": Config.CACHE_ENABLED, "cache_ttl": Config.CACHE_TTL, "max_candidates": Config.MAX_CANDIDATES } } # Add cache stats if available try: cache_stats = linkedin_search_service.get_cache_stats() health_status["cache_stats"] = cache_stats except Exception as e: health_status["cache_stats"] = {"error": str(e)} return health_status except Exception as e: logger.error(f"❌ Health check failed: {str(e)}") raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") @app.post("/api/source-candidates", response_model=SourcingResponse) async def source_candidates(request: SourcingRequest): """ Main endpoint to source and score LinkedIn candidates """ start_time = time.time() try: logger.info(f"Starting candidate sourcing for job: {request.job_description[:100]}...") # Step 1: Search for LinkedIn profiles logger.info("🔍 Searching LinkedIn profiles...") candidates = linkedin_search_service.search_linkedin_profiles( job_description=request.job_description, location=request.location, max_results=request.max_candidates ) if not candidates: logger.warning("No candidates found in search") return SourcingResponse( candidates=[], total_found=0, search_query=f"LinkedIn {request.job_description} {request.location or ''}", processing_time=time.time() - start_time ) logger.info(f"Found {len(candidates)} candidates") # Step 2: Score candidates logger.info("📊 Scoring candidates...") scored_candidates = scoring_service.score_candidates( candidates, request.job_description, batch_size= 5 ) # Step 3: Generate outreach messages for top candidates logger.info("💬 Generating outreach messages...") candidates_with_messages = outreach_service.generate_outreach_messages( scored_candidates, request.job_description, max_messages=min(5, len(scored_candidates)) ) # Step 4: Convert to response format response_candidates = [] for candidate_data in candidates_with_messages: profile = candidate_data['profile'] score_breakdown = candidate_data['score_breakdown'] outreach_message = candidate_data.get('outreach_message', '') # Create CandidateProfile candidate_profile = CandidateProfile( name=profile.get('name', 'Unknown'), headline=profile.get('headline', ''), location=profile.get('location', ''), profile_url=profile.get('profile_url', ''), company=profile.get('company'), education=profile.get('education'), experience_summary=profile.get('experience_summary') ) # Create ScoreBreakdown score_breakdown_model = ScoreBreakdown( education_score=score_breakdown.get('education_score', 0), career_trajectory_score=score_breakdown.get('career_trajectory_score', 0), company_relevance_score=score_breakdown.get('company_relevance_score', 0), experience_match_score=score_breakdown.get('experience_match_score', 0), location_score=score_breakdown.get('location_score', 0), tenure_score=score_breakdown.get('tenure_score', 0), total_score=score_breakdown.get('total_score', 0) ) # Create CandidateWithScore candidate_with_score = CandidateWithScore( profile=candidate_profile, score_breakdown=score_breakdown_model, outreach_message=outreach_message ) response_candidates.append(candidate_with_score) processing_time = time.time() - start_time search_query = f"LinkedIn {request.job_description} {request.location or ''}" logger.info(f"✅ Completed sourcing in {processing_time:.2f}s. Found {len(response_candidates)} candidates.") return SourcingResponse( candidates=response_candidates, total_found=len(response_candidates), search_query=search_query, processing_time=processing_time ) except Exception as e: logger.error(f"Error in source_candidates: {str(e)}") raise HTTPException(status_code=500, detail=f"Error sourcing candidates: {str(e)}") @app.post("/search", response_model=SearchResponse) async def search_linkedin_profiles(request: SearchRequest): """Search for LinkedIn profiles based on job description""" try: logger.info(f"🔍 Received search request for: {request.job_description[:100]}...") # Validate configuration Config.validate_config() # Perform search candidates = linkedin_search_service.search_linkedin_profiles( job_description=request.job_description, location=request.location, max_results=request.max_results ) # Get cache stats for metadata cache_stats = linkedin_search_service.get_cache_stats() response = SearchResponse( candidates=candidates, total_found=len(candidates), search_metadata={ "cache_hit": cache_stats.get('search_cache_size', 0) > 0, "cache_stats": cache_stats, "search_queries_used": len(linkedin_search_service._build_multiple_search_queries( request.job_description, request.location )) } ) logger.info(f"✅ Search completed successfully. Found {len(candidates)} candidates.") return response except ValueError as e: logger.error(f"❌ Configuration error: {str(e)}") raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}") except Exception as e: logger.error(f"❌ Search error: {str(e)}") raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") @app.post("/outreach", response_model=OutreachResponse) async def generate_outreach_messages(request: OutreachRequest): """Generate personalized outreach messages for candidates""" try: logger.info(f"📧 Received outreach request for {len(request.candidate_profiles)} candidates") # Validate configuration Config.validate_config() # Generate outreach messages messages = outreach_service.generate_outreach_messages( candidate_profiles=request.candidate_profiles, company_info=request.company_info, job_description=request.job_description ) response = OutreachResponse( messages=messages, total_messages=len(messages) ) logger.info(f"✅ Outreach generation completed. Created {len(messages)} messages.") return response except ValueError as e: logger.error(f"❌ Configuration error: {str(e)}") raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}") except Exception as e: logger.error(f"❌ Outreach generation error: {str(e)}") raise HTTPException(status_code=500, detail=f"Outreach generation failed: {str(e)}") @app.get("/cache/stats", response_model=CacheStatsResponse) async def get_cache_stats(): """Get cache statistics and usage information""" try: cache_stats = linkedin_search_service.get_cache_stats() return CacheStatsResponse(**cache_stats) except Exception as e: logger.error(f"❌ Error getting cache stats: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get cache stats: {str(e)}") @app.delete("/cache/clear") async def clear_cache(cache_type: str = "all"): """Clear specified cache or all caches""" try: if cache_type not in ["all", "search", "profile", "query"]: raise HTTPException(status_code=400, detail="Invalid cache type. Use 'all', 'search', 'profile', or 'query'") linkedin_search_service.clear_cache(cache_type) return {"message": f"Cache cleared successfully", "cache_type": cache_type} except Exception as e: logger.error(f"❌ Error clearing cache: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") @app.post("/cache/cleanup") async def cleanup_expired_cache(): """Clean up expired cache entries""" try: linkedin_search_service.cleanup_expired_cache() return {"message": "Expired cache entries cleaned up successfully"} except Exception as e: logger.error(f"❌ Error cleaning up cache: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to cleanup cache: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)