M_chatbot / rag /vectorstore.py
minh-4T's picture
unroll table and project restructuring
89c8b6a
import asyncio
import logging
import os
from typing import Any, Dict, List
import pdfplumber
from docx import Document
from docx.document import Document as _Document
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from langchain_core.documents import Document as LangChainDocument
from utils.text_utils import clean_text
logger = logging.getLogger(__name__)
def table_to_unrolled_text(data: List[List[str]], is_docx: bool = False) -> str:
if not data or len(data) < 2:
return ""
# Làm sạch dữ liệu ban đầu chuyển None thành chuỗi rỗng
cleaned_data = []
for row in data:
cleaned_row = [str(cell).strip() if cell else "" for cell in row]
cleaned_data.append(cleaned_row)
num_cols = len(cleaned_data[0])
header_row = cleaned_data[0]
# CHỈ CHẠY FORWARD FILL NẾU KHÔNG PHẢI FILE WORD
if not is_docx:
# 2. Kỹ thuật Forward-Fill cho khu vực Header (Xử lý gộp cột - Colspan)
# Giả định hàng đầu tiên chắc chắn là Header
for i in range(1, num_cols):
if not header_row[i] and header_row[i-1]:
header_row[i] = header_row[i-1] # Kéo giá trị từ trái sang phải
# 3. Kỹ thuật Forward-Fill cho khu vực Dữ liệu (Xử lý gộp hàng - Rowspan)
for r in range(1, len(cleaned_data)):
for c in range(num_cols):
# Nếu ô hiện tại rỗng, kéo giá trị từ ô ngay bên trên xuống
if not cleaned_data[r][c] and cleaned_data[r-1][c]:
cleaned_data[r][c] = cleaned_data[r-1][c]
# 4. Trải phẳng bảng (Unrolling)
headers = cleaned_data[0]
unrolled_rows = []
for r in range(1, len(cleaned_data)):
row_values = cleaned_data[r]
row_text_parts = []
# Chỉ ghép những ô có dữ liệu thực sự (khác Header)
for c in range(min(len(headers), len(row_values))):
header_val = headers[c]
cell_val = row_values[c]
# Tránh lặp lại nếu dữ liệu vô tình giống hệt Header
if cell_val and cell_val != header_val:
row_text_parts.append(f"{header_val}: {cell_val}")
if row_text_parts:
unrolled_rows.append("- " + " | ".join(row_text_parts))
return "\n" + "\n".join(unrolled_rows) + "\n\n"
def read_pdf_with_tables(filepath: str) -> List[LangChainDocument]:
docs: List[LangChainDocument] = []
try:
with pdfplumber.open(filepath) as pdf:
for page_index, page in enumerate(pdf.pages, 1):
text = page.extract_text() or ""
tables = page.extract_tables()
table_texts: List[str] = []
if tables:
for table in tables:
# Vẫn chạy Forward-Fill bình thường cho PDF
unrolled_table = table_to_unrolled_text(table, is_docx=False)
if unrolled_table:
table_texts.append(unrolled_table)
full_content = text + "\n\n[BANG DU LIEU TRICH XUAT]:\n" + "\n".join(table_texts)
if full_content.strip():
docs.append(
LangChainDocument(
page_content=full_content,
metadata={"source": filepath, "page": page_index},
)
)
except Exception as error:
logger.error("Lỗi đọc PDF %s: %s", os.path.basename(filepath), error)
return docs
def iter_block_items(parent):
if isinstance(parent, _Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("Chỉ hỗ trợ duyệt Document hoặc Cell")
for child in parent_elm.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, parent)
elif isinstance(child, CT_Tbl):
yield Table(child, parent)
def read_docx_with_tables(filepath: str) -> str:
doc = Document(filepath)
full_text: List[str] = []
for block in iter_block_items(doc):
if isinstance(block, Paragraph):
if block.text.strip():
full_text.append(block.text.strip())
elif isinstance(block, Table):
table_data: List[List[str]] = []
for row in block.rows:
row_data: List[str] = []
for cell in row.cells:
row_data.append(clean_text(cell.text))
table_data.append(row_data)
# CẮT FORWARD-FILL TẠI ĐÂY BẰNG is_docx=True
unrolled_table = table_to_unrolled_text(table_data, is_docx=True)
if unrolled_table:
full_text.append(f"\n{unrolled_table}\n")
return "\n".join(full_text)
def load_documents_from_file(filepath: str, filename: str) -> List[LangChainDocument]:
docs: List[LangChainDocument] = []
lower_name = filename.lower()
try:
if lower_name.endswith(".pdf"):
docs = read_pdf_with_tables(filepath)
elif lower_name.endswith(".docx"):
text = read_docx_with_tables(filepath)
if text:
docs = [LangChainDocument(page_content=text, metadata={"source": filepath})]
elif lower_name.endswith(".txt"):
with open(filepath, "r", encoding="utf-8", errors="ignore") as input_file:
text = input_file.read()
if text and text.strip():
docs = [LangChainDocument(page_content=text, metadata={"source": filepath})]
if docs:
logger.info("Da doc: %s", filename)
return docs
except Exception as error:
logger.error("Loi doc %s: %s", filename, str(error)[:120])
return []
async def build_vectorstore_improved(
sync_coordinator: Any,
startup_wait_seconds: int = 5,
) -> Dict[str, Any]:
if sync_coordinator is None:
raise ValueError("sync_coordinator is required")
startup_sync_task = asyncio.create_task(
sync_coordinator.run_sync(
trigger="startup:initial_sync",
queue_if_locked=False,
)
)
if startup_wait_seconds <= 0:
return {
"task": startup_sync_task,
"initial_sync": None,
"timed_out": True,
}
try:
initial_sync = await asyncio.wait_for(
asyncio.shield(startup_sync_task),
timeout=startup_wait_seconds,
)
return {
"task": startup_sync_task,
"initial_sync": initial_sync,
"timed_out": False,
}
except asyncio.TimeoutError:
return {
"task": startup_sync_task,
"initial_sync": None,
"timed_out": True,
}
def load_vectorstore_improved(sync_coordinator: Any) -> Dict[str, Any]:
if sync_coordinator is None:
return {}
try:
state = sync_coordinator.get_health_snapshot()
return state if isinstance(state, dict) else {}
except Exception:
logger.exception("Khong the lay sync state tu coordinator")
return {}