from typing import Optional, List from uuid import UUID from sqlalchemy import select, and_, or_, update, delete, func from sqlalchemy.ext.asyncio import AsyncSession from externals.databases.pg_models import ( CVUser, CVTenant, CVFile, CVProfile, CVFilter, CVWeight, CVMatching, CVScore, ) from utils.decorator import retry_db # ========================= # USER # ========================= from externals.databases.schemas.user import UserCreate, UserResponse from utils.security import hash_password @retry_db(retries=2, delay=2) async def get_user_by_username( db: AsyncSession, username: str, ) -> CVUser | None: result = await db.execute( select(CVUser).where(CVUser.username == username) ) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def create_user( db: AsyncSession, user_in: UserCreate, ) -> CVUser: # ✅ CONVERT Pydantic → ORM user = CVUser( username=user_in.username, hashed_password=hash_password(user_in.password), email=user_in.email, full_name=user_in.full_name, role=user_in.role, is_active=True, tenant_id=user_in.tenant_id, notes=user_in.notes, ) db.add(user) # ✅ ORM await db.commit() await db.refresh(user) return user @retry_db(retries=2, delay=2) async def deactivate_user( db: AsyncSession, username: str, ) -> CVUser | None: result = await db.execute( select(CVUser).where(CVUser.username == username) ) user = result.scalar_one_or_none() if not user: return None user.is_active = False await db.commit() await db.refresh(user) return user @retry_db(retries=2, delay=2) async def get_user_by_email(db, email: str): result = await db.execute( select(CVUser).where(CVUser.email == email) ) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def get_user_by_id(db, user_id: str): result = await db.execute( select(CVUser).where(CVUser.user_id == user_id) ) return result.scalar_one_or_none() # ========================= # TENANT # ========================= from externals.databases.schemas.tenant import TenantCreate @retry_db(retries=2, delay=2) async def get_tenant_by_name( db: AsyncSession, tenant_name: str, ) -> CVTenant | None: result = await db.execute( select(CVTenant).where(CVTenant.tenant_name == tenant_name) ) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def create_tenant( db: AsyncSession, tenant_in: TenantCreate, ) -> CVTenant: # ✅ CONVERT Pydantic → ORM tenant = CVTenant( tenant_name=tenant_in.tenant_name, notes=tenant_in.notes, ) db.add(tenant) # ✅ ORM await db.commit() await db.refresh(tenant) return tenant # ========================= # FILE # ========================= @retry_db(retries=2, delay=2) async def mark_file_extracted( db: AsyncSession, file_id: UUID, ) -> None: stmt = ( update(CVFile) .where(CVFile.file_id == file_id) .values(is_extracted=True) ) await db.execute(stmt) await db.commit() @retry_db(retries=2, delay=2) async def create_cv_file( db: AsyncSession, *, user_id: str, filename: str, file_type: str, url: str, ): cv_file = CVFile( filename=filename, file_type=file_type, url=url, is_extracted=False, user_id=user_id, ) db.add(cv_file) await db.commit() await db.refresh(cv_file) return cv_file @retry_db(retries=2, delay=2) async def delete_file_by_filename( db: AsyncSession, filename: str, ) -> bool: stmt = delete(CVFile).where(CVFile.filename == filename) result = await db.execute(stmt) await db.commit() return result.rowcount > 0 @retry_db(retries=2, delay=2) async def set_file_deleted_by_filename( db: AsyncSession, filename: str, user_id: UUID, ) -> bool: stmt = ( update(CVFile) .where(CVFile.filename == filename, CVFile.user_id == user_id) .values(is_deleted=True) ) result = await db.execute(stmt) await db.commit() return result.rowcount > 0 @retry_db(retries=2, delay=2) async def get_file_by_user_id( db: AsyncSession, user_id: str, ) -> CVFile | None: result = await db.execute( select(CVFile).where(CVFile.user_id == user_id, CVFile.is_deleted == False) ) return result.scalars().all() @retry_db(retries=2, delay=2) async def get_file_by_filename( db: AsyncSession, filename: str, user_id: UUID, ) -> CVFile | None: result = await db.execute( select(CVFile) .where( CVFile.filename == filename, CVFile.user_id == user_id, ) ) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def count_files_by_user( db: AsyncSession, user_id: str, ) -> int: """Return number of CVFile rows belonging to a user.""" result = await db.execute( select(func.count(CVFile.file_id)).where(CVFile.user_id == user_id, CVFile.is_deleted == False, ) ) return int(result.scalar_one()) @retry_db(retries=2, delay=2) async def count_profiles_by_user( db: AsyncSession, user_id: str, ) -> int: """Return number of CVProfile rows that are associated with files of a user.""" result = await db.execute( select(func.count(CVProfile.profile_id)) .join(CVFile, CVProfile.file_id == CVFile.file_id) .where(CVFile.user_id == user_id, CVFile.is_deleted == False) ) return int(result.scalar_one()) @retry_db(retries=2, delay=2) async def mark_file_extracted(db: AsyncSession, file_id: UUID): await db.execute( update(CVFile) .where(CVFile.file_id == file_id) .values(is_extracted=True) ) # ========================= # PROFILE # ========================= # async def create_profile( # db: AsyncSession, # profile: CVProfile, # ) -> CVProfile: # db.add(profile) # await db.commit() # await db.refresh(profile) # return profile from services.models.data_model import AIProfile @retry_db(retries=2, delay=2) async def create_profile( db: AsyncSession, filename: str, file_id: str, profile: AIProfile, ) -> CVProfile: profile = CVProfile( fullname=profile.get("fullname"), gpa_edu_1=profile.get("gpa_edu_1", None), univ_edu_1=profile.get("univ_edu_1", None), major_edu_1=profile.get("major_edu_1", None), gpa_edu_2=profile.get("gpa_edu_2", None), univ_edu_2=profile.get("univ_edu_2", None), major_edu_2=profile.get("major_edu_2", None), gpa_edu_3=profile.get("gpa_edu_3", None), univ_edu_3=profile.get("univ_edu_3", None), major_edu_3=profile.get("major_edu_3", None), domicile=profile.get("domicile", None), yoe=profile.get("yoe", None), hardskills=profile.get("hardskills", []), softskills=profile.get("softskills", []), certifications=profile.get("certifications", []), business_domain=profile.get("business_domain", []), filename=filename, file_id=file_id, ) db.add(profile) await db.flush() return profile @retry_db(retries=2, delay=2) async def get_profile_by_filename( db: AsyncSession, filename: str, current_user: CVUser, ) -> Optional[CVProfile]: stmt = ( select(CVProfile) .join(CVFile, CVProfile.file_id == CVFile.file_id) .where(CVProfile.filename == filename, CVFile.user_id == current_user.user_id) ) result = await db.execute(stmt) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def get_profiles( db: AsyncSession, ) -> List[CVProfile]: stmt = select(CVProfile) result = await db.execute(stmt) return result.scalars().all() @retry_db(retries=2, delay=2) async def get_profiles_by_user_id( db: AsyncSession, current_user: CVUser, ) -> List[CVProfile]: stmt = ( select(CVProfile) .join(CVFile, CVProfile.file_id == CVFile.file_id) .where(CVFile.user_id == current_user.user_id) ) result = await db.execute(stmt) return result.scalars().all() from sqlalchemy import select, and_, func, cast, String from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.dialects.postgresql import ARRAY from typing import List @retry_db(retries=2, delay=2) async def get_profiles_by_criteria_id( db: AsyncSession, criteria_id: str, current_user: CVUser, ) -> List[CVProfile]: filter = await get_filter_by_id(db, criteria_id) if not filter: return [] conditions = [] # --- GPA --- if filter.gpa_edu_1 is not None: conditions.append(CVProfile.gpa_edu_1 >= filter.gpa_edu_1) if filter.gpa_edu_2 is not None: conditions.append(CVProfile.gpa_edu_2 >= filter.gpa_edu_2) if filter.gpa_edu_3 is not None: conditions.append(CVProfile.gpa_edu_3 >= filter.gpa_edu_3) # --- University (case-insensitive, any match in list) --- if filter.univ_edu_1: conditions.append( func.lower(CVProfile.univ_edu_1).in_( [v.lower() for v in filter.univ_edu_1] ) ) if filter.univ_edu_2: conditions.append( func.lower(CVProfile.univ_edu_2).in_( [v.lower() for v in filter.univ_edu_2] ) ) if filter.univ_edu_3: conditions.append( func.lower(CVProfile.univ_edu_3).in_( [v.lower() for v in filter.univ_edu_3] ) ) # --- Major (case-insensitive, any match in list) --- if filter.major_edu_1: conditions.append( func.lower(CVProfile.major_edu_1).in_( [v.lower() for v in filter.major_edu_1] ) ) if filter.major_edu_2: conditions.append( func.lower(CVProfile.major_edu_2).in_( [v.lower() for v in filter.major_edu_2] ) ) if filter.major_edu_3: conditions.append( func.lower(CVProfile.major_edu_3).in_( [v.lower() for v in filter.major_edu_3] ) ) # --- Others --- if filter.domicile: conditions.append( func.lower(CVProfile.domicile) == filter.domicile.lower() ) if filter.yoe is not None: conditions.append(CVProfile.yoe >= filter.yoe) # --- ARRAY fields (Postgres-safe cast) --- if filter.hardskills: conditions.append( CVProfile.hardskills.overlap( cast(filter.hardskills, ARRAY(String)) ) ) if filter.softskills: conditions.append( CVProfile.softskills.overlap( cast(filter.softskills, ARRAY(String)) ) ) if filter.certifications: conditions.append( CVProfile.certifications.overlap( cast(filter.certifications, ARRAY(String)) ) ) if filter.business_domain: conditions.append( CVProfile.business_domain.overlap( cast(filter.business_domain, ARRAY(String)) ) ) if not conditions: return [] # 🔐 Tenant-safe JOIN stmt = ( select(CVProfile) .join(CVFile, CVProfile.file_id == CVFile.file_id) .join(CVUser, CVFile.user_id == CVUser.user_id) .where( CVUser.tenant_id == current_user.tenant_id, *conditions ) ) compiled = stmt.compile( dialect=db.bind.dialect, compile_kwargs={"literal_binds": True} ) print(compiled) result = await db.execute(stmt) return result.scalars().all() # async def get_profile_by_id( # db: AsyncSession, # profile_id: str, # ) -> Optional[CVProfile]: # stmt = select(CVProfile).where(CVProfile.profile_id == profile_id) # result = await db.execute(stmt) # return result.scalar_one_or_none() # from typing import Tuple # async def get_profile_by_id( # db: AsyncSession, # profile_id: str, # ) -> Optional[Tuple[CVProfile, str]]: # stmt = ( # select(CVProfile, CVFile.url) # .join(CVFile, CVProfile.file_id == CVFile.file_id) # .where(CVProfile.profile_id == profile_id) # ) # result = await db.execute(stmt) # result = result.scalar_one_or_none() # if not result: # return None # return result @retry_db(retries=2, delay=2) async def get_profile_by_id( db: AsyncSession, profile_id: str, ) -> Optional[dict]: stmt = ( select(CVProfile, CVFile.url) .join(CVFile, CVProfile.file_id == CVFile.file_id) .where(CVProfile.profile_id == profile_id) ) result = await db.execute(stmt) row = result.first() if not row: return None profile, url = row # Convert SQLAlchemy model to dict safely profile_dict = { column.name: getattr(profile, column.name) for column in CVProfile.__table__.columns } # Add file URL profile_dict["url"] = url return profile_dict @retry_db(retries=2, delay=2) async def list_profiles( db: AsyncSession, limit: int = 50, offset: int = 0, ) -> List[CVProfile]: stmt = select(CVProfile).limit(limit).offset(offset) result = await db.execute(stmt) return result.scalars().all() # ========================= # FILTER & WEIGHT # ========================= from sqlalchemy import select, and_ from sqlalchemy.ext.asyncio import AsyncSession @retry_db(retries=2, delay=2) async def get_filter( db: AsyncSession, filter: CVFilter, ): conditions = [] # --- GPA --- if filter.gpa_edu_1 is not None: conditions.append(CVFilter.gpa_edu_1 >= filter.gpa_edu_1) if filter.gpa_edu_2 is not None: conditions.append(CVFilter.gpa_edu_2 >= filter.gpa_edu_2) if filter.gpa_edu_3 is not None: conditions.append(CVFilter.gpa_edu_3 >= filter.gpa_edu_3) # --- University --- if filter.univ_edu_1: conditions.append(CVFilter.univ_edu_1.overlap(filter.univ_edu_1)) if filter.univ_edu_2: conditions.append(CVFilter.univ_edu_2.overlap(filter.univ_edu_2)) if filter.univ_edu_3: conditions.append(CVFilter.univ_edu_3.overlap(filter.univ_edu_3)) # --- Major --- if filter.major_edu_1: conditions.append(CVFilter.major_edu_1.overlap(filter.major_edu_1)) if filter.major_edu_2: conditions.append(CVFilter.major_edu_2.overlap(filter.major_edu_2)) if filter.major_edu_3: conditions.append(CVFilter.major_edu_3.overlap(filter.major_edu_3)) # --- Others --- if filter.domicile: conditions.append(CVFilter.domicile == filter.domicile) if filter.yoe is not None: conditions.append(CVFilter.yoe >= filter.yoe) # --- ARRAY fields (exact match) --- if filter.hardskills: conditions.append(CVFilter.hardskills.overlap(filter.hardskills)) if filter.softskills: conditions.append(CVFilter.softskills.overlap(filter.softskills)) if filter.certifications: conditions.append(CVFilter.certifications.overlap(filter.certifications)) if filter.business_domain: conditions.append(CVFilter.business_domain.overlap(filter.business_domain)) # ⛔ Prevent full table scan if not conditions: return None stmt = select(CVFilter).where(and_(*conditions)) result = await db.execute(stmt) return result.scalar_one_or_none() from services.models.data_model import Criteria @retry_db(retries=2, delay=2) async def get_multi_filter( db: AsyncSession, filter: Criteria, ): conditions = [] list_conditions = [] # --- GPA (scalar: AND) --- if filter.get("gpa_edu_1") is not None: conditions.append(CVFilter.gpa_edu_1 >= filter.get("gpa_edu_1")) if filter.get("gpa_edu_2") is not None: conditions.append(CVFilter.gpa_edu_2 >= filter.get("gpa_edu_2")) if filter.get("gpa_edu_3") is not None: conditions.append(CVFilter.gpa_edu_3 >= filter.get("gpa_edu_3")) # --- Others (scalar: AND) --- if filter.get("domicile"): conditions.append(CVFilter.domicile == filter.get("domicile")) if filter.get("yoe") is not None: conditions.append(CVFilter.yoe >= filter.get("yoe")) # --- University (list: OR) --- if filter.get("univ_edu_1"): list_conditions.append(CVFilter.univ_edu_1.in_(filter.get("univ_edu_1"))) if filter.get("univ_edu_2"): list_conditions.append(CVFilter.univ_edu_2.in_(filter.get("univ_edu_2"))) if filter.get("univ_edu_3"): list_conditions.append(CVFilter.univ_edu_3.in_(filter.get("univ_edu_3"))) # --- Major (list: OR) --- if filter.get("major_edu_1"): list_conditions.append(CVFilter.major_edu_1.in_(filter.get("major_edu_1"))) if filter.get("major_edu_2"): list_conditions.append(CVFilter.major_edu_2.in_(filter.get("major_edu_2"))) if filter.get("major_edu_3"): list_conditions.append(CVFilter.major_edu_3.in_(filter.get("major_edu_3"))) # --- ARRAY fields (list: OR) --- if filter.get("hardskills"): list_conditions.append(CVFilter.hardskills.overlap(filter.get("hardskills"))) if filter.get("softskills"): list_conditions.append(CVFilter.softskills.overlap(filter.get("softskills"))) if filter.get("certifications"): list_conditions.append(CVFilter.certifications.overlap(filter.get("certifications"))) if filter.get("business_domain"): list_conditions.append(CVFilter.business_domain.overlap(filter.get("business_domain"))) if list_conditions: conditions.append(or_(*list_conditions)) # ⛔ Prevent full table scan if not conditions: return None stmt = select(CVFilter).where(and_(*conditions)) result = await db.execute(stmt) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def get_filter_by_id( db: AsyncSession, criteria_id: str, ) -> Optional[CVFilter]: stmt = select(CVFilter).where(CVFilter.criteria_id == criteria_id) result = await db.execute(stmt) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def create_filter( db: AsyncSession, filter: CVFilter, ): db.add(filter) await db.commit() return filter @retry_db(retries=2, delay=2) async def get_filter_and_weight( db: AsyncSession, criteria_id: str, ): stmt = ( select(CVFilter, CVWeight) .join(CVWeight, CVFilter.criteria_id == CVWeight.criteria_id) .where(CVFilter.criteria_id == criteria_id) ) result = await db.execute(stmt) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def get_weight_by_id( db: AsyncSession, weight_id: str, ): stmt = ( select(CVWeight) .where(CVWeight.weight_id == weight_id) ) result = await db.execute(stmt) return result.scalar_one_or_none() @retry_db(retries=2, delay=2) async def create_filter_and_weight( db: AsyncSession, cv_filter: CVFilter, cv_weight: CVWeight, ): db.add_all([cv_filter, cv_weight]) await db.commit() @retry_db(retries=2, delay=2) async def create_weight( db: AsyncSession, cv_weight: CVWeight, ) -> CVWeight: db.add(cv_weight) await db.commit() return cv_weight # ========================= # MATCHING # ========================= @retry_db(retries=2, delay=2) async def create_matching( db: AsyncSession, matching: CVMatching, ) -> CVMatching: db.add(matching) await db.commit() await db.refresh(matching) return matching @retry_db(retries=2, delay=2) async def create_matchings( db: AsyncSession, matchings: list, ) -> list[CVMatching]: try: orm_objects = [ CVMatching(**matching) for matching in matchings ] db.add_all(orm_objects) await db.commit() for obj in orm_objects: await db.refresh(obj) return orm_objects except Exception as E: print(f"Error creating matchings: {E}") raise @retry_db(retries=2, delay=2) async def get_matching_by_profile_and_criteria( db: AsyncSession, profile_id: str, criteria_id: str, ) -> Optional[CVMatching]: stmt = ( select(CVMatching) .where( and_( CVMatching.profile_id == profile_id, CVMatching.criteria_id == criteria_id, ) ) ) result = await db.execute(stmt) return result.scalar_one_or_none() # ========================= # SCORE # ========================= @retry_db(retries=2, delay=2) async def create_score( db: AsyncSession, score: CVScore, ) -> CVScore: db.add(score) await db.commit() await db.refresh(score) return score @retry_db(retries=2, delay=2) async def create_scores( db: AsyncSession, scores: list[CVScore], ) -> list[CVScore]: try: db.add_all(scores) await db.commit() for score in scores: await db.refresh(score) return scores except Exception as E: print(f"Error creating scores: {E}") raise @retry_db(retries=2, delay=2) async def get_scores_by_criteria( db: AsyncSession, criteria_id: str, ) -> List[CVScore]: stmt = ( select(CVScore) .join(CVMatching, CVScore.matching_id == CVMatching.matching_id) .where(CVMatching.criteria_id == criteria_id) .order_by(CVScore.score.desc()) ) result = await db.execute(stmt) return result.scalars().all()