| from typing import Any, Dict, List, Optional |
| from postgrest.types import CountMethod |
| from supabase.client import create_client, Client |
| from loguru import logger |
| import re |
|
|
| from .utils import timing_decorator_sync |
| from .constants import VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES |
| from .config import get_settings |
|
|
| def remove_stop_phrases(text, stop_phrases): |
| for phrase in stop_phrases: |
| |
| |
| pattern = rf"\b{phrase}\b" |
| text = re.sub(pattern, " ", text) |
| return text |
|
|
| class SupabaseClient: |
| def __init__(self, url: str, key: str): |
| """ |
| Khởi tạo SupabaseClient với url và key. |
| Input: url (str), key (str) |
| Output: SupabaseClient instance. |
| """ |
| self.client: Client = create_client(url, key) |
| settings = get_settings() |
| self.default_match_count = settings.match_count |
|
|
| @timing_decorator_sync |
| def get_page_token(self, page_id: str): |
| """ |
| Lấy access token của Facebook page từ Supabase. |
| Input: page_id (str) |
| Output: access_token (str) hoặc None nếu không có. |
| """ |
| try: |
| response = self.client.table('PageToken').select('token').eq('id', page_id).execute() |
| if response.data and len(response.data) > 0: |
| return response.data[0]['token'] |
| return None |
| except Exception as e: |
| logger.error(f"Error getting page token: {e}") |
| return None |
|
|
| @timing_decorator_sync |
| def match_documents(self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = '', min_rank_threshold: float = 0.001, rrf_k: int = 60): |
| """ |
| Truy vấn vector similarity search qua RPC match_documents. |
| Input: embedding (list[float]), match_count (int), vehicle_keywords (list[str] hoặc None) |
| Output: list[dict] kết quả truy vấn. |
| """ |
| |
| if match_count is None: |
| match_count = self.default_match_count |
|
|
| |
| |
|
|
| """ |
| Xử lý câu hỏi thô: tách từ, loại bỏ stop words, |
| và trả về chuỗi text sạch để truyền vào RPC. |
| """ |
| |
| |
| |
| cleaned_text = remove_stop_phrases(user_question.lower(), VIETNAMESE_STOP_PHRASES) |
| |
| cleaned_text = re.sub(r"[^\w\s]", " ", cleaned_text) |
| |
| words = cleaned_text.split() |
| or_query_tsquery = " ".join([word for word in words if word not in VIETNAMESE_STOP_WORDS]) |
| logger.info(f"[DEBUG][RPC]: or_query_tsquery: {or_query_tsquery}") |
| logger.info(f"[DEBUG][RPC]: embedding: {embedding[:5]}...{embedding[-5:]}") |
|
|
| try: |
| payload = { |
| 'query_text': or_query_tsquery, |
| 'query_embedding': embedding, |
| 'match_count': match_count, |
| 'min_rank_threshold': min_rank_threshold, |
| 'vehicle_filters': None, |
| 'rrf_k': rrf_k |
| } |
| if vehicle_keywords: |
| vehicle_columns = [VEHICLE_KEYWORD_TO_COLUMN[k] for k in vehicle_keywords if k in VEHICLE_KEYWORD_TO_COLUMN] |
| if vehicle_columns: |
| payload['vehicle_filters'] = vehicle_columns |
| response = self.client.rpc( |
| 'match_documents', |
| payload |
| ).execute() |
|
|
| if response.data: |
| return response.data |
| return [] |
| except Exception as e: |
| logger.error(f"Error matching documents: {e}") |
| return [] |
|
|
| @timing_decorator_sync |
| def store_embedding(self, text: str, embedding: List[float], metadata: Dict[str, Any]): |
| """ |
| Lưu embedding vào Supabase. |
| Input: text (str), embedding (list[float]), metadata (dict) |
| Output: bool (True nếu thành công, False nếu lỗi) |
| """ |
| try: |
| response = self.client.table('embeddings').insert({ |
| 'content': text, |
| 'embedding': embedding, |
| 'metadata': metadata |
| }).execute() |
| |
| return bool(response.data) |
| except Exception as e: |
| logger.error(f"Error storing embedding: {e}") |
| return False |
|
|
| @timing_decorator_sync |
| def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool: |
| """ |
| Lưu document chunk vào Supabase. |
| Input: chunk_data (dict) - chứa tất cả thông tin chunk |
| Output: bool (True nếu thành công, False nếu lỗi) |
| """ |
| try: |
| |
| processed_data = chunk_data.copy() |
| |
| |
| if 'embedding' in processed_data: |
| processed_data['embedding'] = processed_data['embedding'] |
| |
| |
| if 'article_number' in processed_data: |
| if processed_data['article_number'] is None or processed_data['article_number'] == "": |
| processed_data['article_number'] = None |
| elif isinstance(processed_data['article_number'], str): |
| try: |
| processed_data['article_number'] = int(processed_data['article_number']) |
| except (ValueError, TypeError): |
| processed_data['article_number'] = None |
| |
| |
| if 'vanbanid' in processed_data: |
| if isinstance(processed_data['vanbanid'], str): |
| try: |
| processed_data['vanbanid'] = int(processed_data['vanbanid']) |
| except (ValueError, TypeError): |
| logger.error(f"Invalid vanbanid: {processed_data['vanbanid']}") |
| return False |
| |
| |
| text_fields = ['document_title', 'article_title', 'clause_number', 'sub_clause_letter', 'context_summary'] |
| for field in text_fields: |
| if field in processed_data and processed_data[field] == "": |
| processed_data[field] = None |
| |
| |
| if 'cha' in processed_data and processed_data['cha'] == "": |
| processed_data['cha'] = None |
| |
| response = self.client.table('document_chunks').insert(processed_data).execute() |
| |
| if response.data: |
| logger.info(f"Successfully stored chunk {processed_data.get('id', 'unknown')}") |
| return True |
| else: |
| logger.error(f"Failed to store chunk {processed_data.get('id', 'unknown')}") |
| return False |
| |
| except Exception as e: |
| logger.error(f"Error storing document chunk: {e}") |
| return False |
|
|
| @timing_decorator_sync |
| def delete_all_document_chunks(self) -> bool: |
| """ |
| Xóa toàn bộ bảng document_chunks. |
| Output: bool (True nếu thành công, False nếu lỗi) |
| """ |
| try: |
| |
| response = self.client.table('document_chunks').delete().execute() |
| logger.info(f"Successfully deleted all document chunks") |
| return True |
| except Exception as e: |
| logger.error(f"Error deleting all document chunks: {e}") |
| return False |
|
|
| @timing_decorator_sync |
| def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]: |
| """ |
| Lấy tất cả chunks của một văn bản theo vanbanid. |
| Input: vanbanid (int) |
| Output: List[Dict] - danh sách chunks |
| """ |
| try: |
| response = self.client.table('document_chunks').select('*').eq('vanbanid', vanbanid).execute() |
| if response.data: |
| logger.info(f"Found {len(response.data)} chunks for vanbanid {vanbanid}") |
| return response.data |
| return [] |
| except Exception as e: |
| logger.error(f"Error getting document chunks for vanbanid {vanbanid}: {e}") |
| return [] |
|
|
| @timing_decorator_sync |
| def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool: |
| """ |
| Xóa tất cả chunks của một văn bản theo vanbanid. |
| Input: vanbanid (int) |
| Output: bool (True nếu thành công, False nếu lỗi) |
| """ |
| try: |
| response = self.client.table('document_chunks').delete().eq('vanbanid', vanbanid).execute() |
| logger.info(f"Successfully deleted all chunks for vanbanid {vanbanid}") |
| return True |
| except Exception as e: |
| logger.error(f"Error deleting chunks for vanbanid {vanbanid}: {e}") |
| return False |
|
|
| @timing_decorator_sync |
| def get_all_document_chunks(self) -> List[Dict[str, Any]]: |
| """ |
| Lấy toàn bộ dữ liệu từ bảng document_chunks. |
| Output: List[Dict] - danh sách tất cả chunks |
| """ |
| try: |
| logger.info("[SUPABASE] Fetching all document chunks") |
| |
| |
| count_response = self.client.table('document_chunks').select('*', count=CountMethod.exact).execute() |
| total_count = count_response.count if hasattr(count_response, 'count') else 'unknown' |
| logger.info(f"[SUPABASE] Total records in table: {total_count}") |
| |
| all_chunks = [] |
| page_size = 1000 |
| last_id = 0 |
| page_count = 0 |
| |
| while True: |
| page_count += 1 |
| |
| |
| if last_id == 0: |
| |
| response = self.client.table('document_chunks').select('*').order('id').limit(page_size).execute() |
| else: |
| |
| response = self.client.table('document_chunks').select('*').order('id').gt('id', last_id).limit(page_size).execute() |
| |
| actual_count = len(response.data) if response.data else 0 |
| logger.info(f"[SUPABASE] Page {page_count}: last_id={last_id}, requested={page_size}, actual={actual_count}") |
| |
| if not response.data: |
| logger.info(f"[SUPABASE] No more data after id {last_id}") |
| break |
| |
| all_chunks.extend(response.data) |
| |
| |
| if response.data: |
| last_id = max(chunk.get('id', 0) for chunk in response.data) |
| |
| if actual_count < page_size: |
| logger.info(f"[SUPABASE] Last page with {actual_count} records") |
| break |
| |
| logger.info(f"[SUPABASE] Cursor-based pagination fetched {len(all_chunks)} document chunks (expected: {total_count})") |
| logger.info(f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}") |
| return all_chunks |
| |
| except Exception as e: |
| logger.error(f"[SUPABASE] Error fetching document chunks: {e}") |
| return [] |