import gradio as gr from openai import OpenAI from datetime import datetime, timedelta import pandas as pd import os, random, tempfile, json import re import threading import uuid import csv # --- API設定 --- API_KEY = os.getenv("API_KEY") BASE_URL = "https://openrouter.ai/api/v1" LOG_FILE = os.getenv("LOG_FILE", "logs.csv") # ✅ 管理者DL用パスワード(環境変数で設定) ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "0421") client = OpenAI(base_url=BASE_URL, api_key=API_KEY) # --- 外部CSVとして管理する passage データ --- # passage.csv: passage_id,level,text,question,choices,answer の順 passages_df = pd.read_csv("passage.csv") levels = [1, 2, 3, 4, 5] # ✅ ログの同時書き込みガード(ログは全ユーザーで共有) _log_lock = threading.Lock() # ========================= # ✅ ログ列(指定順) # ========================= LOG_COLUMNS = [ "user_id", "question_number", "reading_level", "passage_id", "question", "choice", "correct", "time", "actions", ] # ========================= # ✅ 文字化け対策(方法②:ASCII 正規化) # ========================= def normalize_text_for_csv(text: str) -> str: if not text: return text return ( text.replace("—", "-") .replace("–", "-") .replace("’", "'") .replace("‘", "'") .replace("“", '"') .replace("”", '"') .replace("\u00a0", " ") # NBSP 対策 ) # --- セッション state 初期化 --- def _new_session_state(): return { "session_id": str(uuid.uuid4()), "used_passages": set(), "question_count": 0, # ✅ 採点済み問題数 "user_id": None, "action_log": [], # ✅ 1問中の選択変更履歴(submit時にまとめて保存) # ✅ CSVから読んだ正解(A/B/C/D)を保持 "current_answer": None, } # --- ログ追記(CSVのみ・固定カラムで統一) --- def log_to_csv(entry: dict): # ✅ pandas -> csv.writer に置換(列順は LOG_COLUMNS で保証) with _log_lock: file_exists = os.path.exists(LOG_FILE) with open(LOG_FILE, mode="a", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=LOG_COLUMNS) if not file_exists: writer.writeheader() row = {col: entry.get(col, None) for col in LOG_COLUMNS} writer.writerow(row) def log_event(state: dict, **kwargs): # ✅ Excelで#####になりにくいよう「JST」を付けてテキスト化 now = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y-%m-%d %H:%M:%S") + " JST" # ✅ 文字化け対策(CSVに書く直前で正規化) if "question" in kwargs and isinstance(kwargs["question"], str): kwargs["question"] = normalize_text_for_csv(kwargs["question"]) if "actions" in kwargs and isinstance(kwargs["actions"], str): kwargs["actions"] = normalize_text_for_csv(kwargs["actions"]) if "choice" in kwargs and isinstance(kwargs["choice"], str): kwargs["choice"] = normalize_text_for_csv(kwargs["choice"]) entry = {"time": now, "user_id": state.get("user_id"), **kwargs} log_to_csv(entry) # ========================= # ✅ choices列を柔軟にパース # - JSON配列: ["...", "...", "...", "..."] # - 改行区切り / | / || / ; などにも対応(できるだけ) # ========================= def _parse_choices(raw): if raw is None: return [] s = str(raw).strip() if s == "": return [] # JSONっぽい場合 if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")): try: obj = json.loads(s) if isinstance(obj, list): return [str(x).strip() for x in obj] except Exception: pass # それ以外は区切り推定 # 優先: 改行 -> || -> | -> ; -> , if "\n" in s: parts = [p.strip() for p in s.split("\n") if p.strip()] return parts if "||" in s: parts = [p.strip() for p in s.split("||") if p.strip()] return parts if "|" in s: parts = [p.strip() for p in s.split("|") if p.strip()] return parts if ";" in s: parts = [p.strip() for p in s.split(";") if p.strip()] return parts if "," in s: parts = [p.strip() for p in s.split(",") if p.strip()] return parts return [s] def _format_question_block(question, choices_list): # choices を A-D に整形して question_display に入れる labels = ["A", "B", "C", "D"] # 4つ未満/超過でも崩れにくくする choices = list(choices_list)[:4] while len(choices) < 4: choices.append("") lines = [] q = "" if question is None else str(question).strip() lines.append(f"Q: {q}") for lab, opt in zip(labels, choices): opt_s = "" if opt is None else str(opt).strip() lines.append(f"{lab}. {opt_s}") return "\n".join(lines) # --- 自動難易度調整 --- def adaptive_test(prev_level, prev_correct): idx = levels.index(prev_level) if prev_correct and idx < len(levels) - 1: return levels[idx + 1] elif not prev_correct and idx > 0: return levels[idx - 1] return prev_level # --- passage取得(✅ CSVから question/choices/answer も返す) --- def get_passage(level, used_passages): subset = passages_df[passages_df["level"] == level] available = [pid for pid in subset["passage_id"] if pid not in used_passages] if not available: available = list(subset["passage_id"]) passage_id = random.choice(available) row = subset[subset["passage_id"] == passage_id].iloc[0] text = row["text"] question = row["question"] choices_raw = row["choices"] answer = row["answer"] choices_list = _parse_choices(choices_raw) question_block = _format_question_block(question, choices_list) # answer は A/B/C/D を想定(念のため整形) ans = "" if answer is None else str(answer).strip().upper() ans = ans[:1] # "A" など1文字に寄せる return passage_id, text, question_block, ans # --- 開始ボタン動作(✅ start時だけログ) --- def start_test(student_id, state): state = _new_session_state() if not student_id or student_id.strip() == "": return ( state, "", "", 0, "", None, "⚠️ 学生番号を入力してからテストを開始してください", False, "", "", "", gr.update(interactive=True), # Student ID 入力:有効のまま gr.update(interactive=True), # Start Test ボタン:有効のまま ) state["user_id"] = student_id.strip() level = 3 passage_id, text, question_block, ans = get_passage(level, state["used_passages"]) state["used_passages"].add(passage_id) # ✅ この問題の正解を state に保持 state["current_answer"] = ans displayed_time = datetime.utcnow() + timedelta(hours=9) # ✅ startログ:actions列に "start test pushed" を入れる log_event( state, question_number=None, reading_level=None, passage_id=None, question=None, choice=None, correct=None, actions="start test pushed", ) return ( state, text, question_block, level, passage_id, None, "", True, displayed_time.isoformat(), 1, state["user_id"], gr.update(interactive=False), # ✅ Student ID 入力:無効化(リロードまで) gr.update(interactive=False), # ✅ Start Test ボタン:無効化(リロードまで) ) # --- 選択肢変更(✅ CSVには書かない。actionsにだけ溜める) --- def log_choice_change(choice, question_number, user_id, state): if not isinstance(state, dict): state = _new_session_state() if choice: t_iso = (datetime.utcnow() + timedelta(hours=9)).isoformat() state["action_log"].append({"action": "choice", "choice": choice, "time": t_iso}) return state # --- 回答送信(✅ submit時だけログ) --- def next_step(prev_level, user_answer, question_text, passage_text, displayed_time, question_number, user_id, passage_id, state): if not isinstance(state, dict): state = _new_session_state() if not user_answer: return ( state, "⚠️ Please select an answer!", passage_text, question_text, prev_level, None, "", True, displayed_time, question_number, user_id, passage_id ) submit_time = datetime.utcnow() + timedelta(hours=9) state["question_count"] += 1 # ✅ 変更点①:「わからない」は例外なく不正解 if str(user_answer).strip() == "わからない": correct = False else: # ✅ CSVから読んだ正解(A/B/C/D)と比較(AI採点はしない) correct_answer = (state.get("current_answer") or "").strip().upper() correct = (str(user_answer).strip().upper() == correct_answer) new_level = adaptive_test(prev_level, correct) # ✅ submitログ(ここだけ記録) log_event( state, question_number=state["question_count"], reading_level=prev_level, passage_id=passage_id, question=question_text, choice=user_answer, correct=correct, actions=json.dumps(state["action_log"], ensure_ascii=False), ) # ✅ 最終問題なら結果だけ大きく表示(仕様そのまま) if state["question_count"] >= 5: final_level = prev_level if correct else new_level return ( state, f"

