File size: 4,919 Bytes
3256847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from __future__ import annotations

import json
import random
import threading
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import contextvars

DATA_DIR = Path(__file__).parent / "data"
STUDENTS_PATH = DATA_DIR / "students.json"
TOPICS_PATH = DATA_DIR / "topics.json"
SESSIONS_PATH = DATA_DIR / "sessions.json"
RESULTS_PATH = DATA_DIR / "results.json"

_FILE_LOCK = threading.Lock()

_current_session_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
    "current_session_id", default=None
)

_LAST_SESSION_ID: Optional[str] = None


class StudentNotFoundError(Exception):
    pass


def _utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def _read_json(path: Path, default: Any) -> Any:
    with _FILE_LOCK:
        if not path.exists():
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(json.dumps(default, ensure_ascii=False, indent=2), encoding="utf-8")
            return default
        txt = path.read_text(encoding="utf-8").strip()
        if not txt:
            return default
        return json.loads(txt)


def _write_json(path: Path, data: Any) -> None:
    with _FILE_LOCK:
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")


def ensure_data_files() -> None:
    _read_json(STUDENTS_PATH, [])
    _read_json(TOPICS_PATH, [])
    _read_json(SESSIONS_PATH, {})
    _read_json(RESULTS_PATH, [])


@dataclass
class Message:
    role: str  
    content: str
    datetime: str

    def to_dict(self) -> Dict[str, str]:
        return {"role": self.role, "content": self.content, "datetime": self.datetime}


def set_current_session(session_id: Optional[str]) -> None:
    _current_session_id.set(session_id)


def get_last_session_id() -> Optional[str]:
    return _LAST_SESSION_ID


def _load_students() -> List[Dict[str, str]]:
    return _read_json(STUDENTS_PATH, [])


def _load_topics() -> List[str]:
    return _read_json(TOPICS_PATH, [])


def _load_sessions() -> Dict[str, Any]:
    return _read_json(SESSIONS_PATH, {})


def _save_sessions(sessions: Dict[str, Any]) -> None:
    _write_json(SESSIONS_PATH, sessions)


def _append_result(record: Dict[str, Any]) -> None:
    results = _read_json(RESULTS_PATH, [])
    if not isinstance(results, list):
        results = []
    results.append(record)
    _write_json(RESULTS_PATH, results)


def start_exam(email: str, name: str) -> List[str]:
    global _LAST_SESSION_ID

    ensure_data_files()

    students = _load_students()
    found = next((s for s in students if s.get("email", "").strip().lower() == email.strip().lower()), None)
    if not found:
        raise StudentNotFoundError(f"Student not found for email: {email}")

    topics = _load_topics()
    if len(topics) < 2:
        topics = ["General knowledge: topic A", "General knowledge: topic B", "General knowledge: topic C"]

    k = 2 if len(topics) < 3 else random.choice([2, 3])
    selected = random.sample(topics, k=min(k, len(topics)))

    sessions = _load_sessions()
    session_id = str(uuid.uuid4())
    sessions[session_id] = {
        "session_id": session_id,
        "email": email,
        "name": name,
        "topics_queue": selected,
        "topics_all": selected,
        "started_at": _utc_now_iso(),
        "finished_at": None,
        "status": "active",
    }
    _save_sessions(sessions)

    _LAST_SESSION_ID = session_id
    set_current_session(session_id)

    return selected


def get_next_topic() -> str:
    """
    IMPORTANT FIX:
    Gradio callbacks may run in a different context, so contextvar can be empty.
    We fallback to _LAST_SESSION_ID.
    """
    ensure_data_files()

    session_id = _current_session_id.get() or _LAST_SESSION_ID
    if not session_id:
        return ""

    sessions = _load_sessions()
    sess = sessions.get(session_id)
    if not sess or sess.get("status") != "active":
        return ""

    queue = sess.get("topics_queue", [])
    if not queue:
        return ""

    next_topic = queue.pop(0)
    sess["topics_queue"] = queue
    sessions[session_id] = sess
    _save_sessions(sessions)
    return str(next_topic)


def end_exam(email: str, score: float, history: List[Dict[str, str]]) -> None:
    ensure_data_files()

    sessions = _load_sessions()
    for sid, sess in list(sessions.items()):
        if sess.get("email", "").strip().lower() == email.strip().lower() and sess.get("status") == "active":
            sess["status"] = "finished"
            sess["finished_at"] = _utc_now_iso()
            sessions[sid] = sess
    _save_sessions(sessions)

    record = {
        "email": email,
        "score": float(score),
        "history": history,
        "saved_at": _utc_now_iso(),
    }
    _append_result(record)