Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import os | |
| from typing import Any, Dict, List | |
| import pdfplumber | |
| from docx import Document | |
| from docx.document import Document as _Document | |
| from docx.oxml.table import CT_Tbl | |
| from docx.oxml.text.paragraph import CT_P | |
| from docx.table import Table, _Cell | |
| from docx.text.paragraph import Paragraph | |
| from langchain_core.documents import Document as LangChainDocument | |
| from utils.text_utils import clean_text | |
| logger = logging.getLogger(__name__) | |
| def table_to_unrolled_text(data: List[List[str]], is_docx: bool = False) -> str: | |
| if not data or len(data) < 2: | |
| return "" | |
| # Làm sạch dữ liệu ban đầu chuyển None thành chuỗi rỗng | |
| cleaned_data = [] | |
| for row in data: | |
| cleaned_row = [str(cell).strip() if cell else "" for cell in row] | |
| cleaned_data.append(cleaned_row) | |
| num_cols = len(cleaned_data[0]) | |
| header_row = cleaned_data[0] | |
| # CHỈ CHẠY FORWARD FILL NẾU KHÔNG PHẢI FILE WORD | |
| if not is_docx: | |
| # 2. Kỹ thuật Forward-Fill cho khu vực Header (Xử lý gộp cột - Colspan) | |
| # Giả định hàng đầu tiên chắc chắn là Header | |
| for i in range(1, num_cols): | |
| if not header_row[i] and header_row[i-1]: | |
| header_row[i] = header_row[i-1] # Kéo giá trị từ trái sang phải | |
| # 3. Kỹ thuật Forward-Fill cho khu vực Dữ liệu (Xử lý gộp hàng - Rowspan) | |
| for r in range(1, len(cleaned_data)): | |
| for c in range(num_cols): | |
| # Nếu ô hiện tại rỗng, kéo giá trị từ ô ngay bên trên xuống | |
| if not cleaned_data[r][c] and cleaned_data[r-1][c]: | |
| cleaned_data[r][c] = cleaned_data[r-1][c] | |
| # 4. Trải phẳng bảng (Unrolling) | |
| headers = cleaned_data[0] | |
| unrolled_rows = [] | |
| for r in range(1, len(cleaned_data)): | |
| row_values = cleaned_data[r] | |
| row_text_parts = [] | |
| # Chỉ ghép những ô có dữ liệu thực sự (khác Header) | |
| for c in range(min(len(headers), len(row_values))): | |
| header_val = headers[c] | |
| cell_val = row_values[c] | |
| # Tránh lặp lại nếu dữ liệu vô tình giống hệt Header | |
| if cell_val and cell_val != header_val: | |
| row_text_parts.append(f"{header_val}: {cell_val}") | |
| if row_text_parts: | |
| unrolled_rows.append("- " + " | ".join(row_text_parts)) | |
| return "\n" + "\n".join(unrolled_rows) + "\n\n" | |
| def read_pdf_with_tables(filepath: str) -> List[LangChainDocument]: | |
| docs: List[LangChainDocument] = [] | |
| try: | |
| with pdfplumber.open(filepath) as pdf: | |
| for page_index, page in enumerate(pdf.pages, 1): | |
| text = page.extract_text() or "" | |
| tables = page.extract_tables() | |
| table_texts: List[str] = [] | |
| if tables: | |
| for table in tables: | |
| # Vẫn chạy Forward-Fill bình thường cho PDF | |
| unrolled_table = table_to_unrolled_text(table, is_docx=False) | |
| if unrolled_table: | |
| table_texts.append(unrolled_table) | |
| full_content = text + "\n\n[BANG DU LIEU TRICH XUAT]:\n" + "\n".join(table_texts) | |
| if full_content.strip(): | |
| docs.append( | |
| LangChainDocument( | |
| page_content=full_content, | |
| metadata={"source": filepath, "page": page_index}, | |
| ) | |
| ) | |
| except Exception as error: | |
| logger.error("Lỗi đọc PDF %s: %s", os.path.basename(filepath), error) | |
| return docs | |
| def iter_block_items(parent): | |
| if isinstance(parent, _Document): | |
| parent_elm = parent.element.body | |
| elif isinstance(parent, _Cell): | |
| parent_elm = parent._tc | |
| else: | |
| raise ValueError("Chỉ hỗ trợ duyệt Document hoặc Cell") | |
| for child in parent_elm.iterchildren(): | |
| if isinstance(child, CT_P): | |
| yield Paragraph(child, parent) | |
| elif isinstance(child, CT_Tbl): | |
| yield Table(child, parent) | |
| def read_docx_with_tables(filepath: str) -> str: | |
| doc = Document(filepath) | |
| full_text: List[str] = [] | |
| for block in iter_block_items(doc): | |
| if isinstance(block, Paragraph): | |
| if block.text.strip(): | |
| full_text.append(block.text.strip()) | |
| elif isinstance(block, Table): | |
| table_data: List[List[str]] = [] | |
| for row in block.rows: | |
| row_data: List[str] = [] | |
| for cell in row.cells: | |
| row_data.append(clean_text(cell.text)) | |
| table_data.append(row_data) | |
| # CẮT FORWARD-FILL TẠI ĐÂY BẰNG is_docx=True | |
| unrolled_table = table_to_unrolled_text(table_data, is_docx=True) | |
| if unrolled_table: | |
| full_text.append(f"\n{unrolled_table}\n") | |
| return "\n".join(full_text) | |
| def load_documents_from_file(filepath: str, filename: str) -> List[LangChainDocument]: | |
| docs: List[LangChainDocument] = [] | |
| lower_name = filename.lower() | |
| try: | |
| if lower_name.endswith(".pdf"): | |
| docs = read_pdf_with_tables(filepath) | |
| elif lower_name.endswith(".docx"): | |
| text = read_docx_with_tables(filepath) | |
| if text: | |
| docs = [LangChainDocument(page_content=text, metadata={"source": filepath})] | |
| elif lower_name.endswith(".txt"): | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as input_file: | |
| text = input_file.read() | |
| if text and text.strip(): | |
| docs = [LangChainDocument(page_content=text, metadata={"source": filepath})] | |
| if docs: | |
| logger.info("Da doc: %s", filename) | |
| return docs | |
| except Exception as error: | |
| logger.error("Loi doc %s: %s", filename, str(error)[:120]) | |
| return [] | |
| async def build_vectorstore_improved( | |
| sync_coordinator: Any, | |
| startup_wait_seconds: int = 5, | |
| ) -> Dict[str, Any]: | |
| if sync_coordinator is None: | |
| raise ValueError("sync_coordinator is required") | |
| startup_sync_task = asyncio.create_task( | |
| sync_coordinator.run_sync( | |
| trigger="startup:initial_sync", | |
| queue_if_locked=False, | |
| ) | |
| ) | |
| if startup_wait_seconds <= 0: | |
| return { | |
| "task": startup_sync_task, | |
| "initial_sync": None, | |
| "timed_out": True, | |
| } | |
| try: | |
| initial_sync = await asyncio.wait_for( | |
| asyncio.shield(startup_sync_task), | |
| timeout=startup_wait_seconds, | |
| ) | |
| return { | |
| "task": startup_sync_task, | |
| "initial_sync": initial_sync, | |
| "timed_out": False, | |
| } | |
| except asyncio.TimeoutError: | |
| return { | |
| "task": startup_sync_task, | |
| "initial_sync": None, | |
| "timed_out": True, | |
| } | |
| def load_vectorstore_improved(sync_coordinator: Any) -> Dict[str, Any]: | |
| if sync_coordinator is None: | |
| return {} | |
| try: | |
| state = sync_coordinator.get_health_snapshot() | |
| return state if isinstance(state, dict) else {} | |
| except Exception: | |
| logger.exception("Khong the lay sync state tu coordinator") | |
| return {} |