Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from openai import OpenAI | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| import os, random, tempfile, json | |
| import re | |
| import threading | |
| import uuid | |
| import csv | |
| # --- API設定 --- | |
| API_KEY = os.getenv("API_KEY") | |
| BASE_URL = "https://openrouter.ai/api/v1" | |
| LOG_FILE = os.getenv("LOG_FILE", "logs.csv") | |
| # ✅ 管理者DL用パスワード(環境変数で設定) | |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "0421") | |
| client = OpenAI(base_url=BASE_URL, api_key=API_KEY) | |
| # --- 外部CSVとして管理する passage データ --- | |
| # passage.csv: passage_id,level,text,question,choices,answer の順 | |
| passages_df = pd.read_csv("passage.csv") | |
| levels = [1, 2, 3, 4, 5] | |
| # ✅ ログの同時書き込みガード(ログは全ユーザーで共有) | |
| _log_lock = threading.Lock() | |
| # ========================= | |
| # ✅ ログ列(指定順) | |
| # ========================= | |
| LOG_COLUMNS = [ | |
| "user_id", | |
| "question_number", | |
| "reading_level", | |
| "passage_id", | |
| "question", | |
| "choice", | |
| "correct", | |
| "time", | |
| "actions", | |
| ] | |
| # ========================= | |
| # ✅ 文字化け対策(方法②:ASCII 正規化) | |
| # ========================= | |
| def normalize_text_for_csv(text: str) -> str: | |
| if not text: | |
| return text | |
| return ( | |
| text.replace("—", "-") | |
| .replace("–", "-") | |
| .replace("’", "'") | |
| .replace("‘", "'") | |
| .replace("“", '"') | |
| .replace("”", '"') | |
| .replace("\u00a0", " ") # NBSP 対策 | |
| ) | |
| # --- セッション state 初期化 --- | |
| def _new_session_state(): | |
| return { | |
| "session_id": str(uuid.uuid4()), | |
| "used_passages": set(), | |
| "question_count": 0, # ✅ 採点済み問題数 | |
| "user_id": None, | |
| "action_log": [], # ✅ 1問中の選択変更履歴(submit時にまとめて保存) | |
| # ✅ CSVから読んだ正解(A/B/C/D)を保持 | |
| "current_answer": None, | |
| } | |
| # --- ログ追記(CSVのみ・固定カラムで統一) --- | |
| def log_to_csv(entry: dict): | |
| # ✅ pandas -> csv.writer に置換(列順は LOG_COLUMNS で保証) | |
| with _log_lock: | |
| file_exists = os.path.exists(LOG_FILE) | |
| with open(LOG_FILE, mode="a", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=LOG_COLUMNS) | |
| if not file_exists: | |
| writer.writeheader() | |
| row = {col: entry.get(col, None) for col in LOG_COLUMNS} | |
| writer.writerow(row) | |
| def log_event(state: dict, **kwargs): | |
| # ✅ Excelで#####になりにくいよう「JST」を付けてテキスト化 | |
| now = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y-%m-%d %H:%M:%S") + " JST" | |
| # ✅ 文字化け対策(CSVに書く直前で正規化) | |
| if "question" in kwargs and isinstance(kwargs["question"], str): | |
| kwargs["question"] = normalize_text_for_csv(kwargs["question"]) | |
| if "actions" in kwargs and isinstance(kwargs["actions"], str): | |
| kwargs["actions"] = normalize_text_for_csv(kwargs["actions"]) | |
| if "choice" in kwargs and isinstance(kwargs["choice"], str): | |
| kwargs["choice"] = normalize_text_for_csv(kwargs["choice"]) | |
| entry = {"time": now, "user_id": state.get("user_id"), **kwargs} | |
| log_to_csv(entry) | |
| # ========================= | |
| # ✅ choices列を柔軟にパース | |
| # - JSON配列: ["...", "...", "...", "..."] | |
| # - 改行区切り / | / || / ; などにも対応(できるだけ) | |
| # ========================= | |
| def _parse_choices(raw): | |
| if raw is None: | |
| return [] | |
| s = str(raw).strip() | |
| if s == "": | |
| return [] | |
| # JSONっぽい場合 | |
| if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")): | |
| try: | |
| obj = json.loads(s) | |
| if isinstance(obj, list): | |
| return [str(x).strip() for x in obj] | |
| except Exception: | |
| pass | |
| # それ以外は区切り推定 | |
| # 優先: 改行 -> || -> | -> ; -> , | |
| if "\n" in s: | |
| parts = [p.strip() for p in s.split("\n") if p.strip()] | |
| return parts | |
| if "||" in s: | |
| parts = [p.strip() for p in s.split("||") if p.strip()] | |
| return parts | |
| if "|" in s: | |
| parts = [p.strip() for p in s.split("|") if p.strip()] | |
| return parts | |
| if ";" in s: | |
| parts = [p.strip() for p in s.split(";") if p.strip()] | |
| return parts | |
| if "," in s: | |
| parts = [p.strip() for p in s.split(",") if p.strip()] | |
| return parts | |
| return [s] | |
| def _format_question_block(question, choices_list): | |
| # choices を A-D に整形して question_display に入れる | |
| labels = ["A", "B", "C", "D"] | |
| # 4つ未満/超過でも崩れにくくする | |
| choices = list(choices_list)[:4] | |
| while len(choices) < 4: | |
| choices.append("") | |
| lines = [] | |
| q = "" if question is None else str(question).strip() | |
| lines.append(f"Q: {q}") | |
| for lab, opt in zip(labels, choices): | |
| opt_s = "" if opt is None else str(opt).strip() | |
| lines.append(f"{lab}. {opt_s}") | |
| return "\n".join(lines) | |
| # --- 自動難易度調整 --- | |
| def adaptive_test(prev_level, prev_correct): | |
| idx = levels.index(prev_level) | |
| if prev_correct and idx < len(levels) - 1: | |
| return levels[idx + 1] | |
| elif not prev_correct and idx > 0: | |
| return levels[idx - 1] | |
| return prev_level | |
| # --- passage取得(✅ CSVから question/choices/answer も返す) --- | |
| def get_passage(level, used_passages): | |
| subset = passages_df[passages_df["level"] == level] | |
| available = [pid for pid in subset["passage_id"] if pid not in used_passages] | |
| if not available: | |
| available = list(subset["passage_id"]) | |
| passage_id = random.choice(available) | |
| row = subset[subset["passage_id"] == passage_id].iloc[0] | |
| text = row["text"] | |
| question = row["question"] | |
| choices_raw = row["choices"] | |
| answer = row["answer"] | |
| choices_list = _parse_choices(choices_raw) | |
| question_block = _format_question_block(question, choices_list) | |
| # answer は A/B/C/D を想定(念のため整形) | |
| ans = "" if answer is None else str(answer).strip().upper() | |
| ans = ans[:1] # "A" など1文字に寄せる | |
| return passage_id, text, question_block, ans | |
| # --- 開始ボタン動作(✅ start時だけログ) --- | |
| def start_test(student_id, state): | |
| state = _new_session_state() | |
| if not student_id or student_id.strip() == "": | |
| return ( | |
| state, | |
| "", "", 0, "", None, | |
| "⚠️ 学生番号を入力してからテストを開始してください", | |
| False, "", "", "", | |
| gr.update(interactive=True), # Student ID 入力:有効のまま | |
| gr.update(interactive=True), # Start Test ボタン:有効のまま | |
| ) | |
| state["user_id"] = student_id.strip() | |
| level = 3 | |
| passage_id, text, question_block, ans = get_passage(level, state["used_passages"]) | |
| state["used_passages"].add(passage_id) | |
| # ✅ この問題の正解を state に保持 | |
| state["current_answer"] = ans | |
| displayed_time = datetime.utcnow() + timedelta(hours=9) | |
| # ✅ startログ:actions列に "start test pushed" を入れる | |
| log_event( | |
| state, | |
| question_number=None, | |
| reading_level=None, | |
| passage_id=None, | |
| question=None, | |
| choice=None, | |
| correct=None, | |
| actions="start test pushed", | |
| ) | |
| return ( | |
| state, | |
| text, question_block, level, passage_id, None, | |
| "", True, displayed_time.isoformat(), 1, state["user_id"], | |
| gr.update(interactive=False), # ✅ Student ID 入力:無効化(リロードまで) | |
| gr.update(interactive=False), # ✅ Start Test ボタン:無効化(リロードまで) | |
| ) | |
| # --- 選択肢変更(✅ CSVには書かない。actionsにだけ溜める) --- | |
| def log_choice_change(choice, question_number, user_id, state): | |
| if not isinstance(state, dict): | |
| state = _new_session_state() | |
| if choice: | |
| t_iso = (datetime.utcnow() + timedelta(hours=9)).isoformat() | |
| state["action_log"].append({"action": "choice", "choice": choice, "time": t_iso}) | |
| return state | |
| # --- 回答送信(✅ submit時だけログ) --- | |
| def next_step(prev_level, user_answer, question_text, passage_text, | |
| displayed_time, question_number, user_id, passage_id, state): | |
| if not isinstance(state, dict): | |
| state = _new_session_state() | |
| if not user_answer: | |
| return ( | |
| state, | |
| "⚠️ Please select an answer!", passage_text, question_text, prev_level, | |
| None, "", True, displayed_time, question_number, user_id, passage_id | |
| ) | |
| submit_time = datetime.utcnow() + timedelta(hours=9) | |
| state["question_count"] += 1 | |
| # ✅ 変更点①:「わからない」は例外なく不正解 | |
| if str(user_answer).strip() == "わからない": | |
| correct = False | |
| else: | |
| # ✅ CSVから読んだ正解(A/B/C/D)と比較(AI採点はしない) | |
| correct_answer = (state.get("current_answer") or "").strip().upper() | |
| correct = (str(user_answer).strip().upper() == correct_answer) | |
| new_level = adaptive_test(prev_level, correct) | |
| # ✅ submitログ(ここだけ記録) | |
| log_event( | |
| state, | |
| question_number=state["question_count"], | |
| reading_level=prev_level, | |
| passage_id=passage_id, | |
| question=question_text, | |
| choice=user_answer, | |
| correct=correct, | |
| actions=json.dumps(state["action_log"], ensure_ascii=False), | |
| ) | |
| # ✅ 最終問題なら結果だけ大きく表示(仕様そのまま) | |
| if state["question_count"] >= 5: | |
| final_level = prev_level if correct else new_level | |
| return ( | |
| state, | |
| f"<h1>🎯 Your Reading level: <strong>Level {final_level}</strong></h1>", | |
| "", "", final_level, | |
| None, "", False, "", "", user_id, passage_id | |
| ) | |
| # --- 次の問題へ --- | |
| next_passage_id, next_text, next_question_block, next_ans = get_passage(new_level, state["used_passages"]) | |
| state["used_passages"].add(next_passage_id) | |
| # ✅ 次の問題の正解を state に保持 | |
| state["current_answer"] = next_ans | |
| next_display_time = datetime.utcnow() + timedelta(hours=9) | |
| # ✅ 次の問題のために選択変更履歴をリセット | |
| state["action_log"] = [] | |
| feedback = "✅ Correct!" if correct else "❌ Incorrect." | |
| return ( | |
| state, | |
| feedback + "\n➡️ Loading next question…", | |
| next_text, next_question_block, new_level, | |
| None, "", True, next_display_time.isoformat(), state["question_count"] + 1, user_id, next_passage_id | |
| ) | |
| # --- 管理者DL --- | |
| def download_logs(admin_password): | |
| if not ADMIN_PASSWORD: | |
| return None, "⚠️ ADMIN_PASSWORD が設定されていません(環境変数で設定してください)" | |
| if (admin_password or "") != ADMIN_PASSWORD: | |
| return None, "❌ Password incorrect." | |
| with _log_lock: | |
| if not os.path.exists(LOG_FILE): | |
| return None, "⚠️ logs.csv がまだ存在しません(ログが1件もありません)" | |
| tmp_dir = tempfile.mkdtemp() | |
| # ✅ 変更点:DL用ファイル名を日付・時間付きにする(JST) | |
| ts = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y%m%d_%H%M") | |
| out_path = os.path.join(tmp_dir, f"test_logs_{ts}.csv") | |
| with open(LOG_FILE, "rb") as fsrc, open(out_path, "wb") as fdst: | |
| fdst.write(fsrc.read()) | |
| return out_path, "✅ logs.csv is ready." | |
| # ========================= | |
| # ✅ 追加:Student ID の入力に応じて Start Test の有効/無効を切替 | |
| # ========================= | |
| def toggle_start_button(student_id): | |
| ok = bool((student_id or "").strip()) | |
| return gr.update(interactive=ok) | |
| # --- Gradio UI --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 📘 Reading Level Test") | |
| session_state = gr.State(_new_session_state()) | |
| student_id_input = gr.Textbox(label="Student ID", placeholder="例: B123456") | |
| # ✅ 初期状態は押せない(Student ID が空だから) | |
| start_btn = gr.Button("▶️ Start Test", interactive=False) | |
| # ✅ Student ID が入力されたら Start Test を押せるようにする | |
| student_id_input.change( | |
| fn=toggle_start_button, | |
| inputs=[student_id_input], | |
| outputs=[start_btn] | |
| ) | |
| text_display = gr.Textbox(label="Reading Passage", lines=15, interactive=False) | |
| question_display = gr.Textbox(label="Question", lines=6, interactive=False) | |
| # ✅ 変更点②:共通選択肢「わからない」を追加(ログ列は触らない) | |
| user_answer = gr.Radio(["A", "B", "C", "D", "わからない"], label="Your Answer") | |
| submit_btn = gr.Button("Submit Answer") | |
| feedback_display = gr.Markdown() | |
| hidden_level = gr.Number(visible=False) | |
| hidden_passage = gr.Textbox(visible=False) # 互換用(未使用) | |
| hidden_display_time = gr.Textbox(visible=False) | |
| hidden_question_number = gr.Number(visible=False) | |
| hidden_user_id = gr.Textbox(visible=False) | |
| hidden_passage_id = gr.Textbox(visible=False) | |
| test_visible = gr.State(False) | |
| start_btn.click( | |
| fn=start_test, | |
| inputs=[student_id_input, session_state], | |
| outputs=[ | |
| session_state, | |
| text_display, question_display, hidden_level, hidden_passage_id, user_answer, | |
| feedback_display, test_visible, hidden_display_time, | |
| hidden_question_number, hidden_user_id, | |
| student_id_input, # ✅ 追加:Start後に入力欄を無効化 | |
| start_btn, # ✅ 追加:Start後にボタンを無効化 | |
| ] | |
| ) | |
| # ✅ 選択肢変更はCSVに書かず、actionsにだけ溜める | |
| user_answer.change( | |
| fn=log_choice_change, | |
| inputs=[user_answer, hidden_question_number, hidden_user_id, session_state], | |
| outputs=[session_state] | |
| ) | |
| submit_btn.click( | |
| fn=next_step, | |
| inputs=[hidden_level, user_answer, question_display, text_display, | |
| hidden_display_time, hidden_question_number, hidden_user_id, hidden_passage_id, session_state], | |
| outputs=[ | |
| session_state, | |
| feedback_display, text_display, question_display, hidden_level, | |
| user_answer, hidden_passage, test_visible, hidden_display_time, | |
| hidden_question_number, hidden_user_id, hidden_passage_id | |
| ] | |
| ) | |
| def toggle_visibility(show): | |
| v = bool(show) | |
| return ( | |
| gr.update(visible=v), gr.update(visible=v), | |
| gr.update(visible=v), gr.update(visible=v) | |
| ) | |
| test_visible.change( | |
| fn=toggle_visibility, | |
| inputs=test_visible, | |
| outputs=[text_display, question_display, user_answer, submit_btn] | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("## 🔒 Admin: Download logs (since last restart)") | |
| admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter admin password") | |
| dl_btn = gr.Button("⬇️ Download logs.csv") | |
| dl_file = gr.File(label="logs.csv (download)") | |
| dl_msg = gr.Markdown() | |
| dl_btn.click( | |
| fn=download_logs, | |
| inputs=[admin_pw], | |
| outputs=[dl_file, dl_msg] | |
| ) | |
| # ✅ 追加:queue(同時アクセス耐性UP) | |
| demo.queue(max_size=64).launch() | |