ketannnn commited on
Commit
3330b0f
·
1 Parent(s): bdaeeeb

feat: implement feature builder with growth velocity and JD quality scoring

Browse files
Files changed (1) hide show
  1. backend/src/workers/ingest.py +206 -0
backend/src/workers/ingest.py ADDED
@@ -0,0 +1,206 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ import asyncio
3
+ import re
4
+ from typing import Any
5
+ from sqlalchemy import text, select
6
+ from sqlalchemy.dialects.postgresql import insert as pg_insert
7
+ from qdrant_client import QdrantClient
8
+ from qdrant_client.models import PointStruct
9
+
10
+ from .celery_app import celery_app
11
+ from ..config import get_settings
12
+ from ..database import AsyncSessionLocal
13
+ from ..models.candidate import Candidate
14
+ from ..models.jd import JobDescription
15
+ from ..ml.embedder import embed_texts, embed_query, compute_text_hash
16
+ from ..ml.feature_builder import (
17
+ build_candidate_text,
18
+ compute_growth_velocity,
19
+ compute_jd_quality,
20
+ parse_jd_requirements,
21
+ )
22
+
23
+
24
+ def _get_qdrant() -> QdrantClient:
25
+ settings = get_settings()
26
+ return QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
27
+
28
+
29
+ def _normalize_bool(val: Any) -> bool | None:
30
+ if val is None:
31
+ return None
32
+ if isinstance(val, bool):
33
+ return val
34
+ if isinstance(val, str):
35
+ return val.lower() in ("true", "yes", "1")
36
+ return bool(val)
37
+
38
+
39
+ def _normalize_float(val: Any) -> float | None:
40
+ try:
41
+ return float(val) if val is not None and str(val).strip() != "" else None
42
+ except (ValueError, TypeError):
43
+ return None
44
+
45
+
46
+ def _parse_list(val: Any) -> list:
47
+ if isinstance(val, list):
48
+ return val
49
+ if isinstance(val, str) and val.strip():
50
+ return [x.strip() for x in re.split(r"[,;|]", val) if x.strip()]
51
+ return []
52
+
53
+
54
+ @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
55
+ def ingest_candidates_batch(self, rows: list[dict]) -> dict:
56
+ try:
57
+ return asyncio.run(_ingest_candidates_async(rows))
58
+ except Exception as exc:
59
+ raise self.retry(exc=exc, countdown=30)
60
+
61
+
62
+ async def _ingest_candidates_async(rows: list[dict]) -> dict:
63
+ settings = get_settings()
64
+ qdrant = _get_qdrant()
65
+ texts = []
66
+ processed = []
67
+
68
+ for row in rows:
69
+ work_exp = row.get("parsed_work_experience") or []
70
+ if isinstance(work_exp, str):
71
+ try:
72
+ import json
73
+ work_exp = json.loads(work_exp)
74
+ except Exception:
75
+ work_exp = []
76
+
77
+ is_funded = _normalize_bool(row.get("most_recent_company_is_funded"))
78
+ velocity = compute_growth_velocity(work_exp, is_funded=bool(is_funded))
79
+
80
+ candidate_text = build_candidate_text({**row, "parsed_work_experience": work_exp})
81
+ text_hash = compute_text_hash(candidate_text)
82
+
83
+ processed.append({
84
+ "row": row,
85
+ "work_exp": work_exp,
86
+ "velocity": velocity,
87
+ "text": candidate_text,
88
+ "hash": text_hash,
89
+ })
90
+ texts.append(candidate_text)
91
+
92
+ embeddings = embed_texts(texts)
93
+
94
+ async with AsyncSessionLocal() as session:
95
+ qdrant_points = []
96
+ inserted = 0
97
+
98
+ for i, p in enumerate(processed):
99
+ row = p["row"]
100
+ candidate_id = uuid.uuid4()
101
+ qdrant_id = str(candidate_id)
102
+
103
+ cand = Candidate(
104
+ id=candidate_id,
105
+ external_id=str(row.get("id") or row.get("candidate_id") or ""),
106
+ name=row.get("name"),
107
+ email=row.get("email"),
108
+ looking_for=row.get("looking_for"),
109
+ currently_employed=_normalize_bool(row.get("currently_employed")),
110
+ notice_period=str(row.get("notice_period") or "")[:100] or None,
111
+ open_to_working_at=row.get("open_to_working_at"),
112
+ role_type=row.get("role_type"),
113
+ engineer_type=row.get("engineer_type"),
114
+ years_of_experience=_normalize_float(row.get("years_of_experience")),
115
+ programming_languages=_parse_list(row.get("programming_languages")),
116
+ backend_frameworks=_parse_list(row.get("backend_frameworks")),
117
+ frontend_technologies=_parse_list(row.get("frontend_technologies")),
118
+ gen_ai_experience=_normalize_bool(row.get("gen_ai_experience")),
119
+ recent_experience_type=row.get("recent_experience_type"),
120
+ education_status=row.get("education_status"),
121
+ degree=row.get("degree"),
122
+ parsed_summary=row.get("parsed_summary"),
123
+ parsed_skills=row.get("parsed_skills"),
124
+ parsed_work_experience=p["work_exp"],
125
+ most_recent_company=row.get("most_recent_company"),
126
+ most_recent_company_description=row.get("most_recent_company_description"),
127
+ most_recent_company_is_funded=_normalize_bool(row.get("most_recent_company_is_funded")),
128
+ most_recent_company_is_product_company=_normalize_bool(row.get("most_recent_company_is_product_company")),
129
+ most_recent_company_total_funding=_normalize_float(row.get("most_recent_company_total_funding")),
130
+ most_recent_company_funding_status=row.get("most_recent_company_funding_status"),
131
+ time_in_current_company=_normalize_float(row.get("time_in_current_company")),
132
+ is_actively_or_passively_looking=row.get("is_actively_or_passively_looking"),
133
+ growth_velocity=p["velocity"],
134
+ embedding_hash=p["hash"],
135
+ qdrant_id=qdrant_id,
136
+ )
137
+ session.add(cand)
138
+
139
+ all_skills = (
140
+ _parse_list(row.get("programming_languages"))
141
+ + _parse_list(row.get("backend_frameworks"))
142
+ + _parse_list(row.get("frontend_technologies"))
143
+ )
144
+ if row.get("parsed_skills"):
145
+ all_skills.extend(_parse_list(row["parsed_skills"]))
146
+
147
+ qdrant_points.append(
148
+ PointStruct(
149
+ id=qdrant_id,
150
+ vector=embeddings[i].tolist(),
151
+ payload={
152
+ "candidate_id": str(candidate_id),
153
+ "role_type": row.get("role_type"),
154
+ "engineer_type": row.get("engineer_type"),
155
+ "years_of_experience": _normalize_float(row.get("years_of_experience")),
156
+ "looking_for": row.get("looking_for"),
157
+ "open_to_working_at": row.get("open_to_working_at"),
158
+ "skills": all_skills[:50],
159
+ "gen_ai_experience": _normalize_bool(row.get("gen_ai_experience")),
160
+ "growth_velocity": p["velocity"],
161
+ "is_funded": _normalize_bool(row.get("most_recent_company_is_funded")),
162
+ "currently_employed": _normalize_bool(row.get("currently_employed")),
163
+ },
164
+ )
165
+ )
166
+ inserted += 1
167
+
168
+ await session.commit()
169
+
170
+ if qdrant_points:
171
+ qdrant.upsert(collection_name=settings.collection_name, points=qdrant_points)
172
+
173
+ return {"inserted": inserted, "total": len(rows)}
174
+
175
+
176
+ @celery_app.task(bind=True, name="ingest_jd", max_retries=3)
177
+ def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict:
178
+ try:
179
+ return asyncio.run(_ingest_jd_async(jd_id, raw_text, title))
180
+ except Exception as exc:
181
+ raise self.retry(exc=exc, countdown=30)
182
+
183
+
184
+ async def _ingest_jd_async(jd_id: str, raw_text: str, title: str) -> dict:
185
+ parsed = parse_jd_requirements(raw_text)
186
+ jd_quality = compute_jd_quality(raw_text, parsed)
187
+
188
+ async with AsyncSessionLocal() as session:
189
+ result = await session.execute(
190
+ select(JobDescription).where(JobDescription.id == uuid.UUID(jd_id))
191
+ )
192
+ jd = result.scalar_one_or_none()
193
+ if jd:
194
+ jd.parsed_requirements = parsed
195
+ jd.required_skills = parsed.get("required_skills", [])
196
+ jd.min_yoe = parsed.get("min_yoe")
197
+ jd.max_yoe = parsed.get("max_yoe")
198
+ jd.role_type = parsed.get("role_type")
199
+ jd.engineer_type = parsed.get("engineer_type")
200
+ jd.location = parsed.get("location")
201
+ jd.remote_allowed = parsed.get("remote_allowed")
202
+ jd.jd_quality = jd_quality
203
+ jd.status = "ready"
204
+ await session.commit()
205
+
206
+ return {"jd_id": jd_id, "status": "ready", "jd_quality": jd_quality}