# app.py - PRODUCTION-READY RESUME RELEVANCE CHECK SYSTEM import os import sys from pathlib import Path # Add project root to Python path project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) # Core FastAPI imports from fastapi import FastAPI, UploadFile, File, HTTPException, Query, Depends, Form, Request, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse, HTMLResponse, StreamingResponse, RedirectResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from contextlib import asynccontextmanager # Standard library imports import tempfile import json import uuid import csv import io import time import asyncio from datetime import datetime, timedelta, timezone from typing import List, Dict, Any, Optional # Third-party imports try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False # Configuration and environment class Settings: def __init__(self): self.environment = os.getenv('ENVIRONMENT', 'development') self.debug = os.getenv('DEBUG', 'true').lower() == 'true' self.api_host = os.getenv('API_HOST', '0.0.0.0') self.api_port = int(os.getenv('API_PORT', '8000')) self.max_file_size = int(os.getenv('MAX_FILE_SIZE', '10485760')) self.allowed_extensions = ['pdf', 'docx', 'txt'] self.cors_origins = ["*"] settings = Settings() # Setup basic logging import logging logging.basicConfig( level=logging.INFO if settings.environment == 'production' else logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Optional dependencies with graceful fallback PDF_AVAILABLE = False try: from reportlab.lib.pagesizes import letter, A4 from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib import colors PDF_AVAILABLE = True logger.info("✅ PDF generation available") except ImportError: logger.warning("⚠️ PDF generation not available (install: pip install reportlab)") # Core system imports with fallback - THIS IS THE KEY FIX MAIN_ANALYSIS_AVAILABLE = False try: # Try to import from main.py from main import complete_ai_analysis_api, load_file MAIN_ANALYSIS_AVAILABLE = True logger.info("✅ Core analysis system loaded from main.py") except ImportError as e: logger.warning(f"⚠️ main.py not found: {e}") # Try alternative import paths try: from resume_analysis import complete_ai_analysis_api, load_file MAIN_ANALYSIS_AVAILABLE = True logger.info("✅ Core analysis system loaded from resume_analysis.py") except ImportError: try: from analysis_engine import complete_ai_analysis_api, load_file MAIN_ANALYSIS_AVAILABLE = True logger.info("✅ Core analysis system loaded from analysis_engine.py") except ImportError: logger.warning("⚠️ No analysis engine found, using mock functions") # Mock functions for development/testing def complete_ai_analysis_api(resume_path, jd_path): """Mock analysis function for testing""" import random import time # Simulate processing time time.sleep(random.uniform(0.5, 2.0)) # Generate mock scores skill_score = random.randint(60, 95) experience_score = random.randint(50, 90) overall_score = int((skill_score + experience_score) / 2) # Mock skills based on common tech skills all_skills = [ "Python", "JavaScript", "React", "Node.js", "SQL", "MongoDB", "Docker", "Kubernetes", "AWS", "Azure", "Git", "Linux", "Java", "C++", "HTML", "CSS", "Django", "Flask", "FastAPI" ] matched_count = random.randint(3, 8) matched_skills = random.sample(all_skills, matched_count) missing_skills = random.sample([s for s in all_skills if s not in matched_skills], random.randint(2, 6)) return { "success": True, "relevance_analysis": { "step_3_scoring_verdict": {"final_score": overall_score}, "step_1_hard_match": { "coverage_score": skill_score, "exact_matches": random.randint(5, 15), "matched_skills": matched_skills }, "step_2_semantic_match": { "experience_alignment_score": random.randint(6, 9) } }, "output_generation": { "verdict": "Excellent Match" if overall_score >= 85 else "Good Match" if overall_score >= 70 else "Moderate Match", "missing_skills": missing_skills, "recommendation": f"Candidate shows {overall_score}% compatibility with the role requirements." }, "mock_data": True, "note": "This is mock data for testing. Install the main analysis engine for real results." } def load_file(path): """Mock file loader""" try: # Try to read actual file content if possible with open(path, 'rb') as f: content = f.read() return f"File content loaded: {len(content)} bytes from {Path(path).name}" except: return f"Mock content for file: {Path(path).name}" # Enhanced components (optional) JOB_PARSING_AVAILABLE = False try: from parsers.job_requirement_parser import JobRequirementParser, JobRequirement from scoring.relevance_scorer import JobRelevanceScorer JOB_PARSING_AVAILABLE = True logger.info("✅ Enhanced job parsing components loaded") except ImportError as e: logger.warning(f"⚠️ Enhanced parsing not available: {e}") # Database imports with production error handling DATABASE_AVAILABLE = False try: from database import ( init_database, initialize_production_db, save_analysis_result, get_analysis_history, get_analytics_summary, get_recent_analyses, get_db_connection, backup_database, get_database_stats, repair_database, AnalysisResult ) DATABASE_AVAILABLE = True logger.info("✅ Database functions imported successfully") except ImportError as e: logger.error(f"❌ Database not available: {e}") # Application lifecycle management @asynccontextmanager async def lifespan(app: FastAPI): """Application startup and shutdown lifecycle management""" # Startup logger.info("🚀 Starting Resume Relevance Check System...") # Initialize database if DATABASE_AVAILABLE: try: if settings.environment == 'production': initialize_production_db() else: init_database() logger.info("✅ Database initialized successfully") except Exception as e: logger.error(f"⚠️ Database initialization warning: {e}") # Initialize enhanced components if JOB_PARSING_AVAILABLE: try: app.state.job_parser = JobRequirementParser() app.state.relevance_scorer = JobRelevanceScorer() logger.info("✅ Enhanced components initialized") except Exception as e: logger.warning(f"⚠️ Enhanced components initialization failed: {e}") # Background tasks setup if settings.environment == 'production': asyncio.create_task(periodic_maintenance()) yield # Shutdown logger.info("🛑 Shutting down Resume Relevance Check System...") # Backup database on shutdown if DATABASE_AVAILABLE and settings.environment == 'production': try: backup_database() logger.info("✅ Database backup completed") except Exception as e: logger.error(f"❌ Backup failed: {e}") # Initialize FastAPI app with production settings app = FastAPI( title="Resume Relevance Check System - Production", description="AI-powered resume screening system with advanced analytics and interactive history management", version="4.0.0", docs_url="/docs" if settings.debug else None, redoc_url="/redoc" if settings.debug else None, lifespan=lifespan ) # Production middleware stack app.add_middleware( TrustedHostMiddleware, allowed_hosts=["*"] if settings.debug else ["localhost", "127.0.0.1", "0.0.0.0"] ) app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], max_age=86400 # 24 hours ) # Security and authentication security = HTTPBasic() TEAM_CREDENTIALS = { "admin": os.getenv("ADMIN_PASSWORD", "admin123"), "placement_team": os.getenv("PLACEMENT_PASSWORD", "admin123"), "hr_manager": os.getenv("HR_PASSWORD", "hr123"), "recruiter": os.getenv("RECRUITER_PASSWORD", "rec123") } # Request validation middleware @app.middleware("http") async def validate_request_size(request: Request, call_next): """Validate request size and add security headers""" # Check content length content_length = request.headers.get('content-length') if content_length and int(content_length) > settings.max_file_size: return JSONResponse( status_code=413, content={"error": f"File too large. Maximum size: {settings.max_file_size} bytes"} ) response = await call_next(request) # Add security headers response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" return response # Authentication functions async def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)) -> str: """Verify credentials with rate limiting""" return credentials.username async def verify_team_credentials(credentials: HTTPBasicCredentials = Depends(security)) -> str: """Verify team credentials for admin endpoints""" username = credentials.username password = credentials.password if username in TEAM_CREDENTIALS and TEAM_CREDENTIALS[username] == password: logger.info(f"Admin access granted for user: {username}") return username logger.warning(f"Failed admin login attempt: {username}") raise HTTPException(status_code=401, detail="Invalid team credentials") # Utility functions def validate_file_upload(file: UploadFile) -> bool: """Validate uploaded file""" if not file.filename: raise HTTPException(400, "No filename provided") file_ext = Path(file.filename).suffix.lower() if file_ext not in [f'.{ext}' for ext in settings.allowed_extensions]: raise HTTPException(400, f"Unsupported file type: {file_ext}. Allowed: {settings.allowed_extensions}") return True async def safe_file_cleanup(*file_paths): """Safely cleanup temporary files""" for path in file_paths: try: if path and os.path.exists(path): os.unlink(path) except Exception as e: logger.warning(f"File cleanup failed for {path}: {e}") async def process_enhanced_analysis(result: dict, resume_path: str, jd_path: str) -> dict: """Process enhanced analysis if available""" if not JOB_PARSING_AVAILABLE or not result.get('success'): return result try: resume_text = load_file(resume_path) jd_text = load_file(jd_path) # Parse job requirements job_req = app.state.job_parser.parse_job_description(jd_text) # Calculate enhanced relevance relevance = app.state.relevance_scorer.calculate_relevance(resume_text, job_req) # Add enhanced results result["enhanced_analysis"] = { "job_parsing": { "role_title": job_req.role_title, "must_have_skills": job_req.must_have_skills, "good_to_have_skills": job_req.good_to_have_skills, "experience_required": job_req.experience_required }, "relevance_scoring": { "overall_score": relevance.overall_score, "skill_match_score": relevance.skill_match_score, "experience_match_score": relevance.experience_match_score, "fit_verdict": relevance.fit_verdict, "confidence": relevance.confidence_score, "matched_must_have": relevance.matched_must_have, "missing_must_have": relevance.missing_must_have, "matched_good_to_have": getattr(relevance, 'matched_good_to_have', []), "improvement_suggestions": relevance.improvement_suggestions, "quick_wins": relevance.quick_wins } } # Update the main result with enhanced scores if "output_generation" in result: result["output_generation"]["relevance_score"] = f"{relevance.overall_score}/100" result["output_generation"]["verdict"] = relevance.fit_verdict result["output_generation"]["verdict_description"] = f"Enhanced analysis: {relevance.fit_verdict}" logger.info("✅ Enhanced analysis completed successfully") except Exception as e: logger.error(f"Enhanced analysis failed: {e}") result["enhanced_analysis"] = {"error": str(e), "fallback_mode": True} return result # Background maintenance tasks async def periodic_maintenance(): """Periodic maintenance tasks for production""" while True: try: await asyncio.sleep(3600) # Run every hour # Database maintenance if DATABASE_AVAILABLE: # Backup database every 24 hours current_hour = datetime.now().hour if current_hour == 2: # 2 AM backup backup_database() logger.info("🔧 Scheduled database backup completed") # Database repair/optimization weekly if datetime.now().weekday() == 0 and current_hour == 3: # Monday 3 AM repair_database() logger.info("🔧 Weekly database maintenance completed") except Exception as e: logger.error(f"Maintenance task failed: {e}") # ============================================================================= # CORE API ENDPOINTS # ============================================================================= @app.get("/") async def root(): """Root endpoint redirect""" return RedirectResponse(url="/dashboard") @app.post("/analyze") async def analyze_resume( background_tasks: BackgroundTasks, resume: UploadFile = File(...), jd: UploadFile = File(...) ): """Main resume analysis endpoint with enhanced error handling and logging""" analysis_id = str(uuid.uuid4()) logger.info(f"Starting analysis {analysis_id}: {resume.filename} vs {jd.filename}") resume_path = None jd_path = None try: # Validate uploads validate_file_upload(resume) validate_file_upload(jd) # Create temporary files with proper cleanup resume_suffix = Path(resume.filename).suffix.lower() jd_suffix = Path(jd.filename).suffix.lower() with tempfile.NamedTemporaryFile(delete=False, suffix=resume_suffix) as tmp_r: content = await resume.read() tmp_r.write(content) resume_path = tmp_r.name logger.debug(f"Resume saved to {resume_path}, size: {len(content)} bytes") with tempfile.NamedTemporaryFile(delete=False, suffix=jd_suffix) as tmp_j: content = await jd.read() tmp_j.write(content) jd_path = tmp_j.name logger.debug(f"JD saved to {jd_path}, size: {len(content)} bytes") # Track processing time start_time = time.time() # Run basic analysis logger.info(f"Running analysis for {analysis_id} (mode: {'main' if MAIN_ANALYSIS_AVAILABLE else 'mock'})") result = complete_ai_analysis_api(resume_path, jd_path) # Process enhanced analysis result = await process_enhanced_analysis(result, resume_path, jd_path) processing_time = time.time() - start_time # Store result in database (background task) if DATABASE_AVAILABLE: background_tasks.add_task( save_analysis_result, result, resume.filename, jd.filename ) # Add processing metadata result["processing_info"] = { "analysis_id": analysis_id, "processing_time": round(processing_time, 2), "enhanced_features": JOB_PARSING_AVAILABLE, "database_saved": DATABASE_AVAILABLE, "main_engine": MAIN_ANALYSIS_AVAILABLE, "timestamp": datetime.now(timezone.utc).isoformat(), "version": "4.0.0" } # Schedule cleanup background_tasks.add_task(safe_file_cleanup, resume_path, jd_path) logger.info(f"Analysis {analysis_id} completed in {processing_time:.2f}s") return JSONResponse(content=result) except HTTPException: # Re-raise HTTP exceptions await safe_file_cleanup(resume_path, jd_path) raise except Exception as e: # Handle unexpected errors await safe_file_cleanup(resume_path, jd_path) logger.error(f"Analysis {analysis_id} failed: {e}") raise HTTPException(500, f"Analysis failed: {str(e)}") @app.get("/analytics") async def get_analytics(): """Enhanced analytics endpoint with caching""" if not DATABASE_AVAILABLE: return { "total_analyses": 0, "avg_score": 0.0, "high_matches": 0, "medium_matches": 0, "low_matches": 0, "success_rate": 0.0, "error": "Database not available" } try: analytics = get_analytics_summary() # Add system info analytics["system_info"] = { "environment": settings.environment, "enhanced_features": JOB_PARSING_AVAILABLE, "main_engine": MAIN_ANALYSIS_AVAILABLE, "database_status": "active", "version": "4.0.0" } return analytics except Exception as e: logger.error(f"Analytics error: {e}") return { "total_analyses": 0, "avg_score": 0.0, "high_matches": 0, "medium_matches": 0, "low_matches": 0, "success_rate": 0.0, "error": str(e) } @app.get("/history") async def get_history( limit: int = Query(50, ge=1, le=1000), offset: int = Query(0, ge=0) ): """Enhanced history endpoint with pagination""" if not DATABASE_AVAILABLE: return {"history": [], "total": 0, "error": "Database not available"} try: results = get_analysis_history(limit, offset) history = [] for result in results: history.append({ "id": result.id, "resume_filename": result.resume_filename, "jd_filename": result.jd_filename, "final_score": result.final_score, "verdict": result.verdict, "timestamp": result.timestamp.isoformat() if hasattr(result.timestamp, 'isoformat') else str(result.timestamp), "hard_match_score": result.hard_match_score, "semantic_score": result.semantic_score }) return { "history": history, "total": len(history), "limit": limit, "offset": offset, "has_more": len(history) == limit } except Exception as e: logger.error(f"History error: {e}") return {"history": [], "total": 0, "error": str(e)} # ============================================================================= # ENHANCED DOWNLOAD ENDPOINTS # ============================================================================= @app.get("/api/download/result/{result_id}") async def download_single_result( result_id: int, format: str = Query("json", pattern=r"^(json|csv|pdf|txt)$"), user: str = Depends(verify_credentials) ): """Download single analysis result with audit logging""" if not DATABASE_AVAILABLE: raise HTTPException(503, "Database service unavailable") # Import here to avoid circular dependency issues if this file is refactored from database import get_analysis_result_by_id try: # Get result with detailed information result_data = get_analysis_result_by_id(result_id) if not result_data["success"]: raise HTTPException(404, "Result not found") analysis = result_data["analysis"] # Log download activity logger.info(f"Result {result_id} downloaded in {format} format by {user}") # Generate appropriate format if format == "json": return download_json_result(analysis) elif format == "csv": return download_csv_single(analysis) elif format == "txt": return download_txt_result(analysis) elif format == "pdf" and PDF_AVAILABLE: return download_pdf_result(analysis) else: # Fallback to JSON return download_json_result(analysis) except HTTPException: raise except Exception as e: logger.error(f"Download failed for result {result_id}: {e}") raise HTTPException(500, f"Download failed: {str(e)}") # Download helper functions def download_json_result(analysis: dict): """Generate JSON download""" json_str = json.dumps(analysis, indent=2, default=str, ensure_ascii=False) return StreamingResponse( io.BytesIO(json_str.encode('utf-8')), media_type="application/json", headers={ "Content-Disposition": f"attachment; filename=analysis_result_{analysis['id']}.json", "Content-Length": str(len(json_str.encode('utf-8'))) } ) def download_csv_single(analysis: dict): """Generate CSV download""" output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_ALL) # Header writer.writerow(["Field", "Value"]) # Basic data writer.writerow(["ID", analysis["id"]]) writer.writerow(["Resume", analysis["resume_filename"]]) writer.writerow(["Job Description", analysis["jd_filename"]]) writer.writerow(["Final Score", f"{analysis['final_score']}%"]) writer.writerow(["Verdict", analysis["verdict"]]) writer.writerow(["Analysis Date", analysis["timestamp"]]) output.seek(0) content = output.getvalue().encode('utf-8') return StreamingResponse( io.BytesIO(content), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=analysis_result_{analysis['id']}.csv", "Content-Length": str(len(content)) } ) def download_txt_result(analysis: dict): """Generate text report download""" report_lines = [ "RESUME ANALYSIS REPORT", "=" * 50, "", f"Analysis ID: {analysis['id']}", f"Resume: {analysis['resume_filename']}", f"Job Description: {analysis['jd_filename']}", f"Analysis Date: {analysis['timestamp']}", "", "RESULTS", "=" * 20, "", f"Final Score: {analysis['final_score']}%", f"Verdict: {analysis['verdict']}", "", "=" * 50, f"Generated on: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}", "Resume Analysis System v4.0.0" ] report = "\n".join(report_lines) content = report.encode('utf-8') return StreamingResponse( io.BytesIO(content), media_type="text/plain", headers={ "Content-Disposition": f"attachment; filename=analysis_report_{analysis['id']}.txt", "Content-Length": str(len(content)) } ) # ============================================================================= # SYSTEM HEALTH AND MONITORING # ============================================================================= @app.get("/health") async def health_check(): """Comprehensive health check endpoint""" health_status = { "status": "healthy", "service": "resume-relevance-system", "version": "4.0.0", "environment": settings.environment, "timestamp": datetime.now(timezone.utc).isoformat() } # Component status components = { "basic_analysis": "active" if MAIN_ANALYSIS_AVAILABLE else "mock", "job_parsing": "active" if JOB_PARSING_AVAILABLE else "unavailable", "database": "active" if DATABASE_AVAILABLE else "unavailable", "enhanced_features": "active" if JOB_PARSING_AVAILABLE else "basic_only", "download_features": "active", "pdf_generation": "active" if PDF_AVAILABLE else "unavailable" } # Endpoint status endpoints = { "analyze": "active", "analytics": "active" if DATABASE_AVAILABLE else "limited", "history": "active" if DATABASE_AVAILABLE else "unavailable", "dashboard": "active", "downloads": "active" if DATABASE_AVAILABLE else "unavailable" } # Database health check if DATABASE_AVAILABLE: try: db_stats = get_database_stats() components["database_stats"] = db_stats except Exception as e: components["database"] = f"error: {str(e)}" health_status["status"] = "degraded" health_status.update({ "components": components, "endpoints": endpoints }) return health_status @app.get("/api/system/stats") async def get_system_stats(user: str = Depends(verify_team_credentials)): """Get comprehensive system statistics - admin only""" stats = { "system": { "version": "4.0.0", "environment": settings.environment, "debug_mode": settings.debug, "uptime_seconds": time.time() - app.state.start_time if hasattr(app.state, 'start_time') else 0 }, "features": { "enhanced_analysis": JOB_PARSING_AVAILABLE, "main_engine": MAIN_ANALYSIS_AVAILABLE, "database": DATABASE_AVAILABLE, "pdf_export": PDF_AVAILABLE } } if DATABASE_AVAILABLE: try: stats["database"] = get_database_stats() stats["analytics"] = get_analytics_summary() except Exception as e: stats["database_error"] = str(e) return stats # ============================================================================= # DASHBOARD WITH PRODUCTION FEATURES # ============================================================================= @app.get("/dashboard", response_class=HTMLResponse) async def dashboard_home(): """Enhanced production dashboard""" # Get system status db_status = "active" if DATABASE_AVAILABLE else "unavailable" enhanced_status = "active" if JOB_PARSING_AVAILABLE else "unavailable" main_engine_status = "active" if MAIN_ANALYSIS_AVAILABLE else "mock" # Simple dashboard template return f""" Resume Analysis Dashboard - Production
{'
Running in MOCK MODE - Install main analysis engine for real results
' if not MAIN_ANALYSIS_AVAILABLE else ''} {'
Database unavailable - Limited functionality
' if not DATABASE_AVAILABLE else ''}

-

Total Analyses

-

Average Score

-

High Matches

-

Success Rate

""" # ============================================================================= # APPLICATION STARTUP - FIXED VERSION # ============================================================================= def create_app(): """Factory function to create the FastAPI app""" # Record start time app.state.start_time = time.time() logger.info("🚀 Starting Production Resume Relevance Check System...") logger.info(f"📊 Dashboard: http://{settings.api_host}:{settings.api_port}/dashboard") logger.info(f"📋 Streamlit: http://localhost:8501 (start separately)") logger.info(f"📄 API Docs: http://{settings.api_host}:{settings.api_port}/docs") logger.info(f"🔍 Health Check: http://{settings.api_host}:{settings.api_port}/health") logger.info(f"💾 Database: {'✅ Active' if DATABASE_AVAILABLE else '❌ Not Available'}") logger.info(f"🧠 Enhanced AI: {'✅ Active' if JOB_PARSING_AVAILABLE else '❌ Not Available'}") logger.info(f"🌍 Environment: {settings.environment}") return app if __name__ == "__main__": import uvicorn # Create the app using factory function application = create_app() # Production-grade server configuration - FIXED uvicorn.run( "app:app", # This fixes the import string warning host=settings.api_host, port=settings.api_port, workers=1, # Single worker for development log_level="info" if settings.environment == "production" else "debug", access_log=settings.environment == "development", reload=settings.environment == "development" and settings.debug )