| import asyncio |
| from typing import Any |
| from sqlalchemy import select |
| from .celery_app import celery_app |
| from ..database import AsyncSessionLocal |
| from ..models.match_result import MatchResult |
| from ..models.candidate import Candidate |
| from ..models.jd import JobDescription |
| from ..routers.matching import _build_jd_dict |
| from ..matching.llm_explainer import generate_explanation |
| import datetime |
|
|
| def _run_async(coro): |
| loop = None |
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_closed(): |
| raise RuntimeError |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| return loop.run_until_complete(coro) |
|
|
| @celery_app.task(bind=True, name="generate_top_explanations", max_retries=1) |
| def generate_top_explanations(self, match_result_ids: list[str]) -> dict: |
| try: |
| return _run_async(_generate_top_explanations_async(match_result_ids)) |
| except Exception as exc: |
| raise self.retry(exc=exc, countdown=10) |
|
|
| async def _generate_top_explanations_async(match_result_ids: list[str]) -> dict: |
| async with AsyncSessionLocal() as db: |
| |
| q = ( |
| select(MatchResult, Candidate, JobDescription) |
| .join(Candidate, MatchResult.candidate_id == Candidate.id) |
| .join(JobDescription, MatchResult.jd_id == JobDescription.id) |
| .where(MatchResult.id.in_(match_result_ids)) |
| ) |
| result = await db.execute(q) |
| rows = result.all() |
|
|
| success_count = 0 |
| for mr, cand, jd in rows: |
| |
| if mr.explanation: |
| continue |
| |
| jd_dict = _build_jd_dict(jd) |
| cand_dict = { |
| "parsed_summary": cand.parsed_summary, |
| "parsed_skills": cand.parsed_skills, |
| "years_of_experience": cand.years_of_experience, |
| "programming_languages": cand.programming_languages or [], |
| "backend_frameworks": cand.backend_frameworks or [], |
| "frontend_technologies": cand.frontend_technologies or [], |
| "most_recent_company": cand.most_recent_company, |
| "growth_velocity": cand.growth_velocity, |
| } |
| |
| try: |
| |
| exp = await generate_explanation(jd_dict, cand_dict, mr.gaps or []) |
| mr.explanation = exp |
| mr.explanation_generated_at = datetime.datetime.now(datetime.timezone.utc) |
| await db.commit() |
| success_count += 1 |
| |
| |
| await asyncio.sleep(0.5) |
| except Exception as e: |
| print(f"Failed to generate explanation for {mr.id}: {e}") |
| |
| return {"attempted": len(rows), "succeeded": success_count} |
|
|