import gradio as gr from openai import OpenAI from datetime import datetime, timedelta import pandas as pd import os, random, tempfile, json import re import threading import uuid import csv # --- API設定 --- API_KEY = os.getenv("API_KEY") BASE_URL = "https://openrouter.ai/api/v1" LOG_FILE = os.getenv("LOG_FILE", "logs.csv") # ✅ 管理者DL用パスワード(環境変数で設定) ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "0421") client = OpenAI(base_url=BASE_URL, api_key=API_KEY) # --- 外部CSVとして管理する passage データ --- # passage.csv: passage_id,level,text,question,choices,answer の順 passages_df = pd.read_csv("passage.csv") levels = [1, 2, 3, 4, 5] # ✅ ログの同時書き込みガード(ログは全ユーザーで共有) _log_lock = threading.Lock() # ========================= # ✅ ログ列(指定順) # ========================= LOG_COLUMNS = [ "user_id", "question_number", "reading_level", "passage_id", "question", "choice", "correct", "time", "actions", ] # ========================= # ✅ 文字化け対策(方法②:ASCII 正規化) # ========================= def normalize_text_for_csv(text: str) -> str: if not text: return text return ( text.replace("—", "-") .replace("–", "-") .replace("’", "'") .replace("‘", "'") .replace("“", '"') .replace("”", '"') .replace("\u00a0", " ") # NBSP 対策 ) # --- セッション state 初期化 --- def _new_session_state(): return { "session_id": str(uuid.uuid4()), "used_passages": set(), "question_count": 0, # ✅ 採点済み問題数 "user_id": None, "action_log": [], # ✅ 1問中の選択変更履歴(submit時にまとめて保存) # ✅ CSVから読んだ正解(A/B/C/D)を保持 "current_answer": None, } # --- ログ追記(CSVのみ・固定カラムで統一) --- def log_to_csv(entry: dict): # ✅ pandas -> csv.writer に置換(列順は LOG_COLUMNS で保証) with _log_lock: file_exists = os.path.exists(LOG_FILE) with open(LOG_FILE, mode="a", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=LOG_COLUMNS) if not file_exists: writer.writeheader() row = {col: entry.get(col, None) for col in LOG_COLUMNS} writer.writerow(row) def log_event(state: dict, **kwargs): # ✅ Excelで#####になりにくいよう「JST」を付けてテキスト化 now = (datetime.utcnow() + timedelta(hours=9)).strftime("%Y-%m-%d %H:%M:%S") + " JST" # ✅ 文字化け対策(CSVに書く直前で正規化) if "question" in kwargs and isinstance(kwargs["question"], str): kwargs["question"] = normalize_text_for_csv(kwargs["question"]) if "actions" in kwargs and isinstance(kwargs["actions"], str): kwargs["actions"] = normalize_text_for_csv(kwargs["actions"]) if "choice" in kwargs and isinstance(kwargs["choice"], str): kwargs["choice"] = normalize_text_for_csv(kwargs["choice"]) entry = {"time": now, "user_id": state.get("user_id"), **kwargs} log_to_csv(entry) # ========================= # ✅ choices列を柔軟にパース # - JSON配列: ["...", "...", "...", "..."] # - 改行区切り / | / || / ; などにも対応(できるだけ) # ========================= def _parse_choices(raw): if raw is None: return [] s = str(raw).strip() if s == "": return [] # JSONっぽい場合 if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")): try: obj = json.loads(s) if isinstance(obj, list): return [str(x).strip() for x in obj] except Exception: pass # それ以外は区切り推定 # 優先: 改行 -> || -> | -> ; -> , if "\n" in s: parts = [p.strip() for p in s.split("\n") if p.strip()] return parts if "||" in s: parts = [p.strip() for p in s.split("||") if p.strip()] return parts if "|" in s: parts = [p.strip() for p in s.split("|") if p.strip()] return parts if ";" in s: parts = [p.strip() for p in s.split(";") if p.strip()] return parts if "," in s: parts = [p.strip() for p in s.split(",") if p.strip()] return parts return [s] def _format_question_block(question, choices_list): # choices を A-D に整形して question_display に入れる labels = ["A", "B", "C", "D"] # 4つ未満/超過でも崩れにくくする choices = list(choices_list)[:4] while len(choices) < 4: choices.append("") lines = [] q = "" if question is None else str(question).strip() lines.append(f"Q: {q}") for lab, opt in zip(labels, choices): opt_s = "" if opt is None else str(opt).strip() lines.append(f"{lab}. {opt_s}") return "\n".join(lines) # --- 自動難易度調整 --- def adaptive_test(prev_level, prev_correct): idx = levels.index(prev_level) if prev_correct and idx < len(levels) - 1: return levels[idx + 1] elif not prev_correct and idx > 0: return levels[idx - 1] return prev_level # --- passage取得(✅ CSVから question/choices/answer も返す) --- def get_passage(level, used_passages): subset = passages_df[passages_df["level"] == level] available = [pid for pid in subset["passage_id"] if pid not in used_passages] if not available: available = list(subset["passage_id"]) passage_id = random.choice(available) row = subset[subset["passage_id"] == passage_id].iloc[0] text = row["text"] question = row["question"] choices_raw = row["choices"] answer = row["answer"] choices_list = _parse_choices(choices_raw) question_block = _format_question_block(question, choices_list) # answer は A/B/C/D を想定(念のため整形) ans = "" if answer is None else str(answer).strip().upper() ans = ans[:1] # "A" など1文字に寄せる return passage_id, text, question_block, ans # --- 開始ボタン動作(✅ start時だけログ) --- def start_test(student_id, state): state = _new_session_state() if not student_id or student_id.strip() == "": return ( state, "", "", 0, "", None, "⚠️ 学生番号を入力してからテストを開始してください", False, "", "", "", gr.update(interactive=True), # Student ID 入力:有効のまま gr.update(interactive=True), # Start Test ボタン:有効のまま ) state["user_id"] = student_id.strip() level = 3 passage_id, text, question_block, ans = get_passage(level, state["used_passages"]) state["used_passages"].add(passage_id) # ✅ この問題の正解を state に保持 state["current_answer"] = ans displayed_time = datetime.utcnow() + timedelta(hours=9) # ✅ startログ:actions列に "start test pushed" を入れる log_event( state, question_number=None, reading_level=None, passage_id=None, question=None, choice=None, correct=None, actions="start test pushed", ) return ( state, text, question_block, level, passage_id, None, "", True, displayed_time.isoformat(), 1, state["user_id"], gr.update(interactive=False), # ✅ Student ID 入力:無効化(リロードまで) gr.update(interactive=False), # ✅ Start Test ボタン:無効化(リロードまで) ) # --- 選択肢変更(✅ CSVには書かない。actionsにだけ溜める) --- def log_choice_change(choice, question_number, user_id, state): if not isinstance(state, dict): state = _new_session_state() if choice: t_iso = (datetime.utcnow() + timedelta(hours=9)).isoformat() state["action_log"].append({"action": "choice", "choice": choice, "time": t_iso}) return state # --- 回答送信(✅ submit時だけログ) --- def next_step(prev_level, user_answer, question_text, passage_text, displayed_time, question_number, user_id, passage_id, state): if not isinstance(state, dict): state = _new_session_state() if not user_answer: return ( state, "⚠️ Please select an answer!", passage_text, question_text, prev_level, None, "", True, displayed_time, question_number, user_id, passage_id ) submit_time = datetime.utcnow() + timedelta(hours=9) state["question_count"] += 1 # ✅ 変更点①:「わからない」は例外なく不正解 if str(user_answer).strip() == "わからない": correct = False else: # ✅ CSVから読んだ正解(A/B/C/D)と比較(AI採点はしない) correct_answer = (state.get("current_answer") or "").strip().upper() correct = (str(user_answer).strip().upper() == correct_answer) new_level = adaptive_test(prev_level, correct) # ✅ submitログ(ここだけ記録) log_event( state, question_number=state["question_count"], reading_level=prev_level, passage_id=passage_id, question=question_text, choice=user_answer, correct=correct, actions=json.dumps(state["action_log"], ensure_ascii=False), ) # ✅ 最終問題なら結果だけ大きく表示(仕様そのまま) if state["question_count"] >= 5: final_level = prev_level if correct else new_level return ( state, f"