File size: 9,779 Bytes
3330b0f
 
 
5655f74
3330b0f
5655f74
96fe8d8
 
3330b0f
 
 
 
 
5655f74
3330b0f
5655f74
3330b0f
 
 
 
 
 
 
 
96fe8d8
3330b0f
96fe8d8
3330b0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b7d367d
 
 
 
 
 
 
 
 
 
 
 
3330b0f
5655f74
3330b0f
b7d367d
3330b0f
 
 
 
5655f74
3330b0f
96fe8d8
3330b0f
 
5655f74
3330b0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5655f74
 
3330b0f
 
 
 
27cbb3d
 
 
 
 
 
 
 
 
 
 
 
3330b0f
 
96fe8d8
3330b0f
 
 
 
 
 
 
5655f74
 
 
 
 
 
 
 
3330b0f
 
5655f74
3330b0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96fe8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3330b0f
 
 
 
96fe8d8
 
3330b0f
5655f74
 
 
 
 
 
 
 
 
 
 
 
3330b0f
 
 
 
 
b7d367d
3330b0f
 
 
 
 
 
 
 
 
5655f74
3330b0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import uuid
import asyncio
import re
import json
from typing import Any
from sqlalchemy import select, func
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

from .celery_app import celery_app
from ..config import get_settings
from ..database import AsyncSessionLocal
from ..models.candidate import Candidate
from ..models.session import Session
from ..models.jd import JobDescription
from ..ml.embedder import embed_texts, compute_text_hash
from ..ml.feature_builder import (
    build_candidate_text,
    compute_growth_velocity,
    compute_jd_quality,
    parse_jd_requirements,
)


def _get_qdrant() -> QdrantClient:
    settings = get_settings()
    return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)


def _normalize_bool(val: Any) -> bool | None:
    if val is None:
        return None
    if isinstance(val, bool):
        return val
    if isinstance(val, str):
        return val.lower() in ("true", "yes", "1")
    return bool(val)


def _normalize_float(val: Any) -> float | None:
    try:
        return float(val) if val is not None and str(val).strip() != "" else None
    except (ValueError, TypeError):
        return None


def _parse_list(val: Any) -> list:
    if isinstance(val, list):
        return val
    if isinstance(val, str) and val.strip():
        return [x.strip() for x in re.split(r"[,;|]", val) if x.strip()]
    return []


def _run_async(coro):
    loop = None
    try:
        loop = asyncio.get_event_loop()
        if loop.is_closed():
            raise RuntimeError
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
    return loop.run_until_complete(coro)

@celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict:
    try:
        return _run_async(_ingest_candidates_async(rows, session_id))
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)


