| import uuid |
| import asyncio |
| import re |
| import json |
| from typing import Any |
| from sqlalchemy import select, func |
| from qdrant_client import QdrantClient |
| from qdrant_client.models import PointStruct |
|
|
| from .celery_app import celery_app |
| from ..config import get_settings |
| from ..database import AsyncSessionLocal |
| from ..models.candidate import Candidate |
| from ..models.session import Session |
| from ..models.jd import JobDescription |
| from ..ml.embedder import embed_texts, compute_text_hash |
| from ..ml.feature_builder import ( |
| build_candidate_text, |
| compute_growth_velocity, |
| compute_jd_quality, |
| parse_jd_requirements, |
| ) |
|
|
|
|
| def _get_qdrant() -> QdrantClient: |
| settings = get_settings() |
| return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key) |
|
|
|
|
| def _normalize_bool(val: Any) -> bool | None: |
| if val is None: |
| return None |
| if isinstance(val, bool): |
| return val |
| if isinstance(val, str): |
| return val.lower() in ("true", "yes", "1") |
| return bool(val) |
|
|
|
|
| def _normalize_float(val: Any) -> float | None: |
| try: |
| return float(val) if val is not None and str(val).strip() != "" else None |
| except (ValueError, TypeError): |
| return None |
|
|
|
|
| def _parse_list(val: Any) -> list: |
| if isinstance(val, list): |
| return val |
| if isinstance(val, str) and val.strip(): |
| return [x.strip() for x in re.split(r"[,;|]", val) if x.strip()] |
| return [] |
|
|
|
|
| def _run_async(coro): |
| loop = None |
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_closed(): |
| raise RuntimeError |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| return loop.run_until_complete(coro) |
|
|
| @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3) |
| def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict: |
| try: |
| return _run_async(_ingest_candidates_async(rows, session_id)) |
| except Exception as exc: |
| raise self.retry(exc=exc, countdown=30) |
|
|
|
|
| async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict: |
| settings = get_settings() |
| qdrant = _get_qdrant() |
| texts = [] |
| processed = [] |
| sess_uuid = uuid.UUID(session_id) if session_id else None |
|
|
| for row in rows: |
| work_exp = row.get("parsed_work_experience") or [] |
| if isinstance(work_exp, str): |
| try: |
| work_exp = json.loads(work_exp) |
| except Exception: |
| work_exp = [] |
|
|
| is_funded = _normalize_bool(row.get("most_recent_company_is_funded")) |
| velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded)) |
| candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp}) |
| text_hash = compute_text_hash(candidate_text) |
|
|
| processed.append({ |
| "row": row, "work_exp": work_exp, "velocity": velocity, |
| "text": candidate_text, "hash": text_hash, |
| }) |
| texts.append(candidate_text) |
|
|
| embeddings = embed_texts(texts) |
| |
| |
| actual_dim = embeddings.shape[1] |
| expected_dim = settings.vector_size |
| print(f"[INGEST] Generated {len(texts)} embeddings with dimension {actual_dim} (Expected: {expected_dim})") |
| |
| if actual_dim != expected_dim: |
| raise ValueError( |
| f"Vector size mismatch! Model '{settings.embedding_model}' produced dimension {actual_dim}, " |
| f"but Qdrant collection '{settings.collection_name}' expects {expected_dim}. " |
| f"Please check your EMBEDDING_MODEL and VECTOR_SIZE settings." |
| ) |
|
|
| async with AsyncSessionLocal() as session: |
| qdrant_points = [] |
| inserted = 0 |
|
|
| for i, p in enumerate(processed): |
| row = p["row"] |
| candidate_id = uuid.uuid4() |
| qdrant_id = str(candidate_id) |
|
|
| all_skills = ( |
| _parse_list(row.get("programming_languages")) |
| + _parse_list(row.get("backend_frameworks")) |
| + _parse_list(row.get("frontend_technologies")) |
| ) |
| if row.get("parsed_skills"): |
| all_skills.extend(_parse_list(row["parsed_skills"])) |
|
|
| cand = Candidate( |
| id=candidate_id, |
| session_id=sess_uuid, |
| external_id=str(row.get("id") or row.get("candidate_id") or ""), |
| name=row.get("name"), |
| email=row.get("email"), |
| looking_for=row.get("looking_for"), |
| currently_employed=_normalize_bool(row.get("currently_employed")), |
| notice_period=str(row.get("notice_period") or "")[:100] or None, |
| open_to_working_at=row.get("open_to_working_at"), |
| role_type=row.get("role_type"), |
| engineer_type=row.get("engineer_type"), |
| years_of_experience=_normalize_float(row.get("years_of_experience")), |
| programming_languages=_parse_list(row.get("programming_languages")), |
| backend_frameworks=_parse_list(row.get("backend_frameworks")), |
| frontend_technologies=_parse_list(row.get("frontend_technologies")), |
| gen_ai_experience=_normalize_bool(row.get("gen_ai_experience")), |
| recent_experience_type=row.get("recent_experience_type"), |
| education_status=row.get("education_status"), |
| degree=row.get("degree"), |
| parsed_summary=row.get("parsed_summary"), |
| parsed_skills=row.get("parsed_skills"), |
| parsed_work_experience=p["work_exp"], |
| most_recent_company=row.get("most_recent_company"), |
| most_recent_company_description=row.get("most_recent_company_description"), |
| most_recent_company_is_funded=_normalize_bool(row.get("most_recent_company_is_funded")), |
| most_recent_company_is_product_company=_normalize_bool(row.get("most_recent_company_is_product_company")), |
| most_recent_company_total_funding=_normalize_float(row.get("most_recent_company_total_funding")), |
| most_recent_company_funding_status=row.get("most_recent_company_funding_status"), |
| time_in_current_company=_normalize_float(row.get("time_in_current_company")), |
| is_actively_or_passively_looking=row.get("is_actively_or_passively_looking"), |
| growth_velocity=p["velocity"], |
| embedding_hash=p["hash"], |
| qdrant_id=qdrant_id, |
| ) |
| session.add(cand) |
|
|
| qdrant_points.append( |
| PointStruct( |
| id=qdrant_id, |
| vector=embeddings[i].tolist(), |
| payload={ |
| "candidate_id": str(candidate_id), |
| "session_id": session_id or "", |
| "role_type": row.get("role_type"), |
| "engineer_type": row.get("engineer_type"), |
| "years_of_experience": _normalize_float(row.get("years_of_experience")), |
| "looking_for": row.get("looking_for"), |
| "open_to_working_at": row.get("open_to_working_at"), |
| "skills": all_skills[:50], |
| "gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")), |
| "growth_velocity": p["velocity"], |
| "is_funded": _normalize_bool(row.get("most_recent_company_is_funded")), |
| "currently_employed": _normalize_bool(row.get("currently_employed")), |
| }, |
| ) |
| ) |
| inserted += 1 |
|
|
| await session.commit() |
|
|
| if qdrant_points: |
| qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points) |
|
|
| if sess_uuid: |
| result = await session.execute( |
| select(func.count()).select_from(Candidate).where(Candidate.session_id == sess_uuid) |
| ) |
| count = result.scalar() or 0 |
| sess_result = await session.execute(select(Session).where(Session.id == sess_uuid)) |
| sess_obj = sess_result.scalar_one_or_none() |
| if sess_obj: |
| sess_obj.candidate_count = count |
| await session.commit() |
|
|
| return {"inserted": inserted, "total": len(rows), "session_id": session_id} |
|
|
|
|
| @celery_app.task(bind=True, name="ingest_jd", max_retries=3) |
| def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict: |
| try: |
| return _run_async(_ingest_jd_async(jd_id, raw_text, title)) |
| except Exception as exc: |
| raise self.retry(exc=exc, countdown=30) |
|
|
|
|
| async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict: |
| parsed = parse_jd_requirements(raw_text) |
| jd_quality = compute_jd_quality(raw_text, parsed) |
|
|
| async with AsyncSessionLocal() as session: |
| result = await session.execute(select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id))) |
| jd = result.scalar_one_or_none() |
| if jd: |
| jd.parsed_requirements = parsed |
| jd.required_skills = parsed.get("required_skills", []) |
| jd.min_yoe = parsed.get("min_yoe") |
| jd.max_yoe = parsed.get("max_yoe") |
| jd.role_type = parsed.get("role_type") |
| jd.engineer_type = parsed.get("engineer_type") |
| jd.location = parsed.get("location") |
| jd.remote_allowed = parsed.get("remote_allowed") |
| jd.jd_quality = jd_quality |
| jd.status = "ready" |
| await session.commit() |
|
|
| return {"jd_id": jd_id, "status": "ready", "jd_quality": jd_quality} |
|
|