|
|
""" |
|
|
Database Query Router. |
|
|
|
|
|
Flexible endpoints for querying any table in the recruitment database. |
|
|
""" |
|
|
|
|
|
from typing import Any, Optional |
|
|
from uuid import UUID |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query |
|
|
from sqlalchemy import asc, desc |
|
|
from sqlalchemy.orm import joinedload |
|
|
|
|
|
from src.backend.api.schemas.database import ( |
|
|
TableName, |
|
|
SortOrder, |
|
|
QueryRequest, |
|
|
QueryResponse, |
|
|
SingleRecordResponse, |
|
|
) |
|
|
from src.backend.database.candidates.client import SessionLocal |
|
|
from src.backend.database.candidates.models import ( |
|
|
Candidate, |
|
|
CVScreeningResult, |
|
|
VoiceScreeningResult, |
|
|
InterviewScheduling, |
|
|
FinalDecision, |
|
|
) |
|
|
|
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TABLE_MAP = { |
|
|
TableName.candidates: Candidate, |
|
|
TableName.cv_screening_results: CVScreeningResult, |
|
|
TableName.voice_screening_results: VoiceScreeningResult, |
|
|
TableName.interview_scheduling: InterviewScheduling, |
|
|
TableName.final_decision: FinalDecision, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def model_to_dict(obj: Any, fields: Optional[list[str]] = None) -> dict[str, Any]: |
|
|
""" |
|
|
Convert a SQLAlchemy model instance to a dictionary. |
|
|
|
|
|
Args: |
|
|
obj: SQLAlchemy model instance |
|
|
fields: Optional list of fields to include. If None, includes all. |
|
|
|
|
|
Returns: |
|
|
Dictionary representation of the model |
|
|
""" |
|
|
if obj is None: |
|
|
return None |
|
|
|
|
|
result = {} |
|
|
for column in obj.__table__.columns: |
|
|
key = column.name |
|
|
if fields is None or key in fields: |
|
|
value = getattr(obj, key) |
|
|
|
|
|
if hasattr(value, 'hex'): |
|
|
value = str(value) |
|
|
elif hasattr(value, 'value'): |
|
|
value = value.value |
|
|
result[key] = value |
|
|
return result |
|
|
|
|
|
|
|
|
def serialize_relation(relation_data: Any, is_list: bool = True) -> Any: |
|
|
"""Serialize relationship data.""" |
|
|
if relation_data is None: |
|
|
return None |
|
|
if is_list: |
|
|
return [model_to_dict(item) for item in relation_data] |
|
|
return model_to_dict(relation_data) |
|
|
|
|
|
|
|
|
def apply_filters(query, model, filters: dict[str, Any]): |
|
|
""" |
|
|
Apply filters to a SQLAlchemy query. |
|
|
|
|
|
Supports: |
|
|
- Simple equality: {"field": "value"} |
|
|
- Comparison operators: {"field": {"$gt": 5, "$lte": 10}} |
|
|
- List membership: {"field": {"$in": [1, 2, 3]}} |
|
|
""" |
|
|
for field, value in filters.items(): |
|
|
if not hasattr(model, field): |
|
|
continue |
|
|
|
|
|
column = getattr(model, field) |
|
|
|
|
|
if isinstance(value, dict): |
|
|
|
|
|
for op, op_value in value.items(): |
|
|
if op == "$eq": |
|
|
query = query.filter(column == op_value) |
|
|
elif op == "$ne": |
|
|
query = query.filter(column != op_value) |
|
|
elif op == "$gt": |
|
|
query = query.filter(column > op_value) |
|
|
elif op == "$gte": |
|
|
query = query.filter(column >= op_value) |
|
|
elif op == "$lt": |
|
|
query = query.filter(column < op_value) |
|
|
elif op == "$lte": |
|
|
query = query.filter(column <= op_value) |
|
|
elif op == "$in": |
|
|
query = query.filter(column.in_(op_value)) |
|
|
elif op == "$nin": |
|
|
query = query.filter(~column.in_(op_value)) |
|
|
elif op == "$like": |
|
|
query = query.filter(column.like(op_value)) |
|
|
elif op == "$ilike": |
|
|
query = query.filter(column.ilike(op_value)) |
|
|
else: |
|
|
|
|
|
query = query.filter(column == value) |
|
|
|
|
|
return query |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/query", response_model=QueryResponse) |
|
|
async def query_table(request: QueryRequest) -> QueryResponse: |
|
|
""" |
|
|
Flexible query endpoint for any table. |
|
|
|
|
|
Supports filtering, field selection, pagination, and sorting. |
|
|
|
|
|
Example request body: |
|
|
```json |
|
|
{ |
|
|
"table": "candidates", |
|
|
"filters": {"status": "applied"}, |
|
|
"fields": ["id", "full_name", "email"], |
|
|
"limit": 10, |
|
|
"offset": 0, |
|
|
"sort_by": "created_at", |
|
|
"sort_order": "desc" |
|
|
} |
|
|
``` |
|
|
""" |
|
|
model = TABLE_MAP.get(request.table) |
|
|
if not model: |
|
|
raise HTTPException(status_code=400, detail=f"Unknown table: {request.table}") |
|
|
|
|
|
try: |
|
|
with SessionLocal() as session: |
|
|
|
|
|
query = session.query(model) |
|
|
|
|
|
|
|
|
if request.include_relations and request.table == TableName.candidates: |
|
|
query = query.options( |
|
|
joinedload(Candidate.cv_screening_results), |
|
|
joinedload(Candidate.voice_screening_results), |
|
|
joinedload(Candidate.interview_scheduling), |
|
|
joinedload(Candidate.final_decision), |
|
|
) |
|
|
|
|
|
|
|
|
if request.filters: |
|
|
query = apply_filters(query, model, request.filters) |
|
|
|
|
|
|
|
|
total_count = query.count() |
|
|
|
|
|
|
|
|
if request.sort_by and hasattr(model, request.sort_by): |
|
|
sort_column = getattr(model, request.sort_by) |
|
|
if request.sort_order == SortOrder.asc: |
|
|
query = query.order_by(asc(sort_column)) |
|
|
else: |
|
|
query = query.order_by(desc(sort_column)) |
|
|
|
|
|
|
|
|
query = query.offset(request.offset).limit(request.limit) |
|
|
|
|
|
|
|
|
results = query.all() |
|
|
|
|
|
|
|
|
data = [] |
|
|
for row in results: |
|
|
row_dict = model_to_dict(row, request.fields) |
|
|
|
|
|
|
|
|
if request.include_relations and request.table == TableName.candidates: |
|
|
row_dict["cv_screening_results"] = serialize_relation(row.cv_screening_results) |
|
|
row_dict["voice_screening_results"] = serialize_relation(row.voice_screening_results) |
|
|
row_dict["interview_scheduling"] = serialize_relation(row.interview_scheduling) |
|
|
row_dict["final_decision"] = serialize_relation(row.final_decision, is_list=False) |
|
|
|
|
|
data.append(row_dict) |
|
|
|
|
|
return QueryResponse( |
|
|
success=True, |
|
|
table=request.table.value, |
|
|
total_count=total_count, |
|
|
returned_count=len(data), |
|
|
offset=request.offset, |
|
|
data=data, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Query failed: {str(e)}") |
|
|
|
|
|
|
|
|
@router.get("/candidates", response_model=QueryResponse) |
|
|
async def list_candidates( |
|
|
status: Optional[str] = Query(default=None, description="Filter by status"), |
|
|
limit: int = Query(default=100, ge=1, le=1000, description="Max records"), |
|
|
offset: int = Query(default=0, ge=0, description="Offset for pagination"), |
|
|
include_relations: bool = Query(default=False, description="Include related screening data"), |
|
|
) -> QueryResponse: |
|
|
""" |
|
|
List all candidates with optional filtering. |
|
|
|
|
|
Convenience endpoint for the most common query. |
|
|
""" |
|
|
filters = {} |
|
|
if status: |
|
|
filters["status"] = status |
|
|
|
|
|
request = QueryRequest( |
|
|
table=TableName.candidates, |
|
|
filters=filters if filters else None, |
|
|
include_relations=include_relations, |
|
|
limit=limit, |
|
|
offset=offset, |
|
|
sort_by="created_at", |
|
|
sort_order=SortOrder.desc, |
|
|
) |
|
|
return await query_table(request) |
|
|
|
|
|
|
|
|
@router.get("/candidates/{candidate_id}", response_model=SingleRecordResponse) |
|
|
async def get_candidate( |
|
|
candidate_id: UUID, |
|
|
include_relations: bool = Query(default=True, description="Include related screening data"), |
|
|
) -> SingleRecordResponse: |
|
|
""" |
|
|
Get a single candidate by ID with all related data. |
|
|
""" |
|
|
try: |
|
|
with SessionLocal() as session: |
|
|
query = session.query(Candidate).filter(Candidate.id == candidate_id) |
|
|
|
|
|
if include_relations: |
|
|
query = query.options( |
|
|
joinedload(Candidate.cv_screening_results), |
|
|
joinedload(Candidate.voice_screening_results), |
|
|
joinedload(Candidate.interview_scheduling), |
|
|
joinedload(Candidate.final_decision), |
|
|
) |
|
|
|
|
|
candidate = query.first() |
|
|
|
|
|
if not candidate: |
|
|
return SingleRecordResponse( |
|
|
success=False, |
|
|
table="candidates", |
|
|
data=None, |
|
|
message=f"Candidate with ID {candidate_id} not found", |
|
|
) |
|
|
|
|
|
data = model_to_dict(candidate) |
|
|
|
|
|
if include_relations: |
|
|
data["cv_screening_results"] = serialize_relation(candidate.cv_screening_results) |
|
|
data["voice_screening_results"] = serialize_relation(candidate.voice_screening_results) |
|
|
data["interview_scheduling"] = serialize_relation(candidate.interview_scheduling) |
|
|
data["final_decision"] = serialize_relation(candidate.final_decision, is_list=False) |
|
|
|
|
|
return SingleRecordResponse( |
|
|
success=True, |
|
|
table="candidates", |
|
|
data=data, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to fetch candidate: {str(e)}") |
|
|
|
|
|
|
|
|
@router.get("/candidates/email/{email}", response_model=SingleRecordResponse) |
|
|
async def get_candidate_by_email( |
|
|
email: str, |
|
|
include_relations: bool = Query(default=True, description="Include related screening data"), |
|
|
) -> SingleRecordResponse: |
|
|
""" |
|
|
Get a candidate by email address. |
|
|
""" |
|
|
try: |
|
|
with SessionLocal() as session: |
|
|
query = session.query(Candidate).filter(Candidate.email == email) |
|
|
|
|
|
if include_relations: |
|
|
query = query.options( |
|
|
joinedload(Candidate.cv_screening_results), |
|
|
joinedload(Candidate.voice_screening_results), |
|
|
joinedload(Candidate.interview_scheduling), |
|
|
joinedload(Candidate.final_decision), |
|
|
) |
|
|
|
|
|
candidate = query.first() |
|
|
|
|
|
if not candidate: |
|
|
return SingleRecordResponse( |
|
|
success=False, |
|
|
table="candidates", |
|
|
data=None, |
|
|
message=f"Candidate with email '{email}' not found", |
|
|
) |
|
|
|
|
|
data = model_to_dict(candidate) |
|
|
|
|
|
if include_relations: |
|
|
data["cv_screening_results"] = serialize_relation(candidate.cv_screening_results) |
|
|
data["voice_screening_results"] = serialize_relation(candidate.voice_screening_results) |
|
|
data["interview_scheduling"] = serialize_relation(candidate.interview_scheduling) |
|
|
data["final_decision"] = serialize_relation(candidate.final_decision, is_list=False) |
|
|
|
|
|
return SingleRecordResponse( |
|
|
success=True, |
|
|
table="candidates", |
|
|
data=data, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to fetch candidate: {str(e)}") |
|
|
|
|
|
|
|
|
@router.get("/cv-screening", response_model=QueryResponse) |
|
|
async def list_cv_screenings( |
|
|
candidate_id: Optional[UUID] = Query(default=None, description="Filter by candidate ID"), |
|
|
min_score: Optional[float] = Query(default=None, ge=0, le=1, description="Minimum overall fit score"), |
|
|
limit: int = Query(default=100, ge=1, le=1000), |
|
|
offset: int = Query(default=0, ge=0), |
|
|
) -> QueryResponse: |
|
|
""" |
|
|
List CV screening results with optional filtering. |
|
|
""" |
|
|
filters = {} |
|
|
if candidate_id: |
|
|
filters["candidate_id"] = str(candidate_id) |
|
|
if min_score is not None: |
|
|
filters["overall_fit_score"] = {"$gte": min_score} |
|
|
|
|
|
request = QueryRequest( |
|
|
table=TableName.cv_screening_results, |
|
|
filters=filters if filters else None, |
|
|
limit=limit, |
|
|
offset=offset, |
|
|
sort_by="timestamp", |
|
|
sort_order=SortOrder.desc, |
|
|
) |
|
|
return await query_table(request) |
|
|
|
|
|
|
|
|
@router.get("/voice-screening", response_model=QueryResponse) |
|
|
async def list_voice_screenings( |
|
|
candidate_id: Optional[UUID] = Query(default=None, description="Filter by candidate ID"), |
|
|
limit: int = Query(default=100, ge=1, le=1000), |
|
|
offset: int = Query(default=0, ge=0), |
|
|
) -> QueryResponse: |
|
|
""" |
|
|
List voice screening results with optional filtering. |
|
|
""" |
|
|
filters = {} |
|
|
if candidate_id: |
|
|
filters["candidate_id"] = str(candidate_id) |
|
|
|
|
|
request = QueryRequest( |
|
|
table=TableName.voice_screening_results, |
|
|
filters=filters if filters else None, |
|
|
limit=limit, |
|
|
offset=offset, |
|
|
sort_by="timestamp", |
|
|
sort_order=SortOrder.desc, |
|
|
) |
|
|
return await query_table(request) |
|
|
|
|
|
|
|
|
@router.get("/interviews", response_model=QueryResponse) |
|
|
async def list_interviews( |
|
|
candidate_id: Optional[UUID] = Query(default=None, description="Filter by candidate ID"), |
|
|
status: Optional[str] = Query(default=None, description="Filter by interview status"), |
|
|
limit: int = Query(default=100, ge=1, le=1000), |
|
|
offset: int = Query(default=0, ge=0), |
|
|
) -> QueryResponse: |
|
|
""" |
|
|
List interview scheduling records with optional filtering. |
|
|
""" |
|
|
filters = {} |
|
|
if candidate_id: |
|
|
filters["candidate_id"] = str(candidate_id) |
|
|
if status: |
|
|
filters["status"] = status |
|
|
|
|
|
request = QueryRequest( |
|
|
table=TableName.interview_scheduling, |
|
|
filters=filters if filters else None, |
|
|
limit=limit, |
|
|
offset=offset, |
|
|
sort_by="start_time", |
|
|
sort_order=SortOrder.desc, |
|
|
) |
|
|
return await query_table(request) |
|
|
|
|
|
|
|
|
@router.get("/decisions", response_model=QueryResponse) |
|
|
async def list_decisions( |
|
|
decision: Optional[str] = Query(default=None, description="Filter by decision (e.g., 'hired', 'rejected')"), |
|
|
min_score: Optional[float] = Query(default=None, ge=0, le=1, description="Minimum overall score"), |
|
|
limit: int = Query(default=100, ge=1, le=1000), |
|
|
offset: int = Query(default=0, ge=0), |
|
|
) -> QueryResponse: |
|
|
""" |
|
|
List final decisions with optional filtering. |
|
|
""" |
|
|
filters = {} |
|
|
if decision: |
|
|
filters["decision"] = decision |
|
|
if min_score is not None: |
|
|
filters["overall_score"] = {"$gte": min_score} |
|
|
|
|
|
request = QueryRequest( |
|
|
table=TableName.final_decision, |
|
|
filters=filters if filters else None, |
|
|
limit=limit, |
|
|
offset=offset, |
|
|
sort_by="timestamp", |
|
|
sort_order=SortOrder.desc, |
|
|
) |
|
|
return await query_table(request) |
|
|
|
|
|
|
|
|
@router.get("/stats") |
|
|
async def get_database_stats() -> dict: |
|
|
""" |
|
|
Get summary statistics for all tables. |
|
|
""" |
|
|
try: |
|
|
with SessionLocal() as session: |
|
|
stats = { |
|
|
"candidates": { |
|
|
"total": session.query(Candidate).count(), |
|
|
}, |
|
|
"cv_screening_results": { |
|
|
"total": session.query(CVScreeningResult).count(), |
|
|
}, |
|
|
"voice_screening_results": { |
|
|
"total": session.query(VoiceScreeningResult).count(), |
|
|
}, |
|
|
"interview_scheduling": { |
|
|
"total": session.query(InterviewScheduling).count(), |
|
|
}, |
|
|
"final_decision": { |
|
|
"total": session.query(FinalDecision).count(), |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
from sqlalchemy import func |
|
|
status_counts = session.query( |
|
|
Candidate.status, func.count(Candidate.id) |
|
|
).group_by(Candidate.status).all() |
|
|
|
|
|
stats["candidates"]["by_status"] = { |
|
|
str(status.value) if hasattr(status, 'value') else str(status): count |
|
|
for status, count in status_counts |
|
|
} |
|
|
|
|
|
return {"success": True, "stats": stats} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to get stats: {str(e)}") |
|
|
|
|
|
|
|
|
@router.get("/health") |
|
|
async def database_health(): |
|
|
"""Health check for database router.""" |
|
|
try: |
|
|
with SessionLocal() as session: |
|
|
|
|
|
from sqlalchemy import text |
|
|
session.execute(text("SELECT 1")) |
|
|
return {"status": "healthy", "service": "database", "connection": "ok"} |
|
|
except Exception as e: |
|
|
return {"status": "unhealthy", "service": "database", "error": str(e)} |
|
|
|
|
|
|