#!/usr/bin/env python3 """ 데이터베이스 관리 모듈 SQLite를 사용한 채팅 히스토리 및 사용자 데이터 관리 """ import sqlite3 import logging import json from datetime import datetime from typing import List, Dict, Any, Optional from pathlib import Path logger = logging.getLogger(__name__) class DatabaseManager: """SQLite 데이터베이스 관리 클래스""" def __init__(self, db_path: str = "./lily_llm.db"): self.db_path = Path(db_path) self.db_path.parent.mkdir(exist_ok=True) self.init_database() def init_database(self): """데이터베이스 초기화 및 테이블 생성""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 사용자 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, username TEXT, email TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMP, is_active BOOLEAN DEFAULT TRUE ) """) # 채팅 세션 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS chat_sessions ( session_id TEXT PRIMARY KEY, user_id TEXT, session_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_activity TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 채팅 메시지 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS chat_messages ( message_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, user_id TEXT, message_type TEXT CHECK(message_type IN ('user', 'assistant', 'system')), content TEXT, metadata TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (session_id) REFERENCES chat_sessions (session_id), FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 문서 정보 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS documents ( document_id TEXT PRIMARY KEY, user_id TEXT, filename TEXT, file_type TEXT, file_size INTEGER, upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_accessed TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, metadata TEXT, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # RAG 쿼리 히스토리 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS rag_queries ( query_id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, document_id TEXT, query_text TEXT, response_text TEXT, processing_time REAL, search_results INTEGER, query_type TEXT CHECK(query_type IN ('text', 'hybrid')), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (user_id), FOREIGN KEY (document_id) REFERENCES documents (document_id) ) """) conn.commit() logger.info("✅ 데이터베이스 초기화 완료") except Exception as e: logger.error(f"❌ 데이터베이스 초기화 실패: {e}") raise def add_user(self, user_id: str, username: str = None, email: str = None) -> bool: """사용자 추가""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO users (user_id, username, email, last_login) VALUES (?, ?, ?, CURRENT_TIMESTAMP) """, (user_id, username, email)) conn.commit() logger.info(f"✅ 사용자 추가 완료: {user_id}") return True except Exception as e: logger.error(f"❌ 사용자 추가 실패: {e}") return False def get_user(self, user_id: str) -> Optional[Dict[str, Any]]: """사용자 정보 조회""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE user_id = ?", (user_id,)) row = cursor.fetchone() if row: columns = [description[0] for description in cursor.description] return dict(zip(columns, row)) return None except Exception as e: logger.error(f"❌ 사용자 조회 실패: {e}") return None def create_chat_session(self, user_id: str, session_name: str = None) -> Optional[str]: """채팅 세션 생성""" try: session_id = f"session_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO chat_sessions (session_id, user_id, session_name, last_activity) VALUES (?, ?, ?, CURRENT_TIMESTAMP) """, (session_id, user_id, session_name)) conn.commit() logger.info(f"✅ 채팅 세션 생성 완료: {session_id}") return session_id except Exception as e: logger.error(f"❌ 채팅 세션 생성 실패: {e}") return None def add_chat_message(self, session_id: str, user_id: str, message_type: str, content: str, metadata: Dict[str, Any] = None) -> bool: """채팅 메시지 추가""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # 메시지 추가 cursor.execute(""" INSERT INTO chat_messages (session_id, user_id, message_type, content, metadata) VALUES (?, ?, ?, ?, ?) """, (session_id, user_id, message_type, content, json.dumps(metadata) if metadata else None)) # 세션 활동 시간 업데이트 cursor.execute(""" UPDATE chat_sessions SET last_activity = CURRENT_TIMESTAMP WHERE session_id = ? """, (session_id,)) conn.commit() logger.info(f"✅ 메시지 추가 완료: {message_type}") return True except Exception as e: logger.error(f"❌ 메시지 추가 실패: {e}") return False def get_chat_history(self, session_id: str, limit: int = 50) -> List[Dict[str, Any]]: """채팅 히스토리 조회""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM chat_messages WHERE session_id = ? ORDER BY timestamp DESC LIMIT ? """, (session_id, limit)) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] messages = [] for row in reversed(rows): # 시간순으로 정렬 message = dict(zip(columns, row)) if message['metadata']: message['metadata'] = json.loads(message['metadata']) messages.append(message) return messages except Exception as e: logger.error(f"❌ 채팅 히스토리 조회 실패: {e}") return [] def add_document_record(self, document_id: str, user_id: str, filename: str, file_type: str, file_size: int, metadata: Dict[str, Any] = None) -> bool: """문서 정보 추가""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO documents (document_id, user_id, filename, file_type, file_size, metadata) VALUES (?, ?, ?, ?, ?, ?) """, (document_id, user_id, filename, file_type, file_size, json.dumps(metadata) if metadata else None)) conn.commit() logger.info(f"✅ 문서 정보 추가 완료: {document_id}") return True except Exception as e: logger.error(f"❌ 문서 정보 추가 실패: {e}") return False def add_rag_query(self, user_id: str, document_id: str, query_text: str, response_text: str, processing_time: float, search_results: int, query_type: str = 'text') -> bool: """RAG 쿼리 히스토리 추가""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO rag_queries (user_id, document_id, query_text, response_text, processing_time, search_results, query_type) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user_id, document_id, query_text, response_text, processing_time, search_results, query_type)) conn.commit() logger.info(f"✅ RAG 쿼리 히스토리 추가 완료") return True except Exception as e: logger.error(f"❌ RAG 쿼리 히스토리 추가 실패: {e}") return False def get_user_documents(self, user_id: str) -> List[Dict[str, Any]]: """사용자의 문서 목록 조회""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM documents WHERE user_id = ? AND is_active = TRUE ORDER BY upload_date DESC """, (user_id,)) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] documents = [] for row in rows: doc = dict(zip(columns, row)) if doc['metadata']: doc['metadata'] = json.loads(doc['metadata']) documents.append(doc) return documents except Exception as e: logger.error(f"❌ 사용자 문서 목록 조회 실패: {e}") return [] def get_user_sessions(self, user_id: str) -> List[Dict[str, Any]]: """사용자의 채팅 세션 목록 조회""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM chat_sessions WHERE user_id = ? AND is_active = TRUE ORDER BY last_activity DESC """, (user_id,)) rows = cursor.fetchall() columns = [description[0] for description in cursor.description] sessions = [] for row in rows: sessions.append(dict(zip(columns, row))) return sessions except Exception as e: logger.error(f"❌ 사용자 세션 목록 조회 실패: {e}") return [] # 전역 인스턴스 db_manager = DatabaseManager()