Feedback1 / src /agent.py
jyotidabass91's picture
Update src/agent.py
714e68d verified
"""
FitScore Feedback Agent - Complete System for Hugging Face Deployment
"""
import os
import uuid
import time
import requests
import json
from datetime import datetime
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from sqlalchemy.orm import Session
from .database import get_db, create_tables, CandidateEvaluation, Feedback
from .models import (
FitScoreRequest, CandidateRequest, FeedbackRequest,
FitScoreResponse, FeedbackResponse, RecalculateResponse,
ComparisonResponse, HealthResponse, RootResponse
)
from .feedback_system import AdaptiveFeedbackSystem
from .reinforcement_learning import ReinforcementLearningSystem
from .advanced_learning import AdvancedLearningSystem
from .adaptive_hiring import AdaptiveHiringSystem
from .synapse_ai import SynapseAISystem
class FitScoreFeedbackAgent:
"""
Complete FitScore Feedback Agent for Hugging Face deployment.
Includes all subsystems for comprehensive functionality.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# FitScore API configuration
self.ANALYZE_URL = config.get("fitscore_api_url", "")
self.auth_email = config.get("auth_email", "")
self.auth_password = config.get("auth_password", "")
self.auth_login_url = config.get("auth_login_url", "")
# Global variable to store access token
self.access_token = None
self.app = FastAPI(
title="FitScore Feedback Agent",
description="Advanced feedback loop system for candidate evaluation and model improvement",
version="1.0.0"
)
# Initialize all subsystems
self.feedback_system = AdaptiveFeedbackSystem()
self.reinforcement_system = ReinforcementLearningSystem()
self.advanced_learning = AdvancedLearningSystem()
self.adaptive_hiring = AdaptiveHiringSystem()
self.synapse_ai = SynapseAISystem()
# Setup CORS
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add custom exception handlers
self._add_exception_handlers()
# Add JSON error middleware
self.app.middleware("http")(self._json_error_middleware)
# Register endpoints
self._register_endpoints()
# Initialize database
self._init_database()
def _get_access_token(self):
"""
Get access token for the external API with better error handling
"""
# If we already have a token, return it
if self.access_token:
return self.access_token
try:
login_data = {
"email": self.auth_email,
"password": self.auth_password
}
login_headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
# Add timeout to prevent hanging
login_response = requests.post(self.auth_login_url, headers=login_headers, json=login_data, timeout=None)
if login_response.status_code == 200:
login_result = login_response.json()
self.access_token = login_result.get('data', {}).get('tokens', {}).get('accessToken')
if self.access_token:
print("✅ Successfully obtained access token")
return self.access_token
else:
print("⚠️ Login successful but no access token found in response")
return None
else:
print(f"⚠️ Login failed with status {login_response.status_code}: {login_response.text}")
return None
except requests.exceptions.Timeout:
print("⚠️ Login request timed out")
return None
except requests.exceptions.RequestException as e:
print(f"⚠️ Network error during login: {e}")
return None
except Exception as e:
print(f"⚠️ Unexpected error getting access token: {e}")
return None
def _reset_access_token(self):
"""Reset the access token to force a new login"""
self.access_token = None
def _add_exception_handlers(self):
"""Add custom exception handlers for better error messages"""
@self.app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors with better messages"""
return JSONResponse(
status_code=422,
content={
"detail": "Request validation failed. Please check your JSON data for invalid characters or malformed content.",
"errors": exc.errors(),
"help": "Make sure your JSON is properly formatted and doesn't contain invalid control characters."
}
)
@self.app.exception_handler(json.JSONDecodeError)
async def json_decode_exception_handler(request: Request, exc: json.JSONDecodeError):
"""Handle JSON decode errors with helpful messages"""
return JSONResponse(
status_code=422,
content={
"detail": "Invalid JSON format",
"error": str(exc),
"help": "Please check your JSON syntax and remove any invalid control characters.",
"position": exc.pos,
"line": exc.lineno,
"column": exc.colno
}
)
async def _json_error_middleware(self, request: Request, call_next):
"""Middleware to handle JSON parsing errors"""
try:
# Try to read the body to catch JSON errors early
if request.method in ["POST", "PUT", "PATCH"]:
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
try:
body = await request.body()
if body:
json.loads(body.decode('utf-8'))
except json.JSONDecodeError as e:
return JSONResponse(
status_code=422,
content={
"detail": "Invalid JSON in request body",
"error": str(e),
"help": "Please check your JSON syntax and remove any invalid control characters.",
"position": e.pos
}
)
response = await call_next(request)
return response
except Exception as e:
return JSONResponse(
status_code=500,
content={
"detail": "Internal server error",
"error": str(e)
}
)
def _init_database(self):
"""Initialize database tables"""
try:
create_tables()
# Create initial global prompt
self.feedback_system.create_initial_global_prompt()
print("✅ Database initialized successfully!")
except Exception as e:
print(f"⚠️ Database initialization warning: {e}")
def _call_fitscore_api(self, candidate_data: Dict[str, Any]) -> Dict[str, Any]:
"""Call the FitScore API for analysis"""
try:
# Get access token for authentication
auth_token = self._get_access_token()
if not auth_token:
print("⚠️ Failed to obtain access token, using fallback evaluation")
return self._fallback_evaluation(candidate_data)
# Prepare the request data (form data, not JSON)
data = {
'job_id': candidate_data.get("job_id", str(uuid.uuid4())),
'jd_text': candidate_data.get("job_description", "Software Engineer position"),
'resume_text': candidate_data.get("resume_text", "")
}
# Validate required fields
if not data['resume_text']:
raise ValueError("resume_text must be provided")
# Prepare headers with Bearer token
headers = {
'accept': 'application/json',
'Authorization': f'Bearer {auth_token}'
}
# Make the API call
response = requests.post(self.ANALYZE_URL, headers=headers, data=data, timeout=None)
# If we get an authentication error, try to get a fresh token and retry once
if response.status_code == 401:
print("⚠️ Authentication failed, getting fresh token...")
self._reset_access_token()
new_token = self._get_access_token()
if new_token:
headers['Authorization'] = f'Bearer {new_token}'
response = requests.post(self.ANALYZE_URL, headers=headers, data=data, timeout=None)
else:
print("⚠️ Could not obtain fresh token, using fallback evaluation")
return self._fallback_evaluation(candidate_data)
# Use raise_for_status like your working code
response.raise_for_status()
# Parse and return the response
result = response.json()
return result
except requests.exceptions.RequestException as e:
print(f"⚠️ API request failed: {e}")
# Fallback to local evaluation
return self._fallback_evaluation(candidate_data)
except Exception as e:
print(f"⚠️ Unexpected error in FitScore API call: {e}")
# Fallback to local evaluation
return self._fallback_evaluation(candidate_data)
def _fallback_evaluation(self, candidate_data: Dict[str, Any]) -> Dict[str, Any]:
"""Fallback evaluation when FitScore API is unavailable"""
try:
# Use adaptive hiring system for local evaluation
candidate_info = {
"education_level": "Bachelors", # Simplified
"years_experience": 5, # Simplified
"skills": ["Python", "React"], # Simplified
"company_size": "Medium",
"location": candidate_data.get("location", "Unknown"),
"industry": "Technology"
}
evaluation_result = self.adaptive_hiring.evaluate_candidate(
candidate_info, candidate_data.get("job_id", "default_job")
)
return evaluation_result
except Exception as e:
print(f"⚠️ Fallback evaluation failed: {e}")
# Return a basic evaluation
return {
"fitscore": 7.5,
"verdict": "Review",
"confidence": 0.7,
"category_scores": {
"education": 0.8,
"career_trajectory": 0.7,
"company_relevance": 0.7,
"tenure": 0.7,
"skills": 0.8,
"bonus": 0.1
},
"justification": "Basic evaluation completed. Manual review recommended.",
"model_version": "v1.0-fallback"
}
def _register_endpoints(self):
"""Register all API endpoints"""
@self.app.get("/", response_model=RootResponse)
async def root():
"""Root endpoint"""
return {
"message": "FitScore Feedback Agent",
"version": "1.0.0",
"status": "running",
"endpoints": [
"POST /fitscore/calculate",
"POST /fitscore/simple",
"POST /fitscore/feedback",
"POST /fitscore/recalculate",
"GET /fitscore/compare/{candidate_id}/{job_id}",
"GET /analytics/feedback",
"GET /analytics/reinforcement",
"POST /reinforcement/submit",
"POST /reinforcement/outcome",
"POST /advanced-learning/event",
"GET /advanced-learning/analytics"
]
}
@self.app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "FitScore Feedback Agent",
"version": "1.0.0",
"timestamp": datetime.utcnow().isoformat()
}
@self.app.post("/fitscore/simple", response_model=FitScoreResponse)
async def calculate_fitscore_simple(
request: FitScoreRequest,
db: Session = Depends(get_db)
):
"""Calculate FitScore with simplified request - only essential fields"""
try:
# Prepare candidate data for FitScore API
candidate_data = {
"job_id": request.job_id,
"job_description": request.jd_text,
"resume_text": request.resume_text
}
# Call FitScore API for evaluation
evaluation_result = self._call_fitscore_api(candidate_data)
# Create evaluation record
evaluation_id = str(uuid.uuid4())
evaluation = CandidateEvaluation(
evaluation_id=evaluation_id,
candidate_id=f"auto_{evaluation_id[:8]}", # Auto-generate candidate ID
job_id=request.job_id,
fitscore=evaluation_result['fitscore'],
verdict=evaluation_result['verdict'],
confidence=evaluation_result['confidence'],
category_scores=evaluation_result['category_scores'],
justification=evaluation_result['justification'],
model_version=evaluation_result['model_version']
)
db.add(evaluation)
db.commit()
return {
"success": True,
"evaluation_id": evaluation_id,
"fitscore": evaluation_result['fitscore'],
"verdict": evaluation_result['verdict'],
"confidence": evaluation_result['confidence'],
"category_scores": evaluation_result['category_scores'],
"justification": evaluation_result['justification'],
"model_version": evaluation_result['model_version'],
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error calculating FitScore: {str(e)}")
@self.app.post("/fitscore/calculate", response_model=FitScoreResponse)
async def calculate_fitscore(
request: CandidateRequest,
db: Session = Depends(get_db)
):
"""Calculate FitScore for candidate evaluation"""
try:
# Prepare candidate data for FitScore API
candidate_data = {
"candidate_id": request.candidate_id,
"job_id": request.job_id,
"recruiter_id": request.recruiter_id,
"name": request.name,
"email": request.email,
"phone": request.phone,
"location": request.location,
"resume_text": request.resume_text,
"job_description": request.job_description
}
# Call FitScore API for evaluation
evaluation_result = self._call_fitscore_api(candidate_data)
# Create evaluation record
evaluation_id = str(uuid.uuid4())
evaluation = CandidateEvaluation(
evaluation_id=evaluation_id,
candidate_id=request.candidate_id,
job_id=request.job_id,
# recruiter_id removed to match existing PostgreSQL schema
fitscore=evaluation_result['fitscore'],
verdict=evaluation_result['verdict'],
confidence=evaluation_result['confidence'],
category_scores=evaluation_result['category_scores'],
justification=evaluation_result['justification'],
model_version=evaluation_result['model_version']
)
db.add(evaluation)
db.commit()
return {
"success": True,
"evaluation_id": evaluation_id,
"fitscore": evaluation_result['fitscore'],
"verdict": evaluation_result['verdict'],
"confidence": evaluation_result['confidence'],
"category_scores": evaluation_result['category_scores'],
"justification": evaluation_result['justification'],
"model_version": evaluation_result['model_version'],
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error calculating FitScore: {str(e)}")
@self.app.post("/fitscore/feedback", response_model=FeedbackResponse)
async def submit_feedback(
request: FeedbackRequest,
db: Session = Depends(get_db)
):
"""Submit feedback for model improvement"""
try:
# Add feedback to system
feedback = self.feedback_system.add_feedback(
job_id=request.job_id,
company_id=request.company_id,
analysis_id=request.analysis_id,
feedback_type=request.feedback_type,
feedback_text=request.feedback_text,
feedback_category=request.feedback_category,
confidence_score=request.confidence_score,
email=request.email,
linkedin_url=request.linkedin_url
)
# Create learning event
learning_event_id = str(uuid.uuid4())
return {
"success": True,
"feedback_id": feedback.feedback_id,
"learning_event_id": learning_event_id,
"message": "Feedback recorded and learning event created"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error submitting feedback: {str(e)}")
@self.app.post("/fitscore/recalculate", response_model=RecalculateResponse)
async def recalculate_fitscore(
candidate_id: str,
job_id: str,
feedback_id: str,
db: Session = Depends(get_db)
):
"""Recalculate FitScore after feedback processing"""
try:
# Get original evaluation
original_evaluation = db.query(CandidateEvaluation).filter(
CandidateEvaluation.candidate_id == candidate_id,
CandidateEvaluation.job_id == job_id
).order_by(CandidateEvaluation.created_at.desc()).first()
if not original_evaluation:
raise HTTPException(status_code=404, detail="Original evaluation not found")
# Get feedback
feedback = db.query(Feedback).filter(Feedback.feedback_id == feedback_id).first()
if not feedback:
raise HTTPException(status_code=404, detail="Feedback not found")
# Recalculate with feedback
updated_result = self.adaptive_hiring.recalculate_with_feedback(
original_evaluation, feedback
)
# Create updated evaluation
updated_evaluation_id = str(uuid.uuid4())
updated_evaluation = CandidateEvaluation(
evaluation_id=updated_evaluation_id,
candidate_id=candidate_id,
job_id=job_id,
# recruiter_id removed to match existing PostgreSQL schema
fitscore=updated_result['fitscore'],
verdict=updated_result['verdict'],
confidence=updated_result['confidence'],
category_scores=updated_result['category_scores'],
justification=updated_result['justification'],
model_version=updated_result['model_version']
)
db.add(updated_evaluation)
db.commit()
score_change = updated_result['fitscore'] - original_evaluation.fitscore
return {
"success": True,
"original_evaluation_id": original_evaluation.evaluation_id,
"updated_evaluation_id": updated_evaluation_id,
"original_fitscore": original_evaluation.fitscore,
"updated_fitscore": updated_result['fitscore'],
"score_change": round(score_change, 2),
"original_verdict": original_evaluation.verdict,
"updated_verdict": updated_result['verdict'],
"verdict_changed": original_evaluation.verdict != updated_result['verdict'],
"model_version": updated_result['model_version'],
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error recalculating FitScore: {str(e)}")
@self.app.get("/fitscore/compare/{candidate_id}/{job_id}", response_model=ComparisonResponse)
async def compare_results(
candidate_id: str,
job_id: str,
db: Session = Depends(get_db)
):
"""Compare original and updated FitScore results"""
try:
# Get evaluations for this candidate/job pair
evaluations = db.query(CandidateEvaluation).filter(
CandidateEvaluation.candidate_id == candidate_id,
CandidateEvaluation.job_id == job_id
).order_by(CandidateEvaluation.created_at).all()
if len(evaluations) < 2:
raise HTTPException(status_code=404, detail="No comparison data available")
original = evaluations[0]
updated = evaluations[-1]
# Calculate changes
score_change = updated.fitscore - original.fitscore
score_change_percentage = (score_change / original.fitscore * 100) if original.fitscore > 0 else 0
confidence_change = updated.confidence - original.confidence
# Calculate category changes
category_changes = {}
for category in original.category_scores:
if category in updated.category_scores:
category_changes[category] = updated.category_scores[category] - original.category_scores[category]
# Get feedback if available
feedback = db.query(Feedback).filter(
Feedback.job_id == job_id
).order_by(Feedback.created_at.desc()).first()
return {
"success": True,
"comparison": {
"original": {
"evaluation_id": original.evaluation_id,
"fitscore": original.fitscore,
"verdict": original.verdict,
"confidence": original.confidence,
"model_version": original.model_version,
"timestamp": original.created_at.isoformat(),
"category_scores": original.category_scores
},
"updated": {
"evaluation_id": updated.evaluation_id,
"fitscore": updated.fitscore,
"verdict": updated.verdict,
"confidence": updated.confidence,
"model_version": updated.model_version,
"timestamp": updated.created_at.isoformat(),
"category_scores": updated.category_scores
},
"changes": {
"score_change": round(score_change, 2),
"score_change_percentage": round(score_change_percentage, 1),
"verdict_changed": original.verdict != updated.verdict,
"confidence_change": round(confidence_change, 3),
"category_changes": category_changes
},
"feedback": {
"feedback_id": feedback.feedback_id if feedback else None,
"feedback_type": feedback.feedback_type if feedback else None,
"feedback_text": feedback.feedback_text if feedback else None,
"feedback_category": feedback.feedback_category if feedback else None,
"timestamp": feedback.created_at.isoformat() if feedback else None
},
"justification": f"FitScore changed from {original.fitscore:.2f} to {updated.fitscore:.2f} ({score_change:+.2f} points, {score_change_percentage:+.1f}%) after processing feedback."
}
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error comparing results: {str(e)}")
@self.app.get("/analytics/feedback")
async def get_feedback_analytics():
"""Get feedback analytics"""
try:
analytics = self.feedback_system.get_feedback_analytics()
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting analytics: {str(e)}")
@self.app.get("/analytics/reinforcement")
async def get_reinforcement_analytics():
"""Get reinforcement learning analytics"""
try:
analytics = self.reinforcement_system.get_learning_analytics()
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting reinforcement analytics: {str(e)}")
@self.app.post("/reinforcement/submit")
async def submit_candidate_reinforcement(
candidate_data: Dict[str, Any],
job_data: Dict[str, Any],
recruiter_id: str
):
"""Submit candidate for reinforcement learning"""
try:
submission_data = {
"candidate_id": candidate_data.get("candidate_id"),
"job_id": job_data.get("job_id"),
"recruiter_id": recruiter_id, # Keep for API but don't store in database
"candidate_data": candidate_data,
"job_data": job_data
}
result = self.reinforcement_system.submit_candidate(submission_data)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error submitting candidate: {str(e)}")
@self.app.post("/reinforcement/outcome")
async def record_outcome(
submission_id: str,
outcome: str,
notes: str = ""
):
"""Record outcome for reinforcement learning"""
try:
result = self.reinforcement_system.record_outcome(submission_id, outcome, notes)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error recording outcome: {str(e)}")
@self.app.post("/advanced-learning/event")
async def process_learning_event(
event_data: Dict[str, Any]
):
"""Process advanced learning event"""
try:
result = self.advanced_learning.process_learning_event(event_data)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing learning event: {str(e)}")
@self.app.get("/advanced-learning/analytics")
async def get_advanced_learning_analytics():
"""Get advanced learning analytics"""
try:
analytics = self.advanced_learning.get_learning_analytics()
return analytics
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error getting advanced learning analytics: {str(e)}")
def get_app(self) -> FastAPI:
"""Get the FastAPI application"""
return self.app
def run(self):
"""Run the application"""
import uvicorn
uvicorn.run(
self.app,
host=self.config.get("host", "0.0.0.0"),
port=self.config.get("port", 7860)
)