coderound / backend /src /workers /explain.py
ketannnn's picture
feat: stabilize celery loops, add redis caching, and scale stage 2 neural limit to 250
b7d367d
import asyncio
from typing import Any
from sqlalchemy import select
from .celery_app import celery_app
from ..database import AsyncSessionLocal
from ..models.match_result import MatchResult
from ..models.candidate import Candidate
from ..models.jd import JobDescription
from ..routers.matching import _build_jd_dict
from ..matching.llm_explainer import generate_explanation
import datetime
def _run_async(coro):
loop = None
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
@celery_app.task(bind=True, name="generate_top_explanations", max_retries=1)
def generate_top_explanations(self, match_result_ids: list[str]) -> dict:
try:
return _run_async(_generate_top_explanations_async(match_result_ids))
except Exception as exc:
raise self.retry(exc=exc, countdown=10)
async def _generate_top_explanations_async(match_result_ids: list[str]) -> dict:
async with AsyncSessionLocal() as db:
# Load all the match results alongside candidate and JD data
q = (
select(MatchResult, Candidate, JobDescription)
.join(Candidate, MatchResult.candidate_id == Candidate.id)
.join(JobDescription, MatchResult.jd_id == JobDescription.id)
.where(MatchResult.id.in_(match_result_ids))
)
result = await db.execute(q)
rows = result.all()
success_count = 0
for mr, cand, jd in rows:
# Skip if already generated
if mr.explanation:
continue
jd_dict = _build_jd_dict(jd)
cand_dict = {
"parsed_summary": cand.parsed_summary,
"parsed_skills": cand.parsed_skills,
"years_of_experience": cand.years_of_experience,
"programming_languages": cand.programming_languages or [],
"backend_frameworks": cand.backend_frameworks or [],
"frontend_technologies": cand.frontend_technologies or [],
"most_recent_company": cand.most_recent_company,
"growth_velocity": cand.growth_velocity,
}
try:
# Dynamically utilize next Groq key in cycler and run the prompt
exp = await generate_explanation(jd_dict, cand_dict, mr.gaps or [])
mr.explanation = exp
mr.explanation_generated_at = datetime.datetime.now(datetime.timezone.utc)
await db.commit()
success_count += 1
# Sleep briefly 0.5s to absolutely guarantee Groq global rate limits remain pristine
await asyncio.sleep(0.5)
except Exception as e:
print(f"Failed to generate explanation for {mr.id}: {e}")
return {"attempted": len(rows), "succeeded": success_count}