|
|
from typing import Any, Dict, List, Optional |
|
|
from postgrest.types import CountMethod |
|
|
from supabase.client import create_client, Client, ClientOptions |
|
|
from postgrest.exceptions import APIError |
|
|
from loguru import logger |
|
|
import re |
|
|
import httpx |
|
|
from tenacity import ( |
|
|
retry, |
|
|
stop_after_attempt, |
|
|
wait_exponential, |
|
|
retry_if_exception_type, |
|
|
) |
|
|
|
|
|
from .utils import timing_decorator_sync |
|
|
from .constants import ( |
|
|
VEHICLE_KEYWORD_TO_COLUMN, |
|
|
VIETNAMESE_STOP_WORDS, |
|
|
VIETNAMESE_STOP_PHRASES, |
|
|
) |
|
|
from .config import get_settings |
|
|
|
|
|
|
|
|
retry_on_supabase_error = retry( |
|
|
stop=stop_after_attempt(4), |
|
|
wait=wait_exponential(multiplier=5, min=10, max=60), |
|
|
retry=retry_if_exception_type((httpx.HTTPError, APIError)), |
|
|
before_sleep=lambda retry_state: logger.warning( |
|
|
f"Supabase call failed, retrying... Attempt: {retry_state.attempt_number}, Error: {retry_state.outcome.exception()}" |
|
|
), |
|
|
) |
|
|
|
|
|
|
|
|
def remove_stop_phrases(text, stop_phrases): |
|
|
for phrase in stop_phrases: |
|
|
|
|
|
|
|
|
pattern = rf"\b{phrase}\b" |
|
|
text = re.sub(pattern, " ", text) |
|
|
return text |
|
|
|
|
|
|
|
|
class SupabaseClient: |
|
|
def __init__(self, url: str, key: str): |
|
|
""" |
|
|
Khởi tạo SupabaseClient với url và key. |
|
|
Input: url (str), key (str) |
|
|
Output: SupabaseClient instance. |
|
|
""" |
|
|
|
|
|
opts = ClientOptions(postgrest_client_timeout=60.0) |
|
|
self.client: Client = create_client(url, key, options=opts) |
|
|
|
|
|
settings = get_settings() |
|
|
self.default_match_count = settings.match_count |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def get_page_token(self, page_id: str): |
|
|
""" |
|
|
Lấy access token của Facebook page từ Supabase. |
|
|
Input: page_id (str) |
|
|
Output: access_token (str) hoặc None nếu không có. |
|
|
""" |
|
|
try: |
|
|
response = ( |
|
|
self.client.table("PageToken") |
|
|
.select("token") |
|
|
.eq("id", page_id) |
|
|
.execute() |
|
|
) |
|
|
if response.data and len(response.data) > 0: |
|
|
return response.data[0]["token"] |
|
|
return None |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error(f"Error getting page token after retries: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while getting page token: {e}" |
|
|
) |
|
|
return None |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def match_documents( |
|
|
self, |
|
|
embedding: List[float], |
|
|
match_count: Optional[int] = None, |
|
|
vehicle_keywords: Optional[List[str]] = None, |
|
|
user_question: str = "", |
|
|
keyword_threshold: float = 0.01, |
|
|
vector_threshold: float = 0.3, |
|
|
rrf_k: int = 60, |
|
|
): |
|
|
""" |
|
|
Truy vấn vector similarity search qua RPC match_documents. |
|
|
Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None) |
|
|
Output: list[dict] kết quả truy vấn. |
|
|
""" |
|
|
|
|
|
if match_count is None: |
|
|
match_count = self.default_match_count |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Xử lý câu hỏi thô: tách từ, loại bỏ stop words, |
|
|
và trả về chuỗi text sạch để truyền vào RPC. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
cleaned_text = remove_stop_phrases( |
|
|
user_question.lower(), VIETNAMESE_STOP_PHRASES |
|
|
) |
|
|
|
|
|
cleaned_text = re.sub(r"[^\w\s]", " ", cleaned_text) |
|
|
|
|
|
words = cleaned_text.split() |
|
|
or_query_tsquery = " ".join( |
|
|
[word for word in words if word not in VIETNAMESE_STOP_WORDS] |
|
|
) |
|
|
logger.debug(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}") |
|
|
logger.debug(f"[DEBUG][RPC]: embedding: {embedding}") |
|
|
|
|
|
payload = { |
|
|
"query_text": or_query_tsquery, |
|
|
"query_embedding": embedding, |
|
|
"vehicle_filters": None, |
|
|
} |
|
|
if vehicle_keywords: |
|
|
vehicle_columns = [ |
|
|
VEHICLE_KEYWORD_TO_COLUMN[k] |
|
|
for k in vehicle_keywords |
|
|
if k in VEHICLE_KEYWORD_TO_COLUMN |
|
|
] |
|
|
if vehicle_columns: |
|
|
payload["vehicle_filters"] = vehicle_columns |
|
|
|
|
|
try: |
|
|
response = self.client.rpc("match_documents", payload).execute() |
|
|
return response.data or [] |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error(f"Error matching documents after retries: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception(f"An unexpected error occurred in match_documents: {e}") |
|
|
return [] |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def store_embedding( |
|
|
self, text: str, embedding: List[float], metadata: Dict[str, Any] |
|
|
): |
|
|
""" |
|
|
Lưu embedding vào Supabase. |
|
|
Input: text (str), embedding (list[float]), metadata (dict) |
|
|
Output: bool (True nếu thành công, False nếu lỗi) |
|
|
""" |
|
|
try: |
|
|
response = ( |
|
|
self.client.table("embeddings") |
|
|
.insert({"content": text, "embedding": embedding, "metadata": metadata}) |
|
|
.execute() |
|
|
) |
|
|
|
|
|
return bool(response.data) |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error(f"Error storing embedding after retries: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while storing embedding: {e}" |
|
|
) |
|
|
return False |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool: |
|
|
""" |
|
|
Lưu document chunk vào Supabase. |
|
|
Input: chunk_data (dict) - chứa tất cả thông tin chunk |
|
|
Output: bool (True nếu thành công, False nếu lỗi) |
|
|
""" |
|
|
try: |
|
|
|
|
|
processed_data = chunk_data.copy() |
|
|
|
|
|
|
|
|
if "embedding" in processed_data: |
|
|
processed_data["embedding"] = processed_data["embedding"] |
|
|
|
|
|
|
|
|
if "article_number" in processed_data: |
|
|
if ( |
|
|
processed_data["article_number"] is None |
|
|
or processed_data["article_number"] == "" |
|
|
): |
|
|
processed_data["article_number"] = None |
|
|
elif isinstance(processed_data["article_number"], str): |
|
|
try: |
|
|
processed_data["article_number"] = int( |
|
|
processed_data["article_number"] |
|
|
) |
|
|
except (ValueError, TypeError): |
|
|
processed_data["article_number"] = None |
|
|
|
|
|
|
|
|
if "vanbanid" in processed_data: |
|
|
if isinstance(processed_data["vanbanid"], str): |
|
|
try: |
|
|
processed_data["vanbanid"] = int(processed_data["vanbanid"]) |
|
|
except (ValueError, TypeError): |
|
|
logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}") |
|
|
return False |
|
|
|
|
|
|
|
|
text_fields = [ |
|
|
"document_title", |
|
|
"article_title", |
|
|
"clause_number", |
|
|
"sub_clause_letter", |
|
|
"context_summary", |
|
|
] |
|
|
for field in text_fields: |
|
|
if field in processed_data and processed_data[field] == "": |
|
|
processed_data[field] = None |
|
|
|
|
|
|
|
|
if "cha" in processed_data and processed_data["cha"] == "": |
|
|
processed_data["cha"] = None |
|
|
|
|
|
response = ( |
|
|
self.client.table("document_chunks").insert(processed_data).execute() |
|
|
) |
|
|
|
|
|
if response.data: |
|
|
logger.debug( |
|
|
f"Successfully stored chunk {processed_data.get('id', 'unknown')}" |
|
|
) |
|
|
return True |
|
|
else: |
|
|
logger.error( |
|
|
f"Failed to store chunk {processed_data.get('id', 'unknown')}" |
|
|
) |
|
|
return False |
|
|
|
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error(f"Error storing document chunk after retries: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while storing document chunk: {e}" |
|
|
) |
|
|
return False |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def delete_all_document_chunks(self) -> bool: |
|
|
""" |
|
|
Xóa toàn bộ bảng document_chunks. |
|
|
Output: bool (True nếu thành công, False nếu lỗi) |
|
|
""" |
|
|
try: |
|
|
|
|
|
response = self.client.table("document_chunks").delete().execute() |
|
|
logger.info(f"Successfully deleted all document chunks") |
|
|
return True |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error(f"Error deleting all document chunks after retries: {e}") |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while deleting all document chunks: {e}" |
|
|
) |
|
|
return False |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Lấy tất cả chunks của một văn bản theo vanbanid. |
|
|
Input: vanbanid (int) |
|
|
Output: List[Dict] - danh sách chunks |
|
|
""" |
|
|
try: |
|
|
response = ( |
|
|
self.client.table("document_chunks") |
|
|
.select("*") |
|
|
.eq("vanbanid", vanbanid) |
|
|
.execute() |
|
|
) |
|
|
if response.data: |
|
|
logger.debug( |
|
|
f"Found {len(response.data)} chunks for vanbanid {vanbanid}" |
|
|
) |
|
|
return response.data |
|
|
return [] |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error( |
|
|
f"Error getting document chunks for vanbanid {vanbanid} after retries: {e}" |
|
|
) |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while getting document chunks for vanbanid {vanbanid}: {e}" |
|
|
) |
|
|
return [] |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool: |
|
|
""" |
|
|
Xóa tất cả chunks của một văn bản theo vanbanid. |
|
|
Input: vanbanid (int) |
|
|
Output: bool (True nếu thành công, False nếu lỗi) |
|
|
""" |
|
|
try: |
|
|
response = ( |
|
|
self.client.table("document_chunks") |
|
|
.delete() |
|
|
.eq("vanbanid", vanbanid) |
|
|
.execute() |
|
|
) |
|
|
logger.debug(f"Successfully deleted all chunks for vanbanid {vanbanid}") |
|
|
return True |
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error( |
|
|
f"Error deleting chunks for vanbanid {vanbanid} after retries: {e}" |
|
|
) |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while deleting chunks for vanbanid {vanbanid}: {e}" |
|
|
) |
|
|
return False |
|
|
|
|
|
@timing_decorator_sync |
|
|
@retry_on_supabase_error |
|
|
def get_all_document_chunks(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Lấy toàn bộ dữ liệu từ bảng document_chunks. |
|
|
Output: List[Dict] - danh sách tất cả chunks |
|
|
""" |
|
|
try: |
|
|
logger.info("[SUPABASE] Fetching all document chunks") |
|
|
|
|
|
|
|
|
count_response = ( |
|
|
self.client.table("document_chunks") |
|
|
.select("*", count=CountMethod.exact) |
|
|
.execute() |
|
|
) |
|
|
total_count = ( |
|
|
count_response.count if hasattr(count_response, "count") else "unknown" |
|
|
) |
|
|
logger.info(f"[SUPABASE] Total records in table: {total_count}") |
|
|
|
|
|
all_chunks = [] |
|
|
page_size = 1000 |
|
|
last_id = 0 |
|
|
page_count = 0 |
|
|
|
|
|
while True: |
|
|
page_count += 1 |
|
|
|
|
|
|
|
|
if last_id == 0: |
|
|
|
|
|
response = ( |
|
|
self.client.table("document_chunks") |
|
|
.select("*") |
|
|
.order("id") |
|
|
.limit(page_size) |
|
|
.execute() |
|
|
) |
|
|
else: |
|
|
|
|
|
response = ( |
|
|
self.client.table("document_chunks") |
|
|
.select("*") |
|
|
.order("id") |
|
|
.gt("id", last_id) |
|
|
.limit(page_size) |
|
|
.execute() |
|
|
) |
|
|
|
|
|
actual_count = len(response.data) if response.data else 0 |
|
|
logger.debug( |
|
|
f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}" |
|
|
) |
|
|
|
|
|
if not response.data: |
|
|
logger.debug(f"[SUPABASE] No more data after id {last_id}") |
|
|
break |
|
|
|
|
|
all_chunks.extend(response.data) |
|
|
|
|
|
|
|
|
if response.data: |
|
|
last_id = max(chunk.get("id", 0) for chunk in response.data) |
|
|
|
|
|
if actual_count < page_size: |
|
|
logger.debug(f"[SUPABASE] Last page with {actual_count} records") |
|
|
break |
|
|
|
|
|
logger.info( |
|
|
f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})" |
|
|
) |
|
|
logger.info( |
|
|
f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}" |
|
|
) |
|
|
return all_chunks |
|
|
|
|
|
except (httpx.HTTPError, APIError) as e: |
|
|
logger.error( |
|
|
f"[SUPABASE] Error fetching document chunks after retries: {e}" |
|
|
) |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.exception( |
|
|
f"An unexpected error occurred while fetching document chunks: {e}" |
|
|
) |
|
|
return [] |
|
|
|