File size: 4,919 Bytes
3256847 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
from __future__ import annotations
import json
import random
import threading
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import contextvars
DATA_DIR = Path(__file__).parent / "data"
STUDENTS_PATH = DATA_DIR / "students.json"
TOPICS_PATH = DATA_DIR / "topics.json"
SESSIONS_PATH = DATA_DIR / "sessions.json"
RESULTS_PATH = DATA_DIR / "results.json"
_FILE_LOCK = threading.Lock()
_current_session_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"current_session_id", default=None
)
_LAST_SESSION_ID: Optional[str] = None
class StudentNotFoundError(Exception):
pass
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _read_json(path: Path, default: Any) -> Any:
with _FILE_LOCK:
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(default, ensure_ascii=False, indent=2), encoding="utf-8")
return default
txt = path.read_text(encoding="utf-8").strip()
if not txt:
return default
return json.loads(txt)
def _write_json(path: Path, data: Any) -> None:
with _FILE_LOCK:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def ensure_data_files() -> None:
_read_json(STUDENTS_PATH, [])
_read_json(TOPICS_PATH, [])
_read_json(SESSIONS_PATH, {})
_read_json(RESULTS_PATH, [])
@dataclass
class Message:
role: str
content: str
datetime: str
def to_dict(self) -> Dict[str, str]:
return {"role": self.role, "content": self.content, "datetime": self.datetime}
def set_current_session(session_id: Optional[str]) -> None:
_current_session_id.set(session_id)
def get_last_session_id() -> Optional[str]:
return _LAST_SESSION_ID
def _load_students() -> List[Dict[str, str]]:
return _read_json(STUDENTS_PATH, [])
def _load_topics() -> List[str]:
return _read_json(TOPICS_PATH, [])
def _load_sessions() -> Dict[str, Any]:
return _read_json(SESSIONS_PATH, {})
def _save_sessions(sessions: Dict[str, Any]) -> None:
_write_json(SESSIONS_PATH, sessions)
def _append_result(record: Dict[str, Any]) -> None:
results = _read_json(RESULTS_PATH, [])
if not isinstance(results, list):
results = []
results.append(record)
_write_json(RESULTS_PATH, results)
def start_exam(email: str, name: str) -> List[str]:
global _LAST_SESSION_ID
ensure_data_files()
students = _load_students()
found = next((s for s in students if s.get("email", "").strip().lower() == email.strip().lower()), None)
if not found:
raise StudentNotFoundError(f"Student not found for email: {email}")
topics = _load_topics()
if len(topics) < 2:
topics = ["General knowledge: topic A", "General knowledge: topic B", "General knowledge: topic C"]
k = 2 if len(topics) < 3 else random.choice([2, 3])
selected = random.sample(topics, k=min(k, len(topics)))
sessions = _load_sessions()
session_id = str(uuid.uuid4())
sessions[session_id] = {
"session_id": session_id,
"email": email,
"name": name,
"topics_queue": selected,
"topics_all": selected,
"started_at": _utc_now_iso(),
"finished_at": None,
"status": "active",
}
_save_sessions(sessions)
_LAST_SESSION_ID = session_id
set_current_session(session_id)
return selected
def get_next_topic() -> str:
"""
IMPORTANT FIX:
Gradio callbacks may run in a different context, so contextvar can be empty.
We fallback to _LAST_SESSION_ID.
"""
ensure_data_files()
session_id = _current_session_id.get() or _LAST_SESSION_ID
if not session_id:
return ""
sessions = _load_sessions()
sess = sessions.get(session_id)
if not sess or sess.get("status") != "active":
return ""
queue = sess.get("topics_queue", [])
if not queue:
return ""
next_topic = queue.pop(0)
sess["topics_queue"] = queue
sessions[session_id] = sess
_save_sessions(sessions)
return str(next_topic)
def end_exam(email: str, score: float, history: List[Dict[str, str]]) -> None:
ensure_data_files()
sessions = _load_sessions()
for sid, sess in list(sessions.items()):
if sess.get("email", "").strip().lower() == email.strip().lower() and sess.get("status") == "active":
sess["status"] = "finished"
sess["finished_at"] = _utc_now_iso()
sessions[sid] = sess
_save_sessions(sessions)
record = {
"email": email,
"score": float(score),
"history": history,
"saved_at": _utc_now_iso(),
}
_append_result(record) |