coderound / backend /src /workers /ingest.py
ketannnn's picture
feat: add frontend system health diagnostics and mute HF Celery gossip
27cbb3d
import uuid
import asyncio
import re
import json
from typing import Any
from sqlalchemy import select, func
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from .celery_app import celery_app
from ..config import get_settings
from ..database import AsyncSessionLocal
from ..models.candidate import Candidate
from ..models.session import Session
from ..models.jd import JobDescription
from ..ml.embedder import embed_texts, compute_text_hash
from ..ml.feature_builder import (
build_candidate_text,
compute_growth_velocity,
compute_jd_quality,
parse_jd_requirements,
)
def _get_qdrant() -> QdrantClient:
settings = get_settings()
return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
def _normalize_bool(val: Any) -> bool | None:
if val is None:
return None
if isinstance(val, bool):
return val
if isinstance(val, str):
return val.lower() in ("true", "yes", "1")
return bool(val)
def _normalize_float(val: Any) -> float | None:
try:
return float(val) if val is not None and str(val).strip() != "" else None
except (ValueError, TypeError):
return None
def _parse_list(val: Any) -> list:
if isinstance(val, list):
return val
if isinstance(val, str) and val.strip():
return [x.strip() for x in re.split(r"[,;|]", val) if x.strip()]
return []
def _run_async(coro):
loop = None
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
@celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict:
try:
return _run_async(_ingest_candidates_async(rows, session_id))
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict:
settings = get_settings()
qdrant = _get_qdrant()
texts = []
processed = []
sess_uuid = uuid.UUID(session_id) if session_id else None
for row in rows:
work_exp = row.get("parsed_work_experience") or []
if isinstance(work_exp, str):
try:
work_exp = json.loads(work_exp)
except Exception:
work_exp = []
is_funded = _normalize_bool(row.get("most_recent_company_is_funded"))
velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded))
candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp})
text_hash = compute_text_hash(candidate_text)
processed.append({
"row": row, "work_exp": work_exp, "velocity": velocity,
"text": candidate_text, "hash": text_hash,
})
texts.append(candidate_text)
embeddings = embed_texts(texts)
# Validation: Ensure generated embeddings match the configured vector_size for Qdrant
actual_dim = embeddings.shape[1]
expected_dim = settings.vector_size
print(f"[INGEST] Generated {len(texts)} embeddings with dimension {actual_dim} (Expected: {expected_dim})")
if actual_dim != expected_dim:
raise ValueError(
f"Vector size mismatch! Model '{settings.embedding_model}' produced dimension {actual_dim}, "
f"but Qdrant collection '{settings.collection_name}' expects {expected_dim}. "
f"Please check your EMBEDDING_MODEL and VECTOR_SIZE settings."
)
async with AsyncSessionLocal() as session:
qdrant_points = []
inserted = 0
for i, p in enumerate(processed):
row = p["row"]
candidate_id = uuid.uuid4()
qdrant_id = str(candidate_id)
all_skills = (
_parse_list(row.get("programming_languages"))
+ _parse_list(row.get("backend_frameworks"))
+ _parse_list(row.get("frontend_technologies"))
)
if row.get("parsed_skills"):
all_skills.extend(_parse_list(row["parsed_skills"]))
cand = Candidate(
id=candidate_id,
session_id=sess_uuid,
external_id=str(row.get("id") or row.get("candidate_id") or ""),
name=row.get("name"),
email=row.get("email"),
looking_for=row.get("looking_for"),
currently_employed=_normalize_bool(row.get("currently_employed")),
notice_period=str(row.get("notice_period") or "")[:100] or None,
open_to_working_at=row.get("open_to_working_at"),
role_type=row.get("role_type"),
engineer_type=row.get("engineer_type"),
years_of_experience=_normalize_float(row.get("years_of_experience")),
programming_languages=_parse_list(row.get("programming_languages")),
backend_frameworks=_parse_list(row.get("backend_frameworks")),
frontend_technologies=_parse_list(row.get("frontend_technologies")),
gen_ai_experience=_normalize_bool(row.get("gen_ai_experience")),
recent_experience_type=row.get("recent_experience_type"),
education_status=row.get("education_status"),
degree=row.get("degree"),
parsed_summary=row.get("parsed_summary"),
parsed_skills=row.get("parsed_skills"),
parsed_work_experience=p["work_exp"],
most_recent_company=row.get("most_recent_company"),
most_recent_company_description=row.get("most_recent_company_description"),
most_recent_company_is_funded=_normalize_bool(row.get("most_recent_company_is_funded")),
most_recent_company_is_product_company=_normalize_bool(row.get("most_recent_company_is_product_company")),
most_recent_company_total_funding=_normalize_float(row.get("most_recent_company_total_funding")),
most_recent_company_funding_status=row.get("most_recent_company_funding_status"),
time_in_current_company=_normalize_float(row.get("time_in_current_company")),
is_actively_or_passively_looking=row.get("is_actively_or_passively_looking"),
growth_velocity=p["velocity"],
embedding_hash=p["hash"],
qdrant_id=qdrant_id,
)
session.add(cand)
qdrant_points.append(
PointStruct(
id=qdrant_id,
vector=embeddings[i].tolist(),
payload={
"candidate_id": str(candidate_id),
"session_id": session_id or "",
"role_type": row.get("role_type"),
"engineer_type": row.get("engineer_type"),
"years_of_experience": _normalize_float(row.get("years_of_experience")),
"looking_for": row.get("looking_for"),
"open_to_working_at": row.get("open_to_working_at"),
"skills": all_skills[:50],
"gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")),
"growth_velocity": p["velocity"],
"is_funded": _normalize_bool(row.get("most_recent_company_is_funded")),
"currently_employed": _normalize_bool(row.get("currently_employed")),
},
)
)
inserted += 1
await session.commit()
if qdrant_points:
qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)
if sess_uuid:
result = await session.execute(
select(func.count()).select_from(Candidate).where(Candidate.session_id == sess_uuid)
)
count = result.scalar() or 0
sess_result = await session.execute(select(Session).where(Session.id == sess_uuid))
sess_obj = sess_result.scalar_one_or_none()
if sess_obj:
sess_obj.candidate_count = count
await session.commit()
return {"inserted": inserted, "total": len(rows), "session_id": session_id}
@celery_app.task(bind=True, name="ingest_jd", max_retries=3)
def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict:
try:
return _run_async(_ingest_jd_async(jd_id, raw_text, title))
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict:
parsed = parse_jd_requirements(raw_text)
jd_quality = compute_jd_quality(raw_text, parsed)
async with AsyncSessionLocal() as session:
result = await session.execute(select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id)))
jd = result.scalar_one_or_none()
if jd:
jd.parsed_requirements = parsed
jd.required_skills = parsed.get("required_skills", [])
jd.min_yoe = parsed.get("min_yoe")
jd.max_yoe = parsed.get("max_yoe")
jd.role_type = parsed.get("role_type")
jd.engineer_type = parsed.get("engineer_type")
jd.location = parsed.get("location")
jd.remote_allowed = parsed.get("remote_allowed")
jd.jd_quality = jd_quality
jd.status = "ready"
await session.commit()
return {"jd_id": jd_id, "status": "ready", "jd_quality": jd_quality}