Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import numpy as np | |
| from typing import List, Dict, Tuple, Optional | |
| from supabase import create_client, Client | |
| from langchain_core.documents import Document | |
| from sentence_transformers import SentenceTransformer | |
| import openai | |
| from config import Config | |
| class SupabaseVectorStore: | |
| """Supabase pgvector 기반 벡터 데이터베이스 클래스""" | |
| def __init__(self, embedding_model: str = None): | |
| self.embedding_model_name = embedding_model or Config.EMBEDDING_MODEL | |
| self.model = None | |
| self.supabase: Optional[Client] = None | |
| self.table_name = "documents" | |
| # Supabase 클라이언트 초기화 | |
| self._init_supabase() | |
| def _init_supabase(self): | |
| """Supabase 클라이언트 초기화""" | |
| try: | |
| if not Config.SUPABASE_URL or not Config.SUPABASE_KEY: | |
| raise ValueError("Supabase URL과 Key가 필요합니다.") | |
| self.supabase = create_client(Config.SUPABASE_URL, Config.SUPABASE_KEY) | |
| print("✅ Supabase 클라이언트 연결 성공") | |
| # 테이블이 없으면 생성 (필요시) | |
| self._create_table_if_not_exists() | |
| except Exception as e: | |
| print(f"❌ Supabase 연결 실패: {str(e)}") | |
| raise | |
| def _create_table_if_not_exists(self): | |
| """테이블 생성 (SQL 실행 필요시 관리자에서 직접 실행)""" | |
| # 아래 SQL은 Supabase SQL 에디터에서 직접 실행해야 함 | |
| create_table_sql = f""" | |
| -- Enable pgvector extension | |
| CREATE EXTENSION IF NOT EXISTS vector; | |
| -- Create documents table | |
| CREATE TABLE IF NOT EXISTS {self.table_name} ( | |
| id SERIAL PRIMARY KEY, | |
| content TEXT NOT NULL, | |
| metadata JSONB, | |
| embedding vector(1536), -- OpenAI embedding 차원 | |
| source_file VARCHAR(255), | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), | |
| updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| -- Create index for vector similarity search | |
| CREATE INDEX IF NOT EXISTS documents_embedding_idx ON {self.table_name} | |
| USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); | |
| -- Create full-text search index | |
| CREATE INDEX IF NOT EXISTS documents_content_idx ON {self.table_name} | |
| USING gin(to_tsvector('korean', content)); | |
| """ | |
| print(f"📝 아래 SQL을 Supabase SQL 에디터에서 실행해주세요:") | |
| print(create_table_sql) | |
| def load_embedding_model(self, use_openai: bool = True): | |
| """임베딩 모델 로드""" | |
| if self.model is not None: | |
| return | |
| if use_openai and Config.OPENAI_API_KEY: | |
| print("📥 OpenAI 임베딩 모델 사용") | |
| self.model = "openai" | |
| else: | |
| print(f"📥 임베딩 모델 로드: {self.embedding_model_name}") | |
| try: | |
| self.model = SentenceTransformer(self.embedding_model_name) | |
| print("✅ 임베딩 모델 로드 완료") | |
| except Exception as e: | |
| print(f"❌ 임베딩 모델 로드 실패: {str(e)}") | |
| print("🔄 다국어 모델로 대체 시도...") | |
| self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') | |
| def create_embeddings(self, texts: List[str], use_openai: bool = True) -> np.ndarray: | |
| """텍스트 목록에 대한 임베딩 생성""" | |
| if self.model is None: | |
| self.load_embedding_model(use_openai) | |
| print(f"🔄 {len(texts)}개 텍스트 임베딩 생성 중...") | |
| if self.model == "openai" and Config.OPENAI_API_KEY: | |
| # OpenAI API 사용 | |
| client = openai.OpenAI(api_key=Config.OPENAI_API_KEY) | |
| embeddings = [] | |
| for i in range(0, len(texts), 100): # 배치 사이즈 100 | |
| batch_texts = texts[i:i+100] | |
| response = client.embeddings.create( | |
| model=Config.OPENAI_EMBEDDING_MODEL, | |
| input=batch_texts | |
| ) | |
| batch_embeddings = [item.embedding for item in response.data] | |
| embeddings.extend(batch_embeddings) | |
| return np.array(embeddings) | |
| else: | |
| # 로컬 모델 사용 | |
| return self.model.encode( | |
| texts, | |
| batch_size=32, | |
| show_progress_bar=True, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| def add_documents(self, documents: List[Document], use_openai: bool = True) -> bool: | |
| """문서 추가""" | |
| if not documents: | |
| print("⚠️ 추가할 문서가 없습니다.") | |
| return False | |
| print(f"📝 {len(documents)}개 문서를 Supabase에 추가 중...") | |
| try: | |
| # 임베딩 생성 | |
| texts = [doc.page_content for doc in documents] | |
| embeddings = self.create_embeddings(texts, use_openai) | |
| embedding_dim = len(embeddings[0]) if len(embeddings) else 0 | |
| # Supabase 테이블 스키마가 vector(1536)으로 고정되어 있어 차원 불일치 시 실패 방지 | |
| if embedding_dim and embedding_dim != 1536: | |
| raise ValueError( | |
| f"임베딩 차원({embedding_dim})이 Supabase 테이블 vector(1536)와 다릅니다. " | |
| "OpenAI 임베딩을 사용하거나 테이블 스키마/임베딩 모델을 맞춰주세요." | |
| ) | |
| # 문서 데이터 준비 | |
| documents_data = [] | |
| for i, doc in enumerate(documents): | |
| doc_data = { | |
| 'content': doc.page_content, | |
| 'metadata': doc.metadata, | |
| 'embedding': embeddings[i].tolist() | |
| } | |
| documents_data.append(doc_data) | |
| # 배치로 데이터 삽입 | |
| batch_size = 100 | |
| for i in range(0, len(documents_data), batch_size): | |
| batch = documents_data[i:i+batch_size] | |
| result = self.supabase.table(self.table_name).insert(batch).execute() | |
| if not result.data: | |
| print(f"❌ 배치 삽입 실패 (배치 {i//batch_size + 1})") | |
| return False | |
| print(f"✅ {len(documents)}개 문서 추가 완료") | |
| return True | |
| except Exception as e: | |
| print(f"❌ 문서 추가 실패: {str(e)}") | |
| return False | |
| def search_similar(self, query: str, k: int = 5, use_openai: bool = True) -> List[Tuple[Document, float]]: | |
| """유사 문서 검색""" | |
| if self.supabase is None: | |
| print("⚠️ Supabase 클라이언트가 초기화되지 않았습니다.") | |
| return [] | |
| if self.model is None: | |
| self.load_embedding_model(use_openai) | |
| try: | |
| # 쿼리 임베딩 생성 | |
| if self.model == "openai" and Config.OPENAI_API_KEY: | |
| client = openai.OpenAI(api_key=Config.OPENAI_API_KEY) | |
| response = client.embeddings.create( | |
| model=Config.OPENAI_EMBEDDING_MODEL, | |
| input=[query] | |
| ) | |
| query_embedding = response.data[0].embedding | |
| else: | |
| query_embedding = self.model.encode([query], normalize_embeddings=True)[0] | |
| query_embedding = query_embedding.tolist() | |
| # 유사도 검색 SQL | |
| match_threshold = 0.5 | |
| match_count = k | |
| search_sql = f""" | |
| SELECT content, metadata, source_file, 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') as similarity | |
| FROM {self.table_name} | |
| WHERE 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') > {match_threshold} | |
| ORDER BY similarity DESC | |
| LIMIT {match_count} | |
| """ | |
| # Supabase RPC 호출 | |
| result = self.supabase.rpc('search_similar_documents', { | |
| 'query_embedding': query_embedding, | |
| 'match_threshold': match_threshold, | |
| 'match_count': match_count | |
| }).execute() | |
| if not result.data: | |
| # RPC가 없으면 직접 SQL 실행 (권한 필요) | |
| result = self.supabase.table(self.table_name).select( | |
| "content, metadata, source_file" | |
| ).execute() | |
| # 클라이언트 측에서 유사도 계산 | |
| if result.data: | |
| similarities = [] | |
| for row in result.data: | |
| # 저장된 임베딩이 없으면 스킵 | |
| if not row.get('embedding'): | |
| continue | |
| similarity = self._cosine_similarity(query_embedding, row['embedding']) | |
| if similarity > match_threshold: | |
| similarities.append((row, similarity)) | |
| # 유사도로 정렬 | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| result.data = [item[0] for item in similarities[:k]] | |
| # 결과 변환 | |
| results = [] | |
| for row in result.data[:k]: | |
| doc = Document( | |
| page_content=row['content'], | |
| metadata=row.get('metadata', {}), | |
| id=row.get('id') | |
| ) | |
| similarity = row.get('similarity', 1.0) # 기본값 1.0 | |
| results.append((doc, float(similarity))) | |
| return results | |
| except Exception as e: | |
| print(f"❌ 검색 실패: {str(e)}") | |
| return [] | |
| def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: | |
| """코사인 유사도 계산""" | |
| vec1 = np.array(vec1) | |
| vec2 = np.array(vec2) | |
| dot_product = np.dot(vec1, vec2) | |
| norm1 = np.linalg.norm(vec1) | |
| norm2 = np.linalg.norm(vec2) | |
| if norm1 == 0 or norm2 == 0: | |
| return 0.0 | |
| return dot_product / (norm1 * norm2) | |
| def delete_all_documents(self) -> bool: | |
| """모든 문서 삭제""" | |
| try: | |
| result = self.supabase.table(self.table_name).delete().execute() | |
| print("✅ 모든 문서 삭제 완료") | |
| return True | |
| except Exception as e: | |
| print(f"❌ 문서 삭제 실패: {str(e)}") | |
| return False | |
| def get_stats(self) -> Dict: | |
| """벡터 데이터베이스 통계 정보""" | |
| try: | |
| result = self.supabase.table(self.table_name).select("count", count="exact").execute() | |
| total_docs = result.count if hasattr(result, 'count') else 0 | |
| return { | |
| "total_documents": total_docs, | |
| "embedding_model": self.embedding_model_name, | |
| "database_type": "supabase", | |
| "table_name": self.table_name | |
| } | |
| except Exception as e: | |
| print(f"❌ 통계 정보 조회 실패: {str(e)}") | |
| return {"status": "error", "message": str(e)} | |
| def rebuild_index(self, documents: List[Document], force_rebuild: bool = False, use_openai: bool = True) -> bool: | |
| """인덱스 재구축""" | |
| if force_rebuild: | |
| print("🔄 기존 데이터 삭제 후 재구축...") | |
| self.delete_all_documents() | |
| return self.add_documents(documents, use_openai) | |
| # 테스트용 함수 | |
| def test_supabase_vector_store(): | |
| """Supabase 벡터 데이터베이스 테스트""" | |
| from document_processor import DocumentProcessor | |
| # 문서 처리 | |
| processor = DocumentProcessor() | |
| documents = processor.load_documents_from_folder("documents") | |
| if not documents: | |
| print("⚠️ 테스트할 문서가 없습니다.") | |
| return | |
| # 벡터 데이터베이스 생성 | |
| vector_store = SupabaseVectorStore() | |
| # 문서 추가 | |
| success = vector_store.add_documents(documents[:5]) # 테스트용으로 5개만 | |
| if not success: | |
| print("❌ 문서 추가 실패") | |
| return | |
| # 검색 테스트 | |
| test_queries = [ | |
| "연차휴가 사용 방법", | |
| "근무시간은 어떻게 되나요?", | |
| "당직근무 절차" | |
| ] | |
| for query in test_queries: | |
| print(f"\n🔍 검색: {query}") | |
| results = vector_store.search_similar(query, k=3) | |
| for i, (doc, similarity) in enumerate(results): | |
| print(f" {i+1}. 유사도: {similarity:.4f}") | |
| print(f" 내용: {doc.page_content[:100]}...") | |
| if __name__ == "__main__": | |
| test_supabase_vector_store() | |