LinkedinAgent / app /main.py
Hydra-Bolt
add
3856f78
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import time
from typing import List, Optional, Dict, Any
import logging
from app.models import SourcingRequest, SourcingResponse, CandidateWithScore, CandidateProfile, ScoreBreakdown
from app.utils.config import Config
from app.services.linkedin_search import LinkedInSearchService
from app.services.scoring import ScoringService
from app.services.outreach import OutreachService
from pydantic import BaseModel
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="LinkedIn Sourcing Agent",
description="AI-powered LinkedIn candidate sourcing and scoring system",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize services
linkedin_search_service = LinkedInSearchService()
scoring_service = ScoringService()
outreach_service = OutreachService()
class SearchRequest(BaseModel):
job_description: str
location: Optional[str] = None
max_results: int = 10
class SearchResponse(BaseModel):
candidates: List[Dict[str, Any]]
total_found: int
search_metadata: Dict[str, Any]
class OutreachRequest(BaseModel):
candidate_profiles: List[Dict[str, Any]]
company_info: Dict[str, Any]
job_description: str
class OutreachResponse(BaseModel):
messages: List[Dict[str, Any]]
total_messages: int
class CacheStatsResponse(BaseModel):
cache_enabled: bool
cache_ttl: int
cache_max_size: int
search_cache_size: int
profile_cache_size: int
query_cache_size: int
search_cache_currsize: int
profile_cache_currsize: int
query_cache_currsize: int
class SourcingRequest(BaseModel):
job_description: str
location: Optional[str] = None
max_candidates: int = 20
batch_size: Optional[int] = 5
@app.on_event("startup")
async def startup_event():
"""Validate configuration on startup"""
try:
Config.validate_config()
logger.info("βœ… Configuration validated successfully")
except ValueError as e:
logger.error(f"❌ Configuration error: {e}")
raise
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "LinkedIn Sourcing Agent",
"version": "1.0.0",
"description": "AI-powered LinkedIn candidate sourcing and scoring system",
"endpoints": {
"source-candidates": "/api/source-candidates",
"search": "/search",
"outreach": "/outreach",
"cache_stats": "/cache/stats",
"cache_clear": "/cache/clear",
"health": "/health"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Basic health checks
health_status = {
"status": "healthy",
"timestamp": time.time(),
"services": {
"google_search": "configured" if Config.GOOGLE_API_KEY else "missing",
"gemini": "configured" if Config.GEMINI_API_KEY else "missing",
"linkedin_search": "operational",
"outreach": "operational",
"cache": "operational"
},
"configuration": {
"cache_enabled": Config.CACHE_ENABLED,
"cache_ttl": Config.CACHE_TTL,
"max_candidates": Config.MAX_CANDIDATES
}
}
# Add cache stats if available
try:
cache_stats = linkedin_search_service.get_cache_stats()
health_status["cache_stats"] = cache_stats
except Exception as e:
health_status["cache_stats"] = {"error": str(e)}
return health_status
except Exception as e:
logger.error(f"❌ Health check failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}")
@app.post("/api/source-candidates", response_model=SourcingResponse)
async def source_candidates(request: SourcingRequest):
"""
Main endpoint to source and score LinkedIn candidates
"""
start_time = time.time()
try:
logger.info(f"Starting candidate sourcing for job: {request.job_description[:100]}...")
# Step 1: Search for LinkedIn profiles
logger.info("πŸ” Searching LinkedIn profiles...")
candidates = linkedin_search_service.search_linkedin_profiles(
job_description=request.job_description,
location=request.location,
max_results=request.max_candidates
)
if not candidates:
logger.warning("No candidates found in search")
return SourcingResponse(
candidates=[],
total_found=0,
search_query=f"LinkedIn {request.job_description} {request.location or ''}",
processing_time=time.time() - start_time
)
logger.info(f"Found {len(candidates)} candidates")
# Step 2: Score candidates
logger.info("πŸ“Š Scoring candidates...")
scored_candidates = scoring_service.score_candidates(
candidates, request.job_description, batch_size= 5
)
# Step 3: Generate outreach messages for top candidates
logger.info("πŸ’¬ Generating outreach messages...")
candidates_with_messages = outreach_service.generate_outreach_messages(
scored_candidates,
request.job_description,
max_messages=min(5, len(scored_candidates))
)
# Step 4: Convert to response format
response_candidates = []
for candidate_data in candidates_with_messages:
profile = candidate_data['profile']
score_breakdown = candidate_data['score_breakdown']
outreach_message = candidate_data.get('outreach_message', '')
# Create CandidateProfile
candidate_profile = CandidateProfile(
name=profile.get('name', 'Unknown'),
headline=profile.get('headline', ''),
location=profile.get('location', ''),
profile_url=profile.get('profile_url', ''),
company=profile.get('company'),
education=profile.get('education'),
experience_summary=profile.get('experience_summary')
)
# Create ScoreBreakdown
score_breakdown_model = ScoreBreakdown(
education_score=score_breakdown.get('education_score', 0),
career_trajectory_score=score_breakdown.get('career_trajectory_score', 0),
company_relevance_score=score_breakdown.get('company_relevance_score', 0),
experience_match_score=score_breakdown.get('experience_match_score', 0),
location_score=score_breakdown.get('location_score', 0),
tenure_score=score_breakdown.get('tenure_score', 0),
total_score=score_breakdown.get('total_score', 0)
)
# Create CandidateWithScore
candidate_with_score = CandidateWithScore(
profile=candidate_profile,
score_breakdown=score_breakdown_model,
outreach_message=outreach_message
)
response_candidates.append(candidate_with_score)
processing_time = time.time() - start_time
search_query = f"LinkedIn {request.job_description} {request.location or ''}"
logger.info(f"βœ… Completed sourcing in {processing_time:.2f}s. Found {len(response_candidates)} candidates.")
return SourcingResponse(
candidates=response_candidates,
total_found=len(response_candidates),
search_query=search_query,
processing_time=processing_time
)
except Exception as e:
logger.error(f"Error in source_candidates: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error sourcing candidates: {str(e)}")
@app.post("/search", response_model=SearchResponse)
async def search_linkedin_profiles(request: SearchRequest):
"""Search for LinkedIn profiles based on job description"""
try:
logger.info(f"πŸ” Received search request for: {request.job_description[:100]}...")
# Validate configuration
Config.validate_config()
# Perform search
candidates = linkedin_search_service.search_linkedin_profiles(
job_description=request.job_description,
location=request.location,
max_results=request.max_results
)
# Get cache stats for metadata
cache_stats = linkedin_search_service.get_cache_stats()
response = SearchResponse(
candidates=candidates,
total_found=len(candidates),
search_metadata={
"cache_hit": cache_stats.get('search_cache_size', 0) > 0,
"cache_stats": cache_stats,
"search_queries_used": len(linkedin_search_service._build_multiple_search_queries(
request.job_description, request.location
))
}
)
logger.info(f"βœ… Search completed successfully. Found {len(candidates)} candidates.")
return response
except ValueError as e:
logger.error(f"❌ Configuration error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}")
except Exception as e:
logger.error(f"❌ Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@app.post("/outreach", response_model=OutreachResponse)
async def generate_outreach_messages(request: OutreachRequest):
"""Generate personalized outreach messages for candidates"""
try:
logger.info(f"πŸ“§ Received outreach request for {len(request.candidate_profiles)} candidates")
# Validate configuration
Config.validate_config()
# Generate outreach messages
messages = outreach_service.generate_outreach_messages(
candidate_profiles=request.candidate_profiles,
company_info=request.company_info,
job_description=request.job_description
)
response = OutreachResponse(
messages=messages,
total_messages=len(messages)
)
logger.info(f"βœ… Outreach generation completed. Created {len(messages)} messages.")
return response
except ValueError as e:
logger.error(f"❌ Configuration error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Configuration error: {str(e)}")
except Exception as e:
logger.error(f"❌ Outreach generation error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Outreach generation failed: {str(e)}")
@app.get("/cache/stats", response_model=CacheStatsResponse)
async def get_cache_stats():
"""Get cache statistics and usage information"""
try:
cache_stats = linkedin_search_service.get_cache_stats()
return CacheStatsResponse(**cache_stats)
except Exception as e:
logger.error(f"❌ Error getting cache stats: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get cache stats: {str(e)}")
@app.delete("/cache/clear")
async def clear_cache(cache_type: str = "all"):
"""Clear specified cache or all caches"""
try:
if cache_type not in ["all", "search", "profile", "query"]:
raise HTTPException(status_code=400, detail="Invalid cache type. Use 'all', 'search', 'profile', or 'query'")
linkedin_search_service.clear_cache(cache_type)
return {"message": f"Cache cleared successfully", "cache_type": cache_type}
except Exception as e:
logger.error(f"❌ Error clearing cache: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@app.post("/cache/cleanup")
async def cleanup_expired_cache():
"""Clean up expired cache entries"""
try:
linkedin_search_service.cleanup_expired_cache()
return {"message": "Expired cache entries cleaned up successfully"}
except Exception as e:
logger.error(f"❌ Error cleaning up cache: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to cleanup cache: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)