import asyncio from typing import Any from sqlalchemy import select from .celery_app import celery_app from ..database import AsyncSessionLocal from ..models.match_result import MatchResult from ..models.candidate import Candidate from ..models.jd import JobDescription from ..routers.matching import _build_jd_dict from ..matching.llm_explainer import generate_explanation import datetime def _run_async(coro): loop = None try: loop = asyncio.get_event_loop() if loop.is_closed(): raise RuntimeError except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(coro) @celery_app.task(bind=True, name="generate_top_explanations", max_retries=1) def generate_top_explanations(self, match_result_ids: list[str]) -> dict: try: return _run_async(_generate_top_explanations_async(match_result_ids)) except Exception as exc: raise self.retry(exc=exc, countdown=10) async def _generate_top_explanations_async(match_result_ids: list[str]) -> dict: async with AsyncSessionLocal() as db: # Load all the match results alongside candidate and JD data q = ( select(MatchResult, Candidate, JobDescription) .join(Candidate, MatchResult.candidate_id == Candidate.id) .join(JobDescription, MatchResult.jd_id == JobDescription.id) .where(MatchResult.id.in_(match_result_ids)) ) result = await db.execute(q) rows = result.all() success_count = 0 for mr, cand, jd in rows: # Skip if already generated if mr.explanation: continue jd_dict = _build_jd_dict(jd) cand_dict = { "parsed_summary": cand.parsed_summary, "parsed_skills": cand.parsed_skills, "years_of_experience": cand.years_of_experience, "programming_languages": cand.programming_languages or [], "backend_frameworks": cand.backend_frameworks or [], "frontend_technologies": cand.frontend_technologies or [], "most_recent_company": cand.most_recent_company, "growth_velocity": cand.growth_velocity, } try: # Dynamically utilize next Groq key in cycler and run the prompt exp = await generate_explanation(jd_dict, cand_dict, mr.gaps or []) mr.explanation = exp mr.explanation_generated_at = datetime.datetime.now(datetime.timezone.utc) await db.commit() success_count += 1 # Sleep briefly 0.5s to absolutely guarantee Groq global rate limits remain pristine await asyncio.sleep(0.5) except Exception as e: print(f"Failed to generate explanation for {mr.id}: {e}") return {"attempted": len(rows), "succeeded": success_count}