import uuid import asyncio import re import json from typing import Any from sqlalchemy import select, func from qdrant_client import QdrantClient from qdrant_client.models import PointStruct from .celery_app import celery_app from ..config import get_settings from ..database import AsyncSessionLocal from ..models.candidate import Candidate from ..models.session import Session from ..models.jd import JobDescription from ..ml.embedder import embed_texts, compute_text_hash from ..ml.feature_builder import ( build_candidate_text, compute_growth_velocity, compute_jd_quality, parse_jd_requirements, ) def _get_qdrant() -> QdrantClient: settings = get_settings() return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key) def _normalize_bool(val: Any) -> bool | None: if val is None: return None if isinstance(val, bool): return val if isinstance(val, str): return val.lower() in ("true", "yes", "1") return bool(val) def _normalize_float(val: Any) -> float | None: try: return float(val) if val is not None and str(val).strip() != "" else None except (ValueError, TypeError): return None def _parse_list(val: Any) -> list: if isinstance(val, list): return val if isinstance(val, str) and val.strip(): return [x.strip() for x in re.split(r"[,;|]", val) if x.strip()] return [] def _run_async(coro): loop = None try: loop = asyncio.get_event_loop() if loop.is_closed(): raise RuntimeError except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(coro) @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3) def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict: try: return _run_async(_ingest_candidates_async(rows, session_id)) except Exception as exc: raise self.retry(exc=exc, countdown=30) async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict: settings = get_settings() qdrant = _get_qdrant() texts = [] processed = [] sess_uuid = uuid.UUID(session_id) if session_id else None for row in rows: work_exp = row.get("parsed_work_experience") or [] if isinstance(work_exp, str): try: work_exp = json.loads(work_exp) except Exception: work_exp = [] is_funded = _normalize_bool(row.get("most_recent_company_is_funded")) velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded)) candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp}) text_hash = compute_text_hash(candidate_text) processed.append({ "row": row, "work_exp": work_exp, "velocity": velocity, "text": candidate_text, "hash": text_hash, }) texts.append(candidate_text) embeddings = embed_texts(texts) # Validation: Ensure generated embeddings match the configured vector_size for Qdrant actual_dim = embeddings.shape[1] expected_dim = settings.vector_size print(f"[INGEST] Generated {len(texts)} embeddings with dimension {actual_dim} (Expected: {expected_dim})") if actual_dim != expected_dim: raise ValueError( f"Vector size mismatch! Model '{settings.embedding_model}' produced dimension {actual_dim}, " f"but Qdrant collection '{settings.collection_name}' expects {expected_dim}. " f"Please check your EMBEDDING_MODEL and VECTOR_SIZE settings." ) async with AsyncSessionLocal() as session: qdrant_points = [] inserted = 0 for i, p in enumerate(processed): row = p["row"] candidate_id = uuid.uuid4() qdrant_id = str(candidate_id) all_skills = ( _parse_list(row.get("programming_languages")) + _parse_list(row.get("backend_frameworks")) + _parse_list(row.get("frontend_technologies")) ) if row.get("parsed_skills"): all_skills.extend(_parse_list(row["parsed_skills"])) cand = Candidate( id=candidate_id, session_id=sess_uuid, external_id=str(row.get("id") or row.get("candidate_id") or ""), name=row.get("name"), email=row.get("email"), looking_for=row.get("looking_for"), currently_employed=_normalize_bool(row.get("currently_employed")), notice_period=str(row.get("notice_period") or "")[:100] or None, open_to_working_at=row.get("open_to_working_at"), role_type=row.get("role_type"), engineer_type=row.get("engineer_type"), years_of_experience=_normalize_float(row.get("years_of_experience")), programming_languages=_parse_list(row.get("programming_languages")), backend_frameworks=_parse_list(row.get("backend_frameworks")), frontend_technologies=_parse_list(row.get("frontend_technologies")), gen_ai_experience=_normalize_bool(row.get("gen_ai_experience")), recent_experience_type=row.get("recent_experience_type"), education_status=row.get("education_status"), degree=row.get("degree"), parsed_summary=row.get("parsed_summary"), parsed_skills=row.get("parsed_skills"), parsed_work_experience=p["work_exp"], most_recent_company=row.get("most_recent_company"), most_recent_company_description=row.get("most_recent_company_description"), most_recent_company_is_funded=_normalize_bool(row.get("most_recent_company_is_funded")), most_recent_company_is_product_company=_normalize_bool(row.get("most_recent_company_is_product_company")), most_recent_company_total_funding=_normalize_float(row.get("most_recent_company_total_funding")), most_recent_company_funding_status=row.get("most_recent_company_funding_status"), time_in_current_company=_normalize_float(row.get("time_in_current_company")), is_actively_or_passively_looking=row.get("is_actively_or_passively_looking"), growth_velocity=p["velocity"], embedding_hash=p["hash"], qdrant_id=qdrant_id, ) session.add(cand) qdrant_points.append( PointStruct( id=qdrant_id, vector=embeddings[i].tolist(), payload={ "candidate_id": str(candidate_id), "session_id": session_id or "", "role_type": row.get("role_type"), "engineer_type": row.get("engineer_type"), "years_of_experience": _normalize_float(row.get("years_of_experience")), "looking_for": row.get("looking_for"), "open_to_working_at": row.get("open_to_working_at"), "skills": all_skills[:50], "gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")), "growth_velocity": p["velocity"], "is_funded": _normalize_bool(row.get("most_recent_company_is_funded")), "currently_employed": _normalize_bool(row.get("currently_employed")), }, ) ) inserted += 1 await session.commit() if qdrant_points: qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points) if sess_uuid: result = await session.execute( select(func.count()).select_from(Candidate).where(Candidate.session_id == sess_uuid) ) count = result.scalar() or 0 sess_result = await session.execute(select(Session).where(Session.id == sess_uuid)) sess_obj = sess_result.scalar_one_or_none() if sess_obj: sess_obj.candidate_count = count await session.commit() return {"inserted": inserted, "total": len(rows), "session_id": session_id} @celery_app.task(bind=True, name="ingest_jd", max_retries=3) def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict: try: return _run_async(_ingest_jd_async(jd_id, raw_text, title)) except Exception as exc: raise self.retry(exc=exc, countdown=30) async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict: parsed = parse_jd_requirements(raw_text) jd_quality = compute_jd_quality(raw_text, parsed) async with AsyncSessionLocal() as session: result = await session.execute(select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id))) jd = result.scalar_one_or_none() if jd: jd.parsed_requirements = parsed jd.required_skills = parsed.get("required_skills", []) jd.min_yoe = parsed.get("min_yoe") jd.max_yoe = parsed.get("max_yoe") jd.role_type = parsed.get("role_type") jd.engineer_type = parsed.get("engineer_type") jd.location = parsed.get("location") jd.remote_allowed = parsed.get("remote_allowed") jd.jd_quality = jd_quality jd.status = "ready" await session.commit() return {"jd_id": jd_id, "status": "ready", "jd_quality": jd_quality}