import os from typing import List from datetime import datetime from backend.error_detection.base import ErrorDetector, DetectionResult from backend.utils.insforge_db import get_db, execute_one class CertificationErrorDetector(ErrorDetector): """ Ultra-deep analysis for the certification pipeline. Checks for: 1. Active events with missing template IDs. 2. Jobs stuck in processing. 3. Failed jobs. 4. Jobs with missing emails. 5. Missing Apps Script configuration. """ page = "certification" def __init__(self, db_path: str): # db_path is ignored for InsForge integration self.db_path = db_path def run(self) -> List[DetectionResult]: results = [] try: with get_db() as conn: with conn.cursor() as cursor: results.extend(self._analyze_template_configuration(cursor)) # Ensure certificate_jobs exists before querying cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_name='certificate_jobs'") if cursor.fetchone(): results.extend(self._analyze_failed_jobs(cursor)) results.extend(self._analyze_stuck_jobs(cursor)) results.extend(self._analyze_missing_emails(cursor)) else: results.append(self._warn("table_exists", "certificate_jobs table does not exist")) except Exception as e: results.append(self._critical("db_connection", f"Certification DB Error: {str(e)}")) results.extend(self._analyze_apps_script_connectivity()) # If no critical or warning errors were added for a category, add OK results if not any(r for r in results if r.check == "template_config" and not r.ok): results.append(self._ok("template_config", "All active events have valid templates.")) if not any(r for r in results if r.check == "failed_jobs" and not r.ok): results.append(self._ok("failed_jobs", "No failed certificate jobs found.")) if not any(r for r in results if r.check == "stuck_jobs" and not r.ok): results.append(self._ok("stuck_jobs", "No stuck certificate jobs found.")) return results def _analyze_template_configuration(self, cursor) -> List[DetectionResult]: results = [] cursor.execute(''' SELECT id, speaker_name, template_id FROM events WHERE send_certificates = true AND (template_id IS NULL OR template_id = '') ''') events = cursor.fetchall() for ev in events: results.append(self._critical( "template_config", f"Event #{ev['id']} ({ev['speaker_name']}) has certificates active but NO template ID.", "Please edit the event and paste a valid Google Slides Template ID." )) return results def _analyze_failed_jobs(self, cursor) -> List[DetectionResult]: results = [] cursor.execute(''' SELECT j.id, s.name as student_name, s.email as student_email, j.error_log as error_message, e.speaker_name FROM certificate_jobs j LEFT JOIN events e ON j.event_id = e.id LEFT JOIN students s ON j.student_id = s.id WHERE j.status = 'failed' ''') jobs = cursor.fetchall() for job in jobs: msg = job['error_message'] or 'Unknown error' results.append(self._critical( "failed_jobs", f"Certificate generation failed for {job['student_name']} ({job['student_email']}).", f"Event: {job['speaker_name']}. Error: {msg}" )) return results def _analyze_stuck_jobs(self, cursor) -> List[DetectionResult]: results = [] cursor.execute(''' SELECT j.id, s.name as student_name, j.generated_at as updated_at FROM certificate_jobs j LEFT JOIN students s ON j.student_id = s.id WHERE j.status = 'processing' ''') jobs = cursor.fetchall() now = datetime.utcnow() for job in jobs: try: updated_at = job['updated_at'] if updated_at: if isinstance(updated_at, str): if "T" in updated_at: updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')).replace(tzinfo=None) else: updated_at = datetime.strptime(updated_at, "%Y-%m-%d %H:%M:%S") age_minutes = (now - updated_at).total_seconds() / 60 if age_minutes > 15: results.append(self._warn( "stuck_jobs", f"Job #{job['id']} for {job['student_name']} is stuck in 'processing' state.", f"Stuck for {age_minutes:.1f} minutes. Consider resetting the status to 'pending'." )) except Exception as e: pass # Ignore parsing errors for individual jobs return results def _analyze_missing_emails(self, cursor) -> List[DetectionResult]: results = [] cursor.execute(''' SELECT j.id, s.name as student_name, j.status FROM certificate_jobs j LEFT JOIN students s ON j.student_id = s.id WHERE (s.email IS NULL OR s.email = '') AND j.status != 'completed' ''') jobs = cursor.fetchall() for job in jobs: results.append(self._warn( "missing_email", f"Job #{job['id']} ({job['student_name']}) has no email address.", "The certificate cannot be delivered without an email." )) return results def _analyze_apps_script_connectivity(self) -> List[DetectionResult]: results = [] url = os.getenv('APPS_SCRIPT_URL') if not url: results.append(self._critical( "apps_script_config", "APPS_SCRIPT_URL is missing from environment/config.", "Certificate generation cannot proceed without the Apps Script webhook." )) else: results.append(self._ok("apps_script_config", "APPS_SCRIPT_URL is configured.")) return results