File size: 13,616 Bytes
358dfff 3bc61a5 358dfff 5ae03a4 358dfff 5ae03a4 358dfff 5ae03a4 358dfff 5ae03a4 358dfff 5ae03a4 358dfff 8039e71 358dfff | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | from sqlalchemy.orm import Session
from typing import List, Optional
import uuid
import json
from models.application import Application
from schemas.application import ApplicationCreate, ApplicationUpdate
from logging_config import get_logger
# Create logger for this module
logger = get_logger(__name__)
def get_application(db: Session, application_id: str) -> Optional[Application]:
"""Get application by ID"""
logger.debug(f"Retrieving application with ID: {application_id}")
application = db.query(Application).filter(Application.id == application_id).first()
if application:
logger.debug(f"Found application: {application.id}")
else:
logger.debug(f"Application not found for ID: {application_id}")
return application
def get_application_by_user(db: Session, application_id: str, user_id: str) -> Optional[Application]:
"""Get application by ID and user ID"""
logger.debug(f"Retrieving application with ID: {application_id} for user ID: {user_id}")
application = db.query(Application).filter(
Application.id == application_id,
Application.user_id == user_id
).first()
if application:
logger.debug(f"Found application: {application.id} for user: {user_id}")
else:
logger.debug(f"Application not found for ID: {application_id} and user ID: {user_id}")
return application
def get_applications_by_job_and_assessment(db: Session, job_id: str, assessment_id: str, skip: int = 0, limit: int = 100) -> List[Application]:
"""Get list of applications by job and assessment IDs"""
logger.debug(f"Retrieving applications for job ID: {job_id}, assessment ID: {assessment_id}, skip={skip}, limit={limit}")
applications = db.query(Application).filter(
Application.job_id == job_id,
Application.assessment_id == assessment_id
).offset(skip).limit(limit).all()
logger.debug(f"Retrieved {len(applications)} applications for job ID: {job_id}, assessment ID: {assessment_id}")
return applications
def get_applications_by_user(db: Session, user_id: str, skip: int = 0, limit: int = 100) -> List[Application]:
"""Get list of applications by user ID"""
logger.debug(f"Retrieving applications for user ID: {user_id}, skip={skip}, limit={limit}")
applications = db.query(Application).filter(Application.user_id == user_id).offset(skip).limit(limit).all()
logger.debug(f"Retrieved {len(applications)} applications for user ID: {user_id}")
return applications
def create_application(db: Session, application: ApplicationCreate) -> Application:
"""Create a new application and calculate scores"""
logger.info(f"Creating new application for job ID: {application.job_id}, assessment ID: {application.assessment_id}, user ID: {application.user_id}")
# Calculate scores for the application
score, question_scores = calculate_detailed_application_score(db, application)
db_application = Application(
id=str(uuid.uuid4()),
job_id=application.job_id,
assessment_id=application.assessment_id,
user_id=application.user_id,
answers=json.dumps([ans.dict() for ans in application.answers]), # Store as JSON string
score=score, # Store the overall application score
question_scores=json.dumps([qs.dict() for qs in question_scores]) # Store individual question scores as JSON
)
db.add(db_application)
db.commit()
db.refresh(db_application)
logger.info(f"Successfully created application with ID: {db_application.id} and overall score: {score}")
return db_application
def update_application(db: Session, application_id: str, **kwargs) -> Optional[Application]:
"""Update an application"""
logger.info(f"Updating application with ID: {application_id}")
db_application = get_application(db, application_id)
if db_application:
for key, value in kwargs.items():
if key == 'answers' and isinstance(value, list):
setattr(db_application, key, json.dumps([ans.dict() if hasattr(ans, 'dict') else ans for ans in value]))
else:
setattr(db_application, key, value)
db.commit()
db.refresh(db_application)
logger.info(f"Successfully updated application: {db_application.id}")
return db_application
logger.warning(f"Failed to update application - application not found: {application_id}")
return None
def delete_application(db: Session, application_id: str) -> bool:
"""Delete an application"""
logger.info(f"Deleting application with ID: {application_id}")
db_application = get_application(db, application_id)
if db_application:
db.delete(db_application)
db.commit()
logger.info(f"Successfully deleted application: {db_application.id}")
return True
logger.warning(f"Failed to delete application - application not found: {application_id}")
return False
def calculate_detailed_application_score(db: Session, application_create: ApplicationCreate):
"""Calculate detailed scores for an application including individual question scores"""
from models.assessment import Assessment
from schemas.application import ApplicationAnswerScore
logger.debug(f"Calculating detailed scores for application - job ID: {application_create.job_id}, assessment ID: {application_create.assessment_id}")
# Get the associated assessment to compare answers with correct answers
assessment = db.query(Assessment).filter(Assessment.id == application_create.assessment_id).first()
if not assessment:
logger.warning(f"Assessment not found for ID: {application_create.assessment_id}")
return 0.0, []
# Parse the questions
import json
try:
questions = json.loads(assessment.questions) if assessment.questions else []
except json.JSONDecodeError:
logger.error(f"Failed to parse questions for assessment ID: {application_create.assessment_id}")
return 0.0, []
# Create a mapping of question_id to question for easy lookup
question_map = {q['id']: q for q in questions}
# Calculate the scores
total_points = 0
earned_points = 0
question_scores = []
for answer in application_create.answers:
question_id = answer.question_id
if not question_id or question_id not in question_map:
continue
question_data = question_map[question_id]
# Calculate weighted score
question_weight = question_data.get('weight', 1) # Default weight is 1
total_points += question_weight
# Initialize question score object
question_score_obj = ApplicationAnswerScore(
question_id=question_id,
score=0.0,
rationale="No rationale available"
)
# For multiple choice questions, score directly without AI
if question_data['type'] in ['choose_one', 'choose_many']:
correct_options = set(question_data.get('correct_options', []))
selected_options = set(answer.options or [])
# Check if the selected options match the correct options exactly
if selected_options == correct_options:
earned_points += question_weight # Full points for correct answer
question_score_obj.score = 1.0 # Perfect score
question_score_obj.rationale = "Correct answer"
else:
question_score_obj.score = 0.0 # No points for incorrect answer
question_score_obj.rationale = f"Incorrect. Correct options: {list(correct_options)}, Selected: {list(selected_options)}"
# For text-based questions, use AI to evaluate the answer
elif question_data['type'] == 'text_based':
# Convert the question data to an AssessmentQuestion object
from schemas.assessment import AssessmentQuestion, AssessmentQuestionOption
from schemas.enums import QuestionType
question_obj = AssessmentQuestion(
id=question_data['id'],
text=question_data['text'],
weight=question_data['weight'],
skill_categories=question_data['skill_categories'],
type=QuestionType(question_data['type']),
options=[AssessmentQuestionOption(text=opt['text'], value=opt['value']) for opt in question_data.get('options', [])],
correct_options=question_data.get('correct_options', [])
)
# Use AI service to score the text-based answer
from services.ai_service import score_answer
score_result = score_answer(
question=question_obj,
answer_text=answer.text or '',
selected_options=answer.options or []
)
earned_points += score_result['score'] * question_weight
question_score_obj.score = score_result['score']
question_score_obj.rationale = score_result['rationale']
question_scores.append(question_score_obj)
# Calculate percentage score
if total_points > 0:
overall_score = (earned_points / total_points) * 100
else:
overall_score = 0.0
logger.debug(f"Calculated detailed scores: overall {overall_score}% ({earned_points}/{total_points} points), {len(question_scores)} questions scored")
return round(overall_score, 2), question_scores
def calculate_application_score(db: Session, application_id: str) -> float:
"""Calculate the score for an application"""
logger.debug(f"Calculating score for application ID: {application_id}")
# Get the application
application = get_application(db, application_id)
if not application:
logger.warning(f"Application not found for ID: {application_id}")
return 0.0
# Get the associated assessment to compare answers with correct answers
from models.assessment import Assessment
assessment = db.query(Assessment).filter(Assessment.id == application.assessment_id).first()
if not assessment:
logger.warning(f"Assessment not found for application ID: {application_id}")
return 0.0
# Parse the answers and questions
import json
try:
# Check if answers is already a list (parsed) or a string (needs parsing)
if isinstance(application.answers, str):
answers = json.loads(application.answers) if application.answers else []
else:
# Assume it's already a list object
answers = application.answers if application.answers else []
# Questions should always be a JSON string from the database
questions = json.loads(assessment.questions) if assessment.questions else []
except json.JSONDecodeError:
logger.error(f"Failed to parse answers or questions for application ID: {application_id}")
return 0.0
# Create a mapping of question_id to question for easy lookup
question_map = {q['id']: q for q in questions}
# Calculate the score
total_points = 0
earned_points = 0
for answer in answers:
question_id = answer.get('question_id')
if not question_id or question_id not in question_map:
continue
question_data = question_map[question_id]
# Calculate weighted score
question_weight = question_data.get('weight', 1) # Default weight is 1
total_points += question_weight
# For multiple choice questions, score directly without AI
if question_data['type'] in ['choose_one', 'choose_many']:
correct_options = set(question_data.get('correct_options', []))
selected_options = set(answer.get('options', []))
# Check if the selected options match the correct options exactly
if selected_options == correct_options:
earned_points += question_weight # Full points for correct answer
# Otherwise, 0 points for incorrect answer (no partial credit for multiple choice)
# For text-based questions, use AI to evaluate the answer
elif question_data['type'] == 'text_based':
# Convert the question data to an AssessmentQuestion object
from schemas.assessment import AssessmentQuestion, AssessmentQuestionOption
from schemas.enums import QuestionType
question_obj = AssessmentQuestion(
id=question_data['id'],
text=question_data['text'],
weight=question_data['weight'],
skill_categories=question_data['skill_categories'],
type=QuestionType(question_data['type']),
options=[AssessmentQuestionOption(text=opt['text'], value=opt['value']) for opt in question_data.get('options', [])],
correct_options=question_data.get('correct_options', [])
)
# Use AI service to score the text-based answer
from services.ai_service import score_answer
score_result = score_answer(
question=question_obj,
answer_text=answer.get('text', ''),
selected_options=answer.get('options', [])
)
earned_points += score_result['score'] * question_weight
# Calculate percentage score
if total_points > 0:
score = (earned_points / total_points) * 100
else:
score = 0.0
logger.debug(f"Calculated score for application ID {application_id}: {score}% ({earned_points}/{total_points} points)")
return round(score, 2) |