| | import re |
| | import os |
| | import uuid |
| | from typing import List, Dict, Optional, Tuple, Any |
| | from dataclasses import dataclass |
| | from loguru import logger |
| | from .supabase_db import SupabaseClient |
| | from .embedding import EmbeddingClient |
| | from .config import get_settings |
| |
|
| | @dataclass |
| | class ChunkMetadata: |
| | """Metadata cho một chunk.""" |
| | id: str |
| | content: str |
| | vanbanid: int |
| | cha: Optional[str] = None |
| | document_title: str = "" |
| | article_number: Optional[int] = None |
| | article_title: str = "" |
| | clause_number: str = "" |
| | sub_clause_letter: str = "" |
| | context_summary: str = "" |
| |
|
| | class LawDocumentChunker: |
| | """Module xử lý chunking văn bản luật và tích hợp với Supabase.""" |
| | |
| | def __init__(self): |
| | """Khởi tạo chunker với các regex patterns.""" |
| | settings = get_settings() |
| | self.supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key) |
| | self.embedding_client = EmbeddingClient() |
| | self.llm_client: Optional[Any] = None |
| | |
| | |
| | |
| | self.PHAN_REGEX = r"^(Phần|PHẦN|Phần thứ)\s+(\d+|[IVXLCDM]+|nhất|hai|ba|tư|năm|sáu|bảy|tám|chín|mười)\.?\s*(.*)" |
| | self.PHU_LUC_REGEX = r"^(Phụ lục|PHỤ LỤC)\s+(\d+|[A-Z]+)\.?\s*(.*)" |
| | self.CHUONG_REGEX = r"^(Chương|CHƯƠNG)\s+(\d+|[IVXLCDM]+)\.?\s*(.*)" |
| | self.MUC_REGEX = r"^(Mục|MỤC)\s+(\d+)\.?\s*(.*)" |
| | self.DIEU_REGEX = r"^Điều\s+(\d+)\.\s*(.*)" |
| | self.KHOAN_REGEX = r"^\s*(\d+(\.\d+)*)\.\s*(.*)" |
| | self.DIEM_REGEX_A = r"^\s*([a-zđ])\)\s*(.*)" |
| | self.DIEM_REGEX_NUM = r"^\s*(\d+\.\d+\.\d+)\.\s*(.*)" |
| | |
| | |
| | self.CHUNK_SIZE = 500 |
| | self.CHUNK_OVERLAP = 100 |
| | |
| | logger.info("[CHUNKER] Initialized LawDocumentChunker") |
| |
|
| | def _create_data_directory(self): |
| | """Tạo thư mục data nếu chưa tồn tại.""" |
| | data_dir = "data" |
| | if not os.path.exists(data_dir): |
| | os.makedirs(data_dir) |
| | logger.info(f"[CHUNKER] Created directory: {data_dir}") |
| | return data_dir |
| |
|
| | def _extract_document_title(self, file_path: str) -> str: |
| | """Trích xuất tiêu đề văn bản từ tên file.""" |
| | filename = os.path.basename(file_path) |
| | |
| | name_without_ext = os.path.splitext(filename)[0] |
| | |
| | title = name_without_ext.replace('_', ' ').title() |
| | logger.info(f"[CHUNKER] Extracted document title: {title}") |
| | return title |
| |
|
| | def _read_document(self, file_path: str) -> str: |
| | """Đọc nội dung văn bản từ file.""" |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | logger.info(f"[CHUNKER] Read document: {file_path}, length: {len(content)}") |
| | return content |
| | except Exception as e: |
| | logger.error(f"[CHUNKER] Error reading file {file_path}: {e}") |
| | raise |
| |
|
| | def _detect_structure_level(self, line: str) -> Tuple[str, Optional[str], Optional[str]]: |
| | """Phát hiện cấp độ cấu trúc của một dòng.""" |
| | line = line.strip() |
| | |
| | try: |
| | |
| | match = re.match(self.PHAN_REGEX, line, re.IGNORECASE) |
| | if match: |
| | return "PHAN", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.PHU_LUC_REGEX, line, re.IGNORECASE) |
| | if match: |
| | return "PHU_LUC", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.CHUONG_REGEX, line, re.IGNORECASE) |
| | if match: |
| | return "CHUONG", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.MUC_REGEX, line, re.IGNORECASE) |
| | if match: |
| | return "MUC", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.DIEU_REGEX, line) |
| | if match: |
| | return "DIEU", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.KHOAN_REGEX, line) |
| | if match: |
| | clause_num = match.group(1) |
| | |
| | if len(clause_num.split('.')) < 3: |
| | return "KHOAN", clause_num, match.group(3) |
| | |
| | |
| | match = re.match(self.DIEM_REGEX_A, line) |
| | if match: |
| | return "DIEM", match.group(1), match.group(2) |
| | |
| | |
| | match = re.match(self.DIEM_REGEX_NUM, line) |
| | if match: |
| | return "DIEM", match.group(1), match.group(2) |
| | |
| | return "CONTENT", None, None |
| | |
| | except Exception as e: |
| | logger.error(f"[CHUNKER] Error in _detect_structure_level for line '{line}': {e}") |
| | return "CONTENT", None, None |
| |
|
| | def _build_structure_summary(self, article_number, clause_number, sub_clause_letter): |
| | if sub_clause_letter and clause_number and article_number: |
| | return f"Điểm {sub_clause_letter} Khoản {clause_number} Điều {article_number}" |
| | elif clause_number and article_number: |
| | return f"Khoản {clause_number} Điều {article_number}" |
| | elif article_number: |
| | return f"Điều {article_number}" |
| | return "" |
| |
|
| | def _create_chunk_metadata(self, content: str, level: str, level_value: Optional[str], |
| | parent_id: Optional[str], vanbanid: int, |
| | document_title: str, chunk_stack: List[Tuple[str, str, Optional[str], str]], chunk_dict: dict) -> 'ChunkMetadata': |
| | """Tạo metadata cho chunk.""" |
| | chunk_id = str(uuid.uuid4()) |
| | metadata = ChunkMetadata( |
| | id=chunk_id, |
| | content=content, |
| | vanbanid=vanbanid, |
| | cha=parent_id, |
| | document_title=document_title |
| | ) |
| | |
| | if level == "DIEU" and level_value: |
| | metadata.article_number = int(level_value) if level_value.isdigit() else None |
| | metadata.article_title = content.split('\n')[0].strip() if content else "" |
| | elif level == "KHOAN" and level_value: |
| | metadata.clause_number = level_value |
| | elif level == "DIEM" and level_value: |
| | metadata.sub_clause_letter = level_value |
| | |
| | logger.debug(f"[CHUNKER] Creating chunk with level: {level}, parent_id: {parent_id}, stack_size: {len(chunk_stack)}") |
| | if chunk_dict is not None and parent_id: |
| | self._fill_metadata_from_parents(metadata, parent_id, chunk_dict) |
| | else: |
| | logger.debug(f"[CHUNKER] Skipping metadata fill - no parent_id or chunk_dict") |
| | |
| | metadata.context_summary = self._build_structure_summary( |
| | metadata.article_number, metadata.clause_number, metadata.sub_clause_letter |
| | ) |
| | logger.debug(f"[CHUNKER] Final metadata for chunk {chunk_id[:8]}... - Level: {level}, Article: {metadata.article_number}, Clause: {metadata.clause_number}, Point: {metadata.sub_clause_letter}") |
| | return metadata |
| |
|
| | def _fill_metadata_from_parents(self, metadata: ChunkMetadata, parent_id: str, chunk_dict: Dict[str, ChunkMetadata]): |
| | """ |
| | Điền metadata từ parent và ancestor (cha, ông, ...), sử dụng dict id->chunk. |
| | """ |
| | parent = chunk_dict.get(parent_id) |
| | if not parent: |
| | logger.warning(f"[CHUNKER] Parent chunk {parent_id} not found in chunk_dict") |
| | return |
| | |
| | if parent.article_number and not metadata.article_number: |
| | metadata.article_number = parent.article_number |
| | if parent.article_title and not metadata.article_title: |
| | metadata.article_title = parent.article_title |
| | if parent.clause_number and not metadata.clause_number: |
| | metadata.clause_number = parent.clause_number |
| | if parent.sub_clause_letter and not metadata.sub_clause_letter: |
| | metadata.sub_clause_letter = parent.sub_clause_letter |
| | |
| | if parent.clause_number and not metadata.article_number: |
| | grandparent = chunk_dict.get(parent.cha) if parent.cha else None |
| | if grandparent and grandparent.article_number: |
| | metadata.article_number = grandparent.article_number |
| | if grandparent and grandparent.article_title: |
| | metadata.article_title = grandparent.article_title |
| |
|
| | def _split_into_chunks(self, text: str, chunk_size: int, overlap: int) -> List[str]: |
| | """Chia text thành các chunk với overlap.""" |
| | chunks = [] |
| | start = 0 |
| | |
| | while start < len(text): |
| | end = start + chunk_size |
| | chunk = text[start:end] |
| | |
| | |
| | if end < len(text): |
| | |
| | last_period = chunk.rfind('.') |
| | last_newline = chunk.rfind('\n') |
| | best_break = max(last_period, last_newline) |
| | |
| | if best_break > start + chunk_size * 0.7: |
| | end = start + best_break + 1 |
| | chunk = text[start:end] |
| | |
| | chunks.append(chunk) |
| | start = end - overlap |
| | |
| | if start >= len(text): |
| | break |
| | |
| | return chunks |
| |
|
| | def _process_document_recursive(self, content: str, vanbanid: int, |
| | document_title: str) -> List[ChunkMetadata]: |
| | """Xử lý văn bản theo cấu trúc phân cấp.""" |
| | lines = content.split('\n') |
| | chunks = [] |
| | chunk_stack = [] |
| | chunk_dict = {} |
| | current_chunk_content = "" |
| | current_level = None |
| | current_level_value = None |
| | current_parent = None |
| | current_level_priority = None |
| | level_priority = { |
| | "PHAN": 1, |
| | "PHU_LUC": 1, |
| | "CHUONG": 2, |
| | "MUC": 3, |
| | "DIEU": 4, |
| | "KHOAN": 5, |
| | "DIEM": 6, |
| | "CONTENT": 7 |
| | } |
| | preamble_done = False |
| | for line in lines: |
| | level, level_value, _ = self._detect_structure_level(line) |
| | line_priority = level_priority.get(level, 7) |
| | |
| | if not preamble_done and (level == "CONTENT" or not level_value): |
| | current_chunk_content += line + "\n" |
| | current_level = "CONTENT" |
| | current_level_value = None |
| | current_parent = None |
| | current_level_priority = 7 |
| | continue |
| | if not preamble_done and (level != "CONTENT" and level_value): |
| | |
| | if current_chunk_content.strip(): |
| | metadata = self._create_chunk_metadata( |
| | current_chunk_content.strip(), |
| | "CONTENT", |
| | None, |
| | None, |
| | vanbanid, |
| | document_title, |
| | chunk_stack, |
| | chunk_dict |
| | ) |
| | chunks.append(metadata) |
| | chunk_stack.append((metadata.id, "CONTENT", None, current_chunk_content.strip())) |
| | chunk_dict[metadata.id] = metadata |
| | preamble_done = True |
| | current_chunk_content = "" |
| | current_level = level |
| | current_level_value = level_value |
| | current_level_priority = line_priority |
| | current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) |
| | current_chunk_content += line + "\n" |
| | continue |
| | |
| | if level != "CONTENT" and level_value: |
| | if current_level is not None and current_level_priority is not None and line_priority <= current_level_priority: |
| | |
| | if current_chunk_content.strip(): |
| | metadata = self._create_chunk_metadata( |
| | current_chunk_content.strip(), |
| | str(current_level), |
| | current_level_value, |
| | current_parent, |
| | vanbanid, |
| | document_title, |
| | chunk_stack, |
| | chunk_dict |
| | ) |
| | chunks.append(metadata) |
| | chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) |
| | chunk_dict[metadata.id] = metadata |
| | |
| | current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) |
| | current_chunk_content = line + "\n" |
| | current_level = level |
| | current_level_value = level_value |
| | current_level_priority = line_priority |
| | else: |
| | |
| | if current_chunk_content.strip() and current_level is not None: |
| | metadata = self._create_chunk_metadata( |
| | current_chunk_content.strip(), |
| | str(current_level), |
| | current_level_value, |
| | current_parent, |
| | vanbanid, |
| | document_title, |
| | chunk_stack, |
| | chunk_dict |
| | ) |
| | chunks.append(metadata) |
| | chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) |
| | chunk_dict[metadata.id] = metadata |
| | current_parent = self._find_parent_for_level(chunk_stack, level, level_priority) |
| | current_chunk_content = line + "\n" |
| | current_level = level |
| | current_level_value = level_value |
| | current_level_priority = line_priority |
| | else: |
| | |
| | current_chunk_content += line + "\n" |
| | |
| | if len(current_chunk_content) > self.CHUNK_SIZE and current_level is not None: |
| | sub_chunks = self._split_into_chunks(current_chunk_content, self.CHUNK_SIZE, self.CHUNK_OVERLAP) |
| | for sub_chunk in sub_chunks: |
| | metadata = self._create_chunk_metadata( |
| | sub_chunk.strip(), |
| | str(current_level), |
| | current_level_value, |
| | current_parent, |
| | vanbanid, |
| | document_title, |
| | chunk_stack, |
| | chunk_dict |
| | ) |
| | chunks.append(metadata) |
| | chunk_stack.append((metadata.id, str(current_level), current_level_value, sub_chunk.strip())) |
| | chunk_dict[metadata.id] = metadata |
| | current_chunk_content = "" |
| | |
| | if current_chunk_content.strip() and current_level is not None: |
| | metadata = self._create_chunk_metadata( |
| | current_chunk_content.strip(), |
| | str(current_level), |
| | current_level_value, |
| | current_parent, |
| | vanbanid, |
| | document_title, |
| | chunk_stack, |
| | chunk_dict |
| | ) |
| | chunks.append(metadata) |
| | chunk_stack.append((metadata.id, str(current_level), current_level_value, current_chunk_content.strip())) |
| | chunk_dict[metadata.id] = metadata |
| | root_count = sum(1 for chunk in chunks if chunk.cha is None) |
| | logger.info(f"[CHUNKER] Created {len(chunks)} chunks, {root_count} root chunks") |
| | for i, chunk in enumerate(chunks[:10]): |
| | logger.debug(f"[CHUNKER] Chunk {i+1}: {chunk.content[:100]}... -> Parent: {chunk.cha}") |
| | if len(chunks) > 10: |
| | logger.debug(f"[CHUNKER] ... and {len(chunks) - 10} more chunks") |
| | return chunks |
| | |
| | def _find_parent_for_level(self, chunk_stack: List[Tuple[str, str, Optional[str], str]], |
| | current_level: str, level_priority: Dict[str, int]) -> Optional[str]: |
| | """ |
| | Tìm parent gần nhất có level cao hơn (priority thấp hơn) cho level hiện tại, kiểm tra hợp lệ cha-con. |
| | """ |
| | current_priority = level_priority.get(current_level, 999) |
| | valid_parents = { |
| | "MUC": ["CHUONG", "PHAN"], |
| | "DIEU": ["MUC", "CHUONG", "PHAN"], |
| | "CHUONG": ["PHAN"], |
| | |
| | } |
| | for chunk_id, level, level_value, content in reversed(chunk_stack): |
| | if level_priority.get(level, 999) < current_priority: |
| | if current_level in valid_parents: |
| | if level in valid_parents[current_level]: |
| | return chunk_id |
| | else: |
| | return chunk_id |
| | return None |
| |
|
| | async def _create_embeddings_for_chunks(self, chunks: List[ChunkMetadata]) -> int: |
| | """Tạo embeddings cho các chunks và lưu ngay lập tức vào Supabase.""" |
| | logger.info(f"[CHUNKER] Creating embeddings and storing {len(chunks)} chunks") |
| | |
| | success_count = 0 |
| | failed_count = 0 |
| | |
| | |
| | logger.info(f"[CHUNKER] === DETAILED METADATA ANALYSIS ===") |
| | for i, chunk in enumerate(chunks[:20]): |
| | logger.info(f"[CHUNKER] Chunk {i+1}:") |
| | logger.info(f" - ID: {chunk.id[:8]}...") |
| | logger.info(f" - Content: {chunk.content[:100]}...") |
| | logger.info(f" - Parent: {chunk.cha}") |
| | logger.info(f" - Article: {chunk.article_number}") |
| | logger.info(f" - Article Title: {chunk.article_title}") |
| | logger.info(f" - Clause: {chunk.clause_number}") |
| | logger.info(f" - Point: {chunk.sub_clause_letter}") |
| | logger.info(f" - Document: {chunk.document_title}") |
| | logger.info(f" ---") |
| | |
| | for i, chunk in enumerate(chunks, 1): |
| | try: |
| | |
| | embedding = await self.embedding_client.create_embedding(chunk.content, task_type="retrieval_document") |
| | |
| | semantic_summary = await self._create_semantic_summary_with_llm(chunk.content) |
| | |
| | |
| | chunk_dict = { |
| | 'id': chunk.id, |
| | 'content': chunk.content, |
| | 'embedding': embedding if embedding is not None else [0.0] * 768, |
| | 'vanbanid': chunk.vanbanid, |
| | 'cha': chunk.cha, |
| | 'document_title': chunk.document_title, |
| | 'article_number': chunk.article_number, |
| | 'article_title': chunk.article_title, |
| | 'clause_number': chunk.clause_number, |
| | 'sub_clause_letter': chunk.sub_clause_letter, |
| | 'context_summary': f"Structure: {chunk.context_summary}|Semantic: {semantic_summary}" |
| | } |
| | |
| | |
| | success = self.supabase_client.store_document_chunk(chunk_dict) |
| | if success: |
| | success_count += 1 |
| | if i % 100 == 0: |
| | logger.info(f"[CHUNKER] Stored chunk {i}/{len(chunks)}: {chunk.id[:8]}...") |
| | else: |
| | failed_count += 1 |
| | logger.error(f"[CHUNKER] Failed to store chunk {chunk.id}") |
| | |
| | except Exception as e: |
| | failed_count += 1 |
| | logger.error(f"[CHUNKER] Error processing chunk {chunk.id}: {e}") |
| | continue |
| | |
| | logger.info(f"[CHUNKER] Successfully processed {success_count}/{len(chunks)} chunks, {failed_count} failed") |
| | return success_count |
| |
|
| | async def _store_chunks_to_supabase(self, chunk_data: List[Dict]) -> bool: |
| | """Legacy method - không còn sử dụng.""" |
| | logger.warning("[CHUNKER] _store_chunks_to_supabase is deprecated, use _create_embeddings_for_chunks instead") |
| | return True |
| |
|
| | async def process_law_document(self, file_path: str, document_id: int) -> bool: |
| | """ |
| | Hàm chính để xử lý văn bản luật. |
| | |
| | Args: |
| | file_path: Đường dẫn đến file văn bản luật |
| | document_id: ID duy nhất của văn bản luật |
| | |
| | Returns: |
| | bool: True nếu thành công, False nếu thất bại |
| | """ |
| | try: |
| | logger.info(f"[CHUNKER] Starting processing for file: {file_path}, document_id: {document_id}") |
| | |
| | |
| | self._create_data_directory() |
| | |
| | |
| | if not os.path.exists(file_path): |
| | logger.error(f"[CHUNKER] File not found: {file_path}") |
| | return False |
| | |
| | |
| | content = self._read_document(file_path) |
| | |
| | |
| | document_title = self._extract_document_title(file_path) |
| | |
| | |
| | chunks = self._process_document_recursive(content, document_id, document_title) |
| | |
| | if not chunks: |
| | logger.warning(f"[CHUNKER] No chunks created for document {document_id}") |
| | return False |
| | |
| | |
| | success_count = await self._create_embeddings_for_chunks(chunks) |
| | |
| | if success_count == 0: |
| | logger.error(f"[CHUNKER] No embeddings created for document {document_id}") |
| | return False |
| | |
| | logger.info(f"[CHUNKER] Successfully processed document {document_id} with {success_count} chunks") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"[CHUNKER] Error processing document {document_id}: {e}") |
| | return False |
| |
|
| | async def _create_semantic_summary_with_llm(self, chunk_content: str) -> str: |
| | """ |
| | Sinh semantic summary ngắn gọn, súc tích cho chunk bằng LLM. |
| | """ |
| | if not hasattr(self, "llm_client") or self.llm_client is None: |
| | logger.warning("[CHUNKER] llm_client chưa được gán, bỏ qua semantic summary.") |
| | return "" |
| | prompt = ( |
| | "Tóm tắt thật ngắn gọn, súc tích nội dung luật sau (1-2 câu, không lặp lại tiêu đề, không giải thích):\n" |
| | f"{chunk_content.strip()}" |
| | ) |
| | try: |
| | summary = await self.llm_client.generate_text(prompt) |
| | return summary.strip() if summary else "" |
| | except Exception as e: |
| | logger.error(f"[CHUNKER] Lỗi khi sinh semantic summary bằng LLM: {e}") |
| | return "" |