Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| import hashlib | |
| import numpy as np | |
| from typing import List, Dict, Tuple | |
| from pathlib import Path | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from langchain_core.documents import Document | |
| from config import Config | |
| class VectorStore: | |
| """FAISS 기반 벡터 데이터베이스 클래스""" | |
| def __init__(self, embedding_model: str = None, cache_dir: str = None): | |
| self.embedding_model_name = embedding_model or Config.EMBEDDING_MODEL | |
| self.cache_dir = cache_dir or Config.VECTOR_DB_PATH | |
| self.model = None | |
| self.index = None | |
| self.documents = [] | |
| self.doc_ids = [] | |
| self.documents_hash = None | |
| # 캐시 디렉토리 생성 | |
| Path(self.cache_dir).mkdir(parents=True, exist_ok=True) | |
| def load_embedding_model(self): | |
| """임베딩 모델 로드""" | |
| if self.model is None: | |
| print(f"📥 임베딩 모델 로드: {self.embedding_model_name}") | |
| try: | |
| self.model = SentenceTransformer(self.embedding_model_name) | |
| print("✅ 임베딩 모델 로드 완료") | |
| except Exception as e: | |
| print(f"❌ 임베딩 모델 로드 실패: {str(e)}") | |
| print("🔄 다국어 모델로 대체 시도...") | |
| self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') | |
| def create_embeddings(self, texts: List[str]) -> np.ndarray: | |
| """텍스트 목록에 대한 임베딩 생성""" | |
| if self.model is None: | |
| self.load_embedding_model() | |
| print(f"🔄 {len(texts)}개 텍스트 임베딩 생성 중...") | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=32, | |
| show_progress_bar=True, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| return embeddings | |
| def build_vector_index(self, documents: List[Document]) -> bool: | |
| """문서 목록으로부터 벡터 인덱스 구축""" | |
| if not documents: | |
| print("⚠️ 처리할 문서가 없습니다.") | |
| return False | |
| print(f"🏗️ {len(documents)}개 문서로 벡터 인덱스 구축 시작...") | |
| # 문서 저장 | |
| self.documents = documents | |
| self.documents_hash = self._compute_documents_hash(documents) | |
| # 텍스트 추출 | |
| texts = [doc.page_content for doc in documents] | |
| # 임베딩 생성 | |
| embeddings = self.create_embeddings(texts) | |
| # FAISS 인덱스 생성 | |
| dimension = embeddings.shape[1] | |
| self.index = faiss.IndexFlatIP(dimension) # 내적 기반 유사도 검색 | |
| # 임베딩 추가 | |
| self.index.add(embeddings.astype('float32')) | |
| # 문서 ID 생성 | |
| self.doc_ids = list(range(len(documents))) | |
| print(f"✅ 벡터 인덱스 구축 완료 (차원: {dimension}, 문서: {len(documents)})") | |
| # 인덱스 저장 | |
| self.save_index() | |
| return True | |
| def search_similar(self, query: str, k: int = 5) -> List[Tuple[Document, float]]: | |
| """유사 문서 검색""" | |
| if self.index is None: | |
| print("⚠️ 벡터 인덱스가 생성되지 않았습니다.") | |
| return [] | |
| if self.model is None: | |
| self.load_embedding_model() | |
| # 쿼리 임베딩 생성 | |
| query_embedding = self.model.encode([query], normalize_embeddings=True) | |
| query_embedding = query_embedding.astype('float32') | |
| # 검색 | |
| k = min(k, len(self.documents)) | |
| similarities, indices = self.index.search(query_embedding, k) | |
| # 결과 변환 | |
| results = [] | |
| for i in range(k): | |
| idx = indices[0][i] | |
| similarity = similarities[0][i] | |
| if 0 <= idx < len(self.documents): | |
| doc = self.documents[idx] | |
| results.append((doc, float(similarity))) | |
| return results | |
| def save_index(self): | |
| """벡터 인덱스 및 문서 저장""" | |
| if self.index is None: | |
| return | |
| try: | |
| # FAISS 인덱스 저장 | |
| index_path = os.path.join(self.cache_dir, "faiss_index.bin") | |
| faiss.write_index(self.index, index_path) | |
| # 문서 및 메타데이터 저장 | |
| metadata_path = os.path.join(self.cache_dir, "metadata.pkl") | |
| metadata = { | |
| 'documents': self.documents, | |
| 'doc_ids': self.doc_ids, | |
| 'embedding_model': self.embedding_model_name, | |
| 'total_documents': len(self.documents), | |
| 'documents_hash': self.documents_hash | |
| } | |
| with open(metadata_path, 'wb') as f: | |
| pickle.dump(metadata, f) | |
| print(f"💾 벡터 인덱스 저장 완료: {self.cache_dir}") | |
| except Exception as e: | |
| print(f"❌ 인덱스 저장 실패: {str(e)}") | |
| def load_index(self) -> bool: | |
| """저장된 벡터 인덱스 로드""" | |
| try: | |
| index_path = os.path.join(self.cache_dir, "faiss_index.bin") | |
| metadata_path = os.path.join(self.cache_dir, "metadata.pkl") | |
| if not os.path.exists(index_path) or not os.path.exists(metadata_path): | |
| return False | |
| # FAISS 인덱스 로드 | |
| self.index = faiss.read_index(index_path) | |
| # 메타데이터 로드 | |
| with open(metadata_path, 'rb') as f: | |
| metadata = pickle.load(f) | |
| self.documents = metadata['documents'] | |
| self.doc_ids = metadata['doc_ids'] | |
| self.embedding_model_name = metadata.get('embedding_model', Config.EMBEDDING_MODEL) | |
| self.documents_hash = metadata.get('documents_hash') | |
| # 임베딩 모델 로드 | |
| self.load_embedding_model() | |
| print(f"📖 벡터 인덱스 로드 완료 (문서: {len(self.documents)}개)") | |
| return True | |
| except Exception as e: | |
| print(f"❌ 인덱스 로드 실패: {str(e)}") | |
| return False | |
| def get_stats(self) -> Dict: | |
| """벡터 데이터베이스 통계 정보""" | |
| if self.index is None: | |
| return {"status": "no_index"} | |
| return { | |
| "total_documents": len(self.documents), | |
| "embedding_model": self.embedding_model_name, | |
| "index_dimension": self.index.d, | |
| "cache_directory": self.cache_dir, | |
| "is_trained": self.index.is_trained if hasattr(self.index, 'is_trained') else True | |
| } | |
| def rebuild_if_needed(self, documents: List[Document], force_rebuild: bool = False) -> bool: | |
| """필요시 인덱스 재구축""" | |
| new_hash = self._compute_documents_hash(documents) | |
| # 기존 인덱스가 있고 강제 재구축이 없는 경우 | |
| if not force_rebuild and self.load_index(): | |
| if self.documents_hash and self.documents_hash == new_hash: | |
| print("📦 기존 인덱스 재사용 (문서 해시 일치)") | |
| return True | |
| else: | |
| print("🔄 문서 변경을 감지하여 인덱스를 재구축합니다.") | |
| print("🔄 벡터 인덱스 재구축") | |
| return self.build_vector_index(documents) | |
| def add_documents(self, new_documents: List[Document]) -> bool: | |
| """새 문서 추가 (동적 업데이트)""" | |
| if not new_documents: | |
| return False | |
| # 임베딩 생성 | |
| new_texts = [doc.page_content for doc in new_documents] | |
| new_embeddings = self.create_embeddings(new_texts) | |
| if self.index is None: | |
| # 인덱스가 없으면 새로 생성 | |
| return self.build_vector_index(new_documents) | |
| # 기존 인덱스에 추가 | |
| self.index.add(new_embeddings.astype('float32')) | |
| # 문서 목록 업데이트 | |
| start_id = len(self.documents) | |
| self.documents.extend(new_documents) | |
| self.doc_ids.extend(range(start_id, start_id + len(new_documents))) | |
| self.documents_hash = self._compute_documents_hash(self.documents) | |
| print(f"➕ {len(new_documents)}개 문서 추가 완료") | |
| # 저장 | |
| self.save_index() | |
| return True | |
| def delete_document(self, doc_id: int) -> bool: | |
| """문서 삭제 (실제로는 인덱스 재구축 필요)""" | |
| if doc_id < 0 or doc_id >= len(self.documents): | |
| return False | |
| # 해당 문서 제외하고 재구축 | |
| remaining_docs = [doc for i, doc in enumerate(self.documents) if i != doc_id] | |
| return self.build_vector_index(remaining_docs) | |
| def _compute_documents_hash(self, documents: List[Document]) -> str: | |
| """문서 내용과 메타데이터 기반 해시 생성 (내용 변경 감지)""" | |
| hasher = hashlib.md5() | |
| for doc in documents: | |
| hasher.update(doc.page_content.encode("utf-8", errors="ignore")) | |
| hasher.update(str(doc.metadata).encode("utf-8", errors="ignore")) | |
| return hasher.hexdigest() | |
| # 테스트용 함수 | |
| def test_vector_store(): | |
| """벡터 데이터베이스 테스트""" | |
| from document_processor import DocumentProcessor | |
| # 문서 처리 | |
| processor = DocumentProcessor() | |
| documents = processor.load_documents_from_folder("documents") | |
| if not documents: | |
| print("⚠️ 테스트할 문서가 없습니다.") | |
| return | |
| # 벡터 데이터베이스 생성 | |
| vector_store = VectorStore() | |
| vector_store.build_vector_index(documents) | |
| # 검색 테스트 | |
| test_queries = [ | |
| "연차휴가 사용 방법", | |
| "근무시간은 어떻게 되나요?", | |
| "당직근무 절차" | |
| ] | |
| for query in test_queries: | |
| print(f"\n🔍 검색: {query}") | |
| results = vector_store.search_similar(query, k=3) | |
| for i, (doc, similarity) in enumerate(results): | |
| print(f" {i+1}. 유사도: {similarity:.4f}") | |
| print(f" 내용: {doc.page_content[:100]}...") | |
| if __name__ == "__main__": | |
| test_vector_store() | |