import os import json import numpy as np from typing import List, Dict, Tuple, Optional from supabase import create_client, Client from langchain_core.documents import Document from sentence_transformers import SentenceTransformer import openai from config import Config class SupabaseVectorStore: """Supabase pgvector 기반 벡터 데이터베이스 클래스""" def __init__(self, embedding_model: str = None): self.embedding_model_name = embedding_model or Config.EMBEDDING_MODEL self.model = None self.supabase: Optional[Client] = None self.table_name = "documents" # Supabase 클라이언트 초기화 self._init_supabase() def _init_supabase(self): """Supabase 클라이언트 초기화""" try: if not Config.SUPABASE_URL or not Config.SUPABASE_KEY: raise ValueError("Supabase URL과 Key가 필요합니다.") self.supabase = create_client(Config.SUPABASE_URL, Config.SUPABASE_KEY) print("✅ Supabase 클라이언트 연결 성공") # 테이블이 없으면 생성 (필요시) self._create_table_if_not_exists() except Exception as e: print(f"❌ Supabase 연결 실패: {str(e)}") raise def _create_table_if_not_exists(self): """테이블 생성 (SQL 실행 필요시 관리자에서 직접 실행)""" # 아래 SQL은 Supabase SQL 에디터에서 직접 실행해야 함 create_table_sql = f""" -- Enable pgvector extension CREATE EXTENSION IF NOT EXISTS vector; -- Create documents table CREATE TABLE IF NOT EXISTS {self.table_name} ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, metadata JSONB, embedding vector(1536), -- OpenAI embedding 차원 source_file VARCHAR(255), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Create index for vector similarity search CREATE INDEX IF NOT EXISTS documents_embedding_idx ON {self.table_name} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); -- Create full-text search index CREATE INDEX IF NOT EXISTS documents_content_idx ON {self.table_name} USING gin(to_tsvector('korean', content)); """ print(f"📝 아래 SQL을 Supabase SQL 에디터에서 실행해주세요:") print(create_table_sql) def load_embedding_model(self, use_openai: bool = True): """임베딩 모델 로드""" if self.model is not None: return if use_openai and Config.OPENAI_API_KEY: print("📥 OpenAI 임베딩 모델 사용") self.model = "openai" else: print(f"📥 임베딩 모델 로드: {self.embedding_model_name}") try: self.model = SentenceTransformer(self.embedding_model_name) print("✅ 임베딩 모델 로드 완료") except Exception as e: print(f"❌ 임베딩 모델 로드 실패: {str(e)}") print("🔄 다국어 모델로 대체 시도...") self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') def create_embeddings(self, texts: List[str], use_openai: bool = True) -> np.ndarray: """텍스트 목록에 대한 임베딩 생성""" if self.model is None: self.load_embedding_model(use_openai) print(f"🔄 {len(texts)}개 텍스트 임베딩 생성 중...") if self.model == "openai" and Config.OPENAI_API_KEY: # OpenAI API 사용 client = openai.OpenAI(api_key=Config.OPENAI_API_KEY) embeddings = [] for i in range(0, len(texts), 100): # 배치 사이즈 100 batch_texts = texts[i:i+100] response = client.embeddings.create( model=Config.OPENAI_EMBEDDING_MODEL, input=batch_texts ) batch_embeddings = [item.embedding for item in response.data] embeddings.extend(batch_embeddings) return np.array(embeddings) else: # 로컬 모델 사용 return self.model.encode( texts, batch_size=32, show_progress_bar=True, convert_to_numpy=True, normalize_embeddings=True ) def add_documents(self, documents: List[Document], use_openai: bool = True) -> bool: """문서 추가""" if not documents: print("⚠️ 추가할 문서가 없습니다.") return False print(f"📝 {len(documents)}개 문서를 Supabase에 추가 중...") try: # 임베딩 생성 texts = [doc.page_content for doc in documents] embeddings = self.create_embeddings(texts, use_openai) embedding_dim = len(embeddings[0]) if len(embeddings) else 0 # Supabase 테이블 스키마가 vector(1536)으로 고정되어 있어 차원 불일치 시 실패 방지 if embedding_dim and embedding_dim != 1536: raise ValueError( f"임베딩 차원({embedding_dim})이 Supabase 테이블 vector(1536)와 다릅니다. " "OpenAI 임베딩을 사용하거나 테이블 스키마/임베딩 모델을 맞춰주세요." ) # 문서 데이터 준비 documents_data = [] for i, doc in enumerate(documents): doc_data = { 'content': doc.page_content, 'metadata': doc.metadata, 'embedding': embeddings[i].tolist() } documents_data.append(doc_data) # 배치로 데이터 삽입 batch_size = 100 for i in range(0, len(documents_data), batch_size): batch = documents_data[i:i+batch_size] result = self.supabase.table(self.table_name).insert(batch).execute() if not result.data: print(f"❌ 배치 삽입 실패 (배치 {i//batch_size + 1})") return False print(f"✅ {len(documents)}개 문서 추가 완료") return True except Exception as e: print(f"❌ 문서 추가 실패: {str(e)}") return False def search_similar(self, query: str, k: int = 5, use_openai: bool = True) -> List[Tuple[Document, float]]: """유사 문서 검색""" if self.supabase is None: print("⚠️ Supabase 클라이언트가 초기화되지 않았습니다.") return [] if self.model is None: self.load_embedding_model(use_openai) try: # 쿼리 임베딩 생성 if self.model == "openai" and Config.OPENAI_API_KEY: client = openai.OpenAI(api_key=Config.OPENAI_API_KEY) response = client.embeddings.create( model=Config.OPENAI_EMBEDDING_MODEL, input=[query] ) query_embedding = response.data[0].embedding else: query_embedding = self.model.encode([query], normalize_embeddings=True)[0] query_embedding = query_embedding.tolist() # 유사도 검색 SQL match_threshold = 0.5 match_count = k search_sql = f""" SELECT content, metadata, source_file, 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') as similarity FROM {self.table_name} WHERE 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') > {match_threshold} ORDER BY similarity DESC LIMIT {match_count} """ # Supabase RPC 호출 result = self.supabase.rpc('search_similar_documents', { 'query_embedding': query_embedding, 'match_threshold': match_threshold, 'match_count': match_count }).execute() if not result.data: # RPC가 없으면 직접 SQL 실행 (권한 필요) result = self.supabase.table(self.table_name).select( "content, metadata, source_file" ).execute() # 클라이언트 측에서 유사도 계산 if result.data: similarities = [] for row in result.data: # 저장된 임베딩이 없으면 스킵 if not row.get('embedding'): continue similarity = self._cosine_similarity(query_embedding, row['embedding']) if similarity > match_threshold: similarities.append((row, similarity)) # 유사도로 정렬 similarities.sort(key=lambda x: x[1], reverse=True) result.data = [item[0] for item in similarities[:k]] # 결과 변환 results = [] for row in result.data[:k]: doc = Document( page_content=row['content'], metadata=row.get('metadata', {}), id=row.get('id') ) similarity = row.get('similarity', 1.0) # 기본값 1.0 results.append((doc, float(similarity))) return results except Exception as e: print(f"❌ 검색 실패: {str(e)}") return [] def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: """코사인 유사도 계산""" vec1 = np.array(vec1) vec2 = np.array(vec2) dot_product = np.dot(vec1, vec2) norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def delete_all_documents(self) -> bool: """모든 문서 삭제""" try: result = self.supabase.table(self.table_name).delete().execute() print("✅ 모든 문서 삭제 완료") return True except Exception as e: print(f"❌ 문서 삭제 실패: {str(e)}") return False def get_stats(self) -> Dict: """벡터 데이터베이스 통계 정보""" try: result = self.supabase.table(self.table_name).select("count", count="exact").execute() total_docs = result.count if hasattr(result, 'count') else 0 return { "total_documents": total_docs, "embedding_model": self.embedding_model_name, "database_type": "supabase", "table_name": self.table_name } except Exception as e: print(f"❌ 통계 정보 조회 실패: {str(e)}") return {"status": "error", "message": str(e)} def rebuild_index(self, documents: List[Document], force_rebuild: bool = False, use_openai: bool = True) -> bool: """인덱스 재구축""" if force_rebuild: print("🔄 기존 데이터 삭제 후 재구축...") self.delete_all_documents() return self.add_documents(documents, use_openai) # 테스트용 함수 def test_supabase_vector_store(): """Supabase 벡터 데이터베이스 테스트""" from document_processor import DocumentProcessor # 문서 처리 processor = DocumentProcessor() documents = processor.load_documents_from_folder("documents") if not documents: print("⚠️ 테스트할 문서가 없습니다.") return # 벡터 데이터베이스 생성 vector_store = SupabaseVectorStore() # 문서 추가 success = vector_store.add_documents(documents[:5]) # 테스트용으로 5개만 if not success: print("❌ 문서 추가 실패") return # 검색 테스트 test_queries = [ "연차휴가 사용 방법", "근무시간은 어떻게 되나요?", "당직근무 절차" ] for query in test_queries: print(f"\n🔍 검색: {query}") results = vector_store.search_similar(query, k=3) for i, (doc, similarity) in enumerate(results): print(f" {i+1}. 유사도: {similarity:.4f}") print(f" 내용: {doc.page_content[:100]}...") if __name__ == "__main__": test_supabase_vector_store()