Spaces:
Sleeping
Sleeping
File size: 7,428 Bytes
f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 89c8b6a d326834 89c8b6a d326834 f9006d9 89c8b6a f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 89c8b6a f9006d9 d326834 f9006d9 89c8b6a f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 89c8b6a f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 d326834 f9006d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | import asyncio
import logging
import os
from typing import Any, Dict, List
import pdfplumber
from docx import Document
from docx.document import Document as _Document
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table, _Cell
from docx.text.paragraph import Paragraph
from langchain_core.documents import Document as LangChainDocument
from utils.text_utils import clean_text
logger = logging.getLogger(__name__)
def table_to_unrolled_text(data: List[List[str]], is_docx: bool = False) -> str:
if not data or len(data) < 2:
return ""
# Làm sạch dữ liệu ban đầu chuyển None thành chuỗi rỗng
cleaned_data = []
for row in data:
cleaned_row = [str(cell).strip() if cell else "" for cell in row]
cleaned_data.append(cleaned_row)
num_cols = len(cleaned_data[0])
header_row = cleaned_data[0]
# CHỈ CHẠY FORWARD FILL NẾU KHÔNG PHẢI FILE WORD
if not is_docx:
# 2. Kỹ thuật Forward-Fill cho khu vực Header (Xử lý gộp cột - Colspan)
# Giả định hàng đầu tiên chắc chắn là Header
for i in range(1, num_cols):
if not header_row[i] and header_row[i-1]:
header_row[i] = header_row[i-1] # Kéo giá trị từ trái sang phải
# 3. Kỹ thuật Forward-Fill cho khu vực Dữ liệu (Xử lý gộp hàng - Rowspan)
for r in range(1, len(cleaned_data)):
for c in range(num_cols):
# Nếu ô hiện tại rỗng, kéo giá trị từ ô ngay bên trên xuống
if not cleaned_data[r][c] and cleaned_data[r-1][c]:
cleaned_data[r][c] = cleaned_data[r-1][c]
# 4. Trải phẳng bảng (Unrolling)
headers = cleaned_data[0]
unrolled_rows = []
for r in range(1, len(cleaned_data)):
row_values = cleaned_data[r]
row_text_parts = []
# Chỉ ghép những ô có dữ liệu thực sự (khác Header)
for c in range(min(len(headers), len(row_values))):
header_val = headers[c]
cell_val = row_values[c]
# Tránh lặp lại nếu dữ liệu vô tình giống hệt Header
if cell_val and cell_val != header_val:
row_text_parts.append(f"{header_val}: {cell_val}")
if row_text_parts:
unrolled_rows.append("- " + " | ".join(row_text_parts))
return "\n" + "\n".join(unrolled_rows) + "\n\n"
def read_pdf_with_tables(filepath: str) -> List[LangChainDocument]:
docs: List[LangChainDocument] = []
try:
with pdfplumber.open(filepath) as pdf:
for page_index, page in enumerate(pdf.pages, 1):
text = page.extract_text() or ""
tables = page.extract_tables()
table_texts: List[str] = []
if tables:
for table in tables:
# Vẫn chạy Forward-Fill bình thường cho PDF
unrolled_table = table_to_unrolled_text(table, is_docx=False)
if unrolled_table:
table_texts.append(unrolled_table)
full_content = text + "\n\n[BANG DU LIEU TRICH XUAT]:\n" + "\n".join(table_texts)
if full_content.strip():
docs.append(
LangChainDocument(
page_content=full_content,
metadata={"source": filepath, "page": page_index},
)
)
except Exception as error:
logger.error("Lỗi đọc PDF %s: %s", os.path.basename(filepath), error)
return docs
def iter_block_items(parent):
if isinstance(parent, _Document):
parent_elm = parent.element.body
elif isinstance(parent, _Cell):
parent_elm = parent._tc
else:
raise ValueError("Chỉ hỗ trợ duyệt Document hoặc Cell")
for child in parent_elm.iterchildren():
if isinstance(child, CT_P):
yield Paragraph(child, parent)
elif isinstance(child, CT_Tbl):
yield Table(child, parent)
def read_docx_with_tables(filepath: str) -> str:
doc = Document(filepath)
full_text: List[str] = []
for block in iter_block_items(doc):
if isinstance(block, Paragraph):
if block.text.strip():
full_text.append(block.text.strip())
elif isinstance(block, Table):
table_data: List[List[str]] = []
for row in block.rows:
row_data: List[str] = []
for cell in row.cells:
row_data.append(clean_text(cell.text))
table_data.append(row_data)
# CẮT FORWARD-FILL TẠI ĐÂY BẰNG is_docx=True
unrolled_table = table_to_unrolled_text(table_data, is_docx=True)
if unrolled_table:
full_text.append(f"\n{unrolled_table}\n")
return "\n".join(full_text)
def load_documents_from_file(filepath: str, filename: str) -> List[LangChainDocument]:
docs: List[LangChainDocument] = []
lower_name = filename.lower()
try:
if lower_name.endswith(".pdf"):
docs = read_pdf_with_tables(filepath)
elif lower_name.endswith(".docx"):
text = read_docx_with_tables(filepath)
if text:
docs = [LangChainDocument(page_content=text, metadata={"source": filepath})]
elif lower_name.endswith(".txt"):
with open(filepath, "r", encoding="utf-8", errors="ignore") as input_file:
text = input_file.read()
if text and text.strip():
docs = [LangChainDocument(page_content=text, metadata={"source": filepath})]
if docs:
logger.info("Da doc: %s", filename)
return docs
except Exception as error:
logger.error("Loi doc %s: %s", filename, str(error)[:120])
return []
async def build_vectorstore_improved(
sync_coordinator: Any,
startup_wait_seconds: int = 5,
) -> Dict[str, Any]:
if sync_coordinator is None:
raise ValueError("sync_coordinator is required")
startup_sync_task = asyncio.create_task(
sync_coordinator.run_sync(
trigger="startup:initial_sync",
queue_if_locked=False,
)
)
if startup_wait_seconds <= 0:
return {
"task": startup_sync_task,
"initial_sync": None,
"timed_out": True,
}
try:
initial_sync = await asyncio.wait_for(
asyncio.shield(startup_sync_task),
timeout=startup_wait_seconds,
)
return {
"task": startup_sync_task,
"initial_sync": initial_sync,
"timed_out": False,
}
except asyncio.TimeoutError:
return {
"task": startup_sync_task,
"initial_sync": None,
"timed_out": True,
}
def load_vectorstore_improved(sync_coordinator: Any) -> Dict[str, Any]:
if sync_coordinator is None:
return {}
try:
state = sync_coordinator.get_health_snapshot()
return state if isinstance(state, dict) else {}
except Exception:
logger.exception("Khong the lay sync state tu coordinator")
return {} |