async def _ingest_candidates_async(rows: list[dict], session_id: str | None) -> dict:
    settings = get_settings()
    qdrant = _get_qdrant()
    texts = []
    processed = []
    sess_uuid = uuid.UUID(session_id) if session_id else None

    for row in rows:
        work_exp = row.get("parsed_work_experience") or []
        if isinstance(work_exp, str):
            try:
                work_exp = json.loads(work_exp)
            except Exception:
                work_exp = []

        is_funded = _normalize_bool(row.get("most_recent_company_is_funded"))
        velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded))
        candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp})
        text_hash = compute_text_hash(candidate_text)

        processed.append({
            "row": row, "work_exp": work_exp, "velocity": velocity,
            "text": candidate_text, "hash": text_hash,
        })
        texts.append(candidate_text)

    embeddings = embed_texts(texts)
    
    # Validation: Ensure generated embeddings match the configured vector_size for Qdrant
    actual_dim = embeddings.shape[1]
    expected_dim = settings.vector_size
    print(f"[INGEST] Generated {len(texts)} embeddings with dimension {actual_dim} (Expected: {expected_dim})")
    
    if actual_dim != expected_dim:
        raise ValueError(
            f"Vector size mismatch! Model '{settings.embedding_model}' produced dimension {actual_dim}, "
            f"but Qdrant collection '{settings.collection_name}' expects {expected_dim}. "
            f"Please check your EMBEDDING_MODEL and VECTOR_SIZE settings."
        )

    async with AsyncSessionLocal() as session:
        qdrant_points = []
        inserted = 0

        for i, p in enumerate(processed):
            row = p["row"]
            candidate_id = uuid.uuid4()
            qdrant_id = str(candidate_id)

            all_skills = (
                _parse_list(row.get("programming_languages"))
                + _parse_list(row.get("backend_frameworks"))
                + _parse_list(row.get("frontend_technologies"))
            )
            if row.get("parsed_skills"):
                all_skills.extend(_parse_list(row["parsed_skills"]))

            cand = Candidate(
                id=candidate_id,
                session_id=sess_uuid,
                external_id=str(row.get("id") or row.get("candidate_id") or ""),
                name=row.get("name"),
                email=row.get("email"),
                looking_for=row.get("looking_for"),
                currently_employed=_normalize_bool(row.get("currently_employed")),
                notice_period=str(row.get("notice_period") or "")[:100] or None,
                open_to_working_at=row.get("open_to_working_at"),
                role_type=row.get("role_type"),
                engineer_type=row.get("engineer_type"),
                years_of_experience=_normalize_float(row.get("years_of_experience")),
                programming_languages=_parse_list(row.get("programming_languages")),
                backend_frameworks=_parse_list(row.get("backend_frameworks")),
                frontend_technologies=_parse_list(row.get("frontend_technologies")),
                gen_ai_experience=_normalize_bool(row.get("gen_ai_experience")),
                recent_experience_type=row.get("recent_experience_type"),
                education_status=row.get("education_status"),
                degree=row.get("degree"),
                parsed_summary=row.get("parsed_summary"),
                parsed_skills=row.get("parsed_skills"),
                parsed_work_experience=p["work_exp"],
                most_recent_company=row.get("most_recent_company"),
                most_recent_company_description=row.get("most_recent_company_description"),
                most_recent_company_is_funded=_normalize_bool(row.get("most_recent_company_is_funded")),
                most_recent_company_is_product_company=_normalize_bool(row.get("most_recent_company_is_product_company")),
                most_recent_company_total_funding=_normalize_float(row.get("most_recent_company_total_funding")),
                most_recent_company_funding_status=row.get("most_recent_company_funding_status"),
                time_in_current_company=_normalize_float(row.get("time_in_current_company")),
                is_actively_or_passively_looking=row.get("is_actively_or_passively_looking"),
                growth_velocity=p["velocity"],
                embedding_hash=p["hash"],
                qdrant_id=qdrant_id,
            )
            session.add(cand)

            qdrant_points.append(
                PointStruct(
                    id=qdrant_id,
                    vector=embeddings[i].tolist(),
                    payload={
                        "candidate_id": str(candidate_id),
                        "session_id": session_id or "",
                        "role_type": row.get("role_type"),
                        "engineer_type": row.get("engineer_type"),
                        "years_of_experience": _normalize_float(row.get("years_of_experience")),
                        "looking_for": row.get("looking_for"),
                        "open_to_working_at": row.get("open_to_working_at"),
                        "skills": all_skills[:50],
                        "gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")),
                        "growth_velocity": p["velocity"],
                        "is_funded": _normalize_bool(row.get("most_recent_company_is_funded")),
                        "currently_employed": _normalize_bool(row.get("currently_employed")),
                    },
                )
            )
            inserted += 1

        await session.commit()

        if qdrant_points:
            qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)

        if sess_uuid:
            result = await session.execute(
                select(func.count()).select_from(Candidate).where(Candidate.session_id == sess_uuid)
            )
            count = result.scalar() or 0
            sess_result = await session.execute(select(Session).where(Session.id == sess_uuid))
            sess_obj = sess_result.scalar_one_or_none()
            if sess_obj:
                sess_obj.candidate_count = count
                await session.commit()

    return {"inserted": inserted, "total": len(rows), "session_id": session_id}


@celery_app.task(bind=True, name="ingest_jd", max_retries=3)
def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict:
    try:
        return _run_async(_ingest_jd_async(jd_id, raw_text, title))
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)


async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict:
    parsed = parse_jd_requirements(raw_text)
    jd_quality = compute_jd_quality(raw_text, parsed)

    async with AsyncSessionLocal() as session:
        result = await session.execute(select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id)))
        jd = result.scalar_one_or_none()
        if jd:
            jd.parsed_requirements = parsed
            jd.required_skills = parsed.get("required_skills", [])
            jd.min_yoe = parsed.get("min_yoe")
            jd.max_yoe = parsed.get("max_yoe")
            jd.role_type = parsed.get("role_type")
            jd.engineer_type = parsed.get("engineer_type")
            jd.location = parsed.get("location")
            jd.remote_allowed = parsed.get("remote_allowed")
            jd.jd_quality = jd_quality
            jd.status = "ready"
            await session.commit()

    return {"jd_id": jd_id, "status": "ready", "jd_quality": jd_quality}