119_ChatBot / vector_store.py
김무영
Improve defaults, security, and index rebuild
cb2a4bd
import os
import pickle
import hashlib
import numpy as np
from typing import List, Dict, Tuple
from pathlib import Path
from sentence_transformers import SentenceTransformer
import faiss
from langchain_core.documents import Document
from config import Config
class VectorStore:
"""FAISS 기반 벡터 데이터베이스 클래스"""
def __init__(self, embedding_model: str = None, cache_dir: str = None):
self.embedding_model_name = embedding_model or Config.EMBEDDING_MODEL
self.cache_dir = cache_dir or Config.VECTOR_DB_PATH
self.model = None
self.index = None
self.documents = []
self.doc_ids = []
self.documents_hash = None
# 캐시 디렉토리 생성
Path(self.cache_dir).mkdir(parents=True, exist_ok=True)
def load_embedding_model(self):
"""임베딩 모델 로드"""
if self.model is None:
print(f"📥 임베딩 모델 로드: {self.embedding_model_name}")
try:
self.model = SentenceTransformer(self.embedding_model_name)
print("✅ 임베딩 모델 로드 완료")
except Exception as e:
print(f"❌ 임베딩 모델 로드 실패: {str(e)}")
print("🔄 다국어 모델로 대체 시도...")
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def create_embeddings(self, texts: List[str]) -> np.ndarray:
"""텍스트 목록에 대한 임베딩 생성"""
if self.model is None:
self.load_embedding_model()
print(f"🔄 {len(texts)}개 텍스트 임베딩 생성 중...")
embeddings = self.model.encode(
texts,
batch_size=32,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
return embeddings
def build_vector_index(self, documents: List[Document]) -> bool:
"""문서 목록으로부터 벡터 인덱스 구축"""
if not documents:
print("⚠️ 처리할 문서가 없습니다.")
return False
print(f"🏗️ {len(documents)}개 문서로 벡터 인덱스 구축 시작...")
# 문서 저장
self.documents = documents
self.documents_hash = self._compute_documents_hash(documents)
# 텍스트 추출
texts = [doc.page_content for doc in documents]
# 임베딩 생성
embeddings = self.create_embeddings(texts)
# FAISS 인덱스 생성
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension) # 내적 기반 유사도 검색
# 임베딩 추가
self.index.add(embeddings.astype('float32'))
# 문서 ID 생성
self.doc_ids = list(range(len(documents)))
print(f"✅ 벡터 인덱스 구축 완료 (차원: {dimension}, 문서: {len(documents)})")
# 인덱스 저장
self.save_index()
return True
def search_similar(self, query: str, k: int = 5) -> List[Tuple[Document, float]]:
"""유사 문서 검색"""
if self.index is None:
print("⚠️ 벡터 인덱스가 생성되지 않았습니다.")
return []
if self.model is None:
self.load_embedding_model()
# 쿼리 임베딩 생성
query_embedding = self.model.encode([query], normalize_embeddings=True)
query_embedding = query_embedding.astype('float32')
# 검색
k = min(k, len(self.documents))
similarities, indices = self.index.search(query_embedding, k)
# 결과 변환
results = []
for i in range(k):
idx = indices[0][i]
similarity = similarities[0][i]
if 0 <= idx < len(self.documents):
doc = self.documents[idx]
results.append((doc, float(similarity)))
return results
def save_index(self):
"""벡터 인덱스 및 문서 저장"""
if self.index is None:
return
try:
# FAISS 인덱스 저장
index_path = os.path.join(self.cache_dir, "faiss_index.bin")
faiss.write_index(self.index, index_path)
# 문서 및 메타데이터 저장
metadata_path = os.path.join(self.cache_dir, "metadata.pkl")
metadata = {
'documents': self.documents,
'doc_ids': self.doc_ids,
'embedding_model': self.embedding_model_name,
'total_documents': len(self.documents),
'documents_hash': self.documents_hash
}
with open(metadata_path, 'wb') as f:
pickle.dump(metadata, f)
print(f"💾 벡터 인덱스 저장 완료: {self.cache_dir}")
except Exception as e:
print(f"❌ 인덱스 저장 실패: {str(e)}")
def load_index(self) -> bool:
"""저장된 벡터 인덱스 로드"""
try:
index_path = os.path.join(self.cache_dir, "faiss_index.bin")
metadata_path = os.path.join(self.cache_dir, "metadata.pkl")
if not os.path.exists(index_path) or not os.path.exists(metadata_path):
return False
# FAISS 인덱스 로드
self.index = faiss.read_index(index_path)
# 메타데이터 로드
with open(metadata_path, 'rb') as f:
metadata = pickle.load(f)
self.documents = metadata['documents']
self.doc_ids = metadata['doc_ids']
self.embedding_model_name = metadata.get('embedding_model', Config.EMBEDDING_MODEL)
self.documents_hash = metadata.get('documents_hash')
# 임베딩 모델 로드
self.load_embedding_model()
print(f"📖 벡터 인덱스 로드 완료 (문서: {len(self.documents)}개)")
return True
except Exception as e:
print(f"❌ 인덱스 로드 실패: {str(e)}")
return False
def get_stats(self) -> Dict:
"""벡터 데이터베이스 통계 정보"""
if self.index is None:
return {"status": "no_index"}
return {
"total_documents": len(self.documents),
"embedding_model": self.embedding_model_name,
"index_dimension": self.index.d,
"cache_directory": self.cache_dir,
"is_trained": self.index.is_trained if hasattr(self.index, 'is_trained') else True
}
def rebuild_if_needed(self, documents: List[Document], force_rebuild: bool = False) -> bool:
"""필요시 인덱스 재구축"""
new_hash = self._compute_documents_hash(documents)
# 기존 인덱스가 있고 강제 재구축이 없는 경우
if not force_rebuild and self.load_index():
if self.documents_hash and self.documents_hash == new_hash:
print("📦 기존 인덱스 재사용 (문서 해시 일치)")
return True
else:
print("🔄 문서 변경을 감지하여 인덱스를 재구축합니다.")
print("🔄 벡터 인덱스 재구축")
return self.build_vector_index(documents)
def add_documents(self, new_documents: List[Document]) -> bool:
"""새 문서 추가 (동적 업데이트)"""
if not new_documents:
return False
# 임베딩 생성
new_texts = [doc.page_content for doc in new_documents]
new_embeddings = self.create_embeddings(new_texts)
if self.index is None:
# 인덱스가 없으면 새로 생성
return self.build_vector_index(new_documents)
# 기존 인덱스에 추가
self.index.add(new_embeddings.astype('float32'))
# 문서 목록 업데이트
start_id = len(self.documents)
self.documents.extend(new_documents)
self.doc_ids.extend(range(start_id, start_id + len(new_documents)))
self.documents_hash = self._compute_documents_hash(self.documents)
print(f"➕ {len(new_documents)}개 문서 추가 완료")
# 저장
self.save_index()
return True
def delete_document(self, doc_id: int) -> bool:
"""문서 삭제 (실제로는 인덱스 재구축 필요)"""
if doc_id < 0 or doc_id >= len(self.documents):
return False
# 해당 문서 제외하고 재구축
remaining_docs = [doc for i, doc in enumerate(self.documents) if i != doc_id]
return self.build_vector_index(remaining_docs)
def _compute_documents_hash(self, documents: List[Document]) -> str:
"""문서 내용과 메타데이터 기반 해시 생성 (내용 변경 감지)"""
hasher = hashlib.md5()
for doc in documents:
hasher.update(doc.page_content.encode("utf-8", errors="ignore"))
hasher.update(str(doc.metadata).encode("utf-8", errors="ignore"))
return hasher.hexdigest()
# 테스트용 함수
def test_vector_store():
"""벡터 데이터베이스 테스트"""
from document_processor import DocumentProcessor
# 문서 처리
processor = DocumentProcessor()
documents = processor.load_documents_from_folder("documents")
if not documents:
print("⚠️ 테스트할 문서가 없습니다.")
return
# 벡터 데이터베이스 생성
vector_store = VectorStore()
vector_store.build_vector_index(documents)
# 검색 테스트
test_queries = [
"연차휴가 사용 방법",
"근무시간은 어떻게 되나요?",
"당직근무 절차"
]
for query in test_queries:
print(f"\n🔍 검색: {query}")
results = vector_store.search_similar(query, k=3)
for i, (doc, similarity) in enumerate(results):
print(f" {i+1}. 유사도: {similarity:.4f}")
print(f" 내용: {doc.page_content[:100]}...")
if __name__ == "__main__":
test_vector_store()