Chatopus / app /supabase_db.py
VietCat's picture
update log to debug level
96d59ac
from typing import Any, Dict, List, Optional
from postgrest.types import CountMethod
from supabase.client import create_client, Client, ClientOptions
from postgrest.exceptions import APIError
from loguru import logger
import re
import httpx
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from .utils import timing_decorator_sync
from .constants import (
VEHICLE_KEYWORD_TO_COLUMN,
VIETNAMESE_STOP_WORDS,
VIETNAMESE_STOP_PHRASES,
)
from .config import get_settings
# --- Cơ chế retry mạnh mẽ và có thể tái sử dụng ---
retry_on_supabase_error = retry(
stop=stop_after_attempt(4), # 1 lần gọi gốc + 3 lần thử lại
wait=wait_exponential(multiplier=5, min=10, max=60), # Chờ 10s, 20s, 40s
retry=retry_if_exception_type((httpx.HTTPError, APIError)),
before_sleep=lambda retry_state: logger.warning(
f"Supabase call failed, retrying... Attempt: {retry_state.attempt_number}, Error: {retry_state.outcome.exception()}"
),
)
def remove_stop_phrases(text, stop_phrases):
for phrase in stop_phrases:
# Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác
# Loại bỏ cụm từ, chỉ xóa khi là từ nguyên vẹn
pattern = rf"\b{phrase}\b"
text = re.sub(pattern, " ", text)
return text
class SupabaseClient:
def __init__(self, url: str, key: str):
"""
Khởi tạo SupabaseClient với url và key.
Input: url (str), key (str)
Output: SupabaseClient instance.
"""
# Tăng thời gian timeout mặc định của client để xử lý các truy vấn nặng
opts = ClientOptions(postgrest_client_timeout=60.0)
self.client: Client = create_client(url, key, options=opts)
settings = get_settings()
self.default_match_count = settings.match_count
@timing_decorator_sync
@retry_on_supabase_error
def get_page_token(self, page_id: str):
"""
Lấy access token của Facebook page từ Supabase.
Input: page_id (str)
Output: access_token (str) hoặc None nếu không có.
"""
try:
response = (
self.client.table("PageToken")
.select("token")
.eq("id", page_id)
.execute()
)
if response.data and len(response.data) > 0:
return response.data[0]["token"]
return None
except (httpx.HTTPError, APIError) as e:
logger.error(f"Error getting page token after retries: {e}")
raise # Ném lại lỗi để tenacity có thể bắt và retry
except Exception as e: # Bắt các lỗi không mong muốn khác
logger.exception(
f"An unexpected error occurred while getting page token: {e}"
)
return None # Không retry với lỗi không mong muốn
@timing_decorator_sync
@retry_on_supabase_error
def match_documents(
self,
embedding: List[float],
match_count: Optional[int] = None,
vehicle_keywords: Optional[List[str]] = None,
user_question: str = "",
keyword_threshold: float = 0.01,
vector_threshold: float = 0.3,
rrf_k: int = 60,
):
"""
Truy vấn vector similarity search qua RPC match_documents.
Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None)
Output: list[dict] kết quả truy vấn.
"""
# Sử dụng match_count từ config nếu không được truyền vào
if match_count is None:
match_count = self.default_match_count
# Chuẩn bị chuỗi truy vấn trong Python
# Tách từ và nối bằng '|'
"""
Xử lý câu hỏi thô: tách từ, loại bỏ stop words,
và trả về chuỗi text sạch để truyền vào RPC.
"""
# Lọc bỏ các từ có trong danh sách stop words và nối thành chuỗi với dấu cách
# 1. Loại bỏ stop phrase (từ ghép)
cleaned_text = remove_stop_phrases(
user_question.lower(), VIETNAMESE_STOP_PHRASES
)
# 2. Loại bỏ ký tự đặc biệt (chỉ giữ lại chữ cái, số, dấu cách)
cleaned_text = re.sub(r"[^\w\s]", " ", cleaned_text)
# 3. Tách từ và loại bỏ stop word đơn lẻ
words = cleaned_text.split()
or_query_tsquery = " ".join(
[word for word in words if word not in VIETNAMESE_STOP_WORDS]
)
logger.debug(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}")
logger.debug(f"[DEBUG][RPC]: embedding: {embedding}")
payload = {
"query_text": or_query_tsquery,
"query_embedding": embedding,
"vehicle_filters": None,
}
if vehicle_keywords:
vehicle_columns = [
VEHICLE_KEYWORD_TO_COLUMN[k]
for k in vehicle_keywords
if k in VEHICLE_KEYWORD_TO_COLUMN
]
if vehicle_columns:
payload["vehicle_filters"] = vehicle_columns
try:
response = self.client.rpc("match_documents", payload).execute()
return response.data or []
except (httpx.HTTPError, APIError) as e:
logger.error(f"Error matching documents after retries: {e}")
raise
except Exception as e:
logger.exception(f"An unexpected error occurred in match_documents: {e}")
return []
@timing_decorator_sync
@retry_on_supabase_error
def store_embedding(
self, text: str, embedding: List[float], metadata: Dict[str, Any]
):
"""
Lưu embedding vào Supabase.
Input: text (str), embedding (list[float]), metadata (dict)
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
response = (
self.client.table("embeddings")
.insert({"content": text, "embedding": embedding, "metadata": metadata})
.execute()
)
return bool(response.data)
except (httpx.HTTPError, APIError) as e:
logger.error(f"Error storing embedding after retries: {e}")
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while storing embedding: {e}"
)
return False
@timing_decorator_sync
@retry_on_supabase_error
def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool:
"""
Lưu document chunk vào Supabase.
Input: chunk_data (dict) - chứa tất cả thông tin chunk
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
# Xử lý các giá trị null/empty cho integer fields
processed_data = chunk_data.copy()
# Giữ lại embedding để lưu vào database
if "embedding" in processed_data:
processed_data["embedding"] = processed_data["embedding"]
# Xử lý article_number - chỉ gửi nếu có giá trị hợp lệ
if "article_number" in processed_data:
if (
processed_data["article_number"] is None
or processed_data["article_number"] == ""
):
processed_data["article_number"] = None
elif isinstance(processed_data["article_number"], str):
try:
processed_data["article_number"] = int(
processed_data["article_number"]
)
except (ValueError, TypeError):
processed_data["article_number"] = None
# Xử lý vanbanid - đảm bảo là integer
if "vanbanid" in processed_data:
if isinstance(processed_data["vanbanid"], str):
try:
processed_data["vanbanid"] = int(processed_data["vanbanid"])
except (ValueError, TypeError):
logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}")
return False
# Xử lý các trường text - chuyển empty string thành None
text_fields = [
"document_title",
"article_title",
"clause_number",
"sub_clause_letter",
"context_summary",
]
for field in text_fields:
if field in processed_data and processed_data[field] == "":
processed_data[field] = None
# Xử lý cha field - chuyển empty string thành None
if "cha" in processed_data and processed_data["cha"] == "":
processed_data["cha"] = None
response = (
self.client.table("document_chunks").insert(processed_data).execute()
)
if response.data:
logger.debug(
f"Successfully stored chunk {processed_data.get('id', 'unknown')}"
)
return True
else:
logger.error(
f"Failed to store chunk {processed_data.get('id', 'unknown')}"
)
return False
except (httpx.HTTPError, APIError) as e:
logger.error(f"Error storing document chunk after retries: {e}")
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while storing document chunk: {e}"
)
return False
@timing_decorator_sync
@retry_on_supabase_error
def delete_all_document_chunks(self) -> bool:
"""
Xóa toàn bộ bảng document_chunks.
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
# Xóa tất cả records trong bảng
response = self.client.table("document_chunks").delete().execute()
logger.info(f"Successfully deleted all document chunks")
return True
except (httpx.HTTPError, APIError) as e:
logger.error(f"Error deleting all document chunks after retries: {e}")
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while deleting all document chunks: {e}"
)
return False
@timing_decorator_sync
@retry_on_supabase_error
def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]:
"""
Lấy tất cả chunks của một văn bản theo vanbanid.
Input: vanbanid (int)
Output: List[Dict] - danh sách chunks
"""
try:
response = (
self.client.table("document_chunks")
.select("*")
.eq("vanbanid", vanbanid)
.execute()
)
if response.data:
logger.debug(
f"Found {len(response.data)} chunks for vanbanid {vanbanid}"
)
return response.data
return []
except (httpx.HTTPError, APIError) as e:
logger.error(
f"Error getting document chunks for vanbanid {vanbanid} after retries: {e}"
)
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while getting document chunks for vanbanid {vanbanid}: {e}"
)
return []
@timing_decorator_sync
@retry_on_supabase_error
def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool:
"""
Xóa tất cả chunks của một văn bản theo vanbanid.
Input: vanbanid (int)
Output: bool (True nếu thành công, False nếu lỗi)
"""
try:
response = (
self.client.table("document_chunks")
.delete()
.eq("vanbanid", vanbanid)
.execute()
)
logger.debug(f"Successfully deleted all chunks for vanbanid {vanbanid}")
return True
except (httpx.HTTPError, APIError) as e:
logger.error(
f"Error deleting chunks for vanbanid {vanbanid} after retries: {e}"
)
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while deleting chunks for vanbanid {vanbanid}: {e}"
)
return False
@timing_decorator_sync
@retry_on_supabase_error
def get_all_document_chunks(self) -> List[Dict[str, Any]]:
"""
Lấy toàn bộ dữ liệu từ bảng document_chunks.
Output: List[Dict] - danh sách tất cả chunks
"""
try:
logger.info("[SUPABASE] Fetching all document chunks")
# Đếm tổng số records trước
count_response = (
self.client.table("document_chunks")
.select("*", count=CountMethod.exact)
.execute()
)
total_count = (
count_response.count if hasattr(count_response, "count") else "unknown"
)
logger.info(f"[SUPABASE] Total records in table: {total_count}")
all_chunks = []
page_size = 1000
last_id = 0
page_count = 0
while True:
page_count += 1
# Sử dụng cursor-based pagination với id
if last_id == 0:
# Lần đầu: lấy từ đầu
response = (
self.client.table("document_chunks")
.select("*")
.order("id")
.limit(page_size)
.execute()
)
else: # noqa
# Các lần sau: lấy từ id > last_id
response = (
self.client.table("document_chunks")
.select("*")
.order("id")
.gt("id", last_id)
.limit(page_size)
.execute()
)
actual_count = len(response.data) if response.data else 0 # noqa
logger.debug(
f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}"
)
if not response.data:
logger.debug(f"[SUPABASE] No more data after id {last_id}")
break
all_chunks.extend(response.data)
# Cập nhật last_id cho page tiếp theo
if response.data:
last_id = max(chunk.get("id", 0) for chunk in response.data)
if actual_count < page_size: # noqa
logger.debug(f"[SUPABASE] Last page with {actual_count} records")
break
logger.info(
f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})"
)
logger.info(
f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}"
)
return all_chunks
except (httpx.HTTPError, APIError) as e:
logger.error(
f"[SUPABASE] Error fetching document chunks after retries: {e}"
)
raise
except Exception as e:
logger.exception(
f"An unexpected error occurred while fetching document chunks: {e}"
)
return []