ketannnn commited on
Commit
b7d367d
·
1 Parent(s): 5ff9d40

feat: stabilize celery loops, add redis caching, and scale stage 2 neural limit to 250

Browse files
backend/src/matching/stage2.py CHANGED
@@ -93,4 +93,4 @@ async def stage2_rerank(jd: dict, shortlist: list[dict]) -> list[dict]:
93
  for cand in results:
94
  cand["gaps"] = _compute_gaps(jd, cand)
95
 
96
- return results[:20]
 
93
  for cand in results:
94
  cand["gaps"] = _compute_gaps(jd, cand)
95
 
96
+ return results[:250]
backend/src/ml/embedder.py CHANGED
@@ -1,8 +1,13 @@
1
  import hashlib
2
  import numpy as np
3
  from sentence_transformers import SentenceTransformer
 
 
4
  from ..config import get_settings
5
 
 
 
 
6
  _model: SentenceTransformer | None = None
7
  INSTRUCTION = "Represent this candidate profile for matching job descriptions: "
8
 
 
1
  import hashlib
2
  import numpy as np
3
  from sentence_transformers import SentenceTransformer
4
+ import os
5
+ import logging
6
  from ..config import get_settings
7
 
8
+ os.environ["TOKENIZERS_PARALLELISM"] = "false"
9
+ logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
10
+
11
  _model: SentenceTransformer | None = None
12
  INSTRUCTION = "Represent this candidate profile for matching job descriptions: "
13
 
backend/src/routers/matching.py CHANGED
@@ -1,10 +1,13 @@
1
  import uuid
 
 
2
  from datetime import datetime, timezone
3
  from fastapi import APIRouter, Depends, HTTPException, Request, Query
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
  from sqlalchemy import select, delete
6
 
7
  from ..database import get_db
 
8
  from ..models.jd import JobDescription
9
  from ..models.candidate import Candidate
10
  from ..models.match_result import MatchResult
