Spaces:
Sleeping
Sleeping
File size: 15,490 Bytes
d7e4bc5 0dadbc6 d5de4d3 96d0ac4 f3f0f55 6b8059c f3f0f55 3a71817 2942f38 d2b3910 d5de4d3 e88a1cd 0dadbc6 f3f0f55 b476bed 64c4f49 0dadbc6 d5de4d3 8417298 f3f0f55 025a5e0 d5de4d3 0dadbc6 d395b26 0dadbc6 b476bed f3f0f55 e88a1cd b34e274 b476bed b34e274 7652c73 b476bed 7652c73 e305068 7652c73 b476bed 3a71817 b476bed 3a71817 b476bed 025a5e0 3a71817 b476bed 3a71817 87957a7 3a71817 b34e274 87957a7 b34e274 b476bed b949c97 7652c73 b476bed 7652c73 8f72d36 b476bed 3a71817 025a5e0 7ad1f4d 025a5e0 7ad1f4d 025a5e0 d5de4d3 b476bed d5de4d3 025a5e0 f3f0f55 d395b26 d5de4d3 025a5e0 d5de4d3 025a5e0 d5de4d3 b476bed f3f0f55 d5de4d3 f3f0f55 cd5e898 d5de4d3 a985d73 d5de4d3 e93fdee f3f0f55 6d2b55a d395b26 025a5e0 f3f0f55 e88a1cd 025a5e0 b476bed d537405 b476bed 3a71817 6e7eeb8 b34e274 b949c97 3a71817 d5de4d3 f3f0f55 025a5e0 a985d73 d5de4d3 e93fdee b476bed 9d6390b b476bed d5de4d3 3a71817 b476bed f3f0f55 d5de4d3 b476bed d5de4d3 f3f0f55 d5de4d3 b476bed d5de4d3 f3f0f55 d5de4d3 e93fdee e88a1cd b476bed f3f0f55 b476bed 7ad1f4d 9fe09fc 025a5e0 b476bed d5de4d3 b476bed 3a71817 b34e274 3a71817 b34e274 3a71817 d5de4d3 b476bed f3f0f55 b476bed d5de4d3 f3f0f55 b476bed d5de4d3 e88a1cd b476bed 025a5e0 f3f0f55 e88a1cd 025a5e0 b476bed f3f0f55 e88a1cd b476bed d5de4d3 f3f0f55 b476bed 025a5e0 b476bed d5de4d3 ea3af2a b476bed f3f0f55 b476bed f3f0f55 b476bed f3f0f55 b476bed 1d4a481 aef5fd9 d4e8635 1d4a481 b476bed f3f0f55 c284b75 b476bed d5de4d3 70c9ef9 f3f0f55 b476bed c284b75 d7e4bc5 b476bed 9fe09fc 7ad1f4d 9fe09fc d5de4d3 e93fdee b476bed 0dadbc6 d5de4d3 b476bed e93fdee d5de4d3 b476bed a985d73 b476bed d5de4d3 d2b3910 b476bed d5de4d3 b476bed d5de4d3 e93fdee d5de4d3 b476bed d5de4d3 8022600 b476bed d7e4bc5 b476bed f3f0f55 7ad1f4d 87957a7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 | import gradio as gr
from openai import OpenAI
from datetime import datetime, timedelta
import pandas as pd
import os, random, tempfile, json
import re
import threading
import uuid
import csv
# --- API設定 ---
API_KEY = os.getenv("API_KEY")
BASE_URL = "https://openrouter.ai/api/v1"
LOG_FILE = os.getenv("LOG_FILE", "logs.csv")
# ✅ 管理者DL用パスワード(環境変数で設定)
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "0421")
client = OpenAI(base_url=BASE_URL, api_key=API_KEY)
# --- 外部CSVとして管理する passage データ ---
# passage.csv: passage_id,level,text,question,choices,answer の順
passages_df = pd.read_csv("passage.csv")
levels = [1, 2, 3, 4, 5]
# ✅ ログの同時書き込みガード(ログは全ユーザーで共有)
_log_lock = threading.Lock()
# =========================
# ✅ ログ列(指定順)
# =========================
LOG_COLUMNS = [
"user_id",
"question_number",
"reading_level",
"passage_id",
"question",
"choice",
"correct",
"time",
"actions",
]
# =========================
# ✅ 文字化け対策(方法②:ASCII 正規化)
# =========================
def normalize_text_for_csv(text: str) -> str:
if not text:
return text
return (
text.replace("—", "-")
.replace("–", "-")
.replace("’", "'")
.replace("‘", "'")
.replace("“", '"')
.replace("”", '"')
.replace("\u00a0", " ") # NBSP 対策
)
# --- セッション state 初期化 ---
def _new_session_state():
return {
"session_id": str(uuid.uuid4()),
"used_passages": set(),
"question_count": 0, # ✅ 採点済み問題数
"user_id": None,
"action_log": [], # ✅ 1問中の選択変更履歴(submit時にまとめて保存)
# ✅ CSVから読んだ正解(A/B/C/D)を保持
"current_answer": None,
}
# --- ログ追記(CSVのみ・固定カラムで統一) ---
def log_to_csv(entry: dict):
# ✅ pandas -> csv.writer に置換(列順は LOG_COLUMNS で保証)
with _log_lock:
file_exists = os.path.exists(LOG_FILE)
with open(LOG_FILE, mode="a", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=LOG_COLUMNS)
if not file_exists:
writer.writeheader()
row = {col: entry.get(col, None) for col in LOG_COLUMNS}
writer.writerow(row)
def log_event(state: dict, **kwargs):
# ✅ Excelで#####になりにくいよう「JST」を付けてテキスト化
now = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y-%m-%d %H:%M:%S") + " JST"
# ✅ 文字化け対策(CSVに書く直前で正規化)
if "question" in kwargs and isinstance(kwargs["question"], str):
kwargs["question"] = normalize_text_for_csv(kwargs["question"])
if "actions" in kwargs and isinstance(kwargs["actions"], str):
kwargs["actions"] = normalize_text_for_csv(kwargs["actions"])
if "choice" in kwargs and isinstance(kwargs["choice"], str):
kwargs["choice"] = normalize_text_for_csv(kwargs["choice"])
entry = {"time": now, "user_id": state.get("user_id"), **kwargs}
log_to_csv(entry)
# =========================
# ✅ choices列を柔軟にパース
# - JSON配列: ["...", "...", "...", "..."]
# - 改行区切り / | / || / ; などにも対応(できるだけ)
# =========================
def _parse_choices(raw):
if raw is None:
return []
s = str(raw).strip()
if s == "":
return []
# JSONっぽい場合
if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")):
try:
obj = json.loads(s)
if isinstance(obj, list):
return [str(x).strip() for x in obj]
except Exception:
pass
# それ以外は区切り推定
# 優先: 改行 -> || -> | -> ; -> ,
if "\n" in s:
parts = [p.strip() for p in s.split("\n") if p.strip()]
return parts
if "||" in s:
parts = [p.strip() for p in s.split("||") if p.strip()]
return parts
if "|" in s:
parts = [p.strip() for p in s.split("|") if p.strip()]
return parts
if ";" in s:
parts = [p.strip() for p in s.split(";") if p.strip()]
return parts
if "," in s:
parts = [p.strip() for p in s.split(",") if p.strip()]
return parts
return [s]
def _format_question_block(question, choices_list):
# choices を A-D に整形して question_display に入れる
labels = ["A", "B", "C", "D"]
# 4つ未満/超過でも崩れにくくする
choices = list(choices_list)[:4]
while len(choices) < 4:
choices.append("")
lines = []
q = "" if question is None else str(question).strip()
lines.append(f"Q: {q}")
for lab, opt in zip(labels, choices):
opt_s = "" if opt is None else str(opt).strip()
lines.append(f"{lab}. {opt_s}")
return "\n".join(lines)
# --- 自動難易度調整 ---
def adaptive_test(prev_level, prev_correct):
idx = levels.index(prev_level)
if prev_correct and idx < len(levels) - 1:
return levels[idx + 1]
elif not prev_correct and idx > 0:
return levels[idx - 1]
return prev_level
# --- passage取得(✅ CSVから question/choices/answer も返す) ---
def get_passage(level, used_passages):
subset = passages_df[passages_df["level"] == level]
available = [pid for pid in subset["passage_id"] if pid not in used_passages]
if not available:
available = list(subset["passage_id"])
passage_id = random.choice(available)
row = subset[subset["passage_id"] == passage_id].iloc[0]
text = row["text"]
question = row["question"]
choices_raw = row["choices"]
answer = row["answer"]
choices_list = _parse_choices(choices_raw)
question_block = _format_question_block(question, choices_list)
# answer は A/B/C/D を想定(念のため整形)
ans = "" if answer is None else str(answer).strip().upper()
ans = ans[:1] # "A" など1文字に寄せる
return passage_id, text, question_block, ans
# --- 開始ボタン動作(✅ start時だけログ) ---
def start_test(student_id, state):
state = _new_session_state()
if not student_id or student_id.strip() == "":
return (
state,
"", "", 0, "", None,
"⚠️ 学生番号を入力してからテストを開始してください",
False, "", "", "",
gr.update(interactive=True), # Student ID 入力:有効のまま
gr.update(interactive=True), # Start Test ボタン:有効のまま
)
state["user_id"] = student_id.strip()
level = 3
passage_id, text, question_block, ans = get_passage(level, state["used_passages"])
state["used_passages"].add(passage_id)
# ✅ この問題の正解を state に保持
state["current_answer"] = ans
displayed_time = datetime.utcnow() + timedelta(hours=9)
# ✅ startログ:actions列に "start test pushed" を入れる
log_event(
state,
question_number=None,
reading_level=None,
passage_id=None,
question=None,
choice=None,
correct=None,
actions="start test pushed",
)
return (
state,
text, question_block, level, passage_id, None,
"", True, displayed_time.isoformat(), 1, state["user_id"],
gr.update(interactive=False), # ✅ Student ID 入力:無効化(リロードまで)
gr.update(interactive=False), # ✅ Start Test ボタン:無効化(リロードまで)
)
# --- 選択肢変更(✅ CSVには書かない。actionsにだけ溜める) ---
def log_choice_change(choice, question_number, user_id, state):
if not isinstance(state, dict):
state = _new_session_state()
if choice:
t_iso = (datetime.utcnow() + timedelta(hours=9)).isoformat()
state["action_log"].append({"action": "choice", "choice": choice, "time": t_iso})
return state
# --- 回答送信(✅ submit時だけログ) ---
def next_step(prev_level, user_answer, question_text, passage_text,
displayed_time, question_number, user_id, passage_id, state):
if not isinstance(state, dict):
state = _new_session_state()
if not user_answer:
return (
state,
"⚠️ Please select an answer!", passage_text, question_text, prev_level,
None, "", True, displayed_time, question_number, user_id, passage_id
)
submit_time = datetime.utcnow() + timedelta(hours=9)
state["question_count"] += 1
# ✅ 変更点①:「わからない」は例外なく不正解
if str(user_answer).strip() == "わからない":
correct = False
else:
# ✅ CSVから読んだ正解(A/B/C/D)と比較(AI採点はしない)
correct_answer = (state.get("current_answer") or "").strip().upper()
correct = (str(user_answer).strip().upper() == correct_answer)
new_level = adaptive_test(prev_level, correct)
# ✅ submitログ(ここだけ記録)
log_event(
state,
question_number=state["question_count"],
reading_level=prev_level,
passage_id=passage_id,
question=question_text,
choice=user_answer,
correct=correct,
actions=json.dumps(state["action_log"], ensure_ascii=False),
)
# ✅ 最終問題なら結果だけ大きく表示(仕様そのまま)
if state["question_count"] >= 5:
final_level = prev_level if correct else new_level
return (
state,
f"<h1>🎯 Your Reading level: <strong>Level {final_level}</strong></h1>",
"", "", final_level,
None, "", False, "", "", user_id, passage_id
)
# --- 次の問題へ ---
next_passage_id, next_text, next_question_block, next_ans = get_passage(new_level, state["used_passages"])
state["used_passages"].add(next_passage_id)
# ✅ 次の問題の正解を state に保持
state["current_answer"] = next_ans
next_display_time = datetime.utcnow() + timedelta(hours=9)
# ✅ 次の問題のために選択変更履歴をリセット
state["action_log"] = []
feedback = "✅ Correct!" if correct else "❌ Incorrect."
return (
state,
feedback + "\n➡️ Loading next question…",
next_text, next_question_block, new_level,
None, "", True, next_display_time.isoformat(), state["question_count"] + 1, user_id, next_passage_id
)
# --- 管理者DL ---
def download_logs(admin_password):
if not ADMIN_PASSWORD:
return None, "⚠️ ADMIN_PASSWORD が設定されていません(環境変数で設定してください)"
if (admin_password or "") != ADMIN_PASSWORD:
return None, "❌ Password incorrect."
with _log_lock:
if not os.path.exists(LOG_FILE):
return None, "⚠️ logs.csv がまだ存在しません(ログが1件もありません)"
tmp_dir = tempfile.mkdtemp()
# ✅ 変更点:DL用ファイル名を日付・時間付きにする(JST)
ts = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y%m%d_%H%M")
out_path = os.path.join(tmp_dir, f"test_logs_{ts}.csv")
with open(LOG_FILE, "rb") as fsrc, open(out_path, "wb") as fdst:
fdst.write(fsrc.read())
return out_path, "✅ logs.csv is ready."
# =========================
# ✅ 追加:Student ID の入力に応じて Start Test の有効/無効を切替
# =========================
def toggle_start_button(student_id):
ok = bool((student_id or "").strip())
return gr.update(interactive=ok)
# --- Gradio UI ---
with gr.Blocks() as demo:
gr.Markdown("# 📘 Reading Level Test")
session_state = gr.State(_new_session_state())
student_id_input = gr.Textbox(label="Student ID", placeholder="例: B123456")
# ✅ 初期状態は押せない(Student ID が空だから)
start_btn = gr.Button("▶️ Start Test", interactive=False)
# ✅ Student ID が入力されたら Start Test を押せるようにする
student_id_input.change(
fn=toggle_start_button,
inputs=[student_id_input],
outputs=[start_btn]
)
text_display = gr.Textbox(label="Reading Passage", lines=15, interactive=False)
question_display = gr.Textbox(label="Question", lines=6, interactive=False)
# ✅ 変更点②:共通選択肢「わからない」を追加(ログ列は触らない)
user_answer = gr.Radio(["A", "B", "C", "D", "わからない"], label="Your Answer")
submit_btn = gr.Button("Submit Answer")
feedback_display = gr.Markdown()
hidden_level = gr.Number(visible=False)
hidden_passage = gr.Textbox(visible=False) # 互換用(未使用)
hidden_display_time = gr.Textbox(visible=False)
hidden_question_number = gr.Number(visible=False)
hidden_user_id = gr.Textbox(visible=False)
hidden_passage_id = gr.Textbox(visible=False)
test_visible = gr.State(False)
start_btn.click(
fn=start_test,
inputs=[student_id_input, session_state],
outputs=[
session_state,
text_display, question_display, hidden_level, hidden_passage_id, user_answer,
feedback_display, test_visible, hidden_display_time,
hidden_question_number, hidden_user_id,
student_id_input, # ✅ 追加:Start後に入力欄を無効化
start_btn, # ✅ 追加:Start後にボタンを無効化
]
)
# ✅ 選択肢変更はCSVに書かず、actionsにだけ溜める
user_answer.change(
fn=log_choice_change,
inputs=[user_answer, hidden_question_number, hidden_user_id, session_state],
outputs=[session_state]
)
submit_btn.click(
fn=next_step,
inputs=[hidden_level, user_answer, question_display, text_display,
hidden_display_time, hidden_question_number, hidden_user_id, hidden_passage_id, session_state],
outputs=[
session_state,
feedback_display, text_display, question_display, hidden_level,
user_answer, hidden_passage, test_visible, hidden_display_time,
hidden_question_number, hidden_user_id, hidden_passage_id
]
)
def toggle_visibility(show):
v = bool(show)
return (
gr.update(visible=v), gr.update(visible=v),
gr.update(visible=v), gr.update(visible=v)
)
test_visible.change(
fn=toggle_visibility,
inputs=test_visible,
outputs=[text_display, question_display, user_answer, submit_btn]
)
gr.Markdown("---")
gr.Markdown("## 🔒 Admin: Download logs (since last restart)")
admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter admin password")
dl_btn = gr.Button("⬇️ Download logs.csv")
dl_file = gr.File(label="logs.csv (download)")
dl_msg = gr.Markdown()
dl_btn.click(
fn=download_logs,
inputs=[admin_pw],
outputs=[dl_file, dl_msg]
)
# ✅ 追加:queue(同時アクセス耐性UP)
demo.queue(max_size=64).launch()
|