Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import google.generativeai as genai | |
| import datetime | |
| import uuid | |
| import os | |
| import tempfile | |
| import pymongo | |
| import sqlite3 | |
| import certifi | |
| from bson.objectid import ObjectId | |
| # --- 1. ํ์ด๋ธ๋ฆฌ๋ DB ์ค์ --- | |
| MONGO_URI = os.environ.get("MONGO_URI") | |
| USE_MONGO = False # ๊ธฐ๋ณธ๊ฐ์ False, ์ฐ๊ฒฐ ์ฑ๊ณต ์ True๋ก ๋ณ๊ฒฝ | |
| # 1-1. MongoDB ์ฐ๊ฒฐ ์๋ | |
| if MONGO_URI: | |
| try: | |
| print("๐ MongoDB ์ฐ๊ฒฐ ์๋ ์ค...") | |
| client = pymongo.MongoClient( | |
| MONGO_URI, | |
| serverSelectionTimeoutMS=3000, # 3์ด ๋ด ์๋ต ์์ผ๋ฉด ํฌ๊ธฐ | |
| tls=True, | |
| tlsAllowInvalidCertificates=True, | |
| tlsAllowInvalidHostnames=True | |
| ) | |
| client.admin.command('ping') | |
| USE_MONGO = True | |
| print("โ MongoDB ์ฐ๊ฒฐ ์ฑ๊ณต! (์๊ฒฉ ์ ์ฅ์ ์ฌ์ฉ)") | |
| db = client.story_assistant_db | |
| interactions_col = db.interactions | |
| summaries_col = db.summaries | |
| except Exception as e: | |
| print(f"โ ๏ธ MongoDB ์ฐ๊ฒฐ ์คํจ: {e}") | |
| print("๐ SQLite ๋ก์ปฌ ์ ์ฅ์๋ก ์ ํํฉ๋๋ค.") | |
| USE_MONGO = False | |
| else: | |
| print("โน๏ธ MONGO_URI๊ฐ ์์ต๋๋ค. SQLite ๋ก์ปฌ ์ ์ฅ์๋ฅผ ์ฌ์ฉํฉ๋๋ค.") | |
| # 1-2. SQLite ์ค์ (MongoDB ์คํจ ์ ๋ฐฑ์ ์ฉ) | |
| if not USE_MONGO: | |
| # Hugging Face Spaces์ ์๊ตฌ ์ ์ฅ์ ๊ฒฝ๋ก(/data)๊ฐ ์์ผ๋ฉด ๊ฑฐ๊ธฐ ์ ์ฅ, ์์ผ๋ฉด ์ผ๋ฐ ํ์ผ | |
| DB_PATH = os.path.join("/data", "logs.db") if os.path.exists("/data") else "logs.db" | |
| print(f"๐ SQLite ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฒฝ๋ก: {DB_PATH}") | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS interactions ( | |
| session_id TEXT, user_id TEXT, timestamp TEXT, role TEXT, content TEXT | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS summaries ( | |
| session_id TEXT, user_id TEXT, timestamp TEXT, summary TEXT | |
| ) | |
| ''') | |
| conn.commit() | |
| # --- 2. ํตํฉ DB ํจ์๋ค (ํ์ด๋ธ๋ฆฌ๋) --- | |
| def log_interaction(session_id, user_id, role, content): | |
| timestamp = datetime.datetime.now().isoformat() | |
| if USE_MONGO: | |
| try: | |
| interactions_col.insert_one({ | |
| "session_id": session_id, "user_id": user_id, | |
| "timestamp": timestamp, "role": role, "content": content | |
| }) | |
| except Exception as e: print(f"DB Log Error (Mongo): {e}") | |
| else: | |
| try: | |
| cursor.execute("INSERT INTO interactions VALUES (?, ?, ?, ?, ?)", | |
| (session_id, user_id, timestamp, role, content)) | |
| conn.commit() | |
| except Exception as e: print(f"DB Log Error (SQLite): {e}") | |
| def log_summary(session_id, user_id, summary_text): | |
| timestamp = datetime.datetime.now().isoformat() | |
| if USE_MONGO: | |
| try: | |
| summaries_col.insert_one({ | |
| "session_id": session_id, "user_id": user_id, | |
| "timestamp": timestamp, "summary": summary_text | |
| }) | |
| except Exception as e: print(f"DB Summary Error (Mongo): {e}") | |
| else: | |
| try: | |
| cursor.execute("INSERT INTO summaries VALUES (?, ?, ?, ?)", | |
| (session_id, user_id, timestamp, summary_text)) | |
| conn.commit() | |
| except Exception as e: print(f"DB Summary Error (SQLite): {e}") | |
| def fetch_history(session_id): | |
| if USE_MONGO: | |
| try: | |
| cursor_mongo = interactions_col.find({"session_id": session_id}).sort("timestamp", 1) | |
| return [(doc["role"], doc["content"], doc["timestamp"]) for doc in cursor_mongo] | |
| except Exception as e: | |
| print(f"DB Fetch Error (Mongo): {e}") | |
| return [] | |
| else: | |
| try: | |
| cursor.execute("SELECT role, content, timestamp FROM interactions WHERE session_id=? ORDER BY timestamp", (session_id,)) | |
| return cursor.fetchall() | |
| except Exception as e: | |
| print(f"DB Fetch Error (SQLite): {e}") | |
| return [] | |
| # --- 3. LLM API ์ค์ --- | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| try: | |
| with open("api_key.txt", "r") as f: api_key = f.read().strip() | |
| except: pass | |
| if not api_key: print("โ ๏ธ GEMINI_API_KEY๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.") | |
| genai.configure(api_key=api_key) | |
| try: | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: SYSTEM_PROMPT = f.read() | |
| except: SYSTEM_PROMPT = "๋น์ ์ ์น์ ํ ์ฑ๋ด์ ๋๋ค." | |
| # ๋ชจ๋ธ๋ช ์ฌํ์ธ (gemini-1.5-flash) | |
| model = genai.GenerativeModel(model_name='gemini-2.5-flash', system_instruction=SYSTEM_PROMPT) | |
| WELCOME_MESSAGE = ( | |
| "์๋ ํ์ธ์, ์ ์๋. '์ด์ผ๊ธฐ ๋น์'์ ๋๋ค.\n" | |
| "์ค๋ ์์์ ์ ์ฒซ ์ฑํฐ๋ฅผ ์ํด, **๊ฐ์ฅ ๋จผ์ ๋ ์ค๋ฅด๋ ์ด์ผ๊ธฐ**๋ฅผ ๋ค๋ ค์ฃผ์๊ฒ ์ด์?\n\n" | |
| "1. **์ ๋ ์์ **์ ์ถ์ต\n2. ์ธ์ **์ต๊ณ ์ ์์**\n3. **์ฒซ ์ง์ฅ**์ ๊ธฐ์ต\n4. **๋ฐฐ์ฐ์**์์ ์ฒซ ๋ง๋จ\n\n" | |
| "์ ์ฃผ์ ์ค ํ๋๋ฅผ ๊ณจ๋ผ์ฃผ์๊ฑฐ๋, ํธํ๊ฒ ๋ค๋ฅธ ๋ง์์ ํด์ฃผ์ ๋ ์ข์ต๋๋ค." | |
| ) | |
| # --- 4. ํต์ฌ ๋ก์ง --- | |
| def format_history_for_llm(history): | |
| llm_history = [] | |
| for msg in history: | |
| if msg['role'] == 'user': | |
| llm_history.append({"role": "user", "parts": [{"text": msg['content']}]}) | |
| elif msg['role'] == 'assistant': | |
| llm_history.append({"role": "model", "parts": [{"text": msg['content']}]}) | |
| return llm_history | |
| def chat(user_input, history, session_id, user_id): | |
| if not user_input.strip(): return "", history | |
| history.append({"role": "user", "content": user_input}) | |
| log_interaction(session_id, user_id, 'user', user_input) | |
| yield "", history | |
| llm_history = format_history_for_llm(history[:-1]) | |
| chat_session = model.start_chat(history=llm_history) | |
| try: | |
| response = chat_session.send_message(user_input, stream=True) | |
| full_response = "" | |
| history.append({"role": "assistant", "content": ""}) | |
| for chunk in response: | |
| full_response += chunk.text | |
| history[-1]['content'] = full_response | |
| yield "", history | |
| log_interaction(session_id, user_id, 'model', full_response) | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"โ ๏ธ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}"}) | |
| yield "", history | |
| def change_topic(history, session_id, user_id): | |
| if not history: return history | |
| llm_history = format_history_for_llm(history) | |
| chat_session = model.start_chat(history=llm_history) | |
| summary_prompt = ( | |
| "์ง๊ธ๊น์ง ๋๋ ๋ํ ๋ด์ฉ์ '์ ์๋์ ~ ๊ฒฝํ์ ํ์ จ์ต๋๋ค'์ ๊ฐ์ ๋ฌธ์ฒด๋ก " | |
| "3๋ฌธ์ฅ ์ด๋ด๋ก ๊ฐ๋ตํ๊ฒ ์์ฝํด์ค. ์ด ์์ฝ์ ๊ธฐ๋ก์ ์ํ ๊ฒ์ด์ผ." | |
| ) | |
| try: | |
| response = chat_session.send_message(summary_prompt) | |
| summary_text = response.text | |
| log_summary(session_id, user_id, summary_text) | |
| end_message = f"\n\n[๊ธฐ๋ก ์๋ฃ] ์ง๊ธ๊น์ง์ ์ด์ผ๊ธฐ๋ ๋ค์๊ณผ ๊ฐ์ด ์ ๊ธฐ๋กํด ๋์์ต๋๋ค.\n\n๐ **์์ฝ:** {summary_text}\n\n์, ๊ทธ๋ผ ์ด์ ๋ค๋ฅธ ์ฃผ์ ๋ก ๋์ด๊ฐ ๋ณผ๊น์? ์ด๋ค ์ด์ผ๊ธฐ๋ฅผ ๋ ๋ค๋ ค์ฃผ์๊ฒ ์ด์?" | |
| history.append({"role": "assistant", "content": end_message}) | |
| return history | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"โ ๏ธ ์์ฝ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {str(e)}"}) | |
| return history | |
| def export_chat_txt(session_id): | |
| data = fetch_history(session_id) | |
| if not data: return None | |
| text_content = f"์์์ ์ธํฐ๋ทฐ ๊ธฐ๋ก (Session: {session_id})\n์ ์ฅ ์ผ์: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + "=" * 50 + "\n\n" | |
| for role, content, timestamp in data: | |
| speaker = "์ ์๋" if role == 'user' else "์ด์ผ๊ธฐ ๋น์" | |
| text_content += f"[{timestamp.split('T')[1][:5]}] {speaker}:\n{content}\n\n" + "-" * 20 + "\n\n" | |
| fd, path = tempfile.mkstemp(suffix=".txt", prefix=f"interview_{session_id[:8]}_") | |
| with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(text_content) | |
| return path | |
| def export_chat_md(session_id): | |
| data = fetch_history(session_id) | |
| if not data: return None | |
| md_content = f"# ๐ ์์์ ์ธํฐ๋ทฐ ๊ธฐ๋ก\n\n> **์ธ์ ID:** `{session_id}`<br>**์ ์ฅ ์ผ์:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n---\n\n" | |
| for role, content, timestamp in data: | |
| speaker = "๐งโ๐ซ ์ ์๋" if role == 'user' else "๐ค ์ด์ผ๊ธฐ ๋น์" | |
| md_content += f"### {speaker} <span style='color:gray;font-size:0.8em'>({timestamp.split('T')[1][:5]})</span>\n\n{content}\n\n" | |
| fd, path = tempfile.mkstemp(suffix=".md", prefix=f"interview_{session_id[:8]}_") | |
| with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(md_content) | |
| return path | |
| # --- 5. UI ๊ตฌ์ฑ --- | |
| def start_chat(user_id_val): | |
| if not user_id_val.strip(): | |
| return gr.update(visible=True), gr.update(visible=False), "โ ๏ธ ํ ์คํฐ ID๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์!" | |
| return gr.update(visible=False), gr.update(visible=True), "" | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| session_id = gr.State(lambda: str(uuid.uuid4())) | |
| user_id = gr.State("") | |
| with gr.Group(visible=True) as login_view: | |
| gr.Markdown("# ๐ ํ์ํฉ๋๋ค! ์ด์ผ๊ธฐ ๋น์ ํ ์คํธ๋ฅผ ์์ํฉ๋๋ค.") | |
| gr.Markdown("์ํํ ๊ธฐ๋ก์ ์ํด ํ ์คํฐ ID(๋๋ค์)๋ฅผ ์ ๋ ฅํด์ฃผ์ธ์.") | |
| with gr.Row(): | |
| login_id_input = gr.Textbox(label="ํ ์คํฐ ID", placeholder="์: tester_hong, ํ๋ณตํํ๋ฃจ", scale=3, autofocus=True) | |
| login_btn = gr.Button("๐ ์์ํ๊ธฐ", variant="primary", scale=1) | |
| login_msg = gr.Markdown("", visible=True) | |
| with gr.Group(visible=False) as chat_view: | |
| gr.Markdown("# ๐ ์ด์ผ๊ธฐ ๋น์ (ํ๋กํ ํ์ )") | |
| # DB ์ํ ํ์ (๋๋ฒ๊น ์ฉ) | |
| db_status = "๐ข MongoDB ์ฐ๊ฒฐ๋จ" if USE_MONGO else "๐ SQLite ๋ก์ปฌ ๋ชจ๋ (์ฃผ์: ์ฌ์์ ์ ๋ฐ์ดํฐ ์ด๊ธฐํ ๊ฐ๋ฅ)" | |
| gr.Markdown(f"โน๏ธ ์์คํ ์ํ: {db_status}") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| value=[{"role": "assistant", "content": WELCOME_MESSAGE}], | |
| height=650, | |
| label="๋ํ์ฐฝ", | |
| type="messages", | |
| avatar_images=(None, "https://i.ibb.co/ZHrkBPm/ai-assistant.png") | |
| ) | |
| with gr.Row(): | |
| msg_input = gr.Textbox(scale=4, show_label=False, placeholder="์ฌ๊ธฐ์ ์ด์ผ๊ธฐ๋ฅผ ์ ๋ ฅํ์ธ์...", container=False) | |
| submit_btn = gr.Button("์ ์ก", scale=1, variant="primary") | |
| with gr.Row(): | |
| topic_change_btn = gr.Button("๐ ์ด ์ฃผ์ ๋ง๋ฌด๋ฆฌํ๊ณ ๋ค๋ฅธ ์ด์ผ๊ธฐ ํ๊ธฐ", variant="secondary", scale=1) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### ๐ ๏ธ ํ ์คํฐ ์ ๋ณด") | |
| user_id_display = gr.Textbox(label="ํ์ฌ ์ ์ ID", interactive=False) | |
| gr.Markdown("### ๐ก ์ฌ์ฉ ๊ฐ์ด๋") | |
| gr.Markdown("- **์ด์ผ๊ธฐ ์์:** ๋ด์ ์ง๋ฌธ์ ํธํ๊ฒ ๋ตํด์ฃผ์ธ์.\n- **์ฃผ์ ๋ณ๊ฒฝ:** ์ฑํ ์ฐฝ ์๋ **'๐ ์ด ์ฃผ์ ๋ง๋ฌด๋ฆฌ...'** ๋ฒํผ์ ๋๋ฌ์ฃผ์ธ์.") | |
| gr.Markdown("### ๐พ ๋ํ ๋ด์ฉ ๋ด๋ณด๋ด๊ธฐ") | |
| with gr.Row(): | |
| export_txt_btn = gr.Button("๐ TXT") | |
| export_md_btn = gr.Button("๐ MD") | |
| download_file = gr.File(label="๋ค์ด๋ก๋", interactive=False, height=100) | |
| login_btn.click(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then( | |
| lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display] | |
| ) | |
| login_id_input.submit(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then( | |
| lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display] | |
| ) | |
| msg_input.submit(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot]) | |
| submit_btn.click(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot]) | |
| topic_change_btn.click(change_topic, [chatbot, session_id, user_id], [chatbot]) | |
| export_txt_btn.click(export_chat_txt, inputs=[session_id], outputs=[download_file]) | |
| export_md_btn.click(export_chat_md, inputs=[session_id], outputs=[download_file]) | |
| # --- 6. ์ฑ ์คํ --- | |
| app_password = os.environ.get("APP_PASSWORD") | |
| if __name__ == "__main__": | |
| launch_kwargs = { | |
| "server_name": "0.0.0.0", | |
| "server_port": 7860, | |
| "ssr_mode": False | |
| } | |
| if app_password: | |
| launch_kwargs["auth"] = ("team", app_password) | |
| print(f"๐ Launching Gradio with settings: {launch_kwargs}") | |
| demo.launch(**launch_kwargs) |