@@ -107,10 +110,10 @@ async def trigger_match(
107
 
108
  await db.commit()
109
 
110
- # Pre-generate LLM explanations async for the top 60 matches
111
- top_60_ids = [str(mr.id) for mr in inserted_mrs[:60]]
112
- if top_60_ids:
113
- generate_top_explanations.delay(top_60_ids)
114
 
115
  results = [_to_matched_candidate(item, i + 1) for i, item in enumerate(final_ranked)]
116
  return MatchResponse(
@@ -128,6 +131,17 @@ async def get_match_results(
128
  session_id: uuid.UUID | None = Query(None),
129
  db: AsyncSession = Depends(get_db),
130
  ):
 
 
 
 
 
 
 
 
 
 
 
131
  jd = await _load_jd(jd_id, db)
132
 
133
  q = (
@@ -249,27 +263,11 @@ async def get_candidate_detail(
249
  if not cand:
250
  raise HTTPException(status_code=404, detail="Candidate not found")
251
 
252
- if not mr.explanation:
253
- jd_dict = _build_jd_dict(jd)
254
- cand_dict = {
255
- "parsed_summary": cand.parsed_summary, "parsed_skills": cand.parsed_skills,
256
- "years_of_experience": cand.years_of_experience,
257
- "programming_languages": cand.programming_languages or [],
258
- "backend_frameworks": cand.backend_frameworks or [],
259
- "frontend_technologies": cand.frontend_technologies or [],
260
- "most_recent_company": cand.most_recent_company,
261
- "growth_velocity": cand.growth_velocity,
262
- }
263
- explanation = await generate_explanation(jd_dict, cand_dict, mr.gaps or [])
264
- mr.explanation = explanation
265
- mr.explanation_generated_at = datetime.now(timezone.utc)
266
- await db.commit()
267
-
268
  return CandidateDetailResponse(
269
  jd_id=jd_id, candidate_id=candidate_id, rank=mr.rank,
270
  final_score=mr.final_score,
271
- component_scores=ComponentScores(**(mr.component_scores or {})),
272
- gaps=[GapItem(**g) for g in (mr.gaps or [])],
273
  explanation=mr.explanation,
274
  candidate={
275
  "name": cand.name, "email": cand.email, "role_type": cand.role_type,
 
1
  import uuid
2
+ import json
3
+ import redis.asyncio as redis
4
  from datetime import datetime, timezone
5
  from fastapi import APIRouter, Depends, HTTPException, Request, Query
6
  from sqlalchemy.ext.asyncio import AsyncSession
7
  from sqlalchemy import select, delete
8
 
9
  from ..database import get_db
10
+ from ..config import get_settings
11
  from ..models.jd import JobDescription
12
  from ..models.candidate import Candidate
13
  from ..models.match_result import MatchResult
 
110
 
111
  await db.commit()
112
 
113
+ # Pre-generate LLM explanations async for the top 20 matches implicitly in background
114
+ top_20_ids = [str(mr.id) for mr in inserted_mrs[:20]]
115
+ if top_20_ids:
116
+ generate_top_explanations.delay(top_20_ids)
117
 
118
  results = [_to_matched_candidate(item, i + 1) for i, item in enumerate(final_ranked)]
119
  return MatchResponse(
 
131
  session_id: uuid.UUID | None = Query(None),
132
  db: AsyncSession = Depends(get_db),
133
  ):
134
+ settings = get_settings()
135
+ r = redis.Redis.from_url(settings.redis_url)
136
+ cache_key = f"match_v2:{jd_id}:{session_id or 'none'}"
137
+
138
+ try:
139
+ cached = await r.get(cache_key)
140
+ if cached:
141
+ return json.loads(cached)
142
+ except Exception:
143
+ pass
144
+
145
  jd = await _load_jd(jd_id, db)
146
 
147
  q = (
 
263
  if not cand:
264
  raise HTTPException(status_code=404, detail="Candidate not found")
265
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
  return CandidateDetailResponse(
267
  jd_id=jd_id, candidate_id=candidate_id, rank=mr.rank,
268
  final_score=mr.final_score,
269
+ component_scores=mr.component_scores or {},
270
+ gaps=mr.gaps or [],
271
  explanation=mr.explanation,
272
  candidate={
273
  "name": cand.name, "email": cand.email, "role_type": cand.role_type,
backend/src/workers/explain.py CHANGED
@@ -10,10 +10,22 @@ from ..routers.matching import _build_jd_dict
10
  from ..matching.llm_explainer import generate_explanation
11
  import datetime
12
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  @celery_app.task(bind=True, name="generate_top_explanations", max_retries=1)
14
  def generate_top_explanations(self, match_result_ids: list[str]) -> dict:
15
  try:
16
- return asyncio.run(_generate_top_explanations_async(match_result_ids))
17
  except Exception as exc:
18
  raise self.retry(exc=exc, countdown=10)
19
 
 
10
  from ..matching.llm_explainer import generate_explanation
11
  import datetime
12
 
13
+ def _run_async(coro):
14
+ loop = None
15
+ try:
16
+ loop = asyncio.get_event_loop()
17
+ if loop.is_closed():
18
+ raise RuntimeError
19
+ except RuntimeError:
20
+ loop = asyncio.new_event_loop()
21
+ asyncio.set_event_loop(loop)
22
+
23
+ return loop.run_until_complete(coro)
24
+
25
  @celery_app.task(bind=True, name="generate_top_explanations", max_retries=1)
26
  def generate_top_explanations(self, match_result_ids: list[str]) -> dict:
27
  try:
28
+ return _run_async(_generate_top_explanations_async(match_result_ids))
29
  except Exception as exc:
30
  raise self.retry(exc=exc, countdown=10)
31
 
backend/src/workers/ingest.py CHANGED
@@ -52,10 +52,22 @@ def _parse_list(val: Any) -> list:
52
  return []
53
 
54
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
56
  def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict:
57
  try:
58
- return asyncio.run(_ingest_candidates_async(rows, session_id))
59
  except Exception as exc:
60
  raise self.retry(exc=exc, countdown=30)
61
 
@@ -186,7 +198,7 @@ async def _ingest_candidates_async(rows: list[dict], session_id: str | None) ->
186
  @celery_app.task(bind=True, name="ingest_jd", max_retries=3)
187
  def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict:
188
  try:
189
- return asyncio.run(_ingest_jd_async(jd_id, raw_text, title))
190
  except Exception as exc:
191
  raise self.retry(exc=exc, countdown=30)
192
 
 
52
  return []
53
 
54
 
55
+ def _run_async(coro):
56
+ loop = None
57
+ try:
58
+ loop = asyncio.get_event_loop()
59
+ if loop.is_closed():
60
+ raise RuntimeError
61
+ except RuntimeError:
62
+ loop = asyncio.new_event_loop()
63
+ asyncio.set_event_loop(loop)
64
+
65
+ return loop.run_until_complete(coro)
66
+
67
  @celery_app.task(bind=True, name="ingest_candidates_batch", max_retries=3)
68
  def ingest_candidates_batch(self, rows: list[dict], session_id: str | None = None) -> dict:
69
  try:
70
+ return _run_async(_ingest_candidates_async(rows, session_id))
71
  except Exception as exc:
72
  raise self.retry(exc=exc, countdown=30)
73
 
 
198
  @celery_app.task(bind=True, name="ingest_jd", max_retries=3)
199
  def ingest_jd(self, jd_id: str, raw_text: str, title: str) -> dict:
200
  try:
201
+ return _run_async(_ingest_jd_async(jd_id, raw_text, title))
202
  except Exception as exc:
203
  raise self.retry(exc=exc, countdown=30)
204