Spaces:
Running
Running
| """ | |
| MathPulse AI โ Class Analytics Engine | |
| Fetches real quiz data from Firestore, computes per-student and class-level | |
| metrics, generates AI insights via DeepSeek, and caches results. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any, Dict, List, Literal, Optional | |
| from pydantic import BaseModel, Field | |
| logger = logging.getLogger("mathpulse.class_analytics") | |
| # โโโ Firestore helper โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| _firebase_firestore = None | |
| def _get_firestore_client(): | |
| global _firebase_firestore | |
| if _firebase_firestore is None: | |
| try: | |
| from firebase_admin import firestore as ff | |
| _firebase_firestore = ff | |
| except Exception: | |
| return None | |
| try: | |
| return _firebase_firestore.client() | |
| except Exception: | |
| return None | |
| # โโโ Models โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| class StudentAnalyticsSummary(BaseModel): | |
| student_id: str | |
| student_name: str | |
| avatar_url: Optional[str] = None | |
| grade_level: str = "" | |
| section: str = "" | |
| avg_score: float = 0.0 | |
| quiz_attempt_count: int = 0 | |
| last_active: Optional[str] = None | |
| risk_level: Literal["Low Risk", "Medium Risk", "High Risk", "Critical", "Unassessed"] = "Unassessed" | |
| engagement_level: Literal["Low", "Medium", "High"] = "Low" | |
| weakest_topic: Optional[str] = None | |
| accuracy_by_topic: Dict[str, float] = Field(default_factory=dict) | |
| completion_rate: float = 0.0 | |
| class TopicPerformance(BaseModel): | |
| topic: str | |
| class_accuracy: float = 0.0 | |
| struggling_count: int = 0 | |
| mastered_count: int = 0 | |
| class ClassInsights(BaseModel): | |
| class_id: str | |
| generated_at: str = "" | |
| class_summary: str = "" | |
| top_weak_topics: List[str] = Field(default_factory=list) | |
| recommended_actions: List[str] = Field(default_factory=list) | |
| class_strengths: str = "" | |
| risk_distribution: Dict[str, int] = Field(default_factory=dict) | |
| topic_performance: List[TopicPerformance] = Field(default_factory=list) | |
| class ClassAnalyticsReport(BaseModel): | |
| class_id: str | |
| class_name: str = "" | |
| grade_level: str = "" | |
| section: str = "" | |
| teacher_id: str = "" | |
| student_count: int = 0 | |
| class_average: float = 0.0 | |
| completion_rate: float = 0.0 | |
| participation_rate: float = 0.0 | |
| attention_count: int = 0 | |
| students: List[StudentAnalyticsSummary] = Field(default_factory=list) | |
| insights: Optional[ClassInsights] = None | |
| generated_at: str = "" | |
| # โโโ Risk & Engagement Classification โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def classify_risk(avg_score: float, quiz_count: int, days_since_active: Optional[int]) -> str: | |
| """Map to WRI status bands. When no WRI is available, estimate from avg_score.""" | |
| if quiz_count == 0: | |
| return "pending_assessment" | |
| # Approximate WRI bands from avg_score (actual WRI uses D, G, P weights) | |
| if avg_score >= 88: | |
| return "safe" | |
| if avg_score >= 80: | |
| return "watch" | |
| if avg_score >= 75: | |
| return "intervene" | |
| if avg_score >= 68: | |
| return "critical" | |
| return "at_risk" | |
| def classify_engagement(days_since_active: Optional[int], recent_quiz_count: int) -> str: | |
| if days_since_active is not None and days_since_active <= 2 and recent_quiz_count >= 5: | |
| return "High" | |
| if days_since_active is not None and days_since_active <= 7: | |
| return "Medium" | |
| return "Low" | |
| # โโโ Engine โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| class ClassAnalyticsEngine: | |
| """Computes class analytics from Firestore data.""" | |
| def __init__(self): | |
| self._cache: Dict[str, tuple] = {} # class_id -> (report, timestamp) | |
| self._cache_ttl = 1800 # 30 min | |
| async def get_class_analytics( | |
| self, class_id: str, teacher_id: str, force_refresh: bool = False | |
| ) -> ClassAnalyticsReport: | |
| # Check cache | |
| if not force_refresh and class_id in self._cache: | |
| report, cached_at = self._cache[class_id] | |
| if time.time() - cached_at < self._cache_ttl: | |
| return report | |
| db = _get_firestore_client() | |
| if not db: | |
| logger.error("Firestore client unavailable") | |
| return ClassAnalyticsReport(class_id=class_id, teacher_id=teacher_id, generated_at=_now_iso()) | |
| # Fast path: try reading from denormalized student_summaries (written by pipeline) | |
| summaries_snap = list(db.collection("classes").document(class_id).collection("student_summaries").stream()) | |
| use_fast_path = len(summaries_snap) > 0 and not force_refresh | |
| # Fetch class info | |
| class_doc = db.collection("classrooms").document(class_id).get() | |
| class_data = class_doc.to_dict() if class_doc.exists else {} | |
| class_name = class_data.get("name", "") | |
| grade_level = class_data.get("gradeLevel", class_data.get("grade", "")) | |
| section = class_data.get("section", "") | |
| # Fetch students in this class from managedStudents | |
| students_query = db.collection("managedStudents").where("classroomId", "==", class_id).stream() | |
| managed_students = [] | |
| for doc_snap in students_query: | |
| managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()}) | |
| # Also try classSectionId match | |
| if not managed_students: | |
| class_section_id = class_data.get("classSectionId", class_id) | |
| students_query2 = db.collection("managedStudents").where("classSectionId", "==", class_section_id).stream() | |
| for doc_snap in students_query2: | |
| managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()}) | |
| # Fetch quiz data for each student from progress collection | |
| student_summaries = await self._build_student_summaries(db, managed_students) | |
| # Compute class-level metrics | |
| assessed_students = [s for s in student_summaries if s.quiz_attempt_count > 0] | |
| student_count = len(student_summaries) | |
| class_average = 0.0 | |
| if assessed_students: | |
| class_average = sum(s.avg_score for s in assessed_students) / len(assessed_students) | |
| completion_rate = 0.0 | |
| if student_count > 0: | |
| completion_rate = (len(assessed_students) / student_count) * 100 | |
| # Participation = active in last 7 days | |
| now = datetime.now(timezone.utc) | |
| active_count = sum(1 for s in student_summaries if s.last_active and _days_since(s.last_active) <= 7) | |
| participation_rate = (active_count / student_count * 100) if student_count > 0 else 0.0 | |
| # Attention = intervene + critical + at_risk | |
| attention_count = sum(1 for s in student_summaries if s.risk_level in ("intervene", "critical", "at_risk")) | |
| # Topic performance | |
| topic_perf = self._compute_topic_performance(student_summaries) | |
| # Risk distribution | |
| risk_dist = {"safe": 0, "watch": 0, "intervene": 0, "critical": 0, "at_risk": 0, "pending_assessment": 0} | |
| for s in student_summaries: | |
| # Prefer stored WRI status from managedStudents if available | |
| stored_status = None | |
| try: | |
| ms_doc = db.collection("managedStudents").document(s.student_id).get() | |
| if ms_doc.exists: | |
| stored_status = ms_doc.to_dict().get("riskStatus") | |
| except Exception: | |
| pass | |
| status = stored_status if stored_status in risk_dist else s.risk_level | |
| risk_dist[status] = risk_dist.get(status, 0) + 1 | |
| # Generate AI insights | |
| insights = await self._generate_insights( | |
| class_id=class_id, | |
| class_name=class_name, | |
| grade_level=grade_level, | |
| section=section, | |
| student_count=student_count, | |
| class_average=class_average, | |
| completion_rate=completion_rate, | |
| participation_rate=participation_rate, | |
| risk_dist=risk_dist, | |
| topic_perf=topic_perf, | |
| ) | |
| report = ClassAnalyticsReport( | |
| class_id=class_id, | |
| class_name=class_name, | |
| grade_level=grade_level, | |
| section=section, | |
| teacher_id=teacher_id, | |
| student_count=student_count, | |
| class_average=round(class_average, 1), | |
| completion_rate=round(completion_rate, 1), | |
| participation_rate=round(participation_rate, 1), | |
| attention_count=attention_count, | |
| students=student_summaries, | |
| insights=insights, | |
| generated_at=_now_iso(), | |
| ) | |
| # Cache | |
| self._cache[class_id] = (report, time.time()) | |
| # Persist to Firestore | |
| try: | |
| db.collection("class_analytics").document(class_id).set( | |
| {**report.model_dump(), "cached_at": _now_iso()}, | |
| merge=True, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to persist analytics cache: {e}") | |
| return report | |
| async def _build_student_summaries( | |
| self, db: Any, managed_students: List[Dict] | |
| ) -> List[StudentAnalyticsSummary]: | |
| summaries = [] | |
| now = datetime.now(timezone.utc) | |
| for student in managed_students: | |
| student_id = student.get("id", "") | |
| student_name = student.get("name", "Unknown") | |
| avatar = student.get("avatar", "") | |
| # Try to fetch quiz data from progress/{student_id} | |
| quiz_attempts = [] | |
| try: | |
| # Try by student ID first | |
| progress_doc = db.collection("progress").document(student_id).get() | |
| if progress_doc.exists: | |
| pdata = progress_doc.to_dict() | |
| quiz_attempts = pdata.get("quizAttempts", []) | |
| # Also try by LRN if available | |
| if not quiz_attempts and student.get("lrn"): | |
| progress_doc2 = db.collection("progress").document(student["lrn"]).get() | |
| if progress_doc2.exists: | |
| pdata2 = progress_doc2.to_dict() | |
| quiz_attempts = pdata2.get("quizAttempts", []) | |
| # Also try accountUid | |
| if not quiz_attempts and student.get("accountUid"): | |
| progress_doc3 = db.collection("progress").document(student["accountUid"]).get() | |
| if progress_doc3.exists: | |
| pdata3 = progress_doc3.to_dict() | |
| quiz_attempts = pdata3.get("quizAttempts", []) | |
| # Also check practice_results subcollection | |
| if not quiz_attempts and student.get("accountUid"): | |
| practice_sessions = ( | |
| db.collection("practice_results") | |
| .document(student["accountUid"]) | |
| .collection("sessions") | |
| .order_by("submitted_at", direction="DESCENDING") | |
| .limit(20) | |
| .stream() | |
| ) | |
| for sess in practice_sessions: | |
| sd = sess.to_dict() | |
| quiz_attempts.append({ | |
| "quizId": sd.get("session_id", ""), | |
| "score": sd.get("score_percent", 0), | |
| "completedAt": sd.get("submitted_at"), | |
| "answers": sd.get("per_question_feedback", []), | |
| }) | |
| except Exception as e: | |
| logger.debug(f"Error fetching progress for {student_id}: {e}") | |
| # Compute metrics from quiz attempts | |
| quiz_count = len(quiz_attempts) | |
| avg_score = 0.0 | |
| accuracy_by_topic: Dict[str, List[float]] = {} | |
| if quiz_count > 0: | |
| scores = [float(q.get("score", 0)) for q in quiz_attempts] | |
| avg_score = sum(scores) / len(scores) | |
| # Extract per-question topic accuracy if available | |
| for attempt in quiz_attempts: | |
| answers = attempt.get("answers", []) | |
| quiz_id = attempt.get("quizId", "") | |
| # Use quizId as topic proxy if no per-question topic | |
| topic = _extract_topic_from_quiz_id(quiz_id) | |
| if topic: | |
| if topic not in accuracy_by_topic: | |
| accuracy_by_topic[topic] = [] | |
| accuracy_by_topic[topic].append(float(attempt.get("score", 0))) | |
| # Compute topic averages | |
| topic_avgs = {t: sum(scores) / len(scores) for t, scores in accuracy_by_topic.items() if scores} | |
| weakest_topic = min(topic_avgs, key=topic_avgs.get) if topic_avgs else student.get("weakestTopic") | |
| if weakest_topic == "N/A": | |
| weakest_topic = None | |
| # Last active | |
| last_active_ts = student.get("lastActive") | |
| last_active_str = None | |
| days_since_active = None | |
| if last_active_ts: | |
| try: | |
| if hasattr(last_active_ts, "seconds"): | |
| last_dt = datetime.fromtimestamp(last_active_ts.seconds, tz=timezone.utc) | |
| else: | |
| last_dt = last_active_ts | |
| last_active_str = last_dt.isoformat() | |
| days_since_active = (now - last_dt).days | |
| except Exception: | |
| pass | |
| # Recent quiz count (last 14 days) | |
| recent_quiz_count = 0 | |
| for q in quiz_attempts: | |
| completed = q.get("completedAt") | |
| if completed: | |
| try: | |
| if hasattr(completed, "seconds"): | |
| q_dt = datetime.fromtimestamp(completed.seconds, tz=timezone.utc) | |
| else: | |
| q_dt = completed if isinstance(completed, datetime) else datetime.now(timezone.utc) | |
| if (now - q_dt).days <= 14: | |
| recent_quiz_count += 1 | |
| except Exception: | |
| pass | |
| risk_level = classify_risk(avg_score, quiz_count, days_since_active) | |
| engagement = classify_engagement(days_since_active, recent_quiz_count) | |
| summaries.append(StudentAnalyticsSummary( | |
| student_id=student_id, | |
| student_name=student_name, | |
| avatar_url=avatar or None, | |
| grade_level=student.get("gradeLevel", student.get("grade", "")), | |
| section=student.get("section", ""), | |
| avg_score=round(avg_score, 1), | |
| quiz_attempt_count=quiz_count, | |
| last_active=last_active_str, | |
| risk_level=risk_level, | |
| engagement_level=engagement, | |
| weakest_topic=weakest_topic, | |
| accuracy_by_topic=topic_avgs, | |
| completion_rate=min(quiz_count / 5 * 100, 100) if quiz_count > 0 else 0.0, | |
| )) | |
| return summaries | |
| def _compute_topic_performance(self, students: List[StudentAnalyticsSummary]) -> List[TopicPerformance]: | |
| topic_data: Dict[str, Dict] = {} | |
| for s in students: | |
| for topic, accuracy in s.accuracy_by_topic.items(): | |
| if topic not in topic_data: | |
| topic_data[topic] = {"scores": [], "struggling": 0, "mastered": 0} | |
| topic_data[topic]["scores"].append(accuracy) | |
| if accuracy < 60: | |
| topic_data[topic]["struggling"] += 1 | |
| if accuracy >= 80: | |
| topic_data[topic]["mastered"] += 1 | |
| # Also include weakest_topic from students without per-topic data | |
| for s in students: | |
| if s.weakest_topic and s.weakest_topic not in topic_data and s.quiz_attempt_count > 0: | |
| topic_data[s.weakest_topic] = { | |
| "scores": [s.avg_score], | |
| "struggling": 1 if s.avg_score < 60 else 0, | |
| "mastered": 0, | |
| } | |
| result = [] | |
| for topic, data in topic_data.items(): | |
| if not data["scores"]: | |
| continue | |
| result.append(TopicPerformance( | |
| topic=topic, | |
| class_accuracy=round(sum(data["scores"]) / len(data["scores"]), 1), | |
| struggling_count=data["struggling"], | |
| mastered_count=data["mastered"], | |
| )) | |
| return sorted(result, key=lambda t: t.class_accuracy)[:8] | |
| async def _generate_insights( | |
| self, | |
| class_id: str, | |
| class_name: str, | |
| grade_level: str, | |
| section: str, | |
| student_count: int, | |
| class_average: float, | |
| completion_rate: float, | |
| participation_rate: float, | |
| risk_dist: Dict[str, int], | |
| topic_perf: List[TopicPerformance], | |
| ) -> ClassInsights: | |
| # Format topic performance for prompt | |
| topic_lines = "\n".join( | |
| f" - {t.topic}: {t.class_accuracy}% (struggling: {t.struggling_count})" | |
| for t in topic_perf[:6] | |
| ) or " No topic data available yet." | |
| weak_topics = [t.topic for t in topic_perf[:3]] if topic_perf else [] | |
| prompt = f"""You are MathPulse AI analyzing a class's performance data for a Filipino K-12 teacher. | |
| Class: Grade {grade_level} - {section} ({class_name}) | |
| Student Count: {student_count} | |
| Class Average Score: {class_average:.1f}% | |
| Completion Rate: {completion_rate:.1f}% | |
| Participation Rate: {participation_rate:.1f}% | |
| Risk Distribution: | |
| - Critical: {risk_dist.get('Critical', 0)} students | |
| - High Risk: {risk_dist.get('High Risk', 0)} students | |
| - Medium Risk: {risk_dist.get('Medium Risk', 0)} students | |
| - Low Risk: {risk_dist.get('Low Risk', 0)} students | |
| - Unassessed: {risk_dist.get('Unassessed', 0)} students | |
| Topic Performance (class accuracy): | |
| {topic_lines} | |
| Top Weakest Topics: {', '.join(weak_topics) if weak_topics else 'None identified yet'} | |
| Generate a JSON response with these exact keys: | |
| {{ | |
| "class_summary": "2-3 sentence overview of class performance. Be honest but constructive.", | |
| "class_strengths": "1 sentence on what the class is doing well.", | |
| "top_weak_topics": ["topic1", "topic2", "topic3"], | |
| "recommended_actions": [ | |
| "Specific action 1 (max 20 words)", | |
| "Specific action 2", | |
| "Specific action 3" | |
| ] | |
| }} | |
| Be specific to Filipino K-12 DepEd context. If data is limited, acknowledge it and suggest next steps.""" | |
| try: | |
| from services.ai_client import get_deepseek_client, CHAT_MODEL | |
| client = get_deepseek_client() | |
| response = client.chat.completions.create( | |
| model=CHAT_MODEL, | |
| messages=[ | |
| {"role": "system", "content": "You are MathPulse AI, a class analytics assistant for Filipino K-12 math teachers. Respond only with valid JSON."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=0.3, | |
| max_tokens=500, | |
| response_format={"type": "json_object"}, | |
| ) | |
| content = response.choices[0].message.content or "{}" | |
| parsed = json.loads(content) | |
| return ClassInsights( | |
| class_id=class_id, | |
| generated_at=_now_iso(), | |
| class_summary=parsed.get("class_summary", "Analytics data is being collected."), | |
| top_weak_topics=parsed.get("top_weak_topics", weak_topics), | |
| recommended_actions=parsed.get("recommended_actions", ["Encourage students to complete more quizzes."]), | |
| class_strengths=parsed.get("class_strengths", "Class is actively using the platform."), | |
| risk_distribution=risk_dist, | |
| topic_performance=topic_perf, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"DeepSeek insights generation failed: {e}") | |
| # Return fallback insights | |
| return ClassInsights( | |
| class_id=class_id, | |
| generated_at=_now_iso(), | |
| class_summary=f"Class has {student_count} students with an average score of {class_average:.0f}%. {risk_dist.get('Unassessed', 0)} students have not yet taken any quizzes.", | |
| top_weak_topics=weak_topics, | |
| recommended_actions=[ | |
| "Encourage unassessed students to complete their first quiz.", | |
| "Review struggling topics in the next class session.", | |
| "Schedule one-on-one check-ins with Critical risk students.", | |
| ], | |
| class_strengths="Students are enrolled and the platform is ready for use." if class_average < 50 else f"Class maintains a {class_average:.0f}% average.", | |
| risk_distribution=risk_dist, | |
| topic_performance=topic_perf, | |
| ) | |
| def invalidate_cache(self, class_id: str) -> None: | |
| self._cache.pop(class_id, None) | |
| # โโโ Helpers โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _days_since(iso_str: str) -> int: | |
| try: | |
| dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00")) | |
| return (datetime.now(timezone.utc) - dt).days | |
| except Exception: | |
| return 999 | |
| def _extract_topic_from_quiz_id(quiz_id: str) -> Optional[str]: | |
| """Extract topic name from quiz ID patterns like 'algebra-1', 'geometry-basics'.""" | |
| if not quiz_id: | |
| return None | |
| # Common patterns: subject-topic, module_quiz, etc. | |
| parts = quiz_id.replace("_", "-").replace(".", "-").split("-") | |
| if len(parts) >= 2: | |
| # Capitalize and join meaningful parts | |
| topic = " ".join(p.capitalize() for p in parts[:2] if p and not p.isdigit()) | |
| return topic if topic else None | |
| return quiz_id.capitalize() if quiz_id else None | |
| # Singleton | |
| _engine_instance: Optional[ClassAnalyticsEngine] = None | |
| def get_class_analytics_engine() -> ClassAnalyticsEngine: | |
| global _engine_instance | |
| if _engine_instance is None: | |
| _engine_instance = ClassAnalyticsEngine() | |
| return _engine_instance | |