import gradio as gr import google.generativeai as genai import datetime import uuid import os import tempfile import pymongo import sqlite3 import certifi from bson.objectid import ObjectId # --- 1. 하이브리드 DB 설정 --- MONGO_URI = os.environ.get("MONGO_URI") USE_MONGO = False # 기본값은 False, 연결 성공 시 True로 변경 # 1-1. MongoDB 연결 시도 if MONGO_URI: try: print("🔌 MongoDB 연결 시도 중...") client = pymongo.MongoClient( MONGO_URI, serverSelectionTimeoutMS=3000, # 3초 내 응답 없으면 포기 tls=True, tlsAllowInvalidCertificates=True, tlsAllowInvalidHostnames=True ) client.admin.command('ping') USE_MONGO = True print("✅ MongoDB 연결 성공! (원격 저장소 사용)") db = client.story_assistant_db interactions_col = db.interactions summaries_col = db.summaries except Exception as e: print(f"⚠️ MongoDB 연결 실패: {e}") print("🔀 SQLite 로컬 저장소로 전환합니다.") USE_MONGO = False else: print("ℹ️ MONGO_URI가 없습니다. SQLite 로컬 저장소를 사용합니다.") # 1-2. SQLite 설정 (MongoDB 실패 시 백업용) if not USE_MONGO: # Hugging Face Spaces의 영구 저장소 경로(/data)가 있으면 거기 저장, 없으면 일반 파일 DB_PATH = os.path.join("/data", "logs.db") if os.path.exists("/data") else "logs.db" print(f"📂 SQLite 데이터베이스 경로: {DB_PATH}") conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS interactions ( session_id TEXT, user_id TEXT, timestamp TEXT, role TEXT, content TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS summaries ( session_id TEXT, user_id TEXT, timestamp TEXT, summary TEXT ) ''') conn.commit() # --- 2. 통합 DB 함수들 (하이브리드) --- def log_interaction(session_id, user_id, role, content): timestamp = datetime.datetime.now().isoformat() if USE_MONGO: try: interactions_col.insert_one({ "session_id": session_id, "user_id": user_id, "timestamp": timestamp, "role": role, "content": content }) except Exception as e: print(f"DB Log Error (Mongo): {e}") else: try: cursor.execute("INSERT INTO interactions VALUES (?, ?, ?, ?, ?)", (session_id, user_id, timestamp, role, content)) conn.commit() except Exception as e: print(f"DB Log Error (SQLite): {e}") def log_summary(session_id, user_id, summary_text): timestamp = datetime.datetime.now().isoformat() if USE_MONGO: try: summaries_col.insert_one({ "session_id": session_id, "user_id": user_id, "timestamp": timestamp, "summary": summary_text }) except Exception as e: print(f"DB Summary Error (Mongo): {e}") else: try: cursor.execute("INSERT INTO summaries VALUES (?, ?, ?, ?)", (session_id, user_id, timestamp, summary_text)) conn.commit() except Exception as e: print(f"DB Summary Error (SQLite): {e}") def fetch_history(session_id): if USE_MONGO: try: cursor_mongo = interactions_col.find({"session_id": session_id}).sort("timestamp", 1) return [(doc["role"], doc["content"], doc["timestamp"]) for doc in cursor_mongo] except Exception as e: print(f"DB Fetch Error (Mongo): {e}") return [] else: try: cursor.execute("SELECT role, content, timestamp FROM interactions WHERE session_id=? ORDER BY timestamp", (session_id,)) return cursor.fetchall() except Exception as e: print(f"DB Fetch Error (SQLite): {e}") return [] # --- 3. LLM API 설정 --- api_key = os.environ.get("GEMINI_API_KEY") if not api_key: try: with open("api_key.txt", "r") as f: api_key = f.read().strip() except: pass if not api_key: print("⚠️ GEMINI_API_KEY가 설정되지 않았습니다.") genai.configure(api_key=api_key) try: with open("system_prompt.txt", "r", encoding="utf-8") as f: SYSTEM_PROMPT = f.read() except: SYSTEM_PROMPT = "당신은 친절한 챗봇입니다." # 모델명 재확인 (gemini-1.5-flash) model = genai.GenerativeModel(model_name='gemini-2.5-flash', system_instruction=SYSTEM_PROMPT) WELCOME_MESSAGE = ( "안녕하세요, 선생님. '이야기 비서'입니다.\n" "오늘 자서전의 첫 챕터를 위해, **가장 먼저 떠오르는 이야기**를 들려주시겠어요?\n\n" "1. **유년 시절**의 추억\n2. 인생 **최고의 음식**\n3. **첫 직장**의 기억\n4. **배우자**와의 첫 만남\n\n" "위 주제 중 하나를 골라주시거나, 편하게 다른 말씀을 해주셔도 좋습니다." ) # --- 4. 핵심 로직 --- def format_history_for_llm(history): llm_history = [] for msg in history: if msg['role'] == 'user': llm_history.append({"role": "user", "parts": [{"text": msg['content']}]}) elif msg['role'] == 'assistant': llm_history.append({"role": "model", "parts": [{"text": msg['content']}]}) return llm_history def chat(user_input, history, session_id, user_id): if not user_input.strip(): return "", history history.append({"role": "user", "content": user_input}) log_interaction(session_id, user_id, 'user', user_input) yield "", history llm_history = format_history_for_llm(history[:-1]) chat_session = model.start_chat(history=llm_history) try: response = chat_session.send_message(user_input, stream=True) full_response = "" history.append({"role": "assistant", "content": ""}) for chunk in response: full_response += chunk.text history[-1]['content'] = full_response yield "", history log_interaction(session_id, user_id, 'model', full_response) except Exception as e: history.append({"role": "assistant", "content": f"⚠️ 오류가 발생했습니다: {str(e)}"}) yield "", history def change_topic(history, session_id, user_id): if not history: return history llm_history = format_history_for_llm(history) chat_session = model.start_chat(history=llm_history) summary_prompt = ( "지금까지 나눈 대화 내용을 '선생님은 ~ 경험을 하셨습니다'와 같은 문체로 " "3문장 이내로 간략하게 요약해줘. 이 요약은 기록을 위한 것이야." ) try: response = chat_session.send_message(summary_prompt) summary_text = response.text log_summary(session_id, user_id, summary_text) end_message = f"\n\n[기록 완료] 지금까지의 이야기는 다음과 같이 잘 기록해 두었습니다.\n\n📝 **요약:** {summary_text}\n\n자, 그럼 이제 다른 주제로 넘어가 볼까요? 어떤 이야기를 더 들려주시겠어요?" history.append({"role": "assistant", "content": end_message}) return history except Exception as e: history.append({"role": "assistant", "content": f"⚠️ 요약 중 오류가 발생했습니다: {str(e)}"}) return history def export_chat_txt(session_id): data = fetch_history(session_id) if not data: return None text_content = f"자서전 인터뷰 기록 (Session: {session_id})\n저장 일시: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + "=" * 50 + "\n\n" for role, content, timestamp in data: speaker = "선생님" if role == 'user' else "이야기 비서" text_content += f"[{timestamp.split('T')[1][:5]}] {speaker}:\n{content}\n\n" + "-" * 20 + "\n\n" fd, path = tempfile.mkstemp(suffix=".txt", prefix=f"interview_{session_id[:8]}_") with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(text_content) return path def export_chat_md(session_id): data = fetch_history(session_id) if not data: return None md_content = f"# 📜 자서전 인터뷰 기록\n\n> **세션 ID:** `{session_id}`
**저장 일시:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n---\n\n" for role, content, timestamp in data: speaker = "🧑‍🏫 선생님" if role == 'user' else "🤖 이야기 비서" md_content += f"### {speaker} ({timestamp.split('T')[1][:5]})\n\n{content}\n\n" fd, path = tempfile.mkstemp(suffix=".md", prefix=f"interview_{session_id[:8]}_") with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(md_content) return path # --- 5. UI 구성 --- def start_chat(user_id_val): if not user_id_val.strip(): return gr.update(visible=True), gr.update(visible=False), "⚠️ 테스터 ID를 입력해주세요!" return gr.update(visible=False), gr.update(visible=True), "" with gr.Blocks(theme=gr.themes.Soft()) as demo: session_id = gr.State(lambda: str(uuid.uuid4())) user_id = gr.State("") with gr.Group(visible=True) as login_view: gr.Markdown("# 👋 환영합니다! 이야기 비서 테스트를 시작합니다.") gr.Markdown("원활한 기록을 위해 테스터 ID(닉네임)를 입력해주세요.") with gr.Row(): login_id_input = gr.Textbox(label="테스터 ID", placeholder="예: tester_hong, 행복한하루", scale=3, autofocus=True) login_btn = gr.Button("🚀 시작하기", variant="primary", scale=1) login_msg = gr.Markdown("", visible=True) with gr.Group(visible=False) as chat_view: gr.Markdown("# 📝 이야기 비서 (프로토타입)") # DB 상태 표시 (디버깅용) db_status = "🟢 MongoDB 연결됨" if USE_MONGO else "🟠 SQLite 로컬 모드 (주의: 재시작 시 데이터 초기화 가능)" gr.Markdown(f"ℹ️ 시스템 상태: {db_status}") with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( value=[{"role": "assistant", "content": WELCOME_MESSAGE}], height=650, label="대화창", type="messages", avatar_images=(None, "https://i.ibb.co/ZHrkBPm/ai-assistant.png") ) with gr.Row(): msg_input = gr.Textbox(scale=4, show_label=False, placeholder="여기에 이야기를 입력하세요...", container=False) submit_btn = gr.Button("전송", scale=1, variant="primary") with gr.Row(): topic_change_btn = gr.Button("🔄 이 주제 마무리하고 다른 이야기 하기", variant="secondary", scale=1) with gr.Column(scale=1): gr.Markdown("### 🛠️ 테스터 정보") user_id_display = gr.Textbox(label="현재 접속 ID", interactive=False) gr.Markdown("### 💡 사용 가이드") gr.Markdown("- **이야기 시작:** 봇의 질문에 편하게 답해주세요.\n- **주제 변경:** 채팅창 아래 **'🔄 이 주제 마무리...'** 버튼을 눌러주세요.") gr.Markdown("### 💾 대화 내용 내보내기") with gr.Row(): export_txt_btn = gr.Button("📄 TXT") export_md_btn = gr.Button("📝 MD") download_file = gr.File(label="다운로드", interactive=False, height=100) login_btn.click(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then( lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display] ) login_id_input.submit(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then( lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display] ) msg_input.submit(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot]) submit_btn.click(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot]) topic_change_btn.click(change_topic, [chatbot, session_id, user_id], [chatbot]) export_txt_btn.click(export_chat_txt, inputs=[session_id], outputs=[download_file]) export_md_btn.click(export_chat_md, inputs=[session_id], outputs=[download_file]) # --- 6. 앱 실행 --- app_password = os.environ.get("APP_PASSWORD") if __name__ == "__main__": launch_kwargs = { "server_name": "0.0.0.0", "server_port": 7860, "ssr_mode": False } if app_password: launch_kwargs["auth"] = ("team", app_password) print(f"🚀 Launching Gradio with settings: {launch_kwargs}") demo.launch(**launch_kwargs)