Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import time | |
| from typing import List, Optional, Dict, Any | |
| import logging | |
| from app.models import SourcingRequest, SourcingResponse, CandidateWithScore, CandidateProfile, ScoreBreakdown | |
| from app.utils.config import Config | |
| from app.services.linkedin_search import LinkedInSearchService | |
| from app.services.scoring import ScoringService | |
| from app.services.outreach import OutreachService | |
| from pydantic import BaseModel | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="LinkedIn Sourcing Agent", | |
| description="AI-powered LinkedIn candidate sourcing and scoring system", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize services | |
| linkedin_search_service = LinkedInSearchService() | |
| scoring_service = ScoringService() | |
| outreach_service = OutreachService() | |
| class SearchRequest(BaseModel): | |
| job_description: str | |
| location: Optional[str] = None | |
| max_results: int = 10 | |
| class SearchResponse(BaseModel): | |
| candidates: List[Dict[str, Any]] | |
| total_found: int | |
| search_metadata: Dict[str, Any] | |
| class OutreachRequest(BaseModel): | |
| candidate_profiles: List[Dict[str, Any]] | |
| company_info: Dict[str, Any] | |
| job_description: str | |
| class OutreachResponse(BaseModel): | |
| messages: List[Dict[str, Any]] | |
| total_messages: int | |
| class CacheStatsResponse(BaseModel): | |
| cache_enabled: bool | |
| cache_ttl: int | |
| cache_max_size: int | |
| search_cache_size: int | |
| profile_cache_size: int | |
| query_cache_size: int | |
| search_cache_currsize: int | |
| profile_cache_currsize: int | |
| query_cache_currsize: int | |
| class SourcingRequest(BaseModel): | |
| job_description: str | |
| location: Optional[str] = None | |
| max_candidates: int = 20 | |
| batch_size: Optional[int] = 5 | |
| async def startup_event(): | |
| """Validate configuration on startup""" | |
| try: | |
| Config.validate_config() | |
| logger.info("β Configuration validated successfully") | |
| except ValueError as e: | |
| logger.error(f"β Configuration error: {e}") | |
| raise | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "LinkedIn Sourcing Agent", | |
| "version": "1.0.0", | |
| "description": "AI-powered LinkedIn candidate sourcing and scoring system", | |
| "endpoints": { | |
| "source-candidates": "/api/source-candidates", | |
| "search": "/search", | |
| "outreach": "/outreach", | |
| "cache_stats": "/cache/stats", | |
| "cache_clear": "/cache/clear", | |
| "health": "/health" | |
| } | |
| } | |
| async def health_check(): | |
| """Health check endpoint""" | |
| try: | |
| # Basic health checks | |
| health_status = { | |
| "status": "healthy", | |
| "timestamp": time.time(), | |
| "services": { | |
| "google_search": "configured" if Config.GOOGLE_API_KEY else "missing", | |
| "gemini": "configured" if Config.GEMINI_API_KEY else "missing", | |
| "linkedin_search": "operational", | |
| "outreach": "operational", | |
| "cache": "operational" | |
| }, | |
| "configuration": { | |
| "cache_enabled": Config.CACHE_ENABLED, | |
| "cache_ttl": Config.CACHE_TTL, | |
| "max_candidates": Config.MAX_CANDIDATES | |
| } | |
| } | |
| # Add cache stats if available | |
| try: | |
| cache_stats = linkedin_search_service.get_cache_stats() | |
| health_status["cache_stats"] = cache_stats | |
| except Exception as e: | |
| health_status["cache_stats"] = {"error": str(e)} | |
| return health_status | |
| except Exception as e: | |
| logger.error(f"β Health check failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") | |
| async def source_candidates(request: SourcingRequest): | |
| """ | |
| Main endpoint to source and score LinkedIn candidates | |
| """ | |
| start_time = time.time() | |
| try: | |
| logger.info(f"Starting candidate sourcing for job: {request.job_description[:100]}...") | |
| # Step 1: Search for LinkedIn profiles | |
| logger.info("π Searching LinkedIn profiles...") | |
| candidates = linkedin_search_service.search_linkedin_profiles( | |
| job_description=request.job_description, | |
| location=request.location, | |
| max_results=request.max_candidates | |
| ) | |
| if not candidates: | |
| logger.warning("No candidates found in search") | |
| return SourcingResponse( | |
| candidates=[], | |
| total_found=0, | |
| search_query=f"LinkedIn {request.job_description} {request.location or ''}", | |
| processing_time=time.time() - start_time | |
| ) | |
| logger.info(f"Found {len(candidates)} candidates") | |
| # Step 2: Score candidates | |
| logger.info("π Scoring candidates...") | |
| scored_candidates = scoring_service.score_candidates( | |
| candidates, request.job_description, batch_size= 5 | |
| ) | |
| # Step 3: Generate outreach messages for top candidates | |
| logger.info("π¬ Generating outreach messages...") | |
| candidates_with_messages = outreach_service.generate_outreach_messages( | |
| scored_candidates, | |
| request.job_description, | |
| max_messages=min(5, len(scored_candidates)) | |
| ) | |
| # Step 4: Convert to response format | |
| response_candidates = [] | |
| for candidate_data in candidates_with_messages: | |
| profile = candidate_data['profile'] | |
| score_breakdown = candidate_data['score_breakdown'] | |
| outreach_message = candidate_data.get('outreach_message', '') | |
| # Create CandidateProfile | |
| candidate_profile = CandidateProfile( | |
| name=profile.get('name', 'Unknown'), | |
| headline=profile.get('headline', ''), | |
| location=profile.get('location', ''), | |
| profile_url=profile.get('profile_url', ''), | |
| company=profile.get('company'), | |
| education=profile.get('education'), | |
| experience_summary=profile.get('experience_summary') | |
| ) | |
| # Create ScoreBreakdown | |
| score_breakdown_model = ScoreBreakdown( | |
| education_score=score_breakdown.get('education_score', 0), | |
| career_trajectory_score=score_breakdown.get('career_trajectory_score', 0), | |
| company_relevance_score=score_breakdown.get('company_relevance_score', 0), | |
| experience_match_score=score_breakdown.get('experience_match_score', 0), | |
| location_score=score_breakdown.get('location_score', 0), | |
| tenure_score=score_breakdown.get('tenure_score', 0), | |
| total_score=score_breakdown.get('total_score', 0) | |
| ) | |
| # Create CandidateWithScore | |
| candidate_with_score = CandidateWithScore( | |
| profile=candidate_profile, | |
| score_breakdown=score_breakdown_model, | |
| outreach_message=outreach_message | |
| ) | |
| response_candidates.append(candidate_with_score) | |
| processing_time = time.time() - start_time | |
| search_query = f"LinkedIn {request.job_description} {request.location or ''}" | |
| logger.info(f"β Completed sourcing in {processing_time:.2f}s. Found {len(response_candidates)} candidates.") | |
| return SourcingResponse( | |
| candidates=response_candidates, | |
| total_found=len(response_candidates), | |
| search_query=search_query, | |
| processing_time=processing_time | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in source_candidates: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error sourcing candidates: {str(e)}") | |
| async def search_linkedin_profiles(request: SearchRequest): | |
| """Search for LinkedIn profiles based on job description""" | |
| try: | |
| logger.info(f"π Received search request for: {request.job_description[:100]}...") | |
| # Validate configuration | |
| Config.validate_config() | |
| # Perform search | |
| candidates = linkedin_search_service.search_linkedin_profiles( | |
| job_description=request.job_description, | |
| location=request.location, | |
| max_results=request.max_results | |
| ) | |
| # Get cache stats for metadata | |
| cache_stats = linkedin_search_service.get_cache_stats() | |
| response = SearchResponse( | |
| candidates=candidates, | |
| total_found=len(candidates), | |
| search_metadata={ | |
| "cache_hit": cache_stats.get('search_cache_size', 0) > 0, | |
| "cache_stats": cache_stats, | |
| "search_queries_used": len(linkedin_search_service._build_multiple_search_queries( | |
| request.job_description, request.location | |
| )) | |
| } | |
| ) | |
| logger.info(f"β Search completed successfully. Found {len(candidates)} candidates.") | |
| return response | |
| except ValueError as e: | |
| logger.error(f"β Configuration error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"β Search error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") | |
| async def generate_outreach_messages(request: OutreachRequest): | |
| """Generate personalized outreach messages for candidates""" | |
| try: | |
| logger.info(f"π§ Received outreach request for {len(request.candidate_profiles)} candidates") | |
| # Validate configuration | |
| Config.validate_config() | |
| # Generate outreach messages | |
| messages = outreach_service.generate_outreach_messages( | |
| candidate_profiles=request.candidate_profiles, | |
| company_info=request.company_info, | |
| job_description=request.job_description | |
| ) | |
| response = OutreachResponse( | |
| messages=messages, | |
| total_messages=len(messages) | |
| ) | |
| logger.info(f"β Outreach generation completed. Created {len(messages)} messages.") | |
| return response | |
| except ValueError as e: | |
| logger.error(f"β Configuration error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"β Outreach generation error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Outreach generation failed: {str(e)}") | |
| async def get_cache_stats(): | |
| """Get cache statistics and usage information""" | |
| try: | |
| cache_stats = linkedin_search_service.get_cache_stats() | |
| return CacheStatsResponse(**cache_stats) | |
| except Exception as e: | |
| logger.error(f"β Error getting cache stats: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to get cache stats: {str(e)}") | |
| async def clear_cache(cache_type: str = "all"): | |
| """Clear specified cache or all caches""" | |
| try: | |
| if cache_type not in ["all", "search", "profile", "query"]: | |
| raise HTTPException(status_code=400, detail="Invalid cache type. Use 'all', 'search', 'profile', or 'query'") | |
| linkedin_search_service.clear_cache(cache_type) | |
| return {"message": f"Cache cleared successfully", "cache_type": cache_type} | |
| except Exception as e: | |
| logger.error(f"β Error clearing cache: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") | |
| async def cleanup_expired_cache(): | |
| """Clean up expired cache entries""" | |
| try: | |
| linkedin_search_service.cleanup_expired_cache() | |
| return {"message": "Expired cache entries cleaned up successfully"} | |
| except Exception as e: | |
| logger.error(f"β Error cleaning up cache: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to cleanup cache: {str(e)}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |