Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import uuid | |
| import os | |
| import csv | |
| import datetime | |
| from datetime import timezone, timedelta | |
| import re | |
| from openai import OpenAI | |
| import sys | |
| # --- JST(日本時間)の定義 --- | |
| JST = timezone(timedelta(hours=9), 'JST') | |
| # --- API Key and Client Initialization --- | |
| OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| if not OPENROUTER_API_KEY: | |
| print("Warning: OpenRouter API Key not found.", file=sys.stderr) | |
| try: | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=OPENROUTER_API_KEY, | |
| ) | |
| print("OpenAI client initialized.") | |
| except Exception as e: | |
| print(f"Failed to initialize OpenAI client: {e}", file=sys.stderr) | |
| # --- Helper Functions --- | |
| def generate_user_id(): | |
| return str(uuid.uuid4()) | |
| def load_prompt(level): | |
| try: | |
| filename = f"{level.lower()}_prompt.txt" | |
| if os.path.exists(filename): | |
| with open(filename, "r", encoding="utf-8") as f: | |
| return f.read() | |
| return "You are a helpful AI assistant." | |
| except Exception: | |
| return "You are a helpful AI assistant." | |
| # --- Core Logic Functions --- | |
| def chat_process(message, display_history, full_logs, prompt_level): | |
| """ | |
| チャット処理を行う関数 | |
| """ | |
| if display_history is None: display_history = [] | |
| if full_logs is None: full_logs = [] | |
| # 1. ユーザーメッセージの作成 | |
| current_time_user = datetime.datetime.now(JST).strftime("%Y-%m-%d %H:%M:%S") | |
| user_msg_data = { | |
| "role": "user", | |
| "content": message, | |
| "timestamp": current_time_user | |
| } | |
| temp_display = display_history + [user_msg_data] | |
| temp_logs = full_logs + [user_msg_data] | |
| # APIリクエスト用のメッセージ作成 | |
| system_prompt = load_prompt(prompt_level) | |
| messages_for_api = [{"role": "system", "content": system_prompt}] | |
| for msg in temp_display: | |
| messages_for_api.append({"role": msg["role"], "content": msg["content"]}) | |
| ai_response = "" | |
| try: | |
| model_name = "google/gemini-2.5-flash" | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=messages_for_api, | |
| temperature=0.7, | |
| max_tokens=500, | |
| ) | |
| content = response.choices[0].message.content | |
| ai_response = content if content is not None else "" | |
| if not ai_response: | |
| ai_response = "(応答が空でした)" | |
| except Exception as e: | |
| error_msg = f"API Error: {e}" | |
| print(error_msg, file=sys.stderr) | |
| ai_response = f"システムエラーが発生しました: {e}" | |
| # 2. AIメッセージの作成 | |
| current_time_ai = datetime.datetime.now(JST).strftime("%Y-%m-%d %H:%M:%S") | |
| ai_msg_data = { | |
| "role": "assistant", | |
| "content": ai_response, | |
| "timestamp": current_time_ai | |
| } | |
| new_display = temp_display + [ai_msg_data] | |
| new_logs = temp_logs + [ai_msg_data] | |
| return "", new_display, new_display, new_logs | |
| def change_difficulty_logic(new_level, full_logs): | |
| """ | |
| 難易度変更時の処理 | |
| """ | |
| if full_logs is None: full_logs = [] | |
| notification_msg = ( | |
| f"**[システム通知]**\n" | |
| f"難易度を **{new_level}** に変更しました。\n" | |
| f"解答開始と入力すれば指導が始まります。" | |
| ) | |
| current_time = datetime.datetime.now(JST).strftime("%Y-%m-%d %H:%M:%S") | |
| msg_data = { | |
| "role": "assistant", | |
| "content": notification_msg, | |
| "timestamp": current_time | |
| } | |
| new_display_history = [msg_data] | |
| new_full_logs = full_logs + [msg_data] | |
| return new_display_history, new_display_history, new_full_logs, new_level | |
| def export_csv_logic(user_id, user_name, full_logs): | |
| print(f"Exporting CSV for UserID: {user_id}, Name: '{user_name}'") | |
| if not user_name or not user_name.strip(): | |
| gr.Warning("名前が正しく取得できませんでした") | |
| return None | |
| if not full_logs: | |
| gr.Warning("保存する履歴がまだありません。") | |
| return None | |
| timestamp_str = datetime.datetime.now(JST).strftime("%Y%m%d_%H%M%S") | |
| safe_name = re.sub(r'[\\/*?:"<>|]', "", user_name) | |
| safe_name = safe_name.strip().replace(" ", "_") | |
| filename = f"chat_history_{safe_name}_{timestamp_str}.csv" | |
| try: | |
| with open(filename, "w", newline="", encoding="utf-8-sig") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Timestamp", "Role", "Content"]) | |
| for msg in full_logs: | |
| ts = msg.get("timestamp", "") | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| writer.writerow([ts, role, content]) | |
| print(f"Successfully saved to {filename}") | |
| return filename | |
| except Exception as e: | |
| print(f"CSV Export Error: {e}", file=sys.stderr) | |
| gr.Warning(f"CSV保存エラー: {e}") | |
| return None | |
| def start_app_logic(user_name): | |
| """ | |
| 名前が入力されているか確認し、画面表示を切り替える | |
| """ | |
| if not user_name or not user_name.strip(): | |
| raise gr.Error("お名前を入力してください。") | |
| # Start画面を隠し(False)、Main画面を表示(True) | |
| return gr.update(visible=False), gr.update(visible=True) | |
| # --- UI Definition --- | |
| with gr.Blocks() as demo: | |
| # ステート定義 | |
| user_id_state = gr.State(generate_user_id) | |
| display_history_state = gr.State([]) # 画面用 | |
| full_logs_state = gr.State([]) # 保存用 | |
| prompt_level_state = gr.State("Beginner") | |
| start_msg_constant = gr.State("解答開始") | |
| # ========================================= | |
| # 1. スタート画面 | |
| # ========================================= | |
| with gr.Column(visible=True) as start_screen: | |
| gr.Markdown("# AI Chatbot System") | |
| gr.Markdown("学習を始めるには、お名前を入力して「解答開始」を押してください。") | |
| user_name_input = gr.Textbox( | |
| label="お名前", | |
| placeholder="山田 太郎", | |
| scale=2 | |
| ) | |
| start_btn = gr.Button("解答開始", variant="primary", size="lg") | |
| # ========================================= | |
| # 2. メインチャット画面 | |
| # ========================================= | |
| with gr.Column(visible=False) as main_screen: | |
| gr.Markdown("# Multi-Level AI Chatbot") | |
| with gr.Row(): | |
| export_btn = gr.DownloadButton("📥 会話をCSVで保存", scale=1) | |
| with gr.Row(): | |
| gr.Markdown("## 問題の難易度を選択してください(初級から順番に取り組んでください)") | |
| with gr.Row(): | |
| level_radio = gr.Radio( | |
| ["Beginner", "Intermediate", "Advanced"], | |
| label="Difficulty Level", | |
| value="Beginner", | |
| interactive=True | |
| ) | |
| # 【修正箇所】Chatbotの定義にlatex_delimitersを追加 | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| height=500, | |
| latex_delimiters=[ | |
| {"left": "$$", "right": "$$", "display": True}, # 行全体(ディスプレイ数式) | |
| {"left": "$", "right": "$", "display": False}, # インライン数式 | |
| {"left": "\\(", "right": "\\)", "display": False}, | |
| {"left": "\\[", "right": "\\]", "display": True}, | |
| ] | |
| ) | |
| msg_input = gr.Textbox( | |
| placeholder="メッセージを入力してください... (Enterで送信)", | |
| label="Chat Input", | |
| lines=1 | |
| ) | |
| send_btn = gr.Button("送信", variant="primary") | |
| # --- Event Handling --- | |
| # A. スタートボタンの処理 | |
| start_btn.click( | |
| fn=start_app_logic, | |
| inputs=[user_name_input], | |
| outputs=[start_screen, main_screen] | |
| ).then( | |
| fn=chat_process, | |
| inputs=[start_msg_constant, display_history_state, full_logs_state, prompt_level_state], | |
| outputs=[msg_input, chatbot, display_history_state, full_logs_state] | |
| ) | |
| # B. メッセージ送信時の処理 | |
| chat_event_args = { | |
| "fn": chat_process, | |
| "inputs": [msg_input, display_history_state, full_logs_state, prompt_level_state], | |
| "outputs": [msg_input, chatbot, display_history_state, full_logs_state] | |
| } | |
| msg_input.submit(**chat_event_args) | |
| send_btn.click(**chat_event_args) | |
| # C. 難易度変更時の処理 | |
| level_radio.change( | |
| fn=change_difficulty_logic, | |
| inputs=[level_radio, full_logs_state], | |
| outputs=[chatbot, display_history_state, full_logs_state, prompt_level_state] | |
| ) | |
| # D. CSV保存処理 | |
| export_btn.click( | |
| fn=export_csv_logic, | |
| inputs=[user_id_state, user_name_input, full_logs_state], | |
| outputs=[export_btn] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True, share=False) |