from sqlalchemy.orm import Session from typing import List, Optional import uuid import json from models.application import Application from schemas.application import ApplicationCreate, ApplicationUpdate from logging_config import get_logger # Create logger for this module logger = get_logger(__name__) def get_application(db: Session, application_id: str) -> Optional[Application]: """Get application by ID""" logger.debug(f"Retrieving application with ID: {application_id}") application = db.query(Application).filter(Application.id == application_id).first() if application: logger.debug(f"Found application: {application.id}") else: logger.debug(f"Application not found for ID: {application_id}") return application def get_application_by_user(db: Session, application_id: str, user_id: str) -> Optional[Application]: """Get application by ID and user ID""" logger.debug(f"Retrieving application with ID: {application_id} for user ID: {user_id}") application = db.query(Application).filter( Application.id == application_id, Application.user_id == user_id ).first() if application: logger.debug(f"Found application: {application.id} for user: {user_id}") else: logger.debug(f"Application not found for ID: {application_id} and user ID: {user_id}") return application def get_applications_by_job_and_assessment(db: Session, job_id: str, assessment_id: str, skip: int = 0, limit: int = 100) -> List[Application]: """Get list of applications by job and assessment IDs""" logger.debug(f"Retrieving applications for job ID: {job_id}, assessment ID: {assessment_id}, skip={skip}, limit={limit}") applications = db.query(Application).filter( Application.job_id == job_id, Application.assessment_id == assessment_id ).offset(skip).limit(limit).all() logger.debug(f"Retrieved {len(applications)} applications for job ID: {job_id}, assessment ID: {assessment_id}") return applications def get_applications_by_user(db: Session, user_id: str, skip: int = 0, limit: int = 100) -> List[Application]: """Get list of applications by user ID""" logger.debug(f"Retrieving applications for user ID: {user_id}, skip={skip}, limit={limit}") applications = db.query(Application).filter(Application.user_id == user_id).offset(skip).limit(limit).all() logger.debug(f"Retrieved {len(applications)} applications for user ID: {user_id}") return applications def create_application(db: Session, application: ApplicationCreate) -> Application: """Create a new application and calculate scores""" logger.info(f"Creating new application for job ID: {application.job_id}, assessment ID: {application.assessment_id}, user ID: {application.user_id}") # Calculate scores for the application score, question_scores = calculate_detailed_application_score(db, application) db_application = Application( id=str(uuid.uuid4()), job_id=application.job_id, assessment_id=application.assessment_id, user_id=application.user_id, answers=json.dumps([ans.dict() for ans in application.answers]), # Store as JSON string score=score, # Store the overall application score question_scores=json.dumps([qs.dict() for qs in question_scores]) # Store individual question scores as JSON ) db.add(db_application) db.commit() db.refresh(db_application) logger.info(f"Successfully created application with ID: {db_application.id} and overall score: {score}") return db_application def update_application(db: Session, application_id: str, **kwargs) -> Optional[Application]: """Update an application""" logger.info(f"Updating application with ID: {application_id}") db_application = get_application(db, application_id) if db_application: for key, value in kwargs.items(): if key == 'answers' and isinstance(value, list): setattr(db_application, key, json.dumps([ans.dict() if hasattr(ans, 'dict') else ans for ans in value])) else: setattr(db_application, key, value) db.commit() db.refresh(db_application) logger.info(f"Successfully updated application: {db_application.id}") return db_application logger.warning(f"Failed to update application - application not found: {application_id}") return None def delete_application(db: Session, application_id: str) -> bool: """Delete an application""" logger.info(f"Deleting application with ID: {application_id}") db_application = get_application(db, application_id) if db_application: db.delete(db_application) db.commit() logger.info(f"Successfully deleted application: {db_application.id}") return True logger.warning(f"Failed to delete application - application not found: {application_id}") return False def calculate_detailed_application_score(db: Session, application_create: ApplicationCreate): """Calculate detailed scores for an application including individual question scores""" from models.assessment import Assessment from schemas.application import ApplicationAnswerScore logger.debug(f"Calculating detailed scores for application - job ID: {application_create.job_id}, assessment ID: {application_create.assessment_id}") # Get the associated assessment to compare answers with correct answers assessment = db.query(Assessment).filter(Assessment.id == application_create.assessment_id).first() if not assessment: logger.warning(f"Assessment not found for ID: {application_create.assessment_id}") return 0.0, [] # Parse the questions import json try: questions = json.loads(assessment.questions) if assessment.questions else [] except json.JSONDecodeError: logger.error(f"Failed to parse questions for assessment ID: {application_create.assessment_id}") return 0.0, [] # Create a mapping of question_id to question for easy lookup question_map = {q['id']: q for q in questions} # Calculate the scores total_points = 0 earned_points = 0 question_scores = [] for answer in application_create.answers: question_id = answer.question_id if not question_id or question_id not in question_map: continue question_data = question_map[question_id] # Calculate weighted score question_weight = question_data.get('weight', 1) # Default weight is 1 total_points += question_weight # Initialize question score object question_score_obj = ApplicationAnswerScore( question_id=question_id, score=0.0, rationale="No rationale available" ) # For multiple choice questions, score directly without AI if question_data['type'] in ['choose_one', 'choose_many']: correct_options = set(question_data.get('correct_options', [])) selected_options = set(answer.options or []) # Check if the selected options match the correct options exactly if selected_options == correct_options: earned_points += question_weight # Full points for correct answer question_score_obj.score = 1.0 # Perfect score question_score_obj.rationale = "Correct answer" else: question_score_obj.score = 0.0 # No points for incorrect answer question_score_obj.rationale = f"Incorrect. Correct options: {list(correct_options)}, Selected: {list(selected_options)}" # For text-based questions, use AI to evaluate the answer elif question_data['type'] == 'text_based': # Convert the question data to an AssessmentQuestion object from schemas.assessment import AssessmentQuestion, AssessmentQuestionOption from schemas.enums import QuestionType question_obj = AssessmentQuestion( id=question_data['id'], text=question_data['text'], weight=question_data['weight'], skill_categories=question_data['skill_categories'], type=QuestionType(question_data['type']), options=[AssessmentQuestionOption(text=opt['text'], value=opt['value']) for opt in question_data.get('options', [])], correct_options=question_data.get('correct_options', []) ) # Use AI service to score the text-based answer from services.ai_service import score_answer score_result = score_answer( question=question_obj, answer_text=answer.text or '', selected_options=answer.options or [] ) earned_points += score_result['score'] * question_weight question_score_obj.score = score_result['score'] question_score_obj.rationale = score_result['rationale'] question_scores.append(question_score_obj) # Calculate percentage score if total_points > 0: overall_score = (earned_points / total_points) * 100 else: overall_score = 0.0 logger.debug(f"Calculated detailed scores: overall {overall_score}% ({earned_points}/{total_points} points), {len(question_scores)} questions scored") return round(overall_score, 2), question_scores def calculate_application_score(db: Session, application_id: str) -> float: """Calculate the score for an application""" logger.debug(f"Calculating score for application ID: {application_id}") # Get the application application = get_application(db, application_id) if not application: logger.warning(f"Application not found for ID: {application_id}") return 0.0 # Get the associated assessment to compare answers with correct answers from models.assessment import Assessment assessment = db.query(Assessment).filter(Assessment.id == application.assessment_id).first() if not assessment: logger.warning(f"Assessment not found for application ID: {application_id}") return 0.0 # Parse the answers and questions import json try: # Check if answers is already a list (parsed) or a string (needs parsing) if isinstance(application.answers, str): answers = json.loads(application.answers) if application.answers else [] else: # Assume it's already a list object answers = application.answers if application.answers else [] # Questions should always be a JSON string from the database questions = json.loads(assessment.questions) if assessment.questions else [] except json.JSONDecodeError: logger.error(f"Failed to parse answers or questions for application ID: {application_id}") return 0.0 # Create a mapping of question_id to question for easy lookup question_map = {q['id']: q for q in questions} # Calculate the score total_points = 0 earned_points = 0 for answer in answers: question_id = answer.get('question_id') if not question_id or question_id not in question_map: continue question_data = question_map[question_id] # Calculate weighted score question_weight = question_data.get('weight', 1) # Default weight is 1 total_points += question_weight # For multiple choice questions, score directly without AI if question_data['type'] in ['choose_one', 'choose_many']: correct_options = set(question_data.get('correct_options', [])) selected_options = set(answer.get('options', [])) # Check if the selected options match the correct options exactly if selected_options == correct_options: earned_points += question_weight # Full points for correct answer # Otherwise, 0 points for incorrect answer (no partial credit for multiple choice) # For text-based questions, use AI to evaluate the answer elif question_data['type'] == 'text_based': # Convert the question data to an AssessmentQuestion object from schemas.assessment import AssessmentQuestion, AssessmentQuestionOption from schemas.enums import QuestionType question_obj = AssessmentQuestion( id=question_data['id'], text=question_data['text'], weight=question_data['weight'], skill_categories=question_data['skill_categories'], type=QuestionType(question_data['type']), options=[AssessmentQuestionOption(text=opt['text'], value=opt['value']) for opt in question_data.get('options', [])], correct_options=question_data.get('correct_options', []) ) # Use AI service to score the text-based answer from services.ai_service import score_answer score_result = score_answer( question=question_obj, answer_text=answer.get('text', ''), selected_options=answer.get('options', []) ) earned_points += score_result['score'] * question_weight # Calculate percentage score if total_points > 0: score = (earned_points / total_points) * 100 else: score = 0.0 logger.debug(f"Calculated score for application ID {application_id}: {score}% ({earned_points}/{total_points} points)") return round(score, 2)