from __future__ import annotations import os import re import uuid from pathlib import Path from typing import List, Tuple, Dict, Any, Optional import yaml from openai import OpenAI from llama_index.core import Document from llama_index.core.node_parser import MarkdownNodeParser, SentenceSplitter from llama_index.core.schema import BaseNode, TextNode # Cấu hình chunking CHUNK_SIZE = 1500 CHUNK_OVERLAP = 150 MIN_CHUNK_SIZE = 200 TABLE_ROWS_PER_CHUNK = 15 # Cấu hình Small-to-Big ENABLE_TABLE_SUMMARY = True MIN_TABLE_ROWS_FOR_SUMMARY = 0 SUMMARY_MODEL = "openai/gpt-oss-120b" GROQ_BASE_URL = "https://api.groq.com/openai/v1" # Regex patterns COURSE_PATTERN = re.compile(r"Học\s*phần\s+(.+?)\s*\(\s*m[ãa]\s+([^\)]+)\)", re.I | re.DOTALL) TABLE_PLACEHOLDER = re.compile(r"__TBL_(\d+)__") HEADER_KEYWORDS = {'TT', 'STT', 'MÃ', 'TÊN', 'KHỐI', 'SỐ', 'ID', 'NO', '#'} FRONTMATTER_PATTERN = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) TABLE_TITLE_PATTERN = re.compile(r"(?:^|\n)#+\s*(?:Bảng|BẢNG)\s*(\d+(?:\.\d+)?)\s*[.:]*\s*(.+?)(?:\n|$)", re.IGNORECASE) def _is_table_row(line: str) -> bool: """Kiểm tra dòng có phải là hàng trong bảng Markdown không.""" s = line.strip() return s.startswith("|") and s.endswith("|") and s.count("|") >= 2 def _is_separator(line: str) -> bool: """Kiểm tra dòng có phải là separator của bảng (|---|---|).""" if not _is_table_row(line): return False return not line.strip().replace("|", "").replace("-", "").replace(":", "").replace(" ", "") def _is_header(line: str) -> bool: """Kiểm tra dòng có phải là header của bảng không.""" if not _is_table_row(line): return False cells = [c.strip() for c in line.split("|") if c.strip()] if not cells or cells[0].isdigit(): return False return any(k in cells[0].upper() for k in HEADER_KEYWORDS) or len(cells[0].split()) <= 3 def _extract_tables(text: str) -> Tuple[List[Tuple[str, List[str]]], str]: """Trích xuất bảng từ text và thay bằng placeholder.""" lines, tables, last_header, i = text.split("\n"), [], None, 0 while i < len(lines) - 1: if _is_table_row(lines[i]) and _is_separator(lines[i + 1]): if _is_header(lines[i]): header = f"{lines[i]}\n{lines[i + 1]}\n" last_header, start = header, i + 2 else: header = last_header or f"| {'|'.join(['Col'] * (lines[i].count('|') - 1))} |\n|{'|'.join(['---'] * (lines[i].count('|') - 1))}|\n" start = i rows, j = [], start while j < len(lines) and (_is_table_row(lines[j]) or _is_separator(lines[j])): if not _is_separator(lines[j]): rows.append(lines[j]) j += 1 if rows: tables.append((header, rows)) i = j else: i += 1 # Thay bảng bằng placeholder result, tbl_idx, i = [], 0, 0 while i < len(lines): if tbl_idx < len(tables) and i < len(lines) - 1 and _is_table_row(lines[i]) and _is_separator(lines[i + 1]): j = i while j < len(lines) and (_is_table_row(lines[j]) or _is_separator(lines[j])): j += 1 result.append(f"__TBL_{tbl_idx}__") tbl_idx, i = tbl_idx + 1, j else: result.append(lines[i]) i += 1 return tables, "\n".join(result) def _split_table(header: str, rows: List[str], max_rows: int = TABLE_ROWS_PER_CHUNK) -> List[str]: """Chia bảng lớn thành nhiều chunks nhỏ.""" if len(rows) <= max_rows: return [header + "\n".join(rows)] chunks = [] for i in range(0, len(rows), max_rows): chunk_rows = rows[i:i + max_rows] chunks.append(chunk_rows) # Gộp chunk cuối nếu quá nhỏ (< 5 dòng) if len(chunks) > 1 and len(chunks[-1]) < 5: chunks[-2].extend(chunks[-1]) chunks.pop() return [header + "\n".join(r) for r in chunks] _summary_client: Optional[OpenAI] = None def _get_summary_client() -> Optional[OpenAI]: """Lấy Groq client để tóm tắt bảng.""" global _summary_client if _summary_client is not None: return _summary_client api_key = os.getenv("GROQ_API_KEY", "").strip() if not api_key: print("Chưa đặt GROQ_API_KEY. Tắt tính năng tóm tắt bảng.") return None _summary_client = OpenAI(api_key=api_key, base_url=GROQ_BASE_URL) return _summary_client def _summarize_table( table_text: str, context_hint: str = "", table_number: str = "", table_title: str = "", source_file: str = "", max_retries: int = 5, base_delay: float = 2.0 ) -> str: """Tóm tắt bảng bằng LLM với retry logic.""" import time if not ENABLE_TABLE_SUMMARY: raise RuntimeError("Tính năng tóm tắt bảng đã tắt. Đặt ENABLE_TABLE_SUMMARY = True") client = _get_summary_client() if client is None: raise RuntimeError("Chưa đặt GROQ_API_KEY. Không thể tóm tắt bảng.") # Tạo chuỗi định danh bảng table_id_parts = [] if table_number: table_id_parts.append(f"Bảng {table_number}") if table_title: table_id_parts.append(f'"{table_title}"') if source_file: table_id_parts.append(f"từ file {source_file}") table_identifier = " - ".join(table_id_parts) if table_id_parts else "Bảng không xác định" prompt = f"""Tóm tắt ngắn gọn nội dung bảng sau bằng tiếng Việt. {f"**Thông tin bảng:** {table_identifier}" if table_identifier else ""} {f"**Ngữ cảnh:** {context_hint}" if context_hint else ""} **YÊU CẦU QUAN TRỌNG:** - Bắt đầu tóm tắt bằng việc nêu rõ đây là {f"Bảng {table_number}" if table_number else "bảng nào"}{f' với tiêu đề "{table_title}"' if table_title else ""}{f" thuộc file {source_file}" if source_file else ""} - Ghi rõ bảng này liệt kê/quy định về cái gì - Nêu các cột chính trong bảng - Thông tin quan trọng (nếu có số liệu cụ thể thì nêu ví dụ) Bảng: {table_text[:3000]} """ last_error = None for attempt in range(max_retries): try: response = client.chat.completions.create( model=SUMMARY_MODEL, messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=1000, ) summary = response.choices[0].message.content or "" if summary.strip(): return summary.strip() else: raise ValueError("API trả về summary rỗng") except Exception as e: last_error = e delay = base_delay * (2 ** attempt) # Exponential backoff: 2, 4, 8, 16, 32 giây print(f"Thử lại {attempt + 1}/{max_retries} cho {table_identifier}: {e}") print(f" Đợi {delay:.1f}s trước khi thử lại...") time.sleep(delay) # Tất cả retry đều thất bại raise RuntimeError(f"Không thể tóm tắt {table_identifier} sau {max_retries} lần thử. Lỗi cuối: {last_error}") def _create_table_nodes( table_text: str, metadata: dict, context_hint: str = "", table_number: str = "", table_title: str = "", source_file: str = "" ) -> List[TextNode]: """Tạo nodes cho bảng. Bảng lớn sẽ có parent + summary node.""" # Đếm số dòng để quyết định có cần tóm tắt không row_count = table_text.count("\n") # Thêm thông tin bảng vào metadata table_meta = {**metadata} if table_number: table_meta["table_number"] = table_number if table_title: table_meta["table_title"] = table_title if row_count < MIN_TABLE_ROWS_FOR_SUMMARY: # Bảng quá nhỏ, không cần tóm tắt return [TextNode(text=table_text, metadata={**table_meta, "is_table": True})] # Kiểm tra có thể tóm tắt không (cần API key) if _get_summary_client() is None: # Không có API key -> trả về node bảng đơn giản, không tóm tắt return [TextNode(text=table_text, metadata={**table_meta, "is_table": True})] # Tạo summary với retry logic summary = _summarize_table( table_text, context_hint, table_number=table_number, table_title=table_title, source_file=source_file ) # Tạo parent node (bảng gốc - KHÔNG embed) parent_id = str(uuid.uuid4()) parent_node = TextNode( text=table_text, metadata={ **table_meta, "is_table": True, "is_parent": True, # Flag để bỏ qua embedding "node_id": parent_id, } ) parent_node.id_ = parent_id # Tạo summary node (SẼ được embed để search) summary_node = TextNode( text=summary, metadata={ **table_meta, "is_table_summary": True, "parent_id": parent_id, # Link tới parent } ) table_id = f"Bảng {table_number}" if table_number else "bảng" print(f"Đã tạo summary cho {table_id} ({row_count} dòng)") return [parent_node, summary_node] def _enrich_metadata(node: BaseNode, source_path: Path | None) -> None: """Bổ sung metadata từ source path và trích xuất thông tin học phần.""" if source_path: node.metadata.update({"source_path": str(source_path), "source_file": source_path.name}) if "Học phần" in (text := node.get_content()) and (m := COURSE_PATTERN.search(text)): node.metadata.update({"course_name": " ".join(m.group(1).split()), "course_code": " ".join(m.group(2).split())}) def _chunk_text(text: str, metadata: dict) -> List[BaseNode]: """Chia text thành chunks theo kích thước cấu hình.""" if len(text) <= CHUNK_SIZE: return [TextNode(text=text, metadata=metadata.copy())] return SentenceSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP).get_nodes_from_documents( [Document(text=text, metadata=metadata.copy())] ) def _extract_frontmatter(text: str) -> Tuple[Dict[str, Any], str]: """Trích xuất YAML frontmatter từ đầu file.""" match = FRONTMATTER_PATTERN.match(text) if not match: return {}, text try: frontmatter = yaml.safe_load(match.group(1)) or {} remaining_text = text[match.end():].lstrip() return frontmatter, remaining_text except yaml.YAMLError: return {}, text def chunk_markdown(text: str, source_path: str | Path | None = None) -> List[BaseNode]: """Chunk một file Markdown thành các nodes.""" if not text or not text.strip(): return [] path = Path(source_path) if source_path else None # Trích xuất YAML frontmatter làm metadata (không chunk) frontmatter_meta, text = _extract_frontmatter(text) tables, text_with_placeholders = _extract_tables(text) # Metadata cơ bản từ frontmatter + source path base_meta = {**frontmatter_meta} if path: base_meta.update({"source_path": str(path), "source_file": path.name}) # Parse theo headings doc = Document(text=text_with_placeholders, metadata=base_meta.copy()) heading_nodes = MarkdownNodeParser().get_nodes_from_documents([doc]) nodes: List[BaseNode] = [] for node in heading_nodes: content, meta = node.get_content(), node.metadata.copy() matches = list(TABLE_PLACEHOLDER.finditer(content)) if not matches: nodes.extend(_chunk_text(content, meta) if len(content) > CHUNK_SIZE else [TextNode(text=content, metadata=meta)]) continue last_end = 0 for match in matches: # Text trước bảng before_text = content[last_end:match.start()].strip() # Trích xuất số bảng và tiêu đề từ text trước bảng table_number = "" table_title = "" if before_text: title_match = TABLE_TITLE_PATTERN.search(before_text) if title_match: table_number = title_match.group(1).strip() table_title = title_match.group(2).strip() if before_text and len(before_text) >= MIN_CHUNK_SIZE: nodes.extend(_chunk_text(before_text, meta) if len(before_text) > CHUNK_SIZE else [TextNode(text=before_text, metadata=meta.copy())]) # Chunk bảng - sử dụng Small-to-Big pattern if (idx := int(match.group(1))) < len(tables): header, rows = tables[idx] table_chunks = _split_table(header, rows) # Lấy context hint từ header path context_hint = meta.get("Header 1", "") or meta.get("section", "") # Lấy source file cho summary source_file = meta.get("source_file", "") or (path.name if path else "") for i, chunk in enumerate(table_chunks): chunk_meta = {**meta} if len(table_chunks) > 1: chunk_meta["table_part"] = f"{i+1}/{len(table_chunks)}" # Tạo parent + summary nodes nếu cần table_nodes = _create_table_nodes( chunk, chunk_meta, context_hint, table_number=table_number, table_title=table_title, source_file=source_file ) nodes.extend(table_nodes) last_end = match.end() # Text sau bảng if (after := content[last_end:].strip()) and len(after) >= MIN_CHUNK_SIZE: nodes.extend(_chunk_text(after, meta) if len(after) > CHUNK_SIZE else [TextNode(text=after, metadata=meta.copy())]) # Gộp các node nhỏ với node kế tiếp final: List[BaseNode] = [] i = 0 while i < len(nodes): curr = nodes[i] curr_content = curr.get_content() curr_is_table = curr.metadata.get("is_table") # Bỏ qua node rỗng if not curr_content.strip(): i += 1 continue # Nếu node hiện tại nhỏ và không phải bảng -> gộp với node sau if not curr_is_table and len(curr_content) < MIN_CHUNK_SIZE and i + 1 < len(nodes): next_node = nodes[i + 1] next_is_table = next_node.metadata.get("is_table") if next_is_table: merged_text = curr_content.strip() + "\n\n" + next_node.get_content() merged_meta = {**curr.metadata, **next_node.metadata} final.append(TextNode(text=merged_text, metadata=merged_meta)) i += 2 else: merged_text = curr_content + "\n\n" + next_node.get_content() merged_meta = {**curr.metadata, **next_node.metadata} final.append(TextNode(text=merged_text, metadata=merged_meta)) i += 2 else: final.append(curr) i += 1 for idx, node in enumerate(final): _enrich_metadata(node, path) node.metadata["chunk_index"] = idx return final def chunk_markdown_file(path: str | Path) -> List[BaseNode]: """Đọc và chunk một file Markdown.""" p = Path(path) if not p.exists(): raise FileNotFoundError(f"Không tìm thấy file: {p}") return chunk_markdown(p.read_text(encoding="utf-8"), source_path=p)