ishaq101's picture
update query score card
f1f72c7
from typing import Optional, List
from uuid import UUID
from sqlalchemy import select, and_, or_, update, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from externals.databases.pg_models import (
CVUser,
CVTenant,
CVFile,
CVProfile,
CVFilter,
CVWeight,
CVMatching,
CVScore,
)
from utils.decorator import retry_db
# =========================
# USER
# =========================
from externals.databases.schemas.user import UserCreate, UserResponse
from utils.security import hash_password
@retry_db(retries=2, delay=2)
async def get_user_by_username(
db: AsyncSession,
username: str,
) -> CVUser | None:
result = await db.execute(
select(CVUser).where(CVUser.username == username)
)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def create_user(
db: AsyncSession,
user_in: UserCreate,
) -> CVUser:
# βœ… CONVERT Pydantic β†’ ORM
user = CVUser(
username=user_in.username,
hashed_password=hash_password(user_in.password),
email=user_in.email,
full_name=user_in.full_name,
role=user_in.role,
is_active=True,
tenant_id=user_in.tenant_id,
notes=user_in.notes,
)
db.add(user) # βœ… ORM
await db.commit()
await db.refresh(user)
return user
@retry_db(retries=2, delay=2)
async def deactivate_user(
db: AsyncSession,
username: str,
) -> CVUser | None:
result = await db.execute(
select(CVUser).where(CVUser.username == username)
)
user = result.scalar_one_or_none()
if not user:
return None
user.is_active = False
await db.commit()
await db.refresh(user)
return user
@retry_db(retries=2, delay=2)
async def get_user_by_email(db, email: str):
result = await db.execute(
select(CVUser).where(CVUser.email == email)
)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def get_user_by_id(db, user_id: str):
result = await db.execute(
select(CVUser).where(CVUser.user_id == user_id)
)
return result.scalar_one_or_none()
# =========================
# TENANT
# =========================
from externals.databases.schemas.tenant import TenantCreate
@retry_db(retries=2, delay=2)
async def get_tenant_by_name(
db: AsyncSession,
tenant_name: str,
) -> CVTenant | None:
result = await db.execute(
select(CVTenant).where(CVTenant.tenant_name == tenant_name)
)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def create_tenant(
db: AsyncSession,
tenant_in: TenantCreate,
) -> CVTenant:
# βœ… CONVERT Pydantic β†’ ORM
tenant = CVTenant(
tenant_name=tenant_in.tenant_name,
notes=tenant_in.notes,
)
db.add(tenant) # βœ… ORM
await db.commit()
await db.refresh(tenant)
return tenant
# =========================
# FILE
# =========================
@retry_db(retries=2, delay=2)
async def mark_file_extracted(
db: AsyncSession,
file_id: UUID,
) -> None:
stmt = (
update(CVFile)
.where(CVFile.file_id == file_id)
.values(is_extracted=True)
)
await db.execute(stmt)
await db.commit()
@retry_db(retries=2, delay=2)
async def create_cv_file(
db: AsyncSession,
*,
user_id: str,
filename: str,
file_type: str,
url: str,
):
cv_file = CVFile(
filename=filename,
file_type=file_type,
url=url,
is_extracted=False,
user_id=user_id,
)
db.add(cv_file)
await db.commit()
await db.refresh(cv_file)
return cv_file
@retry_db(retries=2, delay=2)
async def delete_file_by_filename(
db: AsyncSession,
filename: str,
) -> bool:
stmt = delete(CVFile).where(CVFile.filename == filename)
result = await db.execute(stmt)
await db.commit()
return result.rowcount > 0
@retry_db(retries=2, delay=2)
async def set_file_deleted_by_filename(
db: AsyncSession,
filename: str,
user_id: UUID,
) -> bool:
stmt = (
update(CVFile)
.where(CVFile.filename == filename, CVFile.user_id == user_id)
.values(is_deleted=True)
)
result = await db.execute(stmt)
await db.commit()
return result.rowcount > 0
@retry_db(retries=2, delay=2)
async def get_file_by_user_id(
db: AsyncSession,
user_id: str,
) -> CVFile | None:
result = await db.execute(
select(CVFile).where(CVFile.user_id == user_id,
CVFile.is_deleted == False)
)
return result.scalars().all()
@retry_db(retries=2, delay=2)
async def get_file_by_filename(
db: AsyncSession,
filename: str,
user_id: UUID,
) -> CVFile | None:
result = await db.execute(
select(CVFile)
.where(
CVFile.filename == filename,
CVFile.user_id == user_id,
)
)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def count_files_by_user(
db: AsyncSession,
user_id: str,
) -> int:
"""Return number of CVFile rows belonging to a user."""
result = await db.execute(
select(func.count(CVFile.file_id)).where(CVFile.user_id == user_id,
CVFile.is_deleted == False,
)
)
return int(result.scalar_one())
@retry_db(retries=2, delay=2)
async def count_profiles_by_user(
db: AsyncSession,
user_id: str,
) -> int:
"""Return number of CVProfile rows that are associated with files of a user."""
result = await db.execute(
select(func.count(CVProfile.profile_id))
.join(CVFile, CVProfile.file_id == CVFile.file_id)
.where(CVFile.user_id == user_id,
CVFile.is_deleted == False)
)
return int(result.scalar_one())
@retry_db(retries=2, delay=2)
async def mark_file_extracted(db: AsyncSession, file_id: UUID):
await db.execute(
update(CVFile)
.where(CVFile.file_id == file_id)
.values(is_extracted=True)
)
# =========================
# PROFILE
# =========================
# async def create_profile(
# db: AsyncSession,
# profile: CVProfile,
# ) -> CVProfile:
# db.add(profile)
# await db.commit()
# await db.refresh(profile)
# return profile
from services.models.data_model import AIProfile
@retry_db(retries=2, delay=2)
async def create_profile(
db: AsyncSession,
filename: str,
file_id: str,
profile: AIProfile,
) -> CVProfile:
profile = CVProfile(
fullname=profile.get("fullname"),
gpa_edu_1=profile.get("gpa_edu_1", None),
univ_edu_1=profile.get("univ_edu_1", None),
major_edu_1=profile.get("major_edu_1", None),
gpa_edu_2=profile.get("gpa_edu_2", None),
univ_edu_2=profile.get("univ_edu_2", None),
major_edu_2=profile.get("major_edu_2", None),
gpa_edu_3=profile.get("gpa_edu_3", None),
univ_edu_3=profile.get("univ_edu_3", None),
major_edu_3=profile.get("major_edu_3", None),
domicile=profile.get("domicile", None),
yoe=profile.get("yoe", None),
hardskills=profile.get("hardskills", []),
softskills=profile.get("softskills", []),
certifications=profile.get("certifications", []),
business_domain=profile.get("business_domain", []),
filename=filename,
file_id=file_id,
)
db.add(profile)
await db.flush()
return profile
@retry_db(retries=2, delay=2)
async def get_profile_by_filename(
db: AsyncSession,
filename: str,
current_user: CVUser,
) -> Optional[CVProfile]:
stmt = (
select(CVProfile)
.join(CVFile, CVProfile.file_id == CVFile.file_id)
.where(CVProfile.filename == filename, CVFile.user_id == current_user.user_id)
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def get_profiles(
db: AsyncSession,
) -> List[CVProfile]:
stmt = select(CVProfile)
result = await db.execute(stmt)
return result.scalars().all()
@retry_db(retries=2, delay=2)
async def get_profiles_by_user_id(
db: AsyncSession,
current_user: CVUser,
) -> List[CVProfile]:
stmt = (
select(CVProfile)
.join(CVFile, CVProfile.file_id == CVFile.file_id)
.where(CVFile.user_id == current_user.user_id)
)
result = await db.execute(stmt)
return result.scalars().all()
from sqlalchemy import select, and_, func, cast, String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import ARRAY
from typing import List
@retry_db(retries=2, delay=2)
async def get_profiles_by_criteria_id(
db: AsyncSession,
criteria_id: str,
current_user: CVUser,
) -> List[CVProfile]:
filter = await get_filter_by_id(db, criteria_id)
if not filter:
return []
conditions = []
# --- GPA ---
if filter.gpa_edu_1 is not None:
conditions.append(CVProfile.gpa_edu_1 >= filter.gpa_edu_1)
if filter.gpa_edu_2 is not None:
conditions.append(CVProfile.gpa_edu_2 >= filter.gpa_edu_2)
if filter.gpa_edu_3 is not None:
conditions.append(CVProfile.gpa_edu_3 >= filter.gpa_edu_3)
# --- University (case-insensitive, any match in list) ---
if filter.univ_edu_1:
conditions.append(
func.lower(CVProfile.univ_edu_1).in_(
[v.lower() for v in filter.univ_edu_1]
)
)
if filter.univ_edu_2:
conditions.append(
func.lower(CVProfile.univ_edu_2).in_(
[v.lower() for v in filter.univ_edu_2]
)
)
if filter.univ_edu_3:
conditions.append(
func.lower(CVProfile.univ_edu_3).in_(
[v.lower() for v in filter.univ_edu_3]
)
)
# --- Major (case-insensitive, any match in list) ---
if filter.major_edu_1:
conditions.append(
func.lower(CVProfile.major_edu_1).in_(
[v.lower() for v in filter.major_edu_1]
)
)
if filter.major_edu_2:
conditions.append(
func.lower(CVProfile.major_edu_2).in_(
[v.lower() for v in filter.major_edu_2]
)
)
if filter.major_edu_3:
conditions.append(
func.lower(CVProfile.major_edu_3).in_(
[v.lower() for v in filter.major_edu_3]
)
)
# --- Others ---
if filter.domicile:
conditions.append(
func.lower(CVProfile.domicile) ==
filter.domicile.lower()
)
if filter.yoe is not None:
conditions.append(CVProfile.yoe >= filter.yoe)
# --- ARRAY fields (Postgres-safe cast) ---
if filter.hardskills:
conditions.append(
CVProfile.hardskills.overlap(
cast(filter.hardskills, ARRAY(String))
)
)
if filter.softskills:
conditions.append(
CVProfile.softskills.overlap(
cast(filter.softskills, ARRAY(String))
)
)
if filter.certifications:
conditions.append(
CVProfile.certifications.overlap(
cast(filter.certifications, ARRAY(String))
)
)
if filter.business_domain:
conditions.append(
CVProfile.business_domain.overlap(
cast(filter.business_domain, ARRAY(String))
)
)
if not conditions:
return []
# πŸ” Tenant-safe JOIN
stmt = (
select(CVProfile)
.join(CVFile, CVProfile.file_id == CVFile.file_id)
.join(CVUser, CVFile.user_id == CVUser.user_id)
.where(
CVUser.tenant_id == current_user.tenant_id,
*conditions
)
)
compiled = stmt.compile(
dialect=db.bind.dialect,
compile_kwargs={"literal_binds": True}
)
print(compiled)
result = await db.execute(stmt)
return result.scalars().all()
# async def get_profile_by_id(
# db: AsyncSession,
# profile_id: str,
# ) -> Optional[CVProfile]:
# stmt = select(CVProfile).where(CVProfile.profile_id == profile_id)
# result = await db.execute(stmt)
# return result.scalar_one_or_none()
# from typing import Tuple
# async def get_profile_by_id(
# db: AsyncSession,
# profile_id: str,
# ) -> Optional[Tuple[CVProfile, str]]:
# stmt = (
# select(CVProfile, CVFile.url)
# .join(CVFile, CVProfile.file_id == CVFile.file_id)
# .where(CVProfile.profile_id == profile_id)
# )
# result = await db.execute(stmt)
# result = result.scalar_one_or_none()
# if not result:
# return None
# return result
@retry_db(retries=2, delay=2)
async def get_profile_by_id(
db: AsyncSession,
profile_id: str,
) -> Optional[dict]:
stmt = (
select(CVProfile, CVFile.url)
.join(CVFile, CVProfile.file_id == CVFile.file_id)
.where(CVProfile.profile_id == profile_id)
)
result = await db.execute(stmt)
row = result.first()
if not row:
return None
profile, url = row
# Convert SQLAlchemy model to dict safely
profile_dict = {
column.name: getattr(profile, column.name)
for column in CVProfile.__table__.columns
}
# Add file URL
profile_dict["url"] = url
return profile_dict
@retry_db(retries=2, delay=2)
async def list_profiles(
db: AsyncSession,
limit: int = 50,
offset: int = 0,
) -> List[CVProfile]:
stmt = select(CVProfile).limit(limit).offset(offset)
result = await db.execute(stmt)
return result.scalars().all()
# =========================
# FILTER & WEIGHT
# =========================
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
@retry_db(retries=2, delay=2)
async def get_filter(
db: AsyncSession,
filter: CVFilter,
):
conditions = []
# --- GPA ---
if filter.gpa_edu_1 is not None:
conditions.append(CVFilter.gpa_edu_1 >= filter.gpa_edu_1)
if filter.gpa_edu_2 is not None:
conditions.append(CVFilter.gpa_edu_2 >= filter.gpa_edu_2)
if filter.gpa_edu_3 is not None:
conditions.append(CVFilter.gpa_edu_3 >= filter.gpa_edu_3)
# --- University ---
if filter.univ_edu_1:
conditions.append(CVFilter.univ_edu_1.overlap(filter.univ_edu_1))
if filter.univ_edu_2:
conditions.append(CVFilter.univ_edu_2.overlap(filter.univ_edu_2))
if filter.univ_edu_3:
conditions.append(CVFilter.univ_edu_3.overlap(filter.univ_edu_3))
# --- Major ---
if filter.major_edu_1:
conditions.append(CVFilter.major_edu_1.overlap(filter.major_edu_1))
if filter.major_edu_2:
conditions.append(CVFilter.major_edu_2.overlap(filter.major_edu_2))
if filter.major_edu_3:
conditions.append(CVFilter.major_edu_3.overlap(filter.major_edu_3))
# --- Others ---
if filter.domicile:
conditions.append(CVFilter.domicile == filter.domicile)
if filter.yoe is not None:
conditions.append(CVFilter.yoe >= filter.yoe)
# --- ARRAY fields (exact match) ---
if filter.hardskills:
conditions.append(CVFilter.hardskills.overlap(filter.hardskills))
if filter.softskills:
conditions.append(CVFilter.softskills.overlap(filter.softskills))
if filter.certifications:
conditions.append(CVFilter.certifications.overlap(filter.certifications))
if filter.business_domain:
conditions.append(CVFilter.business_domain.overlap(filter.business_domain))
# β›” Prevent full table scan
if not conditions:
return None
stmt = select(CVFilter).where(and_(*conditions))
result = await db.execute(stmt)
return result.scalar_one_or_none()
from services.models.data_model import Criteria
@retry_db(retries=2, delay=2)
async def get_multi_filter(
db: AsyncSession,
filter: Criteria,
):
conditions = []
list_conditions = []
# --- GPA (scalar: AND) ---
if filter.get("gpa_edu_1") is not None:
conditions.append(CVFilter.gpa_edu_1 >= filter.get("gpa_edu_1"))
if filter.get("gpa_edu_2") is not None:
conditions.append(CVFilter.gpa_edu_2 >= filter.get("gpa_edu_2"))
if filter.get("gpa_edu_3") is not None:
conditions.append(CVFilter.gpa_edu_3 >= filter.get("gpa_edu_3"))
# --- Others (scalar: AND) ---
if filter.get("domicile"):
conditions.append(CVFilter.domicile == filter.get("domicile"))
if filter.get("yoe") is not None:
conditions.append(CVFilter.yoe >= filter.get("yoe"))
# --- University (list: OR) ---
if filter.get("univ_edu_1"):
list_conditions.append(CVFilter.univ_edu_1.in_(filter.get("univ_edu_1")))
if filter.get("univ_edu_2"):
list_conditions.append(CVFilter.univ_edu_2.in_(filter.get("univ_edu_2")))
if filter.get("univ_edu_3"):
list_conditions.append(CVFilter.univ_edu_3.in_(filter.get("univ_edu_3")))
# --- Major (list: OR) ---
if filter.get("major_edu_1"):
list_conditions.append(CVFilter.major_edu_1.in_(filter.get("major_edu_1")))
if filter.get("major_edu_2"):
list_conditions.append(CVFilter.major_edu_2.in_(filter.get("major_edu_2")))
if filter.get("major_edu_3"):
list_conditions.append(CVFilter.major_edu_3.in_(filter.get("major_edu_3")))
# --- ARRAY fields (list: OR) ---
if filter.get("hardskills"):
list_conditions.append(CVFilter.hardskills.overlap(filter.get("hardskills")))
if filter.get("softskills"):
list_conditions.append(CVFilter.softskills.overlap(filter.get("softskills")))
if filter.get("certifications"):
list_conditions.append(CVFilter.certifications.overlap(filter.get("certifications")))
if filter.get("business_domain"):
list_conditions.append(CVFilter.business_domain.overlap(filter.get("business_domain")))
if list_conditions:
conditions.append(or_(*list_conditions))
# β›” Prevent full table scan
if not conditions:
return None
stmt = select(CVFilter).where(and_(*conditions))
result = await db.execute(stmt)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def get_filter_by_id(
db: AsyncSession,
criteria_id: str,
) -> Optional[CVFilter]:
stmt = select(CVFilter).where(CVFilter.criteria_id == criteria_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def create_filter(
db: AsyncSession,
filter: CVFilter,
):
db.add(filter)
await db.commit()
return filter
@retry_db(retries=2, delay=2)
async def get_filter_and_weight(
db: AsyncSession,
criteria_id: str,
):
stmt = (
select(CVFilter, CVWeight)
.join(CVWeight, CVFilter.criteria_id == CVWeight.criteria_id)
.where(CVFilter.criteria_id == criteria_id)
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def get_weight_by_id(
db: AsyncSession,
weight_id: str,
):
stmt = (
select(CVWeight)
.where(CVWeight.weight_id == weight_id)
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
@retry_db(retries=2, delay=2)
async def create_filter_and_weight(
db: AsyncSession,
cv_filter: CVFilter,
cv_weight: CVWeight,
):
db.add_all([cv_filter, cv_weight])
await db.commit()
@retry_db(retries=2, delay=2)
async def create_weight(
db: AsyncSession,
cv_weight: CVWeight,
) -> CVWeight:
db.add(cv_weight)
await db.commit()
return cv_weight
# =========================
# MATCHING
# =========================
@retry_db(retries=2, delay=2)
async def create_matching(
db: AsyncSession,
matching: CVMatching,
) -> CVMatching:
db.add(matching)
await db.commit()
await db.refresh(matching)
return matching
@retry_db(retries=2, delay=2)
async def create_matchings(
db: AsyncSession,
matchings: list,
) -> list[CVMatching]:
try:
orm_objects = [
CVMatching(**matching)
for matching in matchings
]
db.add_all(orm_objects)
await db.commit()
for obj in orm_objects:
await db.refresh(obj)
return orm_objects
except Exception as E:
print(f"Error creating matchings: {E}")
raise
@retry_db(retries=2, delay=2)
async def get_matching_by_profile_and_criteria(
db: AsyncSession,
profile_id: str,
criteria_id: str,
) -> Optional[CVMatching]:
stmt = (
select(CVMatching)
.where(
and_(
CVMatching.profile_id == profile_id,
CVMatching.criteria_id == criteria_id,
)
)
)
result = await db.execute(stmt)
return result.scalar_one_or_none()
# =========================
# SCORE
# =========================
@retry_db(retries=2, delay=2)
async def create_score(
db: AsyncSession,
score: CVScore,
) -> CVScore:
db.add(score)
await db.commit()
await db.refresh(score)
return score
@retry_db(retries=2, delay=2)
async def create_scores(
db: AsyncSession,
scores: list[CVScore],
) -> list[CVScore]:
try:
db.add_all(scores)
await db.commit()
for score in scores:
await db.refresh(score)
return scores
except Exception as E:
print(f"Error creating scores: {E}")
raise
@retry_db(retries=2, delay=2)
async def get_scores_by_criteria(
db: AsyncSession,
criteria_id: str,
) -> List[CVScore]:
stmt = (
select(CVScore)
.join(CVMatching, CVScore.matching_id == CVMatching.matching_id)
.where(CVMatching.criteria_id == criteria_id)
.order_by(CVScore.score.desc())
)
result = await db.execute(stmt)
return result.scalars().all()