119_ChatBot / supabase_vector_store.py
김무영
Improve defaults, security, and index rebuild
cb2a4bd
import os
import json
import numpy as np
from typing import List, Dict, Tuple, Optional
from supabase import create_client, Client
from langchain_core.documents import Document
from sentence_transformers import SentenceTransformer
import openai
from config import Config
class SupabaseVectorStore:
"""Supabase pgvector 기반 벡터 데이터베이스 클래스"""
def __init__(self, embedding_model: str = None):
self.embedding_model_name = embedding_model or Config.EMBEDDING_MODEL
self.model = None
self.supabase: Optional[Client] = None
self.table_name = "documents"
# Supabase 클라이언트 초기화
self._init_supabase()
def _init_supabase(self):
"""Supabase 클라이언트 초기화"""
try:
if not Config.SUPABASE_URL or not Config.SUPABASE_KEY:
raise ValueError("Supabase URL과 Key가 필요합니다.")
self.supabase = create_client(Config.SUPABASE_URL, Config.SUPABASE_KEY)
print("✅ Supabase 클라이언트 연결 성공")
# 테이블이 없으면 생성 (필요시)
self._create_table_if_not_exists()
except Exception as e:
print(f"❌ Supabase 연결 실패: {str(e)}")
raise
def _create_table_if_not_exists(self):
"""테이블 생성 (SQL 실행 필요시 관리자에서 직접 실행)"""
# 아래 SQL은 Supabase SQL 에디터에서 직접 실행해야 함
create_table_sql = f"""
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create documents table
CREATE TABLE IF NOT EXISTS {self.table_name} (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB,
embedding vector(1536), -- OpenAI embedding 차원
source_file VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create index for vector similarity search
CREATE INDEX IF NOT EXISTS documents_embedding_idx ON {self.table_name}
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- Create full-text search index
CREATE INDEX IF NOT EXISTS documents_content_idx ON {self.table_name}
USING gin(to_tsvector('korean', content));
"""
print(f"📝 아래 SQL을 Supabase SQL 에디터에서 실행해주세요:")
print(create_table_sql)
def load_embedding_model(self, use_openai: bool = True):
"""임베딩 모델 로드"""
if self.model is not None:
return
if use_openai and Config.OPENAI_API_KEY:
print("📥 OpenAI 임베딩 모델 사용")
self.model = "openai"
else:
print(f"📥 임베딩 모델 로드: {self.embedding_model_name}")
try:
self.model = SentenceTransformer(self.embedding_model_name)
print("✅ 임베딩 모델 로드 완료")
except Exception as e:
print(f"❌ 임베딩 모델 로드 실패: {str(e)}")
print("🔄 다국어 모델로 대체 시도...")
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def create_embeddings(self, texts: List[str], use_openai: bool = True) -> np.ndarray:
"""텍스트 목록에 대한 임베딩 생성"""
if self.model is None:
self.load_embedding_model(use_openai)
print(f"🔄 {len(texts)}개 텍스트 임베딩 생성 중...")
if self.model == "openai" and Config.OPENAI_API_KEY:
# OpenAI API 사용
client = openai.OpenAI(api_key=Config.OPENAI_API_KEY)
embeddings = []
for i in range(0, len(texts), 100): # 배치 사이즈 100
batch_texts = texts[i:i+100]
response = client.embeddings.create(
model=Config.OPENAI_EMBEDDING_MODEL,
input=batch_texts
)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
return np.array(embeddings)
else:
# 로컬 모델 사용
return self.model.encode(
texts,
batch_size=32,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True
)
def add_documents(self, documents: List[Document], use_openai: bool = True) -> bool:
"""문서 추가"""
if not documents:
print("⚠️ 추가할 문서가 없습니다.")
return False
print(f"📝 {len(documents)}개 문서를 Supabase에 추가 중...")
try:
# 임베딩 생성
texts = [doc.page_content for doc in documents]
embeddings = self.create_embeddings(texts, use_openai)
embedding_dim = len(embeddings[0]) if len(embeddings) else 0
# Supabase 테이블 스키마가 vector(1536)으로 고정되어 있어 차원 불일치 시 실패 방지
if embedding_dim and embedding_dim != 1536:
raise ValueError(
f"임베딩 차원({embedding_dim})이 Supabase 테이블 vector(1536)와 다릅니다. "
"OpenAI 임베딩을 사용하거나 테이블 스키마/임베딩 모델을 맞춰주세요."
)
# 문서 데이터 준비
documents_data = []
for i, doc in enumerate(documents):
doc_data = {
'content': doc.page_content,
'metadata': doc.metadata,
'embedding': embeddings[i].tolist()
}
documents_data.append(doc_data)
# 배치로 데이터 삽입
batch_size = 100
for i in range(0, len(documents_data), batch_size):
batch = documents_data[i:i+batch_size]
result = self.supabase.table(self.table_name).insert(batch).execute()
if not result.data:
print(f"❌ 배치 삽입 실패 (배치 {i//batch_size + 1})")
return False
print(f"✅ {len(documents)}개 문서 추가 완료")
return True
except Exception as e:
print(f"❌ 문서 추가 실패: {str(e)}")
return False
def search_similar(self, query: str, k: int = 5, use_openai: bool = True) -> List[Tuple[Document, float]]:
"""유사 문서 검색"""
if self.supabase is None:
print("⚠️ Supabase 클라이언트가 초기화되지 않았습니다.")
return []
if self.model is None:
self.load_embedding_model(use_openai)
try:
# 쿼리 임베딩 생성
if self.model == "openai" and Config.OPENAI_API_KEY:
client = openai.OpenAI(api_key=Config.OPENAI_API_KEY)
response = client.embeddings.create(
model=Config.OPENAI_EMBEDDING_MODEL,
input=[query]
)
query_embedding = response.data[0].embedding
else:
query_embedding = self.model.encode([query], normalize_embeddings=True)[0]
query_embedding = query_embedding.tolist()
# 유사도 검색 SQL
match_threshold = 0.5
match_count = k
search_sql = f"""
SELECT content, metadata, source_file, 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') as similarity
FROM {self.table_name}
WHERE 1 - (embedding <=> '[{','.join(map(str, query_embedding))}]') > {match_threshold}
ORDER BY similarity DESC
LIMIT {match_count}
"""
# Supabase RPC 호출
result = self.supabase.rpc('search_similar_documents', {
'query_embedding': query_embedding,
'match_threshold': match_threshold,
'match_count': match_count
}).execute()
if not result.data:
# RPC가 없으면 직접 SQL 실행 (권한 필요)
result = self.supabase.table(self.table_name).select(
"content, metadata, source_file"
).execute()
# 클라이언트 측에서 유사도 계산
if result.data:
similarities = []
for row in result.data:
# 저장된 임베딩이 없으면 스킵
if not row.get('embedding'):
continue
similarity = self._cosine_similarity(query_embedding, row['embedding'])
if similarity > match_threshold:
similarities.append((row, similarity))
# 유사도로 정렬
similarities.sort(key=lambda x: x[1], reverse=True)
result.data = [item[0] for item in similarities[:k]]
# 결과 변환
results = []
for row in result.data[:k]:
doc = Document(
page_content=row['content'],
metadata=row.get('metadata', {}),
id=row.get('id')
)
similarity = row.get('similarity', 1.0) # 기본값 1.0
results.append((doc, float(similarity)))
return results
except Exception as e:
print(f"❌ 검색 실패: {str(e)}")
return []
def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""코사인 유사도 계산"""
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def delete_all_documents(self) -> bool:
"""모든 문서 삭제"""
try:
result = self.supabase.table(self.table_name).delete().execute()
print("✅ 모든 문서 삭제 완료")
return True
except Exception as e:
print(f"❌ 문서 삭제 실패: {str(e)}")
return False
def get_stats(self) -> Dict:
"""벡터 데이터베이스 통계 정보"""
try:
result = self.supabase.table(self.table_name).select("count", count="exact").execute()
total_docs = result.count if hasattr(result, 'count') else 0
return {
"total_documents": total_docs,
"embedding_model": self.embedding_model_name,
"database_type": "supabase",
"table_name": self.table_name
}
except Exception as e:
print(f"❌ 통계 정보 조회 실패: {str(e)}")
return {"status": "error", "message": str(e)}
def rebuild_index(self, documents: List[Document], force_rebuild: bool = False, use_openai: bool = True) -> bool:
"""인덱스 재구축"""
if force_rebuild:
print("🔄 기존 데이터 삭제 후 재구축...")
self.delete_all_documents()
return self.add_documents(documents, use_openai)
# 테스트용 함수
def test_supabase_vector_store():
"""Supabase 벡터 데이터베이스 테스트"""
from document_processor import DocumentProcessor
# 문서 처리
processor = DocumentProcessor()
documents = processor.load_documents_from_folder("documents")
if not documents:
print("⚠️ 테스트할 문서가 없습니다.")
return
# 벡터 데이터베이스 생성
vector_store = SupabaseVectorStore()
# 문서 추가
success = vector_store.add_documents(documents[:5]) # 테스트용으로 5개만
if not success:
print("❌ 문서 추가 실패")
return
# 검색 테스트
test_queries = [
"연차휴가 사용 방법",
"근무시간은 어떻게 되나요?",
"당직근무 절차"
]
for query in test_queries:
print(f"\n🔍 검색: {query}")
results = vector_store.search_similar(query, k=3)
for i, (doc, similarity) in enumerate(results):
print(f" {i+1}. 유사도: {similarity:.4f}")
print(f" 내용: {doc.page_content[:100]}...")
if __name__ == "__main__":
test_supabase_vector_store()