🎯 Your Reading level: Level {final_level}

", "", "", final_level, None, "", False, "", "", user_id, passage_id ) # --- 次の問題へ --- next_passage_id, next_text, next_question_block, next_ans = get_passage(new_level, state["used_passages"]) state["used_passages"].add(next_passage_id) # ✅ 次の問題の正解を state に保持 state["current_answer"] = next_ans next_display_time = datetime.utcnow() + timedelta(hours=9) # ✅ 次の問題のために選択変更履歴をリセット state["action_log"] = [] feedback = "✅ Correct!" if correct else "❌ Incorrect." return ( state, feedback + "\n➡️ Loading next question…", next_text, next_question_block, new_level, None, "", True, next_display_time.isoformat(), state["question_count"] + 1, user_id, next_passage_id ) # --- 管理者DL --- def download_logs(admin_password): if not ADMIN_PASSWORD: return None, "⚠️ ADMIN_PASSWORD が設定されていません(環境変数で設定してください)" if (admin_password or "") != ADMIN_PASSWORD: return None, "❌ Password incorrect." with _log_lock: if not os.path.exists(LOG_FILE): return None, "⚠️ logs.csv がまだ存在しません(ログが1件もありません)" tmp_dir = tempfile.mkdtemp() # ✅ 変更点:DL用ファイル名を日付・時間付きにする(JST) ts = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y%m%d_%H%M") out_path = os.path.join(tmp_dir, f"test_logs_{ts}.csv") with open(LOG_FILE, "rb") as fsrc, open(out_path, "wb") as fdst: fdst.write(fsrc.read()) return out_path, "✅ logs.csv is ready." # ========================= # ✅ 追加:Student ID の入力に応じて Start Test の有効/無効を切替 # ========================= def toggle_start_button(student_id): ok = bool((student_id or "").strip()) return gr.update(interactive=ok) # --- Gradio UI --- with gr.Blocks() as demo: gr.Markdown("# 📘 Reading Level Test") session_state = gr.State(_new_session_state()) student_id_input = gr.Textbox(label="Student ID", placeholder="例: B123456") # ✅ 初期状態は押せない(Student ID が空だから) start_btn = gr.Button("▶️ Start Test", interactive=False) # ✅ Student ID が入力されたら Start Test を押せるようにする student_id_input.change( fn=toggle_start_button, inputs=[student_id_input], outputs=[start_btn] ) text_display = gr.Textbox(label="Reading Passage", lines=15, interactive=False) question_display = gr.Textbox(label="Question", lines=6, interactive=False) # ✅ 変更点②:共通選択肢「わからない」を追加(ログ列は触らない) user_answer = gr.Radio(["A", "B", "C", "D", "わからない"], label="Your Answer") submit_btn = gr.Button("Submit Answer") feedback_display = gr.Markdown() hidden_level = gr.Number(visible=False) hidden_passage = gr.Textbox(visible=False) # 互換用(未使用) hidden_display_time = gr.Textbox(visible=False) hidden_question_number = gr.Number(visible=False) hidden_user_id = gr.Textbox(visible=False) hidden_passage_id = gr.Textbox(visible=False) test_visible = gr.State(False) start_btn.click( fn=start_test, inputs=[student_id_input, session_state], outputs=[ session_state, text_display, question_display, hidden_level, hidden_passage_id, user_answer, feedback_display, test_visible, hidden_display_time, hidden_question_number, hidden_user_id, student_id_input, # ✅ 追加:Start後に入力欄を無効化 start_btn, # ✅ 追加:Start後にボタンを無効化 ] ) # ✅ 選択肢変更はCSVに書かず、actionsにだけ溜める user_answer.change( fn=log_choice_change, inputs=[user_answer, hidden_question_number, hidden_user_id, session_state], outputs=[session_state] ) submit_btn.click( fn=next_step, inputs=[hidden_level, user_answer, question_display, text_display, hidden_display_time, hidden_question_number, hidden_user_id, hidden_passage_id, session_state], outputs=[ session_state, feedback_display, text_display, question_display, hidden_level, user_answer, hidden_passage, test_visible, hidden_display_time, hidden_question_number, hidden_user_id, hidden_passage_id ] ) def toggle_visibility(show): v = bool(show) return ( gr.update(visible=v), gr.update(visible=v), gr.update(visible=v), gr.update(visible=v) ) test_visible.change( fn=toggle_visibility, inputs=test_visible, outputs=[text_display, question_display, user_answer, submit_btn] ) gr.Markdown("---") gr.Markdown("## 🔒 Admin: Download logs (since last restart)") admin_pw = gr.Textbox(label="Admin Password", type="password", placeholder="Enter admin password") dl_btn = gr.Button("⬇️ Download logs.csv") dl_file = gr.File(label="logs.csv (download)") dl_msg = gr.Markdown() dl_btn.click( fn=download_logs, inputs=[admin_pw], outputs=[dl_file, dl_msg] ) # ✅ 追加:queue(同時アクセス耐性UP) demo.queue(max_size=64).launch()