from typing import Any, Dict, List, Optional from postgrest.types import CountMethod from supabase.client import create_client, Client, ClientOptions from postgrest.exceptions import APIError from loguru import logger import re import httpx from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) from .utils import timing_decorator_sync from .constants import ( VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES, ) from .config import get_settings # --- Cơ chế retry mạnh mẽ và có thể tái sử dụng --- retry_on_supabase_error = retry( stop=stop_after_attempt(4), # 1 lần gọi gốc + 3 lần thử lại wait=wait_exponential(multiplier=5, min=10, max=60), # Chờ 10s, 20s, 40s retry=retry_if_exception_type((httpx.HTTPError, APIError)), before_sleep=lambda retry_state: logger.warning( f"Supabase call failed, retrying... Attempt: {retry_state.attempt_number}, Error: {retry_state.outcome.exception()}" ), ) def remove_stop_phrases(text, stop_phrases): for phrase in stop_phrases: # Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác # Loại bỏ cụm từ, chỉ xóa khi là từ nguyên vẹn pattern = rf"\b{phrase}\b" text = re.sub(pattern, " ", text) return text class SupabaseClient: def __init__(self, url: str, key: str): """ Khởi tạo SupabaseClient với url và key. Input: url (str), key (str) Output: SupabaseClient instance. """ # Tăng thời gian timeout mặc định của client để xử lý các truy vấn nặng opts = ClientOptions(postgrest_client_timeout=60.0) self.client: Client = create_client(url, key, options=opts) settings = get_settings() self.default_match_count = settings.match_count @timing_decorator_sync @retry_on_supabase_error def get_page_token(self, page_id: str): """ Lấy access token của Facebook page từ Supabase. Input: page_id (str) Output: access_token (str) hoặc None nếu không có. """ try: response = ( self.client.table("PageToken") .select("token") .eq("id", page_id) .execute() ) if response.data and len(response.data) > 0: return response.data[0]["token"] return None except (httpx.HTTPError, APIError) as e: logger.error(f"Error getting page token after retries: {e}") raise # Ném lại lỗi để tenacity có thể bắt và retry except Exception as e: # Bắt các lỗi không mong muốn khác logger.exception( f"An unexpected error occurred while getting page token: {e}" ) return None # Không retry với lỗi không mong muốn @timing_decorator_sync @retry_on_supabase_error def match_documents( self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = "", keyword_threshold: float = 0.01, vector_threshold: float = 0.3, rrf_k: int = 60, ): """ Truy vấn vector similarity search qua RPC match_documents. Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None) Output: list[dict] kết quả truy vấn. """ # Sử dụng match_count từ config nếu không được truyền vào if match_count is None: match_count = self.default_match_count # Chuẩn bị chuỗi truy vấn trong Python # Tách từ và nối bằng '|' """ Xử lý câu hỏi thô: tách từ, loại bỏ stop words, và trả về chuỗi text sạch để truyền vào RPC. """ # Lọc bỏ các từ có trong danh sách stop words và nối thành chuỗi với dấu cách # 1. Loại bỏ stop phrase (từ ghép) cleaned_text = remove_stop_phrases( user_question.lower(), VIETNAMESE_STOP_PHRASES ) # 2. Loại bỏ ký tự đặc biệt (chỉ giữ lại chữ cái, số, dấu cách) cleaned_text = re.sub(r"[^\w\s]", " ", cleaned_text) # 3. Tách từ và loại bỏ stop word đơn lẻ words = cleaned_text.split() or_query_tsquery = " ".join( [word for word in words if word not in VIETNAMESE_STOP_WORDS] ) logger.debug(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}") logger.debug(f"[DEBUG][RPC]: embedding: {embedding}") payload = { "query_text": or_query_tsquery, "query_embedding": embedding, "vehicle_filters": None, } if vehicle_keywords: vehicle_columns = [ VEHICLE_KEYWORD_TO_COLUMN[k] for k in vehicle_keywords if k in VEHICLE_KEYWORD_TO_COLUMN ] if vehicle_columns: payload["vehicle_filters"] = vehicle_columns try: response = self.client.rpc("match_documents", payload).execute() return response.data or [] except (httpx.HTTPError, APIError) as e: logger.error(f"Error matching documents after retries: {e}") raise except Exception as e: logger.exception(f"An unexpected error occurred in match_documents: {e}") return [] @timing_decorator_sync @retry_on_supabase_error def store_embedding( self, text: str, embedding: List[float], metadata: Dict[str, Any] ): """ Lưu embedding vào Supabase. Input: text (str), embedding (list[float]), metadata (dict) Output: bool (True nếu thành công, False nếu lỗi) """ try: response = ( self.client.table("embeddings") .insert({"content": text, "embedding": embedding, "metadata": metadata}) .execute() ) return bool(response.data) except (httpx.HTTPError, APIError) as e: logger.error(f"Error storing embedding after retries: {e}") raise except Exception as e: logger.exception( f"An unexpected error occurred while storing embedding: {e}" ) return False @timing_decorator_sync @retry_on_supabase_error def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool: """ Lưu document chunk vào Supabase. Input: chunk_data (dict) - chứa tất cả thông tin chunk Output: bool (True nếu thành công, False nếu lỗi) """ try: # Xử lý các giá trị null/empty cho integer fields processed_data = chunk_data.copy() # Giữ lại embedding để lưu vào database if "embedding" in processed_data: processed_data["embedding"] = processed_data["embedding"] # Xử lý article_number - chỉ gửi nếu có giá trị hợp lệ if "article_number" in processed_data: if ( processed_data["article_number"] is None or processed_data["article_number"] == "" ): processed_data["article_number"] = None elif isinstance(processed_data["article_number"], str): try: processed_data["article_number"] = int( processed_data["article_number"] ) except (ValueError, TypeError): processed_data["article_number"] = None # Xử lý vanbanid - đảm bảo là integer if "vanbanid" in processed_data: if isinstance(processed_data["vanbanid"], str): try: processed_data["vanbanid"] = int(processed_data["vanbanid"]) except (ValueError, TypeError): logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}") return False # Xử lý các trường text - chuyển empty string thành None text_fields = [ "document_title", "article_title", "clause_number", "sub_clause_letter", "context_summary", ] for field in text_fields: if field in processed_data and processed_data[field] == "": processed_data[field] = None # Xử lý cha field - chuyển empty string thành None if "cha" in processed_data and processed_data["cha"] == "": processed_data["cha"] = None response = ( self.client.table("document_chunks").insert(processed_data).execute() ) if response.data: logger.debug( f"Successfully stored chunk {processed_data.get('id', 'unknown')}" ) return True else: logger.error( f"Failed to store chunk {processed_data.get('id', 'unknown')}" ) return False except (httpx.HTTPError, APIError) as e: logger.error(f"Error storing document chunk after retries: {e}") raise except Exception as e: logger.exception( f"An unexpected error occurred while storing document chunk: {e}" ) return False @timing_decorator_sync @retry_on_supabase_error def delete_all_document_chunks(self) -> bool: """ Xóa toàn bộ bảng document_chunks. Output: bool (True nếu thành công, False nếu lỗi) """ try: # Xóa tất cả records trong bảng response = self.client.table("document_chunks").delete().execute() logger.info(f"Successfully deleted all document chunks") return True except (httpx.HTTPError, APIError) as e: logger.error(f"Error deleting all document chunks after retries: {e}") raise except Exception as e: logger.exception( f"An unexpected error occurred while deleting all document chunks: {e}" ) return False @timing_decorator_sync @retry_on_supabase_error def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]: """ Lấy tất cả chunks của một văn bản theo vanbanid. Input: vanbanid (int) Output: List[Dict] - danh sách chunks """ try: response = ( self.client.table("document_chunks") .select("*") .eq("vanbanid", vanbanid) .execute() ) if response.data: logger.debug( f"Found {len(response.data)} chunks for vanbanid {vanbanid}" ) return response.data return [] except (httpx.HTTPError, APIError) as e: logger.error( f"Error getting document chunks for vanbanid {vanbanid} after retries: {e}" ) raise except Exception as e: logger.exception( f"An unexpected error occurred while getting document chunks for vanbanid {vanbanid}: {e}" ) return [] @timing_decorator_sync @retry_on_supabase_error def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool: """ Xóa tất cả chunks của một văn bản theo vanbanid. Input: vanbanid (int) Output: bool (True nếu thành công, False nếu lỗi) """ try: response = ( self.client.table("document_chunks") .delete() .eq("vanbanid", vanbanid) .execute() ) logger.debug(f"Successfully deleted all chunks for vanbanid {vanbanid}") return True except (httpx.HTTPError, APIError) as e: logger.error( f"Error deleting chunks for vanbanid {vanbanid} after retries: {e}" ) raise except Exception as e: logger.exception( f"An unexpected error occurred while deleting chunks for vanbanid {vanbanid}: {e}" ) return False @timing_decorator_sync @retry_on_supabase_error def get_all_document_chunks(self) -> List[Dict[str, Any]]: """ Lấy toàn bộ dữ liệu từ bảng document_chunks. Output: List[Dict] - danh sách tất cả chunks """ try: logger.info("[SUPABASE] Fetching all document chunks") # Đếm tổng số records trước count_response = ( self.client.table("document_chunks") .select("*", count=CountMethod.exact) .execute() ) total_count = ( count_response.count if hasattr(count_response, "count") else "unknown" ) logger.info(f"[SUPABASE] Total records in table: {total_count}") all_chunks = [] page_size = 1000 last_id = 0 page_count = 0 while True: page_count += 1 # Sử dụng cursor-based pagination với id if last_id == 0: # Lần đầu: lấy từ đầu response = ( self.client.table("document_chunks") .select("*") .order("id") .limit(page_size) .execute() ) else: # noqa # Các lần sau: lấy từ id > last_id response = ( self.client.table("document_chunks") .select("*") .order("id") .gt("id", last_id) .limit(page_size) .execute() ) actual_count = len(response.data) if response.data else 0 # noqa logger.debug( f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}" ) if not response.data: logger.debug(f"[SUPABASE] No more data after id {last_id}") break all_chunks.extend(response.data) # Cập nhật last_id cho page tiếp theo if response.data: last_id = max(chunk.get("id", 0) for chunk in response.data) if actual_count < page_size: # noqa logger.debug(f"[SUPABASE] Last page with {actual_count} records") break logger.info( f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})" ) logger.info( f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}" ) return all_chunks except (httpx.HTTPError, APIError) as e: logger.error( f"[SUPABASE] Error fetching document chunks after retries: {e}" ) raise except Exception as e: logger.exception( f"An unexpected error occurred while fetching document chunks: {e}" ) return []