Spaces:
Sleeping
Sleeping
File size: 6,610 Bytes
547bb9a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import os
from typing import List
from datetime import datetime
from backend.error_detection.base import ErrorDetector, DetectionResult
from backend.utils.insforge_db import get_db, execute_one
class CertificationErrorDetector(ErrorDetector):
"""
Ultra-deep analysis for the certification pipeline.
Checks for:
1. Active events with missing template IDs.
2. Jobs stuck in processing.
3. Failed jobs.
4. Jobs with missing emails.
5. Missing Apps Script configuration.
"""
page = "certification"
def __init__(self, db_path: str):
# db_path is ignored for InsForge integration
self.db_path = db_path
def run(self) -> List[DetectionResult]:
results = []
try:
with get_db() as conn:
with conn.cursor() as cursor:
results.extend(self._analyze_template_configuration(cursor))
# Ensure certificate_jobs exists before querying
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_name='certificate_jobs'")
if cursor.fetchone():
results.extend(self._analyze_failed_jobs(cursor))
results.extend(self._analyze_stuck_jobs(cursor))
results.extend(self._analyze_missing_emails(cursor))
else:
results.append(self._warn("table_exists", "certificate_jobs table does not exist"))
except Exception as e:
results.append(self._critical("db_connection", f"Certification DB Error: {str(e)}"))
results.extend(self._analyze_apps_script_connectivity())
# If no critical or warning errors were added for a category, add OK results
if not any(r for r in results if r.check == "template_config" and not r.ok):
results.append(self._ok("template_config", "All active events have valid templates."))
if not any(r for r in results if r.check == "failed_jobs" and not r.ok):
results.append(self._ok("failed_jobs", "No failed certificate jobs found."))
if not any(r for r in results if r.check == "stuck_jobs" and not r.ok):
results.append(self._ok("stuck_jobs", "No stuck certificate jobs found."))
return results
def _analyze_template_configuration(self, cursor) -> List[DetectionResult]:
results = []
cursor.execute('''
SELECT id, speaker_name, template_id
FROM events
WHERE send_certificates = true
AND (template_id IS NULL OR template_id = '')
''')
events = cursor.fetchall()
for ev in events:
results.append(self._critical(
"template_config",
f"Event #{ev['id']} ({ev['speaker_name']}) has certificates active but NO template ID.",
"Please edit the event and paste a valid Google Slides Template ID."
))
return results
def _analyze_failed_jobs(self, cursor) -> List[DetectionResult]:
results = []
cursor.execute('''
SELECT j.id, s.name as student_name, s.email as student_email, j.error_log as error_message, e.speaker_name
FROM certificate_jobs j
LEFT JOIN events e ON j.event_id = e.id
LEFT JOIN students s ON j.student_id = s.id
WHERE j.status = 'failed'
''')
jobs = cursor.fetchall()
for job in jobs:
msg = job['error_message'] or 'Unknown error'
results.append(self._critical(
"failed_jobs",
f"Certificate generation failed for {job['student_name']} ({job['student_email']}).",
f"Event: {job['speaker_name']}. Error: {msg}"
))
return results
def _analyze_stuck_jobs(self, cursor) -> List[DetectionResult]:
results = []
cursor.execute('''
SELECT j.id, s.name as student_name, j.generated_at as updated_at
FROM certificate_jobs j
LEFT JOIN students s ON j.student_id = s.id
WHERE j.status = 'processing'
''')
jobs = cursor.fetchall()
now = datetime.utcnow()
for job in jobs:
try:
updated_at = job['updated_at']
if updated_at:
if isinstance(updated_at, str):
if "T" in updated_at:
updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00')).replace(tzinfo=None)
else:
updated_at = datetime.strptime(updated_at, "%Y-%m-%d %H:%M:%S")
age_minutes = (now - updated_at).total_seconds() / 60
if age_minutes > 15:
results.append(self._warn(
"stuck_jobs",
f"Job #{job['id']} for {job['student_name']} is stuck in 'processing' state.",
f"Stuck for {age_minutes:.1f} minutes. Consider resetting the status to 'pending'."
))
except Exception as e:
pass # Ignore parsing errors for individual jobs
return results
def _analyze_missing_emails(self, cursor) -> List[DetectionResult]:
results = []
cursor.execute('''
SELECT j.id, s.name as student_name, j.status
FROM certificate_jobs j
LEFT JOIN students s ON j.student_id = s.id
WHERE (s.email IS NULL OR s.email = '') AND j.status != 'completed'
''')
jobs = cursor.fetchall()
for job in jobs:
results.append(self._warn(
"missing_email",
f"Job #{job['id']} ({job['student_name']}) has no email address.",
"The certificate cannot be delivered without an email."
))
return results
def _analyze_apps_script_connectivity(self) -> List[DetectionResult]:
results = []
url = os.getenv('APPS_SCRIPT_URL')
if not url:
results.append(self._critical(
"apps_script_config",
"APPS_SCRIPT_URL is missing from environment/config.",
"Certificate generation cannot proceed without the Apps Script webhook."
))
else:
results.append(self._ok("apps_script_config", "APPS_SCRIPT_URL is configured."))
return results
|