import gradio as gr
import google.generativeai as genai
import datetime
import uuid
import os
import tempfile
import pymongo
import sqlite3
import certifi
from bson.objectid import ObjectId
# --- 1. 하이브리드 DB 설정 ---
MONGO_URI = os.environ.get("MONGO_URI")
USE_MONGO = False # 기본값은 False, 연결 성공 시 True로 변경
# 1-1. MongoDB 연결 시도
if MONGO_URI:
try:
print("🔌 MongoDB 연결 시도 중...")
client = pymongo.MongoClient(
MONGO_URI,
serverSelectionTimeoutMS=3000, # 3초 내 응답 없으면 포기
tls=True,
tlsAllowInvalidCertificates=True,
tlsAllowInvalidHostnames=True
)
client.admin.command('ping')
USE_MONGO = True
print("✅ MongoDB 연결 성공! (원격 저장소 사용)")
db = client.story_assistant_db
interactions_col = db.interactions
summaries_col = db.summaries
except Exception as e:
print(f"⚠️ MongoDB 연결 실패: {e}")
print("🔀 SQLite 로컬 저장소로 전환합니다.")
USE_MONGO = False
else:
print("ℹ️ MONGO_URI가 없습니다. SQLite 로컬 저장소를 사용합니다.")
# 1-2. SQLite 설정 (MongoDB 실패 시 백업용)
if not USE_MONGO:
# Hugging Face Spaces의 영구 저장소 경로(/data)가 있으면 거기 저장, 없으면 일반 파일
DB_PATH = os.path.join("/data", "logs.db") if os.path.exists("/data") else "logs.db"
print(f"📂 SQLite 데이터베이스 경로: {DB_PATH}")
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS interactions (
session_id TEXT, user_id TEXT, timestamp TEXT, role TEXT, content TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS summaries (
session_id TEXT, user_id TEXT, timestamp TEXT, summary TEXT
)
''')
conn.commit()
# --- 2. 통합 DB 함수들 (하이브리드) ---
def log_interaction(session_id, user_id, role, content):
timestamp = datetime.datetime.now().isoformat()
if USE_MONGO:
try:
interactions_col.insert_one({
"session_id": session_id, "user_id": user_id,
"timestamp": timestamp, "role": role, "content": content
})
except Exception as e: print(f"DB Log Error (Mongo): {e}")
else:
try:
cursor.execute("INSERT INTO interactions VALUES (?, ?, ?, ?, ?)",
(session_id, user_id, timestamp, role, content))
conn.commit()
except Exception as e: print(f"DB Log Error (SQLite): {e}")
def log_summary(session_id, user_id, summary_text):
timestamp = datetime.datetime.now().isoformat()
if USE_MONGO:
try:
summaries_col.insert_one({
"session_id": session_id, "user_id": user_id,
"timestamp": timestamp, "summary": summary_text
})
except Exception as e: print(f"DB Summary Error (Mongo): {e}")
else:
try:
cursor.execute("INSERT INTO summaries VALUES (?, ?, ?, ?)",
(session_id, user_id, timestamp, summary_text))
conn.commit()
except Exception as e: print(f"DB Summary Error (SQLite): {e}")
def fetch_history(session_id):
if USE_MONGO:
try:
cursor_mongo = interactions_col.find({"session_id": session_id}).sort("timestamp", 1)
return [(doc["role"], doc["content"], doc["timestamp"]) for doc in cursor_mongo]
except Exception as e:
print(f"DB Fetch Error (Mongo): {e}")
return []
else:
try:
cursor.execute("SELECT role, content, timestamp FROM interactions WHERE session_id=? ORDER BY timestamp", (session_id,))
return cursor.fetchall()
except Exception as e:
print(f"DB Fetch Error (SQLite): {e}")
return []
# --- 3. LLM API 설정 ---
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
try:
with open("api_key.txt", "r") as f: api_key = f.read().strip()
except: pass
if not api_key: print("⚠️ GEMINI_API_KEY가 설정되지 않았습니다.")
genai.configure(api_key=api_key)
try:
with open("system_prompt.txt", "r", encoding="utf-8") as f: SYSTEM_PROMPT = f.read()
except: SYSTEM_PROMPT = "당신은 친절한 챗봇입니다."
# 모델명 재확인 (gemini-1.5-flash)
model = genai.GenerativeModel(model_name='gemini-2.5-flash', system_instruction=SYSTEM_PROMPT)
WELCOME_MESSAGE = (
"안녕하세요, 선생님. '이야기 비서'입니다.\n"
"오늘 자서전의 첫 챕터를 위해, **가장 먼저 떠오르는 이야기**를 들려주시겠어요?\n\n"
"1. **유년 시절**의 추억\n2. 인생 **최고의 음식**\n3. **첫 직장**의 기억\n4. **배우자**와의 첫 만남\n\n"
"위 주제 중 하나를 골라주시거나, 편하게 다른 말씀을 해주셔도 좋습니다."
)
# --- 4. 핵심 로직 ---
def format_history_for_llm(history):
llm_history = []
for msg in history:
if msg['role'] == 'user':
llm_history.append({"role": "user", "parts": [{"text": msg['content']}]})
elif msg['role'] == 'assistant':
llm_history.append({"role": "model", "parts": [{"text": msg['content']}]})
return llm_history
def chat(user_input, history, session_id, user_id):
if not user_input.strip(): return "", history
history.append({"role": "user", "content": user_input})
log_interaction(session_id, user_id, 'user', user_input)
yield "", history
llm_history = format_history_for_llm(history[:-1])
chat_session = model.start_chat(history=llm_history)
try:
response = chat_session.send_message(user_input, stream=True)
full_response = ""
history.append({"role": "assistant", "content": ""})
for chunk in response:
full_response += chunk.text
history[-1]['content'] = full_response
yield "", history
log_interaction(session_id, user_id, 'model', full_response)
except Exception as e:
history.append({"role": "assistant", "content": f"⚠️ 오류가 발생했습니다: {str(e)}"})
yield "", history
def change_topic(history, session_id, user_id):
if not history: return history
llm_history = format_history_for_llm(history)
chat_session = model.start_chat(history=llm_history)
summary_prompt = (
"지금까지 나눈 대화 내용을 '선생님은 ~ 경험을 하셨습니다'와 같은 문체로 "
"3문장 이내로 간략하게 요약해줘. 이 요약은 기록을 위한 것이야."
)
try:
response = chat_session.send_message(summary_prompt)
summary_text = response.text
log_summary(session_id, user_id, summary_text)
end_message = f"\n\n[기록 완료] 지금까지의 이야기는 다음과 같이 잘 기록해 두었습니다.\n\n📝 **요약:** {summary_text}\n\n자, 그럼 이제 다른 주제로 넘어가 볼까요? 어떤 이야기를 더 들려주시겠어요?"
history.append({"role": "assistant", "content": end_message})
return history
except Exception as e:
history.append({"role": "assistant", "content": f"⚠️ 요약 중 오류가 발생했습니다: {str(e)}"})
return history
def export_chat_txt(session_id):
data = fetch_history(session_id)
if not data: return None
text_content = f"자서전 인터뷰 기록 (Session: {session_id})\n저장 일시: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + "=" * 50 + "\n\n"
for role, content, timestamp in data:
speaker = "선생님" if role == 'user' else "이야기 비서"
text_content += f"[{timestamp.split('T')[1][:5]}] {speaker}:\n{content}\n\n" + "-" * 20 + "\n\n"
fd, path = tempfile.mkstemp(suffix=".txt", prefix=f"interview_{session_id[:8]}_")
with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(text_content)
return path
def export_chat_md(session_id):
data = fetch_history(session_id)
if not data: return None
md_content = f"# 📜 자서전 인터뷰 기록\n\n> **세션 ID:** `{session_id}`
**저장 일시:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n---\n\n"
for role, content, timestamp in data:
speaker = "🧑🏫 선생님" if role == 'user' else "🤖 이야기 비서"
md_content += f"### {speaker} ({timestamp.split('T')[1][:5]})\n\n{content}\n\n"
fd, path = tempfile.mkstemp(suffix=".md", prefix=f"interview_{session_id[:8]}_")
with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(md_content)
return path
# --- 5. UI 구성 ---
def start_chat(user_id_val):
if not user_id_val.strip():
return gr.update(visible=True), gr.update(visible=False), "⚠️ 테스터 ID를 입력해주세요!"
return gr.update(visible=False), gr.update(visible=True), ""
with gr.Blocks(theme=gr.themes.Soft()) as demo:
session_id = gr.State(lambda: str(uuid.uuid4()))
user_id = gr.State("")
with gr.Group(visible=True) as login_view:
gr.Markdown("# 👋 환영합니다! 이야기 비서 테스트를 시작합니다.")
gr.Markdown("원활한 기록을 위해 테스터 ID(닉네임)를 입력해주세요.")
with gr.Row():
login_id_input = gr.Textbox(label="테스터 ID", placeholder="예: tester_hong, 행복한하루", scale=3, autofocus=True)
login_btn = gr.Button("🚀 시작하기", variant="primary", scale=1)
login_msg = gr.Markdown("", visible=True)
with gr.Group(visible=False) as chat_view:
gr.Markdown("# 📝 이야기 비서 (프로토타입)")
# DB 상태 표시 (디버깅용)
db_status = "🟢 MongoDB 연결됨" if USE_MONGO else "🟠 SQLite 로컬 모드 (주의: 재시작 시 데이터 초기화 가능)"
gr.Markdown(f"ℹ️ 시스템 상태: {db_status}")
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
value=[{"role": "assistant", "content": WELCOME_MESSAGE}],
height=650,
label="대화창",
type="messages",
avatar_images=(None, "https://i.ibb.co/ZHrkBPm/ai-assistant.png")
)
with gr.Row():
msg_input = gr.Textbox(scale=4, show_label=False, placeholder="여기에 이야기를 입력하세요...", container=False)
submit_btn = gr.Button("전송", scale=1, variant="primary")
with gr.Row():
topic_change_btn = gr.Button("🔄 이 주제 마무리하고 다른 이야기 하기", variant="secondary", scale=1)
with gr.Column(scale=1):
gr.Markdown("### 🛠️ 테스터 정보")
user_id_display = gr.Textbox(label="현재 접속 ID", interactive=False)
gr.Markdown("### 💡 사용 가이드")
gr.Markdown("- **이야기 시작:** 봇의 질문에 편하게 답해주세요.\n- **주제 변경:** 채팅창 아래 **'🔄 이 주제 마무리...'** 버튼을 눌러주세요.")
gr.Markdown("### 💾 대화 내용 내보내기")
with gr.Row():
export_txt_btn = gr.Button("📄 TXT")
export_md_btn = gr.Button("📝 MD")
download_file = gr.File(label="다운로드", interactive=False, height=100)
login_btn.click(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then(
lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display]
)
login_id_input.submit(start_chat, inputs=[login_id_input], outputs=[login_view, chat_view, login_msg]).then(
lambda id_val: (id_val, id_val), inputs=[login_id_input], outputs=[user_id, user_id_display]
)
msg_input.submit(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot])
submit_btn.click(chat, [msg_input, chatbot, session_id, user_id], [msg_input, chatbot])
topic_change_btn.click(change_topic, [chatbot, session_id, user_id], [chatbot])
export_txt_btn.click(export_chat_txt, inputs=[session_id], outputs=[download_file])
export_md_btn.click(export_chat_md, inputs=[session_id], outputs=[download_file])
# --- 6. 앱 실행 ---
app_password = os.environ.get("APP_PASSWORD")
if __name__ == "__main__":
launch_kwargs = {
"server_name": "0.0.0.0",
"server_port": 7860,
"ssr_mode": False
}
if app_password:
launch_kwargs["auth"] = ("team", app_password)
print(f"🚀 Launching Gradio with settings: {launch_kwargs}")
demo.launch(**launch_kwargs)