| from sqlalchemy.orm import Session |
| from typing import List, Optional |
| import uuid |
| import json |
|
|
| from models.assessment import Assessment |
| from schemas.assessment import AssessmentCreate, AssessmentUpdate |
| from logging_config import get_logger |
| from services.ai_service import generate_questions |
| from integrations.ai_integration.ai_factory import AIProvider |
|
|
| |
| logger = get_logger(__name__) |
|
|
| def get_assessment(db: Session, assessment_id: str) -> Optional[Assessment]: |
| """Get assessment by ID""" |
| logger.debug(f"Retrieving assessment with ID: {assessment_id}") |
| assessment = db.query(Assessment).filter(Assessment.id == assessment_id).first() |
| if assessment: |
| logger.debug(f"Found assessment: {assessment.id}") |
| else: |
| logger.debug(f"Assessment not found for ID: {assessment_id}") |
| return assessment |
|
|
| def get_assessments_by_job(db: Session, job_id: str, skip: int = 0, limit: int = 100) -> List[Assessment]: |
| """Get list of assessments by job ID""" |
| logger.debug(f"Retrieving assessments for job ID: {job_id}, skip={skip}, limit={limit}") |
| assessments = db.query(Assessment).filter(Assessment.job_id == job_id).offset(skip).limit(limit).all() |
| logger.debug(f"Retrieved {len(assessments)} assessments for job ID: {job_id}") |
| return assessments |
|
|
| def get_active_assessments_by_job(db: Session, job_id: str, skip: int = 0, limit: int = 100) -> List[Assessment]: |
| """Get list of active assessments by job ID""" |
| logger.debug(f"Retrieving active assessments for job ID: {job_id}, skip={skip}, limit={limit}") |
| assessments = db.query(Assessment).filter(Assessment.job_id == job_id, Assessment.active == True).offset(skip).limit(limit).all() |
| logger.debug(f"Retrieved {len(assessments)} active assessments for job ID: {job_id}") |
| return assessments |
|
|
| def create_assessment(db: Session, job_id: str, assessment: AssessmentCreate) -> Assessment: |
| """Create a new assessment""" |
| logger.info(f"Creating new assessment for job ID: {job_id}, title: {assessment.title}") |
|
|
| |
| from .job_service import get_job |
| job = get_job(db, job_id) |
| if not job: |
| logger.error(f"Job not found for ID: {job_id}") |
| raise ValueError(f"Job not found for ID: {job_id}") |
|
|
| |
| import json |
| job_info = { |
| "title": job.title, |
| "seniority": job.seniority, |
| "description": job.description, |
| "skill_categories": json.loads(job.skill_categories) if job.skill_categories else [] |
| } |
|
|
| |
| generated_questions = generate_questions( |
| title=assessment.title, |
| questions_types=[qt.value for qt in assessment.questions_types], |
| additional_note=assessment.additional_note, |
| job_info=job_info |
| ) |
|
|
| |
| questions_json = json.dumps([q.model_dump() for q in generated_questions]) |
|
|
| |
| from services.ai_service import estimate_assessment_duration |
| duration = estimate_assessment_duration( |
| title=assessment.title, |
| job_info=job_info, |
| questions=generated_questions, |
| additional_note=assessment.additional_note |
| ) |
|
|
| db_assessment = Assessment( |
| id=str(uuid.uuid4()), |
| job_id=job_id, |
| title=assessment.title, |
| duration=duration, |
| passing_score=assessment.passing_score, |
| questions=questions_json, |
| active=True |
| ) |
| db.add(db_assessment) |
| db.commit() |
| db.refresh(db_assessment) |
| logger.info(f"Successfully created assessment with ID: {db_assessment.id} for job ID: {job_id}") |
| return db_assessment |
|
|
| def update_assessment(db: Session, assessment_id: str, **kwargs) -> Optional[Assessment]: |
| """Update an assessment""" |
| logger.info(f"Updating assessment with ID: {assessment_id}") |
| db_assessment = get_assessment(db, assessment_id) |
| if db_assessment: |
| for key, value in kwargs.items(): |
| if key == 'questions': |
| if isinstance(value, list): |
| |
| setattr(db_assessment, key, json.dumps([q.model_dump() if hasattr(q, 'model_dump') else q for q in value])) |
| |
| |
| from services.ai_service import estimate_assessment_duration |
| from models.job import Job |
| import json |
| |
| |
| job = db.query(Job).filter(Job.id == db_assessment.job_id).first() |
| job_info = {} |
| if job: |
| job_info = { |
| "title": job.title, |
| "seniority": job.seniority, |
| "description": job.description, |
| "skill_categories": json.loads(job.skill_categories) if job.skill_categories else [] |
| } |
| |
| |
| questions = [q for q in value] |
| |
| |
| duration = estimate_assessment_duration( |
| title=db_assessment.title, |
| job_info=job_info, |
| questions=questions, |
| additional_note=None |
| ) |
| setattr(db_assessment, 'duration', duration) |
| elif isinstance(value, str): |
| |
| setattr(db_assessment, key, value) |
| |
| |
| try: |
| parsed_questions = json.loads(value) |
| from schemas.assessment import AssessmentQuestion |
| from services.ai_service import estimate_assessment_duration |
| from models.job import Job |
| |
| |
| job = db.query(Job).filter(Job.id == db_assessment.job_id).first() |
| job_info = {} |
| if job: |
| job_info = { |
| "title": job.title, |
| "seniority": job.seniority, |
| "description": job.description, |
| "skill_categories": json.loads(job.skill_categories) if job.skill_categories else [] |
| } |
| |
| |
| questions = [AssessmentQuestion(**q) for q in parsed_questions] |
| |
| |
| duration = estimate_assessment_duration( |
| title=db_assessment.title, |
| job_info=job_info, |
| questions=questions, |
| additional_note=None |
| ) |
| setattr(db_assessment, 'duration', duration) |
| except Exception as e: |
| logger.warning(f"Could not estimate duration from JSON questions: {str(e)}") |
| |
| else: |
| |
| setattr(db_assessment, key, json.dumps(value)) |
| elif key == 'duration': |
| |
| continue |
| else: |
| setattr(db_assessment, key, value) |
| db.commit() |
| db.refresh(db_assessment) |
| logger.info(f"Successfully updated assessment: {db_assessment.id}") |
| return db_assessment |
| logger.warning(f"Failed to update assessment - assessment not found: {assessment_id}") |
| return None |
|
|
| def regenerate_assessment(db: Session, assessment_id: str, **kwargs) -> Optional[Assessment]: |
| """Regenerate an assessment""" |
| logger.info(f"Regenerating assessment with ID: {assessment_id}") |
|
|
| |
| if 'questions_types' in kwargs and kwargs['questions_types'] is not None: |
| |
| assessment = get_assessment(db, assessment_id) |
| if not assessment: |
| logger.warning(f"Assessment not found for regeneration: {assessment_id}") |
| return None |
|
|
| |
| from .job_service import get_job |
| job = get_job(db, assessment.job_id) |
| if not job: |
| logger.error(f"Job not found for assessment ID: {assessment_id}") |
| raise ValueError(f"Job not found for assessment ID: {assessment_id}") |
|
|
| |
| import json |
| job_info = { |
| "title": job.title, |
| "seniority": job.seniority, |
| "description": job.description, |
| "skill_categories": json.loads(job.skill_categories) if job.skill_categories else [] |
| } |
|
|
| |
| additional_note = kwargs.get('additional_note', None) |
| generated_questions = generate_questions( |
| title=assessment.title, |
| questions_types=kwargs['questions_types'], |
| additional_note=additional_note, |
| job_info=job_info |
| ) |
|
|
| |
| questions_json = json.dumps([q.model_dump() for q in generated_questions]) |
|
|
| |
| kwargs['questions'] = questions_json |
| |
| |
| from services.ai_service import estimate_assessment_duration |
| |
| original_assessment = get_assessment(db, assessment_id) |
| duration = estimate_assessment_duration( |
| title=original_assessment.title, |
| job_info=job_info, |
| questions=generated_questions, |
| additional_note=additional_note |
| ) |
| kwargs['duration'] = duration |
|
|
| |
| del kwargs['questions_types'] |
|
|
| |
| result = update_assessment(db, assessment_id, **kwargs) |
| if result: |
| logger.info(f"Successfully regenerated assessment: {result.id}") |
| else: |
| logger.warning(f"Failed to regenerate assessment - assessment not found: {assessment_id}") |
| return result |
|
|
| def delete_assessment(db: Session, assessment_id: str) -> bool: |
| """Delete an assessment""" |
| logger.info(f"Deleting assessment with ID: {assessment_id}") |
| db_assessment = get_assessment(db, assessment_id) |
| if db_assessment: |
| db.delete(db_assessment) |
| db.commit() |
| logger.info(f"Successfully deleted assessment: {db_assessment.id}") |
| return True |
| logger.warning(f"Failed to delete assessment - assessment not found: {assessment_id}") |
| return False |