mathpulse-api-v3test / services /class_analytics_engine.py
github-actions[bot]
๐Ÿš€ Auto-deploy backend from GitHub (8e7094e)
b739f9d
"""
MathPulse AI โ€” Class Analytics Engine
Fetches real quiz data from Firestore, computes per-student and class-level
metrics, generates AI insights via DeepSeek, and caches results.
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
logger = logging.getLogger("mathpulse.class_analytics")
# โ”€โ”€โ”€ Firestore helper โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_firebase_firestore = None
def _get_firestore_client():
global _firebase_firestore
if _firebase_firestore is None:
try:
from firebase_admin import firestore as ff
_firebase_firestore = ff
except Exception:
return None
try:
return _firebase_firestore.client()
except Exception:
return None
# โ”€โ”€โ”€ Models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class StudentAnalyticsSummary(BaseModel):
student_id: str
student_name: str
avatar_url: Optional[str] = None
grade_level: str = ""
section: str = ""
avg_score: float = 0.0
quiz_attempt_count: int = 0
last_active: Optional[str] = None
risk_level: Literal["Low Risk", "Medium Risk", "High Risk", "Critical", "Unassessed"] = "Unassessed"
engagement_level: Literal["Low", "Medium", "High"] = "Low"
weakest_topic: Optional[str] = None
accuracy_by_topic: Dict[str, float] = Field(default_factory=dict)
completion_rate: float = 0.0
class TopicPerformance(BaseModel):
topic: str
class_accuracy: float = 0.0
struggling_count: int = 0
mastered_count: int = 0
class ClassInsights(BaseModel):
class_id: str
generated_at: str = ""
class_summary: str = ""
top_weak_topics: List[str] = Field(default_factory=list)
recommended_actions: List[str] = Field(default_factory=list)
class_strengths: str = ""
risk_distribution: Dict[str, int] = Field(default_factory=dict)
topic_performance: List[TopicPerformance] = Field(default_factory=list)
class ClassAnalyticsReport(BaseModel):
class_id: str
class_name: str = ""
grade_level: str = ""
section: str = ""
teacher_id: str = ""
student_count: int = 0
class_average: float = 0.0
completion_rate: float = 0.0
participation_rate: float = 0.0
attention_count: int = 0
students: List[StudentAnalyticsSummary] = Field(default_factory=list)
insights: Optional[ClassInsights] = None
generated_at: str = ""
# โ”€โ”€โ”€ Risk & Engagement Classification โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def classify_risk(avg_score: float, quiz_count: int, days_since_active: Optional[int]) -> str:
"""Map to WRI status bands. When no WRI is available, estimate from avg_score."""
if quiz_count == 0:
return "pending_assessment"
# Approximate WRI bands from avg_score (actual WRI uses D, G, P weights)
if avg_score >= 88:
return "safe"
if avg_score >= 80:
return "watch"
if avg_score >= 75:
return "intervene"
if avg_score >= 68:
return "critical"
return "at_risk"
def classify_engagement(days_since_active: Optional[int], recent_quiz_count: int) -> str:
if days_since_active is not None and days_since_active <= 2 and recent_quiz_count >= 5:
return "High"
if days_since_active is not None and days_since_active <= 7:
return "Medium"
return "Low"
# โ”€โ”€โ”€ Engine โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class ClassAnalyticsEngine:
"""Computes class analytics from Firestore data."""
def __init__(self):
self._cache: Dict[str, tuple] = {} # class_id -> (report, timestamp)
self._cache_ttl = 1800 # 30 min
async def get_class_analytics(
self, class_id: str, teacher_id: str, force_refresh: bool = False
) -> ClassAnalyticsReport:
# Check cache
if not force_refresh and class_id in self._cache:
report, cached_at = self._cache[class_id]
if time.time() - cached_at < self._cache_ttl:
return report
db = _get_firestore_client()
if not db:
logger.error("Firestore client unavailable")
return ClassAnalyticsReport(class_id=class_id, teacher_id=teacher_id, generated_at=_now_iso())
# Fast path: try reading from denormalized student_summaries (written by pipeline)
summaries_snap = list(db.collection("classes").document(class_id).collection("student_summaries").stream())
use_fast_path = len(summaries_snap) > 0 and not force_refresh
# Fetch class info
class_doc = db.collection("classrooms").document(class_id).get()
class_data = class_doc.to_dict() if class_doc.exists else {}
class_name = class_data.get("name", "")
grade_level = class_data.get("gradeLevel", class_data.get("grade", ""))
section = class_data.get("section", "")
# Fetch students in this class from managedStudents
students_query = db.collection("managedStudents").where("classroomId", "==", class_id).stream()
managed_students = []
for doc_snap in students_query:
managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()})
# Also try classSectionId match
if not managed_students:
class_section_id = class_data.get("classSectionId", class_id)
students_query2 = db.collection("managedStudents").where("classSectionId", "==", class_section_id).stream()
for doc_snap in students_query2:
managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()})
# Fetch quiz data for each student from progress collection
student_summaries = await self._build_student_summaries(db, managed_students)
# Compute class-level metrics
assessed_students = [s for s in student_summaries if s.quiz_attempt_count > 0]
student_count = len(student_summaries)
class_average = 0.0
if assessed_students:
class_average = sum(s.avg_score for s in assessed_students) / len(assessed_students)
completion_rate = 0.0
if student_count > 0:
completion_rate = (len(assessed_students) / student_count) * 100
# Participation = active in last 7 days
now = datetime.now(timezone.utc)
active_count = sum(1 for s in student_summaries if s.last_active and _days_since(s.last_active) <= 7)
participation_rate = (active_count / student_count * 100) if student_count > 0 else 0.0
# Attention = intervene + critical + at_risk
attention_count = sum(1 for s in student_summaries if s.risk_level in ("intervene", "critical", "at_risk"))
# Topic performance
topic_perf = self._compute_topic_performance(student_summaries)
# Risk distribution
risk_dist = {"safe": 0, "watch": 0, "intervene": 0, "critical": 0, "at_risk": 0, "pending_assessment": 0}
for s in student_summaries:
# Prefer stored WRI status from managedStudents if available
stored_status = None
try:
ms_doc = db.collection("managedStudents").document(s.student_id).get()
if ms_doc.exists:
stored_status = ms_doc.to_dict().get("riskStatus")
except Exception:
pass
status = stored_status if stored_status in risk_dist else s.risk_level
risk_dist[status] = risk_dist.get(status, 0) + 1
# Generate AI insights
insights = await self._generate_insights(
class_id=class_id,
class_name=class_name,
grade_level=grade_level,
section=section,
student_count=student_count,
class_average=class_average,
completion_rate=completion_rate,
participation_rate=participation_rate,
risk_dist=risk_dist,
topic_perf=topic_perf,
)
report = ClassAnalyticsReport(
class_id=class_id,
class_name=class_name,
grade_level=grade_level,
section=section,
teacher_id=teacher_id,
student_count=student_count,
class_average=round(class_average, 1),
completion_rate=round(completion_rate, 1),
participation_rate=round(participation_rate, 1),
attention_count=attention_count,
students=student_summaries,
insights=insights,
generated_at=_now_iso(),
)
# Cache
self._cache[class_id] = (report, time.time())
# Persist to Firestore
try:
db.collection("class_analytics").document(class_id).set(
{**report.model_dump(), "cached_at": _now_iso()},
merge=True,
)
except Exception as e:
logger.warning(f"Failed to persist analytics cache: {e}")
return report
async def _build_student_summaries(
self, db: Any, managed_students: List[Dict]
) -> List[StudentAnalyticsSummary]:
summaries = []
now = datetime.now(timezone.utc)
for student in managed_students:
student_id = student.get("id", "")
student_name = student.get("name", "Unknown")
avatar = student.get("avatar", "")
# Try to fetch quiz data from progress/{student_id}
quiz_attempts = []
try:
# Try by student ID first
progress_doc = db.collection("progress").document(student_id).get()
if progress_doc.exists:
pdata = progress_doc.to_dict()
quiz_attempts = pdata.get("quizAttempts", [])
# Also try by LRN if available
if not quiz_attempts and student.get("lrn"):
progress_doc2 = db.collection("progress").document(student["lrn"]).get()
if progress_doc2.exists:
pdata2 = progress_doc2.to_dict()
quiz_attempts = pdata2.get("quizAttempts", [])
# Also try accountUid
if not quiz_attempts and student.get("accountUid"):
progress_doc3 = db.collection("progress").document(student["accountUid"]).get()
if progress_doc3.exists:
pdata3 = progress_doc3.to_dict()
quiz_attempts = pdata3.get("quizAttempts", [])
# Also check practice_results subcollection
if not quiz_attempts and student.get("accountUid"):
practice_sessions = (
db.collection("practice_results")
.document(student["accountUid"])
.collection("sessions")
.order_by("submitted_at", direction="DESCENDING")
.limit(20)
.stream()
)
for sess in practice_sessions:
sd = sess.to_dict()
quiz_attempts.append({
"quizId": sd.get("session_id", ""),
"score": sd.get("score_percent", 0),
"completedAt": sd.get("submitted_at"),
"answers": sd.get("per_question_feedback", []),
})
except Exception as e:
logger.debug(f"Error fetching progress for {student_id}: {e}")
# Compute metrics from quiz attempts
quiz_count = len(quiz_attempts)
avg_score = 0.0
accuracy_by_topic: Dict[str, List[float]] = {}
if quiz_count > 0:
scores = [float(q.get("score", 0)) for q in quiz_attempts]
avg_score = sum(scores) / len(scores)
# Extract per-question topic accuracy if available
for attempt in quiz_attempts:
answers = attempt.get("answers", [])
quiz_id = attempt.get("quizId", "")
# Use quizId as topic proxy if no per-question topic
topic = _extract_topic_from_quiz_id(quiz_id)
if topic:
if topic not in accuracy_by_topic:
accuracy_by_topic[topic] = []
accuracy_by_topic[topic].append(float(attempt.get("score", 0)))
# Compute topic averages
topic_avgs = {t: sum(scores) / len(scores) for t, scores in accuracy_by_topic.items() if scores}
weakest_topic = min(topic_avgs, key=topic_avgs.get) if topic_avgs else student.get("weakestTopic")
if weakest_topic == "N/A":
weakest_topic = None
# Last active
last_active_ts = student.get("lastActive")
last_active_str = None
days_since_active = None
if last_active_ts:
try:
if hasattr(last_active_ts, "seconds"):
last_dt = datetime.fromtimestamp(last_active_ts.seconds, tz=timezone.utc)
else:
last_dt = last_active_ts
last_active_str = last_dt.isoformat()
days_since_active = (now - last_dt).days
except Exception:
pass
# Recent quiz count (last 14 days)
recent_quiz_count = 0
for q in quiz_attempts:
completed = q.get("completedAt")
if completed:
try:
if hasattr(completed, "seconds"):
q_dt = datetime.fromtimestamp(completed.seconds, tz=timezone.utc)
else:
q_dt = completed if isinstance(completed, datetime) else datetime.now(timezone.utc)
if (now - q_dt).days <= 14:
recent_quiz_count += 1
except Exception:
pass
risk_level = classify_risk(avg_score, quiz_count, days_since_active)
engagement = classify_engagement(days_since_active, recent_quiz_count)
summaries.append(StudentAnalyticsSummary(
student_id=student_id,
student_name=student_name,
avatar_url=avatar or None,
grade_level=student.get("gradeLevel", student.get("grade", "")),
section=student.get("section", ""),
avg_score=round(avg_score, 1),
quiz_attempt_count=quiz_count,
last_active=last_active_str,
risk_level=risk_level,
engagement_level=engagement,
weakest_topic=weakest_topic,
accuracy_by_topic=topic_avgs,
completion_rate=min(quiz_count / 5 * 100, 100) if quiz_count > 0 else 0.0,
))
return summaries
def _compute_topic_performance(self, students: List[StudentAnalyticsSummary]) -> List[TopicPerformance]:
topic_data: Dict[str, Dict] = {}
for s in students:
for topic, accuracy in s.accuracy_by_topic.items():
if topic not in topic_data:
topic_data[topic] = {"scores": [], "struggling": 0, "mastered": 0}
topic_data[topic]["scores"].append(accuracy)
if accuracy < 60:
topic_data[topic]["struggling"] += 1
if accuracy >= 80:
topic_data[topic]["mastered"] += 1
# Also include weakest_topic from students without per-topic data
for s in students:
if s.weakest_topic and s.weakest_topic not in topic_data and s.quiz_attempt_count > 0:
topic_data[s.weakest_topic] = {
"scores": [s.avg_score],
"struggling": 1 if s.avg_score < 60 else 0,
"mastered": 0,
}
result = []
for topic, data in topic_data.items():
if not data["scores"]:
continue
result.append(TopicPerformance(
topic=topic,
class_accuracy=round(sum(data["scores"]) / len(data["scores"]), 1),
struggling_count=data["struggling"],
mastered_count=data["mastered"],
))
return sorted(result, key=lambda t: t.class_accuracy)[:8]
async def _generate_insights(
self,
class_id: str,
class_name: str,
grade_level: str,
section: str,
student_count: int,
class_average: float,
completion_rate: float,
participation_rate: float,
risk_dist: Dict[str, int],
topic_perf: List[TopicPerformance],
) -> ClassInsights:
# Format topic performance for prompt
topic_lines = "\n".join(
f" - {t.topic}: {t.class_accuracy}% (struggling: {t.struggling_count})"
for t in topic_perf[:6]
) or " No topic data available yet."
weak_topics = [t.topic for t in topic_perf[:3]] if topic_perf else []
prompt = f"""You are MathPulse AI analyzing a class's performance data for a Filipino K-12 teacher.
Class: Grade {grade_level} - {section} ({class_name})
Student Count: {student_count}
Class Average Score: {class_average:.1f}%
Completion Rate: {completion_rate:.1f}%
Participation Rate: {participation_rate:.1f}%
Risk Distribution:
- Critical: {risk_dist.get('Critical', 0)} students
- High Risk: {risk_dist.get('High Risk', 0)} students
- Medium Risk: {risk_dist.get('Medium Risk', 0)} students
- Low Risk: {risk_dist.get('Low Risk', 0)} students
- Unassessed: {risk_dist.get('Unassessed', 0)} students
Topic Performance (class accuracy):
{topic_lines}
Top Weakest Topics: {', '.join(weak_topics) if weak_topics else 'None identified yet'}
Generate a JSON response with these exact keys:
{{
"class_summary": "2-3 sentence overview of class performance. Be honest but constructive.",
"class_strengths": "1 sentence on what the class is doing well.",
"top_weak_topics": ["topic1", "topic2", "topic3"],
"recommended_actions": [
"Specific action 1 (max 20 words)",
"Specific action 2",
"Specific action 3"
]
}}
Be specific to Filipino K-12 DepEd context. If data is limited, acknowledge it and suggest next steps."""
try:
from services.ai_client import get_deepseek_client, CHAT_MODEL
client = get_deepseek_client()
response = client.chat.completions.create(
model=CHAT_MODEL,
messages=[
{"role": "system", "content": "You are MathPulse AI, a class analytics assistant for Filipino K-12 math teachers. Respond only with valid JSON."},
{"role": "user", "content": prompt},
],
temperature=0.3,
max_tokens=500,
response_format={"type": "json_object"},
)
content = response.choices[0].message.content or "{}"
parsed = json.loads(content)
return ClassInsights(
class_id=class_id,
generated_at=_now_iso(),
class_summary=parsed.get("class_summary", "Analytics data is being collected."),
top_weak_topics=parsed.get("top_weak_topics", weak_topics),
recommended_actions=parsed.get("recommended_actions", ["Encourage students to complete more quizzes."]),
class_strengths=parsed.get("class_strengths", "Class is actively using the platform."),
risk_distribution=risk_dist,
topic_performance=topic_perf,
)
except Exception as e:
logger.warning(f"DeepSeek insights generation failed: {e}")
# Return fallback insights
return ClassInsights(
class_id=class_id,
generated_at=_now_iso(),
class_summary=f"Class has {student_count} students with an average score of {class_average:.0f}%. {risk_dist.get('Unassessed', 0)} students have not yet taken any quizzes.",
top_weak_topics=weak_topics,
recommended_actions=[
"Encourage unassessed students to complete their first quiz.",
"Review struggling topics in the next class session.",
"Schedule one-on-one check-ins with Critical risk students.",
],
class_strengths="Students are enrolled and the platform is ready for use." if class_average < 50 else f"Class maintains a {class_average:.0f}% average.",
risk_distribution=risk_dist,
topic_performance=topic_perf,
)
def invalidate_cache(self, class_id: str) -> None:
self._cache.pop(class_id, None)
# โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _days_since(iso_str: str) -> int:
try:
dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
return (datetime.now(timezone.utc) - dt).days
except Exception:
return 999
def _extract_topic_from_quiz_id(quiz_id: str) -> Optional[str]:
"""Extract topic name from quiz ID patterns like 'algebra-1', 'geometry-basics'."""
if not quiz_id:
return None
# Common patterns: subject-topic, module_quiz, etc.
parts = quiz_id.replace("_", "-").replace(".", "-").split("-")
if len(parts) >= 2:
# Capitalize and join meaningful parts
topic = " ".join(p.capitalize() for p in parts[:2] if p and not p.isdigit())
return topic if topic else None
return quiz_id.capitalize() if quiz_id else None
# Singleton
_engine_instance: Optional[ClassAnalyticsEngine] = None
def get_class_analytics_engine() -> ClassAnalyticsEngine:
global _engine_instance
if _engine_instance is None:
_engine_instance = ClassAnalyticsEngine()
return _engine_instance