| from typing import Dict, List |
|
|
| import numpy as np |
|
|
|
|
| def build_student_documents(student_info: Dict) -> List[Dict]: |
| docs: List[Dict] = [] |
|
|
| attendance = student_info.get("attendance", {}) |
| results = student_info.get("results", {}) |
|
|
| overall_attendance = attendance.get("overall_pct") |
| overall_status = attendance.get("overall_status") |
| current_cgpa = results.get("current_cgpa") |
|
|
| docs.append( |
| { |
| "source": "student_profile", |
| "course_code": "", |
| "chunk": ( |
| f"Student profile overview: current CGPA is {current_cgpa}. " |
| f"Overall attendance is {overall_attendance}% with status {overall_status}." |
| ), |
| } |
| ) |
|
|
| for subject in attendance.get("subjects", []): |
| docs.append( |
| { |
| "source": "attendance", |
| "course_code": str(subject.get("course_code", "")), |
| "chunk": ( |
| f"Attendance for {subject.get('subject', '')} ({subject.get('course_code', '')}): " |
| f"{subject.get('attended_classes', 0)}/{subject.get('total_classes', 0)} classes, " |
| f"{subject.get('attendance_pct', 0)}%, status {subject.get('status', 'unknown')}." |
| ), |
| } |
| ) |
|
|
| for sem in results.get("semesters", []): |
| sem_no = sem.get("semester") |
| docs.append( |
| { |
| "source": "results_semester", |
| "course_code": "", |
| "chunk": ( |
| f"Semester {sem_no} performance summary: SGPA {sem.get('sgpa', 'NA')}, " |
| f"CGPA {sem.get('cgpa', 'NA')}." |
| ), |
| } |
| ) |
|
|
| for subject in sem.get("subjects", []): |
| docs.append( |
| { |
| "source": "results_subject", |
| "course_code": str(subject.get("course_code", "")), |
| "chunk": ( |
| f"Result for {subject.get('subject', '')} ({subject.get('course_code', '')}) " |
| f"in semester {sem_no}: total score {subject.get('total', 'NA')}" |
| ), |
| } |
| ) |
|
|
| return docs |
|
|
|
|
| def mmr_select( |
| query_embedding: List[float], |
| candidates: List[Dict], |
| top_k: int = 8, |
| lambda_param: float = 0.7, |
| ) -> List[Dict]: |
| if not candidates: |
| return [] |
|
|
| vectors = [] |
| valid_candidates = [] |
|
|
| for c in candidates: |
| emb = c.get("embedding") |
| if not emb: |
| continue |
| vec = np.array(emb, dtype=np.float32) |
| if np.linalg.norm(vec) == 0: |
| continue |
| vectors.append(vec) |
| valid_candidates.append(c) |
|
|
| if not valid_candidates: |
| return [] |
|
|
| q = np.array(query_embedding, dtype=np.float32) |
| q_norm = np.linalg.norm(q) |
| if q_norm == 0: |
| return valid_candidates[:top_k] |
|
|
| vectors_np = np.stack(vectors) |
| vec_norms = np.linalg.norm(vectors_np, axis=1) |
| query_sims = (vectors_np @ q) / (vec_norms * q_norm) |
|
|
| selected_idx: List[int] = [] |
| candidate_idx = list(range(len(valid_candidates))) |
|
|
| first_idx = int(np.argmax(query_sims)) |
| selected_idx.append(first_idx) |
| candidate_idx.remove(first_idx) |
|
|
| while candidate_idx and len(selected_idx) < top_k: |
| best_idx = None |
| best_score = -1e9 |
|
|
| for idx in candidate_idx: |
| relevance = float(query_sims[idx]) |
| diversity = max( |
| float(np.dot(vectors_np[idx], vectors_np[s]) / (vec_norms[idx] * vec_norms[s])) |
| for s in selected_idx |
| ) |
| mmr_score = lambda_param * relevance - (1.0 - lambda_param) * diversity |
| if mmr_score > best_score: |
| best_score = mmr_score |
| best_idx = idx |
|
|
| if best_idx is None: |
| break |
|
|
| selected_idx.append(best_idx) |
| candidate_idx.remove(best_idx) |
|
|
| return [valid_candidates[i] for i in selected_idx] |
|
|