File size: 3,060 Bytes
a0f1a88 b7d367d a0f1a88 b7d367d a0f1a88 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | import asyncio
from typing import Any
from sqlalchemy import select
from .celery_app import celery_app
from ..database import AsyncSessionLocal
from ..models.match_result import MatchResult
from ..models.candidate import Candidate
from ..models.jd import JobDescription
from ..routers.matching import _build_jd_dict
from ..matching.llm_explainer import generate_explanation
import datetime
def _run_async(coro):
loop = None
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
@celery_app.task(bind=True, name="generate_top_explanations", max_retries=1)
def generate_top_explanations(self, match_result_ids: list[str]) -> dict:
try:
return _run_async(_generate_top_explanations_async(match_result_ids))
except Exception as exc:
raise self.retry(exc=exc, countdown=10)
async def _generate_top_explanations_async(match_result_ids: list[str]) -> dict:
async with AsyncSessionLocal() as db:
# Load all the match results alongside candidate and JD data
q = (
select(MatchResult, Candidate, JobDescription)
.join(Candidate, MatchResult.candidate_id == Candidate.id)
.join(JobDescription, MatchResult.jd_id == JobDescription.id)
.where(MatchResult.id.in_(match_result_ids))
)
result = await db.execute(q)
rows = result.all()
success_count = 0
for mr, cand, jd in rows:
# Skip if already generated
if mr.explanation:
continue
jd_dict = _build_jd_dict(jd)
cand_dict = {
"parsed_summary": cand.parsed_summary,
"parsed_skills": cand.parsed_skills,
"years_of_experience": cand.years_of_experience,
"programming_languages": cand.programming_languages or [],
"backend_frameworks": cand.backend_frameworks or [],
"frontend_technologies": cand.frontend_technologies or [],
"most_recent_company": cand.most_recent_company,
"growth_velocity": cand.growth_velocity,
}
try:
# Dynamically utilize next Groq key in cycler and run the prompt
exp = await generate_explanation(jd_dict, cand_dict, mr.gaps or [])
mr.explanation = exp
mr.explanation_generated_at = datetime.datetime.now(datetime.timezone.utc)
await db.commit()
success_count += 1
# Sleep briefly 0.5s to absolutely guarantee Groq global rate limits remain pristine
await asyncio.sleep(0.5)
except Exception as e:
print(f"Failed to generate explanation for {mr.id}: {e}")
return {"attempted": len(rows), "succeeded": success_count}
|