github-actions[bot] commited on
Commit
b739f9d
ยท
1 Parent(s): d3d533f

๐Ÿš€ Auto-deploy backend from GitHub (8e7094e)

Browse files
main.py CHANGED
@@ -104,6 +104,9 @@ from routes.risk_router import router as risk_router
104
  from routes.tutor_checkin import router as tutor_checkin_router
105
  from routes.practice import router as practice_router
106
  from routes.ai_monitoring import router as ai_monitoring_router
 
 
 
107
 
108
  # Rate limiting (slowapi)
109
  try:
@@ -379,6 +382,19 @@ ROLE_POLICIES: Dict[str, Set[str]] = {
379
  "/api/predict-risk/batch": TEACHER_OR_ADMIN,
380
  "/api/learning-path": ALL_APP_ROLES,
381
  "/api/analytics/daily-insight": TEACHER_OR_ADMIN,
 
 
 
 
 
 
 
 
 
 
 
 
 
382
  "/api/upload/class-records": TEACHER_OR_ADMIN,
383
  "/api/class-section/{class_section_id}": TEACHER_OR_ADMIN,
384
  "/api/upload/class-records/risk-refresh/recent": TEACHER_OR_ADMIN,
@@ -386,6 +402,7 @@ ROLE_POLICIES: Dict[str, Set[str]] = {
386
  "/api/import/student-accounts/commit": TEACHER_OR_ADMIN,
387
  "/api/admin/users": ADMIN_ONLY,
388
  "/api/admin/users/bulk-action": ADMIN_ONLY,
 
389
  "/api/teacher/create-student-account": TEACHER_OR_ADMIN,
390
  "/api/upload/course-materials": TEACHER_OR_ADMIN,
391
  "/api/upload/course-materials/recent": TEACHER_OR_ADMIN,
@@ -1149,6 +1166,9 @@ app.include_router(risk_router)
1149
  app.include_router(tutor_checkin_router)
1150
  app.include_router(practice_router)
1151
  app.include_router(ai_monitoring_router)
 
 
 
1152
 
1153
 
1154
  # โ”€โ”€โ”€ Global Exception Handler โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
 
104
  from routes.tutor_checkin import router as tutor_checkin_router
105
  from routes.practice import router as practice_router
106
  from routes.ai_monitoring import router as ai_monitoring_router
107
+ from routes.class_analytics_routes import router as class_analytics_router
108
+ from routes.intervention_routes import router as intervention_router
109
+ from routes.pipeline_routes import router as pipeline_router
110
 
111
  # Rate limiting (slowapi)
112
  try:
 
382
  "/api/predict-risk/batch": TEACHER_OR_ADMIN,
383
  "/api/learning-path": ALL_APP_ROLES,
384
  "/api/analytics/daily-insight": TEACHER_OR_ADMIN,
385
+ "/api/analytics/class/{class_id}": TEACHER_OR_ADMIN,
386
+ "/api/analytics/class/{class_id}/students": TEACHER_OR_ADMIN,
387
+ "/api/analytics/class/{class_id}/topics": TEACHER_OR_ADMIN,
388
+ "/api/analytics/class/{class_id}/refresh-insights": TEACHER_OR_ADMIN,
389
+ "/api/analytics/class/{class_id}/invalidate-cache": ALL_APP_ROLES,
390
+ "/api/intervention/generate": TEACHER_OR_ADMIN,
391
+ "/api/intervention/{student_id}": TEACHER_OR_ADMIN,
392
+ "/api/intervention/{student_id}/step/{step_number}/complete": ALL_APP_ROLES,
393
+ "/api/intervention/{student_id}/export-pdf": TEACHER_OR_ADMIN,
394
+ "/api/pipeline/event": ALL_APP_ROLES,
395
+ "/api/pipeline/profile/{student_id}": ALL_APP_ROLES,
396
+ "/api/pipeline/profile/{student_id}/recompute": TEACHER_OR_ADMIN,
397
+ "/api/pipeline/admin/backfill": ADMIN_ONLY,
398
  "/api/upload/class-records": TEACHER_OR_ADMIN,
399
  "/api/class-section/{class_section_id}": TEACHER_OR_ADMIN,
400
  "/api/upload/class-records/risk-refresh/recent": TEACHER_OR_ADMIN,
 
402
  "/api/import/student-accounts/commit": TEACHER_OR_ADMIN,
403
  "/api/admin/users": ADMIN_ONLY,
404
  "/api/admin/users/bulk-action": ADMIN_ONLY,
405
+ "/api/admin/school-analytics": ADMIN_ONLY,
406
  "/api/teacher/create-student-account": TEACHER_OR_ADMIN,
407
  "/api/upload/course-materials": TEACHER_OR_ADMIN,
408
  "/api/upload/course-materials/recent": TEACHER_OR_ADMIN,
 
1166
  app.include_router(tutor_checkin_router)
1167
  app.include_router(practice_router)
1168
  app.include_router(ai_monitoring_router)
1169
+ app.include_router(class_analytics_router)
1170
+ app.include_router(intervention_router)
1171
+ app.include_router(pipeline_router)
1172
 
1173
 
1174
  # โ”€โ”€โ”€ Global Exception Handler โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
routes/admin_routes.py CHANGED
@@ -204,3 +204,112 @@ async def delete_uploaded_file(
204
  except Exception as e:
205
  logger.error(f"Failed to delete file: {e}")
206
  raise HTTPException(status_code=500, detail=f"Failed to delete file: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  except Exception as e:
205
  logger.error(f"Failed to delete file: {e}")
206
  raise HTTPException(status_code=500, detail=f"Failed to delete file: {e}")
207
+
208
+
209
+
210
+ # โ”€โ”€โ”€ School-Wide Analytics โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
211
+
212
+ @router.get("/school-analytics")
213
+ def get_school_analytics(request: Request):
214
+ """School-wide WRI aggregation for admin dashboard."""
215
+ require_admin(request)
216
+
217
+ try:
218
+ import firebase_admin
219
+ from firebase_admin import firestore as fs
220
+ db = fs.client()
221
+ except Exception:
222
+ raise HTTPException(status_code=503, detail="Firestore unavailable")
223
+
224
+ from datetime import datetime, timezone, timedelta
225
+ from collections import defaultdict
226
+
227
+ # Read all student_profiles
228
+ profiles = list(db.collection("student_profiles").stream())
229
+
230
+ total = len(profiles)
231
+ if total == 0:
232
+ # Fallback: read from managedStudents
233
+ profiles = list(db.collection("managedStudents").stream())
234
+ total = len(profiles)
235
+
236
+ wri_dist = {"safe": 0, "watch": 0, "intervene": 0, "critical": 0, "at_risk": 0, "pending_assessment": 0}
237
+ wri_values = []
238
+ grade_wri: dict = defaultdict(list)
239
+ class_wri: dict = defaultdict(list)
240
+ weak_topics_counter: dict = defaultdict(int)
241
+ recent_escalations = []
242
+ pending_count = 0
243
+ now = datetime.now(timezone.utc)
244
+
245
+ for doc in profiles:
246
+ data = doc.to_dict()
247
+ status = data.get("risk_status") or data.get("riskStatus") or "pending_assessment"
248
+
249
+ # Normalize status
250
+ if status in wri_dist:
251
+ wri_dist[status] += 1
252
+ else:
253
+ wri_dist["pending_assessment"] += 1
254
+
255
+ wri = data.get("wri")
256
+ if wri is not None:
257
+ wri_values.append(wri)
258
+ grade = str(data.get("grade_level") or data.get("gradeLevel") or data.get("grade", "?"))
259
+ grade_wri[grade].append(wri)
260
+ class_id = data.get("class_id") or data.get("classroomId") or ""
261
+ if class_id:
262
+ class_wri[class_id].append(wri)
263
+
264
+ if status == "pending_assessment" or data.get("diagnosticScore") is None:
265
+ pending_count += 1
266
+
267
+ # Weak topics
268
+ weak = data.get("quiz_performance", {}).get("lowest_accuracy_topics") or []
269
+ if not weak:
270
+ weak = [data.get("weakestTopic")] if data.get("weakestTopic") and data.get("weakestTopic") != "N/A" else []
271
+ for t in weak[:2]:
272
+ if t:
273
+ weak_topics_counter[t] += 1
274
+
275
+ # Recent escalations (last 24h)
276
+ updated = data.get("wri_updated_at") or data.get("riskUpdatedAt")
277
+ if updated and status in ("critical", "at_risk"):
278
+ try:
279
+ if hasattr(updated, "seconds"):
280
+ dt = datetime.fromtimestamp(updated.seconds, tz=timezone.utc)
281
+ elif isinstance(updated, str):
282
+ dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
283
+ else:
284
+ dt = None
285
+ if dt and (now - dt).total_seconds() < 86400:
286
+ recent_escalations.append({
287
+ "student_id": doc.id,
288
+ "student_name": data.get("display_name") or data.get("name", "Unknown"),
289
+ "risk_status": status,
290
+ "wri": wri,
291
+ "teacher_id": data.get("teacher_id") or data.get("teacherId", ""),
292
+ "escalated_at": dt.isoformat(),
293
+ })
294
+ except Exception:
295
+ pass
296
+
297
+ school_avg = round(sum(wri_values) / len(wri_values), 1) if wri_values else 0.0
298
+ avg_by_grade = {g: round(sum(v) / len(v), 1) for g, v in grade_wri.items() if v}
299
+ classes_ranked = sorted(
300
+ [{"class_id": c, "avg_wri": round(sum(v) / len(v), 1), "student_count": len(v)} for c, v in class_wri.items() if v],
301
+ key=lambda x: x["avg_wri"]
302
+ )
303
+ top_weak = sorted(weak_topics_counter.items(), key=lambda x: -x[1])[:10]
304
+
305
+ return {
306
+ "total_students": total,
307
+ "wri_distribution": wri_dist,
308
+ "school_avg_wri": school_avg,
309
+ "avg_wri_by_grade": avg_by_grade,
310
+ "classes_ranked": classes_ranked[:20],
311
+ "top_weak_topics_school": [t for t, _ in top_weak],
312
+ "recent_escalations": recent_escalations[:20],
313
+ "pending_assessment_count": pending_count,
314
+ "generated_at": now.isoformat(),
315
+ }
routes/class_analytics_routes.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Class Analytics API routes."""
2
+
3
+ import time
4
+ import logging
5
+ from typing import Optional
6
+ from fastapi import APIRouter, HTTPException, Request
7
+
8
+ logger = logging.getLogger("mathpulse.class_analytics_routes")
9
+
10
+ router = APIRouter(prefix="/api/analytics/class", tags=["class-analytics"])
11
+
12
+ # Rate limit: 1 refresh per 5 min per class
13
+ _refresh_timestamps: dict[str, float] = {}
14
+
15
+
16
+ def _require_teacher(request: Request):
17
+ user = getattr(request.state, "user", None)
18
+ if not user:
19
+ raise HTTPException(status_code=401, detail="Authentication required")
20
+ if user.role not in ("teacher", "admin"):
21
+ raise HTTPException(status_code=403, detail="Teacher or admin access required")
22
+ return user
23
+
24
+
25
+ @router.get("/{class_id}")
26
+ async def get_class_analytics(class_id: str, request: Request, refresh: bool = False):
27
+ """Get full class analytics report. Cached for 30 min unless refresh=true."""
28
+ user = _require_teacher(request)
29
+
30
+ from services.class_analytics_engine import get_class_analytics_engine
31
+ engine = get_class_analytics_engine()
32
+ report = await engine.get_class_analytics(class_id, user.uid, force_refresh=refresh)
33
+ return report.model_dump()
34
+
35
+
36
+ @router.get("/{class_id}/students")
37
+ async def get_class_students(
38
+ class_id: str, request: Request, filter: Optional[str] = "all"
39
+ ):
40
+ """Get student summaries for a class with optional filtering."""
41
+ user = _require_teacher(request)
42
+
43
+ from services.class_analytics_engine import get_class_analytics_engine
44
+ engine = get_class_analytics_engine()
45
+ report = await engine.get_class_analytics(class_id, user.uid)
46
+
47
+ students = report.students
48
+ if filter == "top_performers":
49
+ students = sorted(
50
+ [s for s in students if s.quiz_attempt_count > 0],
51
+ key=lambda s: s.avg_score,
52
+ reverse=True,
53
+ )[:10]
54
+ elif filter == "needs_attention":
55
+ students = sorted(
56
+ [s for s in students if s.risk_level in ("High Risk", "Critical", "Unassessed")],
57
+ key=lambda s: s.avg_score,
58
+ )
59
+
60
+ return [s.model_dump() for s in students]
61
+
62
+
63
+ @router.get("/{class_id}/topics")
64
+ async def get_class_topics(class_id: str, request: Request):
65
+ """Get topic performance sorted by accuracy (worst first)."""
66
+ user = _require_teacher(request)
67
+
68
+ from services.class_analytics_engine import get_class_analytics_engine
69
+ engine = get_class_analytics_engine()
70
+ report = await engine.get_class_analytics(class_id, user.uid)
71
+
72
+ topics = report.insights.topic_performance if report.insights else []
73
+ return [t.model_dump() for t in topics]
74
+
75
+
76
+ @router.post("/{class_id}/refresh-insights")
77
+ async def refresh_class_insights(class_id: str, request: Request):
78
+ """Force regeneration of AI insights. Rate limited: 1 per 5 min per class."""
79
+ user = _require_teacher(request)
80
+
81
+ last_refresh = _refresh_timestamps.get(class_id, 0)
82
+ if time.time() - last_refresh < 300:
83
+ raise HTTPException(
84
+ status_code=429,
85
+ detail="Insights can only be refreshed once every 5 minutes.",
86
+ )
87
+
88
+ from services.class_analytics_engine import get_class_analytics_engine
89
+ engine = get_class_analytics_engine()
90
+ engine.invalidate_cache(class_id)
91
+ report = await engine.get_class_analytics(class_id, user.uid, force_refresh=True)
92
+ _refresh_timestamps[class_id] = time.time()
93
+
94
+ if report.insights:
95
+ return report.insights.model_dump()
96
+ return {"error": "Failed to generate insights"}
97
+
98
+
99
+ @router.post("/{class_id}/invalidate-cache")
100
+ async def invalidate_class_cache(class_id: str, request: Request):
101
+ """Invalidate cached analytics for a class (called after quiz completion)."""
102
+ # Allow any authenticated user (student completing quiz triggers this)
103
+ user = getattr(request.state, "user", None)
104
+ if not user:
105
+ raise HTTPException(status_code=401, detail="Authentication required")
106
+
107
+ from services.class_analytics_engine import get_class_analytics_engine
108
+ engine = get_class_analytics_engine()
109
+ engine.invalidate_cache(class_id)
110
+ return {"status": "cache_invalidated", "class_id": class_id}
routes/intervention_routes.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Intervention API routes."""
2
+
3
+ import logging
4
+ from fastapi import APIRouter, HTTPException, Request
5
+ from pydantic import BaseModel
6
+ from typing import Optional
7
+
8
+ logger = logging.getLogger("mathpulse.intervention_routes")
9
+
10
+ router = APIRouter(prefix="/api/intervention", tags=["intervention"])
11
+
12
+
13
+ class GenerateRequest(BaseModel):
14
+ student_id: str
15
+
16
+
17
+ class CompleteStepRequest(BaseModel):
18
+ score: float = 0.0
19
+ time_spent_minutes: int = 0
20
+
21
+
22
+ def _require_auth(request: Request):
23
+ user = getattr(request.state, "user", None)
24
+ if not user:
25
+ raise HTTPException(status_code=401, detail="Authentication required")
26
+ return user
27
+
28
+
29
+ @router.post("/generate")
30
+ async def generate_intervention(body: GenerateRequest, request: Request):
31
+ """Generate a full intervention plan for a student."""
32
+ _require_auth(request)
33
+
34
+ from services.intervention_engine import get_intervention_engine
35
+ engine = get_intervention_engine()
36
+ plan = await engine.generate_full_intervention(body.student_id, force=True)
37
+ return plan.model_dump()
38
+
39
+
40
+ @router.get("/{student_id}")
41
+ async def get_intervention(student_id: str, request: Request):
42
+ """Get latest intervention plan. Returns persisted doc if exists, otherwise generates."""
43
+ user = _require_auth(request)
44
+
45
+ # Try to return persisted plan first (fast, no AI cost)
46
+ try:
47
+ from firebase_admin import firestore as fs
48
+ db = fs.client()
49
+ doc = db.collection("intervention_plans").document(student_id).get()
50
+ if doc.exists:
51
+ return doc.to_dict()
52
+ except Exception:
53
+ pass
54
+
55
+ # No persisted plan โ€” generate one
56
+ from services.intervention_engine import get_intervention_engine
57
+ engine = get_intervention_engine()
58
+ plan = await engine.generate_full_intervention(student_id, force=False)
59
+ return plan.model_dump()
60
+
61
+
62
+ @router.post("/{student_id}/step/{step_number}/complete")
63
+ async def complete_step(student_id: str, step_number: int, body: CompleteStepRequest, request: Request):
64
+ """Mark a learning step as completed."""
65
+ _require_auth(request)
66
+
67
+ from services.intervention_engine import get_intervention_engine
68
+ engine = get_intervention_engine()
69
+ result = await engine.complete_step(student_id, step_number, body.score, body.time_spent_minutes)
70
+
71
+ if "error" in result:
72
+ raise HTTPException(status_code=400, detail=result["error"])
73
+ return result
74
+
75
+
76
+ @router.get("/{student_id}/export-pdf")
77
+ async def export_pdf_data(student_id: str, request: Request):
78
+ """Get all data needed for PDF export."""
79
+ _require_auth(request)
80
+
81
+ from services.intervention_engine import get_intervention_engine
82
+ engine = get_intervention_engine()
83
+ plan = await engine.generate_full_intervention(student_id, force=False)
84
+ return plan.model_dump()
routes/pipeline_routes.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Student Intelligence Pipeline API routes."""
2
+
3
+ import logging
4
+ from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
5
+ from pydantic import BaseModel
6
+ from typing import Any, Dict, Literal
7
+
8
+ logger = logging.getLogger("mathpulse.pipeline_routes")
9
+
10
+ router = APIRouter(prefix="/api/pipeline", tags=["pipeline"])
11
+
12
+
13
+ class PipelineEventPayload(BaseModel):
14
+ student_id: str
15
+ event_type: Literal["diagnostic", "quiz", "battle", "lesson", "module", "session"]
16
+ event_data: Dict[str, Any] = {}
17
+ occurred_at: str
18
+ class_id: str = ""
19
+ teacher_id: str = ""
20
+
21
+
22
+ def _require_auth(request: Request):
23
+ user = getattr(request.state, "user", None)
24
+ if not user:
25
+ raise HTTPException(status_code=401, detail="Authentication required")
26
+ return user
27
+
28
+
29
+ async def _run_pipeline(payload: PipelineEventPayload):
30
+ """Background task to run the pipeline."""
31
+ try:
32
+ from services.student_intelligence_pipeline import get_pipeline, StudentActivityEvent
33
+ pipeline = get_pipeline()
34
+ event = StudentActivityEvent(**payload.model_dump())
35
+ await pipeline.process_event(event)
36
+ except Exception as e:
37
+ logger.error(f"Pipeline background task failed: {e}", exc_info=True)
38
+
39
+
40
+ @router.post("/event", status_code=202)
41
+ async def receive_event(payload: PipelineEventPayload, background_tasks: BackgroundTasks, request: Request):
42
+ """Universal intake endpoint. Returns 202 immediately, processes async."""
43
+ user = _require_auth(request)
44
+ # Students can only emit events for themselves
45
+ if user.role == "student" and payload.student_id != user.uid:
46
+ raise HTTPException(status_code=403, detail="Students can only emit events for their own ID")
47
+ background_tasks.add_task(_run_pipeline, payload)
48
+ return {"status": "accepted", "student_id": payload.student_id}
49
+
50
+
51
+ @router.get("/profile/{student_id}")
52
+ async def get_profile(student_id: str, request: Request):
53
+ """Get full student profile."""
54
+ _require_auth(request)
55
+
56
+ _firebase_firestore = None
57
+ try:
58
+ from firebase_admin import firestore as ff
59
+ _firebase_firestore = ff
60
+ except Exception:
61
+ raise HTTPException(status_code=503, detail="Firestore unavailable")
62
+
63
+ db = _firebase_firestore.client()
64
+ doc = db.collection("student_profiles").document(student_id).get()
65
+ if not doc.exists:
66
+ raise HTTPException(status_code=404, detail="Profile not found")
67
+ return doc.to_dict()
68
+
69
+
70
+ @router.post("/profile/{student_id}/recompute")
71
+ async def recompute_profile(student_id: str, background_tasks: BackgroundTasks, request: Request):
72
+ """Force full profile rebuild from raw Firestore data."""
73
+ user = _require_auth(request)
74
+ if user.role not in ("teacher", "admin"):
75
+ raise HTTPException(status_code=403, detail="Teacher or admin required")
76
+
77
+ async def _recompute():
78
+ from services.student_intelligence_pipeline import get_pipeline, StudentActivityEvent
79
+ pipeline = get_pipeline()
80
+ # Trigger a synthetic diagnostic event to force full recompute
81
+ event = StudentActivityEvent(
82
+ student_id=student_id,
83
+ event_type="session",
84
+ event_data={"event": "force_recompute"},
85
+ occurred_at=__import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat(),
86
+ class_id="",
87
+ teacher_id=user.uid,
88
+ )
89
+ await pipeline.process_event(event)
90
+
91
+ background_tasks.add_task(_recompute)
92
+ return {"status": "recompute_queued", "student_id": student_id}
93
+
94
+
95
+ @router.post("/admin/backfill", status_code=202)
96
+ async def backfill_all(background_tasks: BackgroundTasks, request: Request):
97
+ """One-time migration: rebuild student_profiles for ALL existing students."""
98
+ user = _require_auth(request)
99
+ if user.role != "admin":
100
+ raise HTTPException(status_code=403, detail="Admin only")
101
+
102
+ async def _backfill():
103
+ try:
104
+ from services.student_intelligence_pipeline import get_pipeline, StudentActivityEvent
105
+ from firebase_admin import firestore as ff
106
+ db = ff.client()
107
+ pipeline = get_pipeline()
108
+
109
+ students = db.collection("managedStudents").stream()
110
+ count = 0
111
+ for student_doc in students:
112
+ sid = student_doc.id
113
+ data = student_doc.to_dict()
114
+ event = StudentActivityEvent(
115
+ student_id=sid,
116
+ event_type="session",
117
+ event_data={"event": "backfill"},
118
+ occurred_at=__import__("datetime").datetime.now(__import__("datetime").timezone.utc).isoformat(),
119
+ class_id=data.get("classroomId", ""),
120
+ teacher_id=data.get("teacherId", ""),
121
+ )
122
+ await pipeline.process_event(event)
123
+ count += 1
124
+ if count % 10 == 0:
125
+ logger.info(f"Backfilled {count} students")
126
+ logger.info(f"Backfill complete: {count} students processed")
127
+ except Exception as e:
128
+ logger.error(f"Backfill failed: {e}", exc_info=True)
129
+
130
+ background_tasks.add_task(_backfill)
131
+ return {"status": "backfill_started"}
scripts/backfill_student_profiles.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MathPulse AI โ€” Backfill Student Profiles
3
+
4
+ One-time migration script to build student_profiles for ALL existing students
5
+ who already have data in Firestore but no unified profile.
6
+
7
+ Usage:
8
+ cd backend
9
+ python -m scripts.backfill_student_profiles
10
+ """
11
+
12
+ import asyncio
13
+ import logging
14
+ import os
15
+ import sys
16
+
17
+ # Add backend to path
18
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
19
+
20
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
21
+ logger = logging.getLogger("backfill")
22
+
23
+
24
+ async def backfill_all_profiles():
25
+ """Rebuild student_profiles for all existing managedStudents."""
26
+ # Initialize Firebase
27
+ import firebase_admin
28
+ from firebase_admin import firestore
29
+
30
+ if not firebase_admin._apps:
31
+ sa_path = os.environ.get("FIREBASE_SERVICE_ACCOUNT_FILE")
32
+ sa_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
33
+ if sa_path and os.path.exists(sa_path):
34
+ from firebase_admin import credentials
35
+ cred = credentials.Certificate(sa_path)
36
+ firebase_admin.initialize_app(cred)
37
+ elif sa_json:
38
+ import json
39
+ from firebase_admin import credentials
40
+ cred = credentials.Certificate(json.loads(sa_json))
41
+ firebase_admin.initialize_app(cred)
42
+ else:
43
+ firebase_admin.initialize_app()
44
+
45
+ db = firestore.client()
46
+
47
+ from services.student_intelligence_pipeline import get_pipeline, StudentActivityEvent
48
+ from datetime import datetime, timezone
49
+
50
+ pipeline = get_pipeline()
51
+
52
+ # Fetch all managed students
53
+ logger.info("Fetching all managedStudents...")
54
+ students = list(db.collection("managedStudents").stream())
55
+ total = len(students)
56
+ logger.info(f"Found {total} students to backfill")
57
+
58
+ success = 0
59
+ errors = 0
60
+
61
+ for i, student_doc in enumerate(students):
62
+ sid = student_doc.id
63
+ data = student_doc.to_dict()
64
+
65
+ try:
66
+ # Create a synthetic event to trigger full profile build
67
+ event = StudentActivityEvent(
68
+ student_id=sid,
69
+ event_type="session",
70
+ event_data={"event": "backfill", "source": "migration_script"},
71
+ occurred_at=datetime.now(timezone.utc).isoformat(),
72
+ class_id=data.get("classroomId", ""),
73
+ teacher_id=data.get("teacherId", ""),
74
+ )
75
+
76
+ # Set basic profile fields from managed student data
77
+ profile_ref = db.collection("student_profiles").document(sid)
78
+ profile_doc = profile_ref.get()
79
+ if not profile_doc.exists:
80
+ # Pre-seed with identity data
81
+ profile_ref.set({
82
+ "student_id": sid,
83
+ "display_name": data.get("name", ""),
84
+ "grade_level": data.get("gradeLevel", data.get("grade", "")),
85
+ "section": data.get("section", ""),
86
+ "class_id": data.get("classroomId", ""),
87
+ "teacher_id": data.get("teacherId", ""),
88
+ "diagnostic_score": data.get("diagnosticScore"),
89
+ "external_grades_avg": data.get("externalGradesAvg"),
90
+ "wri": data.get("wri"),
91
+ "risk_status": data.get("riskStatus", "pending_assessment"),
92
+ "wri_weights": data.get("weights", {"w1": 0.30, "w2": 0.40, "w3": 0.30}),
93
+ "profile_version": 0,
94
+ }, merge=True)
95
+
96
+ # Run pipeline to compute P and update everything
97
+ await pipeline.process_event(event)
98
+ success += 1
99
+
100
+ except Exception as e:
101
+ logger.error(f"Error backfilling {sid}: {e}")
102
+ errors += 1
103
+
104
+ if (i + 1) % 10 == 0:
105
+ logger.info(f"Progress: {i + 1}/{total} (success={success}, errors={errors})")
106
+
107
+ logger.info(f"Backfill complete: {success} success, {errors} errors out of {total} total")
108
+
109
+
110
+ if __name__ == "__main__":
111
+ asyncio.run(backfill_all_profiles())
services/class_analytics_engine.py ADDED
@@ -0,0 +1,555 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MathPulse AI โ€” Class Analytics Engine
3
+
4
+ Fetches real quiz data from Firestore, computes per-student and class-level
5
+ metrics, generates AI insights via DeepSeek, and caches results.
6
+ """
7
+
8
+ import asyncio
9
+ import json
10
+ import logging
11
+ import time
12
+ from datetime import datetime, timedelta, timezone
13
+ from typing import Any, Dict, List, Literal, Optional
14
+
15
+ from pydantic import BaseModel, Field
16
+
17
+ logger = logging.getLogger("mathpulse.class_analytics")
18
+
19
+ # โ”€โ”€โ”€ Firestore helper โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
20
+
21
+ _firebase_firestore = None
22
+
23
+
24
+ def _get_firestore_client():
25
+ global _firebase_firestore
26
+ if _firebase_firestore is None:
27
+ try:
28
+ from firebase_admin import firestore as ff
29
+ _firebase_firestore = ff
30
+ except Exception:
31
+ return None
32
+ try:
33
+ return _firebase_firestore.client()
34
+ except Exception:
35
+ return None
36
+
37
+
38
+ # โ”€โ”€โ”€ Models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
39
+
40
+ class StudentAnalyticsSummary(BaseModel):
41
+ student_id: str
42
+ student_name: str
43
+ avatar_url: Optional[str] = None
44
+ grade_level: str = ""
45
+ section: str = ""
46
+ avg_score: float = 0.0
47
+ quiz_attempt_count: int = 0
48
+ last_active: Optional[str] = None
49
+ risk_level: Literal["Low Risk", "Medium Risk", "High Risk", "Critical", "Unassessed"] = "Unassessed"
50
+ engagement_level: Literal["Low", "Medium", "High"] = "Low"
51
+ weakest_topic: Optional[str] = None
52
+ accuracy_by_topic: Dict[str, float] = Field(default_factory=dict)
53
+ completion_rate: float = 0.0
54
+
55
+
56
+ class TopicPerformance(BaseModel):
57
+ topic: str
58
+ class_accuracy: float = 0.0
59
+ struggling_count: int = 0
60
+ mastered_count: int = 0
61
+
62
+
63
+ class ClassInsights(BaseModel):
64
+ class_id: str
65
+ generated_at: str = ""
66
+ class_summary: str = ""
67
+ top_weak_topics: List[str] = Field(default_factory=list)
68
+ recommended_actions: List[str] = Field(default_factory=list)
69
+ class_strengths: str = ""
70
+ risk_distribution: Dict[str, int] = Field(default_factory=dict)
71
+ topic_performance: List[TopicPerformance] = Field(default_factory=list)
72
+
73
+
74
+ class ClassAnalyticsReport(BaseModel):
75
+ class_id: str
76
+ class_name: str = ""
77
+ grade_level: str = ""
78
+ section: str = ""
79
+ teacher_id: str = ""
80
+ student_count: int = 0
81
+ class_average: float = 0.0
82
+ completion_rate: float = 0.0
83
+ participation_rate: float = 0.0
84
+ attention_count: int = 0
85
+ students: List[StudentAnalyticsSummary] = Field(default_factory=list)
86
+ insights: Optional[ClassInsights] = None
87
+ generated_at: str = ""
88
+
89
+
90
+ # โ”€โ”€โ”€ Risk & Engagement Classification โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
91
+
92
+ def classify_risk(avg_score: float, quiz_count: int, days_since_active: Optional[int]) -> str:
93
+ """Map to WRI status bands. When no WRI is available, estimate from avg_score."""
94
+ if quiz_count == 0:
95
+ return "pending_assessment"
96
+ # Approximate WRI bands from avg_score (actual WRI uses D, G, P weights)
97
+ if avg_score >= 88:
98
+ return "safe"
99
+ if avg_score >= 80:
100
+ return "watch"
101
+ if avg_score >= 75:
102
+ return "intervene"
103
+ if avg_score >= 68:
104
+ return "critical"
105
+ return "at_risk"
106
+
107
+
108
+ def classify_engagement(days_since_active: Optional[int], recent_quiz_count: int) -> str:
109
+ if days_since_active is not None and days_since_active <= 2 and recent_quiz_count >= 5:
110
+ return "High"
111
+ if days_since_active is not None and days_since_active <= 7:
112
+ return "Medium"
113
+ return "Low"
114
+
115
+
116
+ # โ”€โ”€โ”€ Engine โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
117
+
118
+ class ClassAnalyticsEngine:
119
+ """Computes class analytics from Firestore data."""
120
+
121
+ def __init__(self):
122
+ self._cache: Dict[str, tuple] = {} # class_id -> (report, timestamp)
123
+ self._cache_ttl = 1800 # 30 min
124
+
125
+ async def get_class_analytics(
126
+ self, class_id: str, teacher_id: str, force_refresh: bool = False
127
+ ) -> ClassAnalyticsReport:
128
+ # Check cache
129
+ if not force_refresh and class_id in self._cache:
130
+ report, cached_at = self._cache[class_id]
131
+ if time.time() - cached_at < self._cache_ttl:
132
+ return report
133
+
134
+ db = _get_firestore_client()
135
+ if not db:
136
+ logger.error("Firestore client unavailable")
137
+ return ClassAnalyticsReport(class_id=class_id, teacher_id=teacher_id, generated_at=_now_iso())
138
+
139
+ # Fast path: try reading from denormalized student_summaries (written by pipeline)
140
+ summaries_snap = list(db.collection("classes").document(class_id).collection("student_summaries").stream())
141
+ use_fast_path = len(summaries_snap) > 0 and not force_refresh
142
+
143
+ # Fetch class info
144
+ class_doc = db.collection("classrooms").document(class_id).get()
145
+ class_data = class_doc.to_dict() if class_doc.exists else {}
146
+
147
+ class_name = class_data.get("name", "")
148
+ grade_level = class_data.get("gradeLevel", class_data.get("grade", ""))
149
+ section = class_data.get("section", "")
150
+
151
+ # Fetch students in this class from managedStudents
152
+ students_query = db.collection("managedStudents").where("classroomId", "==", class_id).stream()
153
+ managed_students = []
154
+ for doc_snap in students_query:
155
+ managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()})
156
+
157
+ # Also try classSectionId match
158
+ if not managed_students:
159
+ class_section_id = class_data.get("classSectionId", class_id)
160
+ students_query2 = db.collection("managedStudents").where("classSectionId", "==", class_section_id).stream()
161
+ for doc_snap in students_query2:
162
+ managed_students.append({"id": doc_snap.id, **doc_snap.to_dict()})
163
+
164
+ # Fetch quiz data for each student from progress collection
165
+ student_summaries = await self._build_student_summaries(db, managed_students)
166
+
167
+ # Compute class-level metrics
168
+ assessed_students = [s for s in student_summaries if s.quiz_attempt_count > 0]
169
+ student_count = len(student_summaries)
170
+
171
+ class_average = 0.0
172
+ if assessed_students:
173
+ class_average = sum(s.avg_score for s in assessed_students) / len(assessed_students)
174
+
175
+ completion_rate = 0.0
176
+ if student_count > 0:
177
+ completion_rate = (len(assessed_students) / student_count) * 100
178
+
179
+ # Participation = active in last 7 days
180
+ now = datetime.now(timezone.utc)
181
+ active_count = sum(1 for s in student_summaries if s.last_active and _days_since(s.last_active) <= 7)
182
+ participation_rate = (active_count / student_count * 100) if student_count > 0 else 0.0
183
+
184
+ # Attention = intervene + critical + at_risk
185
+ attention_count = sum(1 for s in student_summaries if s.risk_level in ("intervene", "critical", "at_risk"))
186
+
187
+ # Topic performance
188
+ topic_perf = self._compute_topic_performance(student_summaries)
189
+
190
+ # Risk distribution
191
+ risk_dist = {"safe": 0, "watch": 0, "intervene": 0, "critical": 0, "at_risk": 0, "pending_assessment": 0}
192
+ for s in student_summaries:
193
+ # Prefer stored WRI status from managedStudents if available
194
+ stored_status = None
195
+ try:
196
+ ms_doc = db.collection("managedStudents").document(s.student_id).get()
197
+ if ms_doc.exists:
198
+ stored_status = ms_doc.to_dict().get("riskStatus")
199
+ except Exception:
200
+ pass
201
+ status = stored_status if stored_status in risk_dist else s.risk_level
202
+ risk_dist[status] = risk_dist.get(status, 0) + 1
203
+
204
+ # Generate AI insights
205
+ insights = await self._generate_insights(
206
+ class_id=class_id,
207
+ class_name=class_name,
208
+ grade_level=grade_level,
209
+ section=section,
210
+ student_count=student_count,
211
+ class_average=class_average,
212
+ completion_rate=completion_rate,
213
+ participation_rate=participation_rate,
214
+ risk_dist=risk_dist,
215
+ topic_perf=topic_perf,
216
+ )
217
+
218
+ report = ClassAnalyticsReport(
219
+ class_id=class_id,
220
+ class_name=class_name,
221
+ grade_level=grade_level,
222
+ section=section,
223
+ teacher_id=teacher_id,
224
+ student_count=student_count,
225
+ class_average=round(class_average, 1),
226
+ completion_rate=round(completion_rate, 1),
227
+ participation_rate=round(participation_rate, 1),
228
+ attention_count=attention_count,
229
+ students=student_summaries,
230
+ insights=insights,
231
+ generated_at=_now_iso(),
232
+ )
233
+
234
+ # Cache
235
+ self._cache[class_id] = (report, time.time())
236
+
237
+ # Persist to Firestore
238
+ try:
239
+ db.collection("class_analytics").document(class_id).set(
240
+ {**report.model_dump(), "cached_at": _now_iso()},
241
+ merge=True,
242
+ )
243
+ except Exception as e:
244
+ logger.warning(f"Failed to persist analytics cache: {e}")
245
+
246
+ return report
247
+
248
+ async def _build_student_summaries(
249
+ self, db: Any, managed_students: List[Dict]
250
+ ) -> List[StudentAnalyticsSummary]:
251
+ summaries = []
252
+ now = datetime.now(timezone.utc)
253
+
254
+ for student in managed_students:
255
+ student_id = student.get("id", "")
256
+ student_name = student.get("name", "Unknown")
257
+ avatar = student.get("avatar", "")
258
+
259
+ # Try to fetch quiz data from progress/{student_id}
260
+ quiz_attempts = []
261
+ try:
262
+ # Try by student ID first
263
+ progress_doc = db.collection("progress").document(student_id).get()
264
+ if progress_doc.exists:
265
+ pdata = progress_doc.to_dict()
266
+ quiz_attempts = pdata.get("quizAttempts", [])
267
+
268
+ # Also try by LRN if available
269
+ if not quiz_attempts and student.get("lrn"):
270
+ progress_doc2 = db.collection("progress").document(student["lrn"]).get()
271
+ if progress_doc2.exists:
272
+ pdata2 = progress_doc2.to_dict()
273
+ quiz_attempts = pdata2.get("quizAttempts", [])
274
+
275
+ # Also try accountUid
276
+ if not quiz_attempts and student.get("accountUid"):
277
+ progress_doc3 = db.collection("progress").document(student["accountUid"]).get()
278
+ if progress_doc3.exists:
279
+ pdata3 = progress_doc3.to_dict()
280
+ quiz_attempts = pdata3.get("quizAttempts", [])
281
+
282
+ # Also check practice_results subcollection
283
+ if not quiz_attempts and student.get("accountUid"):
284
+ practice_sessions = (
285
+ db.collection("practice_results")
286
+ .document(student["accountUid"])
287
+ .collection("sessions")
288
+ .order_by("submitted_at", direction="DESCENDING")
289
+ .limit(20)
290
+ .stream()
291
+ )
292
+ for sess in practice_sessions:
293
+ sd = sess.to_dict()
294
+ quiz_attempts.append({
295
+ "quizId": sd.get("session_id", ""),
296
+ "score": sd.get("score_percent", 0),
297
+ "completedAt": sd.get("submitted_at"),
298
+ "answers": sd.get("per_question_feedback", []),
299
+ })
300
+ except Exception as e:
301
+ logger.debug(f"Error fetching progress for {student_id}: {e}")
302
+
303
+ # Compute metrics from quiz attempts
304
+ quiz_count = len(quiz_attempts)
305
+ avg_score = 0.0
306
+ accuracy_by_topic: Dict[str, List[float]] = {}
307
+
308
+ if quiz_count > 0:
309
+ scores = [float(q.get("score", 0)) for q in quiz_attempts]
310
+ avg_score = sum(scores) / len(scores)
311
+
312
+ # Extract per-question topic accuracy if available
313
+ for attempt in quiz_attempts:
314
+ answers = attempt.get("answers", [])
315
+ quiz_id = attempt.get("quizId", "")
316
+ # Use quizId as topic proxy if no per-question topic
317
+ topic = _extract_topic_from_quiz_id(quiz_id)
318
+ if topic:
319
+ if topic not in accuracy_by_topic:
320
+ accuracy_by_topic[topic] = []
321
+ accuracy_by_topic[topic].append(float(attempt.get("score", 0)))
322
+
323
+ # Compute topic averages
324
+ topic_avgs = {t: sum(scores) / len(scores) for t, scores in accuracy_by_topic.items() if scores}
325
+ weakest_topic = min(topic_avgs, key=topic_avgs.get) if topic_avgs else student.get("weakestTopic")
326
+ if weakest_topic == "N/A":
327
+ weakest_topic = None
328
+
329
+ # Last active
330
+ last_active_ts = student.get("lastActive")
331
+ last_active_str = None
332
+ days_since_active = None
333
+ if last_active_ts:
334
+ try:
335
+ if hasattr(last_active_ts, "seconds"):
336
+ last_dt = datetime.fromtimestamp(last_active_ts.seconds, tz=timezone.utc)
337
+ else:
338
+ last_dt = last_active_ts
339
+ last_active_str = last_dt.isoformat()
340
+ days_since_active = (now - last_dt).days
341
+ except Exception:
342
+ pass
343
+
344
+ # Recent quiz count (last 14 days)
345
+ recent_quiz_count = 0
346
+ for q in quiz_attempts:
347
+ completed = q.get("completedAt")
348
+ if completed:
349
+ try:
350
+ if hasattr(completed, "seconds"):
351
+ q_dt = datetime.fromtimestamp(completed.seconds, tz=timezone.utc)
352
+ else:
353
+ q_dt = completed if isinstance(completed, datetime) else datetime.now(timezone.utc)
354
+ if (now - q_dt).days <= 14:
355
+ recent_quiz_count += 1
356
+ except Exception:
357
+ pass
358
+
359
+ risk_level = classify_risk(avg_score, quiz_count, days_since_active)
360
+ engagement = classify_engagement(days_since_active, recent_quiz_count)
361
+
362
+ summaries.append(StudentAnalyticsSummary(
363
+ student_id=student_id,
364
+ student_name=student_name,
365
+ avatar_url=avatar or None,
366
+ grade_level=student.get("gradeLevel", student.get("grade", "")),
367
+ section=student.get("section", ""),
368
+ avg_score=round(avg_score, 1),
369
+ quiz_attempt_count=quiz_count,
370
+ last_active=last_active_str,
371
+ risk_level=risk_level,
372
+ engagement_level=engagement,
373
+ weakest_topic=weakest_topic,
374
+ accuracy_by_topic=topic_avgs,
375
+ completion_rate=min(quiz_count / 5 * 100, 100) if quiz_count > 0 else 0.0,
376
+ ))
377
+
378
+ return summaries
379
+
380
+ def _compute_topic_performance(self, students: List[StudentAnalyticsSummary]) -> List[TopicPerformance]:
381
+ topic_data: Dict[str, Dict] = {}
382
+
383
+ for s in students:
384
+ for topic, accuracy in s.accuracy_by_topic.items():
385
+ if topic not in topic_data:
386
+ topic_data[topic] = {"scores": [], "struggling": 0, "mastered": 0}
387
+ topic_data[topic]["scores"].append(accuracy)
388
+ if accuracy < 60:
389
+ topic_data[topic]["struggling"] += 1
390
+ if accuracy >= 80:
391
+ topic_data[topic]["mastered"] += 1
392
+
393
+ # Also include weakest_topic from students without per-topic data
394
+ for s in students:
395
+ if s.weakest_topic and s.weakest_topic not in topic_data and s.quiz_attempt_count > 0:
396
+ topic_data[s.weakest_topic] = {
397
+ "scores": [s.avg_score],
398
+ "struggling": 1 if s.avg_score < 60 else 0,
399
+ "mastered": 0,
400
+ }
401
+
402
+ result = []
403
+ for topic, data in topic_data.items():
404
+ if not data["scores"]:
405
+ continue
406
+ result.append(TopicPerformance(
407
+ topic=topic,
408
+ class_accuracy=round(sum(data["scores"]) / len(data["scores"]), 1),
409
+ struggling_count=data["struggling"],
410
+ mastered_count=data["mastered"],
411
+ ))
412
+
413
+ return sorted(result, key=lambda t: t.class_accuracy)[:8]
414
+
415
+ async def _generate_insights(
416
+ self,
417
+ class_id: str,
418
+ class_name: str,
419
+ grade_level: str,
420
+ section: str,
421
+ student_count: int,
422
+ class_average: float,
423
+ completion_rate: float,
424
+ participation_rate: float,
425
+ risk_dist: Dict[str, int],
426
+ topic_perf: List[TopicPerformance],
427
+ ) -> ClassInsights:
428
+ # Format topic performance for prompt
429
+ topic_lines = "\n".join(
430
+ f" - {t.topic}: {t.class_accuracy}% (struggling: {t.struggling_count})"
431
+ for t in topic_perf[:6]
432
+ ) or " No topic data available yet."
433
+
434
+ weak_topics = [t.topic for t in topic_perf[:3]] if topic_perf else []
435
+
436
+ prompt = f"""You are MathPulse AI analyzing a class's performance data for a Filipino K-12 teacher.
437
+
438
+ Class: Grade {grade_level} - {section} ({class_name})
439
+ Student Count: {student_count}
440
+ Class Average Score: {class_average:.1f}%
441
+ Completion Rate: {completion_rate:.1f}%
442
+ Participation Rate: {participation_rate:.1f}%
443
+
444
+ Risk Distribution:
445
+ - Critical: {risk_dist.get('Critical', 0)} students
446
+ - High Risk: {risk_dist.get('High Risk', 0)} students
447
+ - Medium Risk: {risk_dist.get('Medium Risk', 0)} students
448
+ - Low Risk: {risk_dist.get('Low Risk', 0)} students
449
+ - Unassessed: {risk_dist.get('Unassessed', 0)} students
450
+
451
+ Topic Performance (class accuracy):
452
+ {topic_lines}
453
+
454
+ Top Weakest Topics: {', '.join(weak_topics) if weak_topics else 'None identified yet'}
455
+
456
+ Generate a JSON response with these exact keys:
457
+ {{
458
+ "class_summary": "2-3 sentence overview of class performance. Be honest but constructive.",
459
+ "class_strengths": "1 sentence on what the class is doing well.",
460
+ "top_weak_topics": ["topic1", "topic2", "topic3"],
461
+ "recommended_actions": [
462
+ "Specific action 1 (max 20 words)",
463
+ "Specific action 2",
464
+ "Specific action 3"
465
+ ]
466
+ }}
467
+
468
+ Be specific to Filipino K-12 DepEd context. If data is limited, acknowledge it and suggest next steps."""
469
+
470
+ try:
471
+ from services.ai_client import get_deepseek_client, CHAT_MODEL
472
+
473
+ client = get_deepseek_client()
474
+ response = client.chat.completions.create(
475
+ model=CHAT_MODEL,
476
+ messages=[
477
+ {"role": "system", "content": "You are MathPulse AI, a class analytics assistant for Filipino K-12 math teachers. Respond only with valid JSON."},
478
+ {"role": "user", "content": prompt},
479
+ ],
480
+ temperature=0.3,
481
+ max_tokens=500,
482
+ response_format={"type": "json_object"},
483
+ )
484
+
485
+ content = response.choices[0].message.content or "{}"
486
+ parsed = json.loads(content)
487
+
488
+ return ClassInsights(
489
+ class_id=class_id,
490
+ generated_at=_now_iso(),
491
+ class_summary=parsed.get("class_summary", "Analytics data is being collected."),
492
+ top_weak_topics=parsed.get("top_weak_topics", weak_topics),
493
+ recommended_actions=parsed.get("recommended_actions", ["Encourage students to complete more quizzes."]),
494
+ class_strengths=parsed.get("class_strengths", "Class is actively using the platform."),
495
+ risk_distribution=risk_dist,
496
+ topic_performance=topic_perf,
497
+ )
498
+ except Exception as e:
499
+ logger.warning(f"DeepSeek insights generation failed: {e}")
500
+ # Return fallback insights
501
+ return ClassInsights(
502
+ class_id=class_id,
503
+ generated_at=_now_iso(),
504
+ class_summary=f"Class has {student_count} students with an average score of {class_average:.0f}%. {risk_dist.get('Unassessed', 0)} students have not yet taken any quizzes.",
505
+ top_weak_topics=weak_topics,
506
+ recommended_actions=[
507
+ "Encourage unassessed students to complete their first quiz.",
508
+ "Review struggling topics in the next class session.",
509
+ "Schedule one-on-one check-ins with Critical risk students.",
510
+ ],
511
+ class_strengths="Students are enrolled and the platform is ready for use." if class_average < 50 else f"Class maintains a {class_average:.0f}% average.",
512
+ risk_distribution=risk_dist,
513
+ topic_performance=topic_perf,
514
+ )
515
+
516
+ def invalidate_cache(self, class_id: str) -> None:
517
+ self._cache.pop(class_id, None)
518
+
519
+
520
+ # โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
521
+
522
+ def _now_iso() -> str:
523
+ return datetime.now(timezone.utc).isoformat()
524
+
525
+
526
+ def _days_since(iso_str: str) -> int:
527
+ try:
528
+ dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
529
+ return (datetime.now(timezone.utc) - dt).days
530
+ except Exception:
531
+ return 999
532
+
533
+
534
+ def _extract_topic_from_quiz_id(quiz_id: str) -> Optional[str]:
535
+ """Extract topic name from quiz ID patterns like 'algebra-1', 'geometry-basics'."""
536
+ if not quiz_id:
537
+ return None
538
+ # Common patterns: subject-topic, module_quiz, etc.
539
+ parts = quiz_id.replace("_", "-").replace(".", "-").split("-")
540
+ if len(parts) >= 2:
541
+ # Capitalize and join meaningful parts
542
+ topic = " ".join(p.capitalize() for p in parts[:2] if p and not p.isdigit())
543
+ return topic if topic else None
544
+ return quiz_id.capitalize() if quiz_id else None
545
+
546
+
547
+ # Singleton
548
+ _engine_instance: Optional[ClassAnalyticsEngine] = None
549
+
550
+
551
+ def get_class_analytics_engine() -> ClassAnalyticsEngine:
552
+ global _engine_instance
553
+ if _engine_instance is None:
554
+ _engine_instance = ClassAnalyticsEngine()
555
+ return _engine_instance
services/intervention_engine.py ADDED
@@ -0,0 +1,587 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MathPulse AI โ€” Intervention Engine
3
+
4
+ Generates personalized intervention plans for at-risk students using
5
+ real quiz data from Firestore and DeepSeek AI for learning path generation.
6
+ """
7
+
8
+ import json
9
+ import logging
10
+ import time
11
+ from datetime import datetime, timezone
12
+ from typing import Any, Dict, List, Literal, Optional
13
+
14
+ from pydantic import BaseModel, Field
15
+
16
+ logger = logging.getLogger("mathpulse.intervention_engine")
17
+
18
+ # โ”€โ”€โ”€ Firestore helper โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
19
+
20
+ _firebase_firestore = None
21
+
22
+
23
+ def _get_firestore_client():
24
+ global _firebase_firestore
25
+ if _firebase_firestore is None:
26
+ try:
27
+ from firebase_admin import firestore as ff
28
+ _firebase_firestore = ff
29
+ except Exception:
30
+ return None
31
+ try:
32
+ return _firebase_firestore.client()
33
+ except Exception:
34
+ return None
35
+
36
+
37
+ # โ”€โ”€โ”€ Models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
38
+
39
+ class LearningStep(BaseModel):
40
+ step_number: int
41
+ type: Literal["video_lesson", "practice", "assessment", "chat_session", "review"] = "practice"
42
+ title: str
43
+ description: str = ""
44
+ duration_minutes: int = 10
45
+ num_items: Optional[int] = None
46
+ topic: str = ""
47
+ competency_tag: str = ""
48
+ difficulty: Literal["easy", "medium", "hard"] = "easy"
49
+ is_completed: bool = False
50
+ completion_score: Optional[float] = None
51
+
52
+
53
+ class LearningPath(BaseModel):
54
+ student_id: str
55
+ generated_at: str = ""
56
+ methodology_tags: List[str] = Field(default_factory=lambda: ["Interactive", "Video", "Practice", "Quiz"])
57
+ steps: List[LearningStep] = Field(default_factory=list)
58
+ estimated_duration_days: int = 7
59
+ primary_weak_topic: str = ""
60
+ all_weak_topics: List[str] = Field(default_factory=list)
61
+ ai_rationale: str = ""
62
+
63
+
64
+ class InterventionPlan(BaseModel):
65
+ student_id: str
66
+ student_name: str = ""
67
+ grade_level: str = ""
68
+ section: str = ""
69
+ risk_level: Literal["Low Risk", "Medium Risk", "High Risk", "Critical", "Unassessed"] = "Unassessed"
70
+ avg_score: float = 0.0
71
+ engagement_level: Literal["Low", "Medium", "High"] = "Low"
72
+ last_active: Optional[str] = None
73
+ weakest_topic: str = ""
74
+ weak_topics: List[str] = Field(default_factory=list)
75
+ accuracy_by_topic: Dict[str, float] = Field(default_factory=dict)
76
+ learning_strengths: str = ""
77
+ next_steps_summary: str = ""
78
+ learning_path: Optional[LearningPath] = None
79
+ generated_at: str = ""
80
+ teacher_recommendations: List[str] = Field(default_factory=list)
81
+
82
+
83
+ # โ”€โ”€โ”€ Risk & Engagement Classification โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
84
+
85
+ def _classify_risk(avg_score: float, quiz_count: int, days_since_active: Optional[int]) -> str:
86
+ if quiz_count == 0:
87
+ return "Unassessed"
88
+ engagement_low = (days_since_active is None or days_since_active > 7) or quiz_count < 3
89
+ if avg_score < 50 and engagement_low:
90
+ return "Critical"
91
+ if avg_score < 60 or (avg_score < 75 and engagement_low):
92
+ return "High Risk"
93
+ if avg_score < 75:
94
+ return "Medium Risk"
95
+ return "Low Risk"
96
+
97
+
98
+ def _classify_engagement(days_since_active: Optional[int], recent_quiz_count: int) -> str:
99
+ if days_since_active is not None and days_since_active <= 2 and recent_quiz_count >= 5:
100
+ return "High"
101
+ if days_since_active is not None and days_since_active <= 7:
102
+ return "Medium"
103
+ return "Low"
104
+
105
+
106
+ # โ”€โ”€โ”€ Engine โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
107
+
108
+ class InterventionEngine:
109
+ """Generates full intervention plans for at-risk students."""
110
+
111
+ def __init__(self):
112
+ self._cache: Dict[str, tuple] = {} # student_id -> (plan, timestamp)
113
+ self._cache_ttl = 3600 # 1 hour
114
+
115
+ async def generate_full_intervention(self, student_id: str, force: bool = False) -> InterventionPlan:
116
+ # Check cache
117
+ if not force and student_id in self._cache:
118
+ plan, cached_at = self._cache[student_id]
119
+ if time.time() - cached_at < self._cache_ttl:
120
+ return plan
121
+
122
+ db = _get_firestore_client()
123
+ if not db:
124
+ logger.error("Firestore client unavailable")
125
+ return InterventionPlan(student_id=student_id, generated_at=_now_iso())
126
+
127
+ # Fetch student data from managedStudents
128
+ student_data = await self._fetch_student_data(db, student_id)
129
+ if not student_data:
130
+ return InterventionPlan(
131
+ student_id=student_id,
132
+ student_name="Unknown",
133
+ generated_at=_now_iso(),
134
+ learning_strengths="No assessment data available yet.",
135
+ next_steps_summary="Assign a diagnostic quiz to begin intervention planning.",
136
+ )
137
+
138
+ # Fetch quiz attempts
139
+ quiz_attempts = await self._fetch_quiz_attempts(db, student_id, student_data)
140
+
141
+ # Compute metrics
142
+ now = datetime.now(timezone.utc)
143
+ quiz_count = len(quiz_attempts)
144
+ avg_score = 0.0
145
+ accuracy_by_topic: Dict[str, List[float]] = {}
146
+
147
+ if quiz_count > 0:
148
+ scores = [float(q.get("score", 0)) for q in quiz_attempts]
149
+ avg_score = sum(scores) / len(scores)
150
+
151
+ for attempt in quiz_attempts:
152
+ topic = self._extract_topic(attempt)
153
+ if topic:
154
+ if topic not in accuracy_by_topic:
155
+ accuracy_by_topic[topic] = []
156
+ accuracy_by_topic[topic].append(float(attempt.get("score", 0)))
157
+
158
+ topic_avgs = {t: round(sum(s) / len(s), 1) for t, s in accuracy_by_topic.items() if s}
159
+ weak_topics = [t for t, s in sorted(topic_avgs.items(), key=lambda x: x[1]) if s < 70][:5]
160
+ strong_topics = [t for t, s in topic_avgs.items() if s >= 70]
161
+ weakest_topic = weak_topics[0] if weak_topics else student_data.get("weakestTopic", "Foundational Skills")
162
+ if weakest_topic == "N/A":
163
+ weakest_topic = "Foundational Skills"
164
+
165
+ # Last active
166
+ days_since_active = None
167
+ last_active_str = None
168
+ last_active_ts = student_data.get("lastActive")
169
+ if last_active_ts:
170
+ try:
171
+ if hasattr(last_active_ts, "seconds"):
172
+ last_dt = datetime.fromtimestamp(last_active_ts.seconds, tz=timezone.utc)
173
+ else:
174
+ last_dt = last_active_ts
175
+ last_active_str = last_dt.isoformat()
176
+ days_since_active = (now - last_dt).days
177
+ except Exception:
178
+ pass
179
+
180
+ # Recent quiz count (last 14 days)
181
+ recent_count = sum(1 for q in quiz_attempts if self._is_recent(q, now, 14))
182
+
183
+ risk_level = _classify_risk(avg_score, quiz_count, days_since_active)
184
+ engagement = _classify_engagement(days_since_active, recent_count)
185
+
186
+ # Generate AI insights
187
+ insights = await self._generate_insights(
188
+ grade_level=student_data.get("gradeLevel", student_data.get("grade", "11")),
189
+ section=student_data.get("section", ""),
190
+ risk_level=risk_level,
191
+ avg_score=avg_score,
192
+ engagement=engagement,
193
+ strong_topics=strong_topics,
194
+ weak_topics=weak_topics,
195
+ quiz_count=quiz_count,
196
+ )
197
+
198
+ # Generate learning path
199
+ learning_path = await self._generate_learning_path(
200
+ student_id=student_id,
201
+ grade_level=student_data.get("gradeLevel", student_data.get("grade", "11")),
202
+ risk_level=risk_level,
203
+ engagement=engagement,
204
+ weak_topics=weak_topics,
205
+ weakest_topic=weakest_topic,
206
+ topic_avgs=topic_avgs,
207
+ )
208
+
209
+ # Generate teacher recommendations
210
+ recommendations = await self._generate_recommendations(
211
+ grade_level=student_data.get("gradeLevel", student_data.get("grade", "11")),
212
+ risk_level=risk_level,
213
+ weak_topics=weak_topics,
214
+ avg_score=avg_score,
215
+ )
216
+
217
+ plan = InterventionPlan(
218
+ student_id=student_id,
219
+ student_name=student_data.get("name", "Unknown"),
220
+ grade_level=student_data.get("gradeLevel", student_data.get("grade", "")),
221
+ section=student_data.get("section", ""),
222
+ risk_level=risk_level,
223
+ avg_score=round(avg_score, 1),
224
+ engagement_level=engagement,
225
+ last_active=last_active_str,
226
+ weakest_topic=weakest_topic,
227
+ weak_topics=weak_topics,
228
+ accuracy_by_topic=topic_avgs,
229
+ learning_strengths=insights.get("learning_strengths", "Shows potential for improvement with guided support."),
230
+ next_steps_summary=insights.get("next_steps_summary", f"Focus on {weakest_topic} with structured practice."),
231
+ learning_path=learning_path,
232
+ generated_at=_now_iso(),
233
+ teacher_recommendations=recommendations,
234
+ )
235
+
236
+ # Cache
237
+ self._cache[student_id] = (plan, time.time())
238
+
239
+ # Persist to Firestore
240
+ try:
241
+ db.collection("intervention_plans").document(student_id).set(
242
+ plan.model_dump(), merge=True
243
+ )
244
+ except Exception as e:
245
+ logger.warning(f"Failed to persist intervention plan: {e}")
246
+
247
+ return plan
248
+
249
+ async def _fetch_student_data(self, db: Any, student_id: str) -> Optional[Dict]:
250
+ """Fetch student from managedStudents or users collection."""
251
+ try:
252
+ doc = db.collection("managedStudents").document(student_id).get()
253
+ if doc.exists:
254
+ return {"id": doc.id, **doc.to_dict()}
255
+
256
+ # Try users collection
257
+ doc2 = db.collection("users").document(student_id).get()
258
+ if doc2.exists:
259
+ return {"id": doc2.id, **doc2.to_dict()}
260
+ except Exception as e:
261
+ logger.debug(f"Error fetching student {student_id}: {e}")
262
+ return None
263
+
264
+ async def _fetch_quiz_attempts(self, db: Any, student_id: str, student_data: Dict) -> List[Dict]:
265
+ """Fetch quiz attempts from progress collection and practice_results."""
266
+ attempts = []
267
+
268
+ # Try progress/{student_id}
269
+ for lookup_id in [student_id, student_data.get("lrn"), student_data.get("accountUid")]:
270
+ if not lookup_id:
271
+ continue
272
+ try:
273
+ progress_doc = db.collection("progress").document(lookup_id).get()
274
+ if progress_doc.exists:
275
+ pdata = progress_doc.to_dict()
276
+ quiz_data = pdata.get("quizAttempts", [])
277
+ if quiz_data:
278
+ attempts = quiz_data
279
+ break
280
+ except Exception:
281
+ pass
282
+
283
+ # Also check practice_results
284
+ account_uid = student_data.get("accountUid") or student_id
285
+ try:
286
+ sessions = (
287
+ db.collection("practice_results")
288
+ .document(account_uid)
289
+ .collection("sessions")
290
+ .order_by("submitted_at", direction="DESCENDING")
291
+ .limit(30)
292
+ .stream()
293
+ )
294
+ for sess in sessions:
295
+ sd = sess.to_dict()
296
+ attempts.append({
297
+ "quizId": sd.get("session_id", ""),
298
+ "score": sd.get("score_percent", 0),
299
+ "completedAt": sd.get("submitted_at"),
300
+ "answers": sd.get("per_question_feedback", []),
301
+ "subject": sd.get("subject", ""),
302
+ })
303
+ except Exception:
304
+ pass
305
+
306
+ return attempts[:30]
307
+
308
+ def _extract_topic(self, attempt: Dict) -> Optional[str]:
309
+ """Extract topic from quiz attempt."""
310
+ # Check subject field first
311
+ if attempt.get("subject"):
312
+ return attempt["subject"]
313
+ # Try to extract from quizId
314
+ quiz_id = attempt.get("quizId", "")
315
+ if not quiz_id:
316
+ return None
317
+ parts = quiz_id.replace("_", "-").replace(".", "-").split("-")
318
+ if len(parts) >= 2:
319
+ topic = " ".join(p.capitalize() for p in parts[:2] if p and not p.isdigit())
320
+ return topic if topic else None
321
+ return quiz_id.capitalize() if quiz_id else None
322
+
323
+ def _is_recent(self, attempt: Dict, now: datetime, days: int) -> bool:
324
+ completed = attempt.get("completedAt")
325
+ if not completed:
326
+ return False
327
+ try:
328
+ if hasattr(completed, "seconds"):
329
+ q_dt = datetime.fromtimestamp(completed.seconds, tz=timezone.utc)
330
+ elif isinstance(completed, datetime):
331
+ q_dt = completed
332
+ else:
333
+ return False
334
+ return (now - q_dt).days <= days
335
+ except Exception:
336
+ return False
337
+
338
+ async def _generate_insights(self, **kwargs) -> Dict[str, str]:
339
+ """Generate learning_strengths and next_steps_summary via DeepSeek."""
340
+ prompt = f"""You are MathPulse AI analyzing a Filipino K-12 student's performance data.
341
+
342
+ Student: Grade {kwargs['grade_level']}, Section {kwargs['section']}
343
+ Risk Level: {kwargs['risk_level']}
344
+ Average Score: {kwargs['avg_score']:.1f}%
345
+ Engagement: {kwargs['engagement']}
346
+ Strong Topics (accuracy > 70%): {', '.join(kwargs['strong_topics'][:3]) or 'None identified yet'}
347
+ Weak Topics (accuracy < 60%): {', '.join(kwargs['weak_topics'][:3]) or 'None identified yet'}
348
+ Quiz Attempt Count (last 30 days): {kwargs['quiz_count']}
349
+
350
+ Generate two SHORT insights (max 20 words each):
351
+ 1. LEARNING STRENGTHS: What the student excels at or shows potential in. Be specific and encouraging.
352
+ 2. NEXT STEPS: The single most important action for the teacher/student right now.
353
+
354
+ Return as JSON:
355
+ {{"learning_strengths": "...", "next_steps_summary": "..."}}"""
356
+
357
+ try:
358
+ from services.ai_client import get_deepseek_client, CHAT_MODEL
359
+ client = get_deepseek_client()
360
+ response = client.chat.completions.create(
361
+ model=CHAT_MODEL,
362
+ messages=[
363
+ {"role": "system", "content": "You are MathPulse AI. Respond only with valid JSON."},
364
+ {"role": "user", "content": prompt},
365
+ ],
366
+ temperature=0.3,
367
+ max_tokens=200,
368
+ response_format={"type": "json_object"},
369
+ )
370
+ content = response.choices[0].message.content or "{}"
371
+ return json.loads(content)
372
+ except Exception as e:
373
+ logger.warning(f"DeepSeek insights failed: {e}")
374
+ return {
375
+ "learning_strengths": "Shows willingness to engage with the platform." if kwargs['quiz_count'] > 0 else "No assessment data yet โ€” potential to be discovered.",
376
+ "next_steps_summary": f"Begin with foundational practice in {kwargs['weak_topics'][0] if kwargs['weak_topics'] else 'core topics'}.",
377
+ }
378
+
379
+ async def _generate_learning_path(self, **kwargs) -> LearningPath:
380
+ """Generate a structured learning path via DeepSeek."""
381
+ student_id = kwargs["student_id"]
382
+ weak_topics = kwargs["weak_topics"]
383
+ weakest_topic = kwargs["weakest_topic"]
384
+ engagement = kwargs["engagement"]
385
+ risk_level = kwargs["risk_level"]
386
+ grade_level = kwargs["grade_level"]
387
+ topic_avgs = kwargs["topic_avgs"]
388
+
389
+ style_hint = "shorter steps (5-8 min), gamified" if engagement == "Low" else "standard pacing (10-15 min)"
390
+ estimated_days = 5 if risk_level == "Critical" else 7
391
+
392
+ prompt = f"""Create a personalized intervention learning path for a Filipino K-12 math student.
393
+
394
+ Student Profile:
395
+ - Grade Level: {grade_level}
396
+ - Risk Level: {risk_level}
397
+ - Primary Weak Topic: {weakest_topic}
398
+ - All Weak Topics: {', '.join(weak_topics[:4]) or weakest_topic}
399
+ - Accuracy by Topic: {json.dumps(topic_avgs)}
400
+ - Engagement: {engagement} โ†’ {style_hint}
401
+
402
+ Create a 4-6 step learning path that:
403
+ 1. Starts with the MOST CRITICAL weak topic (lowest accuracy)
404
+ 2. Uses varied methodology: video โ†’ practice โ†’ assessment โ†’ review cycle
405
+ 3. Scales difficulty: start easy, progress to grade-level
406
+ 4. Total estimated time: {estimated_days} days
407
+
408
+ Return ONLY valid JSON:
409
+ {{
410
+ "methodology_tags": ["Interactive", "Video", "Practice", "Quiz"],
411
+ "estimated_duration_days": {estimated_days},
412
+ "ai_rationale": "1 sentence explaining why this path was chosen",
413
+ "steps": [
414
+ {{
415
+ "step_number": 1,
416
+ "type": "video_lesson",
417
+ "title": "Topic - Concept Name",
418
+ "description": "Brief description of what student will learn",
419
+ "duration_minutes": 8,
420
+ "num_items": null,
421
+ "topic": "Topic Name",
422
+ "competency_tag": "M11GM-Ia-1",
423
+ "difficulty": "easy"
424
+ }}
425
+ ]
426
+ }}"""
427
+
428
+ try:
429
+ from services.ai_client import get_deepseek_client, CHAT_MODEL
430
+ client = get_deepseek_client()
431
+ response = client.chat.completions.create(
432
+ model=CHAT_MODEL,
433
+ messages=[
434
+ {"role": "system", "content": "You are a curriculum designer for Filipino K-12 DepEd math. Respond only with valid JSON."},
435
+ {"role": "user", "content": prompt},
436
+ ],
437
+ temperature=0.4,
438
+ max_tokens=800,
439
+ response_format={"type": "json_object"},
440
+ )
441
+ content = response.choices[0].message.content or "{}"
442
+ parsed = json.loads(content)
443
+
444
+ steps = []
445
+ for s in parsed.get("steps", []):
446
+ steps.append(LearningStep(
447
+ step_number=s.get("step_number", len(steps) + 1),
448
+ type=s.get("type", "practice"),
449
+ title=s.get("title", "Practice Step"),
450
+ description=s.get("description", ""),
451
+ duration_minutes=s.get("duration_minutes", 10),
452
+ num_items=s.get("num_items"),
453
+ topic=s.get("topic", weakest_topic),
454
+ competency_tag=s.get("competency_tag", ""),
455
+ difficulty=s.get("difficulty", "easy"),
456
+ ))
457
+
458
+ return LearningPath(
459
+ student_id=student_id,
460
+ generated_at=_now_iso(),
461
+ methodology_tags=parsed.get("methodology_tags", ["Interactive", "Video", "Practice", "Quiz"]),
462
+ steps=steps,
463
+ estimated_duration_days=parsed.get("estimated_duration_days", estimated_days),
464
+ primary_weak_topic=weakest_topic,
465
+ all_weak_topics=weak_topics,
466
+ ai_rationale=parsed.get("ai_rationale", f"Focused on {weakest_topic} as the primary area needing improvement."),
467
+ )
468
+ except Exception as e:
469
+ logger.warning(f"DeepSeek learning path generation failed: {e}")
470
+ # Fallback: generate a basic path
471
+ return self._fallback_learning_path(student_id, weakest_topic, weak_topics)
472
+
473
+ def _fallback_learning_path(self, student_id: str, weakest_topic: str, weak_topics: List[str]) -> LearningPath:
474
+ """Generate a basic learning path without AI."""
475
+ steps = [
476
+ LearningStep(step_number=1, type="video_lesson", title=f"{weakest_topic} - Fundamentals",
477
+ description="Review core concepts", duration_minutes=8, topic=weakest_topic, difficulty="easy"),
478
+ LearningStep(step_number=2, type="practice", title=f"{weakest_topic} - Guided Practice",
479
+ description="Work through examples", duration_minutes=12, num_items=10, topic=weakest_topic, difficulty="easy"),
480
+ LearningStep(step_number=3, type="practice", title=f"{weakest_topic} - Independent Practice",
481
+ description="Solve problems independently", duration_minutes=15, num_items=10, topic=weakest_topic, difficulty="medium"),
482
+ LearningStep(step_number=4, type="assessment", title=f"{weakest_topic} - Mastery Check",
483
+ description="Demonstrate understanding", duration_minutes=10, num_items=5, topic=weakest_topic, difficulty="medium"),
484
+ ]
485
+ if len(weak_topics) > 1:
486
+ steps.append(LearningStep(step_number=5, type="review", title=f"{weak_topics[1]} - Review",
487
+ description="Brief review of secondary weak area", duration_minutes=10, topic=weak_topics[1], difficulty="easy"))
488
+
489
+ return LearningPath(
490
+ student_id=student_id,
491
+ generated_at=_now_iso(),
492
+ steps=steps,
493
+ estimated_duration_days=7,
494
+ primary_weak_topic=weakest_topic,
495
+ all_weak_topics=weak_topics,
496
+ ai_rationale=f"Structured path focusing on {weakest_topic} with progressive difficulty.",
497
+ )
498
+
499
+ async def _generate_recommendations(self, **kwargs) -> List[str]:
500
+ """Generate teacher recommendations via DeepSeek."""
501
+ prompt = f"""Generate 3-5 concise, actionable recommendations for a teacher working with this at-risk student.
502
+
503
+ Student: Grade {kwargs['grade_level']}, Risk: {kwargs['risk_level']}
504
+ Weak Topics: {', '.join(kwargs['weak_topics'][:3]) or 'Foundational Skills'}
505
+ Avg Score: {kwargs['avg_score']:.0f}%
506
+
507
+ Return as a JSON array of strings. Each recommendation max 25 words. Be specific to the weak topics."""
508
+
509
+ try:
510
+ from services.ai_client import get_deepseek_client, CHAT_MODEL
511
+ client = get_deepseek_client()
512
+ response = client.chat.completions.create(
513
+ model=CHAT_MODEL,
514
+ messages=[
515
+ {"role": "system", "content": "You are a K-12 math education advisor. Respond only with a JSON array of strings."},
516
+ {"role": "user", "content": prompt},
517
+ ],
518
+ temperature=0.3,
519
+ max_tokens=300,
520
+ response_format={"type": "json_object"},
521
+ )
522
+ content = response.choices[0].message.content or "[]"
523
+ parsed = json.loads(content)
524
+ if isinstance(parsed, list):
525
+ return parsed[:5]
526
+ if isinstance(parsed, dict):
527
+ return parsed.get("recommendations", parsed.get("actions", []))[:5]
528
+ except Exception as e:
529
+ logger.warning(f"DeepSeek recommendations failed: {e}")
530
+
531
+ return [
532
+ f"Schedule 1-on-1 review session for {kwargs['weak_topics'][0] if kwargs['weak_topics'] else 'foundational skills'}.",
533
+ "Assign additional practice problems at reduced difficulty level.",
534
+ "Monitor quiz completion and provide immediate feedback.",
535
+ ]
536
+
537
+ async def complete_step(self, student_id: str, step_number: int, score: float, time_spent: int) -> Dict:
538
+ """Mark a learning step as completed."""
539
+ db = _get_firestore_client()
540
+ if not db:
541
+ return {"error": "Firestore unavailable"}
542
+
543
+ try:
544
+ plan_ref = db.collection("intervention_plans").document(student_id)
545
+ plan_doc = plan_ref.get()
546
+ if not plan_doc.exists:
547
+ return {"error": "No intervention plan found"}
548
+
549
+ plan_data = plan_doc.to_dict()
550
+ learning_path = plan_data.get("learning_path", {})
551
+ steps = learning_path.get("steps", [])
552
+
553
+ for step in steps:
554
+ if step.get("step_number") == step_number:
555
+ step["is_completed"] = True
556
+ step["completion_score"] = score
557
+ break
558
+
559
+ plan_ref.update({"learning_path.steps": steps})
560
+
561
+ # Invalidate cache
562
+ self._cache.pop(student_id, None)
563
+
564
+ return {"status": "completed", "step_number": step_number, "score": score}
565
+ except Exception as e:
566
+ logger.error(f"Failed to complete step: {e}")
567
+ return {"error": str(e)}
568
+
569
+ def invalidate_cache(self, student_id: str) -> None:
570
+ self._cache.pop(student_id, None)
571
+
572
+
573
+ # โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
574
+
575
+ def _now_iso() -> str:
576
+ return datetime.now(timezone.utc).isoformat()
577
+
578
+
579
+ # Singleton
580
+ _engine_instance: Optional[InterventionEngine] = None
581
+
582
+
583
+ def get_intervention_engine() -> InterventionEngine:
584
+ global _engine_instance
585
+ if _engine_instance is None:
586
+ _engine_instance = InterventionEngine()
587
+ return _engine_instance
services/student_intelligence_pipeline.py ADDED
@@ -0,0 +1,546 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MathPulse AI โ€” Student Intelligence Pipeline
3
+
4
+ Central event processor that:
5
+ - Intercepts every student activity completion
6
+ - Recomputes P (systemPerformanceAvg) from all accumulated scores
7
+ - Calls existing compute_wri() with updated D, G, P
8
+ - Writes denormalized student_profiles and class summaries
9
+ - Triggers DeepSeek AI context generation when meaningful
10
+ """
11
+
12
+ import json
13
+ import logging
14
+ import os
15
+ import time
16
+ from datetime import datetime, timedelta, timezone
17
+ from typing import Any, Dict, List, Literal, Optional
18
+
19
+ from pydantic import BaseModel, Field
20
+
21
+ logger = logging.getLogger("mathpulse.pipeline")
22
+
23
+ # โ”€โ”€โ”€ Firestore โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
24
+
25
+ _firebase_firestore = None
26
+
27
+
28
+ def _get_db():
29
+ global _firebase_firestore
30
+ if _firebase_firestore is None:
31
+ try:
32
+ from firebase_admin import firestore as ff
33
+ _firebase_firestore = ff
34
+ except Exception:
35
+ return None
36
+ try:
37
+ return _firebase_firestore.client()
38
+ except Exception:
39
+ return None
40
+
41
+
42
+ # โ”€โ”€โ”€ Models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
43
+
44
+ class StudentActivityEvent(BaseModel):
45
+ student_id: str
46
+ event_type: Literal["diagnostic", "quiz", "battle", "lesson", "module", "session"]
47
+ event_data: Dict[str, Any] = Field(default_factory=dict)
48
+ occurred_at: str # ISO string
49
+ class_id: str = ""
50
+ teacher_id: str = ""
51
+
52
+
53
+ class ProfileUpdateResult(BaseModel):
54
+ student_id: str
55
+ profile_updated: bool = False
56
+ p_updated: bool = False
57
+ new_p: Optional[float] = None
58
+ wri_recomputed: bool = False
59
+ new_wri: Optional[float] = None
60
+ new_risk_status: str = "pending_assessment"
61
+ risk_status_changed: bool = False
62
+ previous_risk_status: Optional[str] = None
63
+ ai_regenerated: bool = False
64
+
65
+
66
+ # โ”€โ”€โ”€ Source weights and recency multipliers for P computation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
67
+
68
+ SOURCE_WEIGHTS = {
69
+ "practice": 1.0,
70
+ "lesson_quiz": 1.0,
71
+ "module_quiz": 1.2,
72
+ "assessment": 1.2,
73
+ "battle": 0.8,
74
+ "intervention_quiz": 1.3,
75
+ "diagnostic": 1.0,
76
+ }
77
+
78
+
79
+ def _recency_multiplier(occurred_at: datetime) -> float:
80
+ now = datetime.now(timezone.utc)
81
+ days_ago = (now - occurred_at).days
82
+ if days_ago <= 7:
83
+ return 1.5
84
+ if days_ago <= 30:
85
+ return 1.0
86
+ return 0.6
87
+
88
+
89
+ # โ”€โ”€โ”€ Pipeline โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
90
+
91
+ class StudentIntelligencePipeline:
92
+
93
+ async def process_event(self, event: StudentActivityEvent) -> ProfileUpdateResult:
94
+ """Master entry point. Called after every student activity."""
95
+ db = _get_db()
96
+ if not db:
97
+ logger.error("Firestore unavailable")
98
+ return ProfileUpdateResult(student_id=event.student_id)
99
+
100
+ result = ProfileUpdateResult(student_id=event.student_id)
101
+
102
+ try:
103
+ # 1. Load or create profile
104
+ profile = self._load_profile(db, event.student_id)
105
+
106
+ # 2. Load managed student data (source of D, G, weights)
107
+ managed = self._load_managed_student(db, event.student_id)
108
+
109
+ # 3. Update profile section from event
110
+ self._update_profile_section(profile, event)
111
+
112
+ # 4. Compute P from all activity (skip for session events)
113
+ if event.event_type != "session":
114
+ d_score = managed.get("diagnosticScore") or profile.get("diagnostic", {}).get("overall_score")
115
+ new_p = self._compute_system_performance_avg(db, event.student_id, event, diagnostic_score=d_score)
116
+ profile["system_performance_avg"] = new_p
117
+ result.p_updated = True
118
+ result.new_p = new_p
119
+ else:
120
+ new_p = profile.get("system_performance_avg")
121
+
122
+ # 5. Recompute WRI using existing compute_wri function
123
+ d = managed.get("diagnosticScore") or profile.get("diagnostic", {}).get("overall_score")
124
+ g = managed.get("externalGradesAvg") or profile.get("external_grades_avg")
125
+ weights = managed.get("weights") or profile.get("wri_weights") or {"w1": 0.30, "w2": 0.40, "w3": 0.30}
126
+
127
+ if d is not None and event.event_type != "session":
128
+ from services.wri_service import compute_wri
129
+ wri_result = compute_wri(d=d, g=g, p=new_p, weights=weights)
130
+
131
+ previous_status = profile.get("risk_status", "pending_assessment")
132
+ new_status = wri_result["risk_status"]
133
+
134
+ result.wri_recomputed = True
135
+ result.new_wri = wri_result["wri"]
136
+ result.new_risk_status = new_status
137
+ result.previous_risk_status = previous_status
138
+ result.risk_status_changed = previous_status != new_status
139
+
140
+ # Update profile
141
+ profile["wri"] = wri_result["wri"]
142
+ profile["risk_status"] = new_status
143
+ profile["previous_risk_status"] = previous_status
144
+ profile["system_performance_avg"] = new_p
145
+ profile["external_grades_avg"] = g
146
+ profile["diagnostic_score"] = d
147
+ profile["wri_weights"] = weights
148
+ profile["g_fallback"] = wri_result["g_fallback"]
149
+ profile["p_fallback"] = wri_result["p_fallback"]
150
+ profile["wri_updated_at"] = _now_iso()
151
+
152
+ # Compute risk trend
153
+ profile["risk_trend"] = self._compute_risk_trend(profile)
154
+
155
+ # 6. Write to managedStudents (update P, WRI, riskStatus)
156
+ self._update_managed_student(db, event.student_id, wri_result, new_p)
157
+
158
+ # 7. AI context generation (cost-controlled)
159
+ if self._should_regenerate_ai(event, profile, result):
160
+ ai_ctx = await self._generate_ai_context(profile, event)
161
+ if ai_ctx:
162
+ profile["ai_context"] = ai_ctx
163
+ result.ai_regenerated = True
164
+
165
+ # 8. Update metadata
166
+ profile["last_updated_at"] = _now_iso()
167
+ profile["last_event_type"] = event.event_type
168
+ profile["last_event_at"] = event.occurred_at
169
+ profile["profile_version"] = profile.get("profile_version", 0) + 1
170
+
171
+ # 9. Write student_profiles/{id}
172
+ db.collection("student_profiles").document(event.student_id).set(profile, merge=True)
173
+ result.profile_updated = True
174
+
175
+ # 10. Write denormalized summary
176
+ if event.class_id:
177
+ self._write_summary(db, event.class_id, event.student_id, profile)
178
+
179
+ # 11. Invalidate class analytics cache
180
+ if event.class_id:
181
+ self._invalidate_class_cache(db, event.class_id)
182
+
183
+ except Exception as e:
184
+ logger.error(f"Pipeline error for {event.student_id}: {e}", exc_info=True)
185
+
186
+ return result
187
+
188
+ # โ”€โ”€โ”€ Profile loading โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
189
+
190
+ def _load_profile(self, db, student_id: str) -> Dict:
191
+ doc = db.collection("student_profiles").document(student_id).get()
192
+ if doc.exists:
193
+ return doc.to_dict()
194
+ return {"student_id": student_id, "profile_version": 0, "risk_status": "pending_assessment"}
195
+
196
+ def _load_managed_student(self, db, student_id: str) -> Dict:
197
+ doc = db.collection("managedStudents").document(student_id).get()
198
+ if doc.exists:
199
+ return doc.to_dict()
200
+ # Try users collection
201
+ doc2 = db.collection("users").document(student_id).get()
202
+ return doc2.to_dict() if doc2.exists else {}
203
+
204
+ # โ”€โ”€โ”€ Profile section updates โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
205
+
206
+ def _update_profile_section(self, profile: Dict, event: StudentActivityEvent):
207
+ ed = event.event_data
208
+
209
+ if event.event_type == "diagnostic":
210
+ profile.setdefault("diagnostic", {})
211
+ profile["diagnostic"]["completed"] = True
212
+ profile["diagnostic"]["completed_at"] = event.occurred_at
213
+ profile["diagnostic"]["overall_score"] = ed.get("overall_score", 0)
214
+ profile["diagnostic"]["per_topic_scores"] = ed.get("per_topic_scores", {})
215
+ profile["diagnostic"]["weak_topics"] = [
216
+ t for t, s in ed.get("per_topic_scores", {}).items() if s < 60
217
+ ]
218
+ profile["diagnostic"]["strong_topics"] = [
219
+ t for t, s in ed.get("per_topic_scores", {}).items() if s >= 75
220
+ ]
221
+ profile["diagnostic_score"] = ed.get("overall_score", 0)
222
+
223
+ elif event.event_type in ("quiz", "battle"):
224
+ qp = profile.setdefault("quiz_performance", {
225
+ "total_attempts": 0, "avg_score_all_time": None,
226
+ "recent_attempts": [], "accuracy_by_topic": {},
227
+ })
228
+ qp["total_attempts"] = qp.get("total_attempts", 0) + 1
229
+
230
+ score = ed.get("score", 0)
231
+ topic = ed.get("topic", "")
232
+
233
+ # Update rolling average
234
+ prev_avg = qp.get("avg_score_all_time") or 0
235
+ prev_count = qp["total_attempts"] - 1
236
+ if prev_count > 0:
237
+ qp["avg_score_all_time"] = round(
238
+ (prev_avg * prev_count + score) / qp["total_attempts"], 1
239
+ )
240
+ else:
241
+ qp["avg_score_all_time"] = score
242
+
243
+ # Update per-topic accuracy (exponential moving average)
244
+ if topic:
245
+ acc = qp.setdefault("accuracy_by_topic", {})
246
+ prev = acc.get(topic, score)
247
+ acc[topic] = round(prev * 0.7 + score * 0.3, 1)
248
+
249
+ # Add to recent attempts (keep last 10)
250
+ recent = qp.setdefault("recent_attempts", [])
251
+ recent.insert(0, {
252
+ "quiz_id": ed.get("quiz_id", ""),
253
+ "topic": topic,
254
+ "competency_tag": ed.get("competency_tag", ""),
255
+ "score": score,
256
+ "source": ed.get("source", event.event_type),
257
+ "attempted_at": event.occurred_at,
258
+ })
259
+ qp["recent_attempts"] = recent[:10]
260
+
261
+ # Compute lowest/highest topics
262
+ if qp.get("accuracy_by_topic"):
263
+ sorted_topics = sorted(qp["accuracy_by_topic"].items(), key=lambda x: x[1])
264
+ qp["lowest_accuracy_topics"] = [t for t, _ in sorted_topics[:5]]
265
+ qp["highest_accuracy_topics"] = [t for t, _ in sorted_topics[-5:]]
266
+
267
+ # Battle-specific
268
+ if event.event_type == "battle":
269
+ bp = profile.setdefault("battle_performance", {"total_battles": 0, "battles_won": 0})
270
+ bp["total_battles"] = bp.get("total_battles", 0) + 1
271
+ if ed.get("won"):
272
+ bp["battles_won"] = bp.get("battles_won", 0) + 1
273
+ bp["win_rate"] = round(bp["battles_won"] / bp["total_battles"] * 100, 1) if bp["total_battles"] > 0 else 0
274
+ bp["avg_battle_score"] = score
275
+ bp["last_battle_at"] = event.occurred_at
276
+
277
+ elif event.event_type == "lesson":
278
+ ce = profile.setdefault("content_engagement", {"lessons_completed": 0, "modules_completed": 0, "topics_studied": []})
279
+ if ed.get("is_completed"):
280
+ ce["lessons_completed"] = ce.get("lessons_completed", 0) + 1
281
+ topic = ed.get("topic", "")
282
+ if topic and topic not in ce.get("topics_studied", []):
283
+ ce.setdefault("topics_studied", []).append(topic)
284
+ ce["last_content_at"] = event.occurred_at
285
+
286
+ elif event.event_type == "module":
287
+ ce = profile.setdefault("content_engagement", {"lessons_completed": 0, "modules_completed": 0, "topics_studied": []})
288
+ if ed.get("is_completed"):
289
+ ce["modules_completed"] = ce.get("modules_completed", 0) + 1
290
+ ce["last_content_at"] = event.occurred_at
291
+
292
+ elif event.event_type == "session":
293
+ eng = profile.setdefault("engagement", {"total_sessions": 0, "login_streak": 0})
294
+ eng["total_sessions"] = eng.get("total_sessions", 0) + 1
295
+ eng["last_active_at"] = event.occurred_at
296
+ eng["days_since_last_active"] = 0
297
+
298
+ # โ”€โ”€โ”€ P computation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
299
+
300
+ def _compute_system_performance_avg(
301
+ self, db, student_id: str, current_event: StudentActivityEvent, diagnostic_score: Optional[float] = None
302
+ ) -> float:
303
+ """Compute P from ALL in-platform scores with source weights and recency."""
304
+ scores = []
305
+
306
+ # Fetch from quizSubmissions
307
+ try:
308
+ subs = db.collection("quizSubmissions").where("lrn", "==", student_id).order_by(
309
+ "submittedAt", direction="DESCENDING"
310
+ ).limit(50).stream()
311
+ for s in subs:
312
+ d = s.to_dict()
313
+ score = d.get("score", 0)
314
+ source = d.get("source", "practice")
315
+ submitted = d.get("submittedAt")
316
+ if submitted and hasattr(submitted, "seconds"):
317
+ dt = datetime.fromtimestamp(submitted.seconds, tz=timezone.utc)
318
+ else:
319
+ dt = datetime.now(timezone.utc) - timedelta(days=15)
320
+ scores.append((score, source, dt))
321
+ except Exception as e:
322
+ logger.debug(f"quizSubmissions fetch failed for {student_id}: {e}")
323
+
324
+ # Fetch from progress/{id}.quizAttempts
325
+ try:
326
+ for lookup_id in [student_id]:
327
+ prog = db.collection("progress").document(lookup_id).get()
328
+ if prog.exists:
329
+ attempts = prog.to_dict().get("quizAttempts", [])
330
+ for a in attempts:
331
+ score = a.get("score", 0)
332
+ completed = a.get("completedAt")
333
+ if completed and hasattr(completed, "seconds"):
334
+ dt = datetime.fromtimestamp(completed.seconds, tz=timezone.utc)
335
+ else:
336
+ dt = datetime.now(timezone.utc) - timedelta(days=15)
337
+ scores.append((score, "practice", dt))
338
+ break
339
+ except Exception as e:
340
+ logger.debug(f"progress fetch failed for {student_id}: {e}")
341
+
342
+ # Include current event score
343
+ if current_event.event_type in ("quiz", "battle", "diagnostic"):
344
+ event_score = current_event.event_data.get("score") or current_event.event_data.get("overall_score", 0)
345
+ source = current_event.event_data.get("source", current_event.event_type)
346
+ try:
347
+ dt = datetime.fromisoformat(current_event.occurred_at.replace("Z", "+00:00"))
348
+ except Exception:
349
+ dt = datetime.now(timezone.utc)
350
+ scores.append((event_score, source, dt))
351
+
352
+ if not scores:
353
+ # Fallback to D (diagnostic score)
354
+ return float(diagnostic_score) if diagnostic_score is not None else 0.0
355
+
356
+ # Weighted average
357
+ weighted_sum = 0.0
358
+ weight_sum = 0.0
359
+ for score, source, dt in scores:
360
+ sw = SOURCE_WEIGHTS.get(source, 1.0)
361
+ rm = _recency_multiplier(dt)
362
+ weighted_sum += score * sw * rm
363
+ weight_sum += sw * rm
364
+
365
+ return round(weighted_sum / weight_sum, 2) if weight_sum > 0 else 0.0
366
+
367
+ # โ”€โ”€โ”€ WRI update to managedStudents โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
368
+
369
+ def _update_managed_student(self, db, student_id: str, wri_result: Dict, new_p: float):
370
+ """Write updated WRI data to managedStudents/{id}."""
371
+ try:
372
+ ref = db.collection("managedStudents").document(student_id)
373
+ if not ref.get().exists:
374
+ return
375
+ ref.update({
376
+ "wri": wri_result["wri"],
377
+ "riskStatus": wri_result["risk_status"],
378
+ "systemPerformanceAvg": new_p,
379
+ "riskUpdatedAt": datetime.now(timezone.utc),
380
+ })
381
+ except Exception as e:
382
+ logger.warning(f"Failed to update managedStudents/{student_id}: {e}")
383
+
384
+ # โ”€โ”€โ”€ Risk trend โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
385
+
386
+ def _compute_risk_trend(self, profile: Dict) -> str:
387
+ qp = profile.get("quiz_performance", {})
388
+ avg_all = qp.get("avg_score_all_time")
389
+ if avg_all is None:
390
+ return "insufficient_data"
391
+ # Compute 7-day avg from recent_attempts
392
+ recent = qp.get("recent_attempts", [])
393
+ now = datetime.now(timezone.utc)
394
+ scores_7d = []
395
+ for a in recent:
396
+ try:
397
+ at = a.get("attempted_at", "")
398
+ if isinstance(at, str):
399
+ dt = datetime.fromisoformat(at.replace("Z", "+00:00"))
400
+ elif hasattr(at, "seconds"):
401
+ dt = datetime.fromtimestamp(at.seconds, tz=timezone.utc)
402
+ else:
403
+ continue
404
+ if (now - dt).days <= 7:
405
+ scores_7d.append(a.get("score", 0))
406
+ except Exception:
407
+ continue
408
+ if len(scores_7d) < 2:
409
+ return "insufficient_data"
410
+ avg_7d = sum(scores_7d) / len(scores_7d)
411
+ if avg_7d > avg_all + 5:
412
+ return "improving"
413
+ if avg_7d < avg_all - 5:
414
+ return "worsening"
415
+ return "stable"
416
+
417
+ # โ”€โ”€โ”€ AI context generation โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
418
+
419
+ def _should_regenerate_ai(self, event: StudentActivityEvent, profile: Dict, result: ProfileUpdateResult) -> bool:
420
+ if event.event_type == "session":
421
+ return False
422
+ if event.event_type == "diagnostic":
423
+ return True
424
+ if result.risk_status_changed:
425
+ return True
426
+ ai_ctx = profile.get("ai_context", {})
427
+ if not ai_ctx.get("generated_at"):
428
+ return True
429
+ # Rate limit: max once per 6 hours
430
+ try:
431
+ last_gen = datetime.fromisoformat(ai_ctx["generated_at"].replace("Z", "+00:00"))
432
+ if (datetime.now(timezone.utc) - last_gen).total_seconds() < 21600:
433
+ return False
434
+ except Exception:
435
+ pass
436
+ return event.event_type in ("quiz", "battle") and result.new_risk_status in ("critical", "at_risk")
437
+
438
+ async def _generate_ai_context(self, profile: Dict, event: StudentActivityEvent) -> Optional[Dict]:
439
+ """Call DeepSeek for AI context generation."""
440
+ try:
441
+ from services.ai_client import get_deepseek_client, CHAT_MODEL
442
+
443
+ weak_topics = profile.get("quiz_performance", {}).get("lowest_accuracy_topics", [])[:3]
444
+ diag = profile.get("diagnostic", {})
445
+ qp = profile.get("quiz_performance", {})
446
+
447
+ prompt = f"""Analyze this student's FULL learning history and generate insights.
448
+
449
+ Student: Grade {profile.get('grade_level', '?')}, Section {profile.get('section', '?')}
450
+ WRI: {profile.get('wri', 'N/A')} | Status: {profile.get('risk_status', 'pending_assessment')}
451
+ Previous Status: {profile.get('previous_risk_status', 'N/A')}
452
+ Risk Trend: {profile.get('risk_trend', 'insufficient_data')}
453
+
454
+ Diagnostic Score: {diag.get('overall_score', 'N/A')}%
455
+ System Performance (P): {profile.get('system_performance_avg', 'N/A')}%
456
+ External Grades (G): {profile.get('external_grades_avg', 'N/A')}%
457
+
458
+ Quiz Performance: {qp.get('total_attempts', 0)} attempts, avg {qp.get('avg_score_all_time', 'N/A')}%
459
+ Weakest Topics: {', '.join(weak_topics) or 'None identified'}
460
+ Strongest Topics: {', '.join(qp.get('highest_accuracy_topics', [])[:3]) or 'None identified'}
461
+
462
+ Latest Event: {event.event_type} โ€” score {event.event_data.get('score', event.event_data.get('overall_score', 'N/A'))}%
463
+
464
+ Generate JSON:
465
+ {{"ai_summary": "1 sentence: current status with WRI context",
466
+ "ai_strengths": "specific topics/skills they excel at",
467
+ "ai_concerns": "specific gaps needing attention",
468
+ "ai_recommendation": "top 1 action for teacher this week (max 20 words)",
469
+ "notable_change": "what changed most since last update",
470
+ "rag_topics_used": []}}"""
471
+
472
+ client = get_deepseek_client()
473
+ response = client.chat.completions.create(
474
+ model=CHAT_MODEL,
475
+ messages=[
476
+ {"role": "system", "content": "You are MathPulse AI analyzing Filipino K-12 math student data. Respond only with valid JSON."},
477
+ {"role": "user", "content": prompt},
478
+ ],
479
+ temperature=0.3,
480
+ max_tokens=400,
481
+ response_format={"type": "json_object"},
482
+ )
483
+ content = response.choices[0].message.content or "{}"
484
+ parsed = json.loads(content)
485
+ parsed["generated_at"] = _now_iso()
486
+ parsed["based_on_wri_status"] = profile.get("risk_status", "pending_assessment")
487
+ return parsed
488
+ except Exception as e:
489
+ logger.warning(f"DeepSeek AI context generation failed: {e}")
490
+ return None
491
+
492
+ # โ”€โ”€โ”€ Denormalized summary โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
493
+
494
+ def _write_summary(self, db, class_id: str, student_id: str, profile: Dict):
495
+ """Write lightweight summary for Teacher Dashboard reads."""
496
+ qp = profile.get("quiz_performance", {})
497
+ ai = profile.get("ai_context", {})
498
+ eng = profile.get("engagement", {})
499
+
500
+ summary = {
501
+ "student_id": student_id,
502
+ "display_name": profile.get("display_name", ""),
503
+ "wri": profile.get("wri"),
504
+ "risk_status": profile.get("risk_status", "pending_assessment"),
505
+ "previous_risk_status": profile.get("previous_risk_status"),
506
+ "risk_trend": profile.get("risk_trend", "insufficient_data"),
507
+ "avg_score_all_time": qp.get("avg_score_all_time"),
508
+ "avg_score_last_7_days": qp.get("avg_score_last_7_days"),
509
+ "score_trend": qp.get("score_trend", "insufficient_data"),
510
+ "last_active_at": eng.get("last_active_at"),
511
+ "days_since_last_active": eng.get("days_since_last_active", 999),
512
+ "weakest_topic": (qp.get("lowest_accuracy_topics") or [None])[0],
513
+ "ai_summary": ai.get("ai_summary", ""),
514
+ "has_intervention_plan": profile.get("intervention", {}).get("has_active_plan", False),
515
+ "lessons_completed": profile.get("content_engagement", {}).get("lessons_completed", 0),
516
+ "modules_completed": profile.get("content_engagement", {}).get("modules_completed", 0),
517
+ "updated_at": _now_iso(),
518
+ }
519
+
520
+ try:
521
+ db.collection("classes").document(class_id).collection("student_summaries").document(student_id).set(summary, merge=True)
522
+ except Exception as e:
523
+ logger.warning(f"Failed to write summary for {student_id} in class {class_id}: {e}")
524
+
525
+ def _invalidate_class_cache(self, db, class_id: str):
526
+ try:
527
+ db.collection("class_analytics").document(class_id).update({"cache_valid": False})
528
+ except Exception:
529
+ pass
530
+
531
+
532
+ # โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
533
+
534
+ def _now_iso() -> str:
535
+ return datetime.now(timezone.utc).isoformat()
536
+
537
+
538
+ # Singleton
539
+ _pipeline: Optional[StudentIntelligencePipeline] = None
540
+
541
+
542
+ def get_pipeline() -> StudentIntelligencePipeline:
543
+ global _pipeline
544
+ if _pipeline is None:
545
+ _pipeline = StudentIntelligencePipeline()
546
+ return _pipeline