import os import re import gc import sys import signal import logging from datetime import datetime from pathlib import Path from docling.document_converter import DocumentConverter, FormatOption from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions, TableStructureOptions, EasyOcrOptions, TableFormerMode from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline # Thêm project root vào path để import HashProcessor PROJECT_ROOT = Path(__file__).resolve().parents[2] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from core.hash_file.hash_file import HashProcessor class DoclingProcessor: """Chuyển đổi PDF sang Markdown bằng Docling.""" def __init__(self, output_dir: str, use_ocr: bool = True, timeout: int = 300, images_scale: float = 3.0): """Khởi tạo processor với cấu hình OCR và table extraction.""" self.output_dir = output_dir self.timeout = timeout self.logger = logging.getLogger(__name__) self.hasher = HashProcessor(verbose=False) os.makedirs(output_dir, exist_ok=True) # File lưu hash index self.hash_index_path = Path(output_dir) / "docling_hash_index.json" self.hash_index = self.hasher.load_processed_index(str(self.hash_index_path)) # Cấu hình pipeline PDF opts = PdfPipelineOptions(do_ocr=use_ocr, do_table_structure=True) opts.table_structure_options = TableStructureOptions(do_cell_matching=True, mode=TableFormerMode.ACCURATE) opts.images_scale = images_scale # Cấu hình OCR tiếng Việt if use_ocr: ocr = EasyOcrOptions() ocr.lang = ["vi"] ocr.force_full_page_ocr = False opts.ocr_options = ocr self.converter = DocumentConverter(format_options={ InputFormat.PDF: FormatOption(backend=PyPdfiumDocumentBackend, pipeline_cls=StandardPdfPipeline, pipeline_options=opts) }) self.logger.info(f"Docling | OCR={use_ocr} | Table=accurate | Scale={images_scale} | timeout={timeout}s") def clean_markdown(self, text: str) -> str: """Xóa số trang và khoảng trắng thừa.""" text = re.sub(r'\n\s*Trang\s+\d+\s*\n', '\n', text) return re.sub(r'\n{3,}', '\n\n', text).strip() def _should_process(self, pdf_path: str, output_path: Path) -> bool: """Kiểm tra xem file PDF có cần xử lý lại không (dựa trên hash).""" # Nếu output chưa tồn tại -> cần xử lý if not output_path.exists(): return True # Tính hash file PDF hiện tại current_hash = self.hasher.get_file_hash(pdf_path) if not current_hash: return True # So sánh với hash đã lưu saved_hash = self.hash_index.get(pdf_path, {}).get("hash") return current_hash != saved_hash def _save_hash(self, pdf_path: str, file_hash: str) -> None: """Lưu hash của file đã xử lý vào index.""" self.hash_index[pdf_path] = { "hash": file_hash, "processed_at": self.hasher.get_current_timestamp() } def parse_document(self, file_path: str) -> str | None: """Chuyển đổi 1 file PDF sang Markdown với timeout.""" if not os.path.exists(file_path): return None filename = os.path.basename(file_path) try: # Đặt timeout để tránh treo signal.signal(signal.SIGALRM, lambda s, f: (_ for _ in ()).throw(TimeoutError())) signal.alarm(self.timeout) result = self.converter.convert(file_path) md = result.document.export_to_markdown(image_placeholder="") signal.alarm(0) md = self.clean_markdown(md) # Thêm frontmatter metadata return f"---\nfilename: {filename}\nfilepath: {file_path}\npage_count: {len(result.document.pages)}\nprocessed_at: {datetime.now().isoformat()}\n---\n\n{md}" except TimeoutError: self.logger.warning(f"Timeout: {filename}") signal.alarm(0) return None except Exception as e: self.logger.error(f"Lỗi: {filename}: {e}") signal.alarm(0) return None def parse_directory(self, source_dir: str) -> dict: """Xử lý toàn bộ thư mục PDF, bỏ qua file không thay đổi (dựa trên hash).""" source_path = Path(source_dir) pdf_files = list(source_path.rglob("*.pdf")) self.logger.info(f"Tìm thấy {len(pdf_files)} file PDF trong {source_dir}") results = {"total": len(pdf_files), "parsed": 0, "skipped": 0, "errors": 0} for i, fp in enumerate(pdf_files): try: rel = fp.relative_to(source_path) except ValueError: rel = Path(fp.name) out = Path(self.output_dir) / rel.with_suffix(".md") out.parent.mkdir(parents=True, exist_ok=True) pdf_path = str(fp) # Kiểm tra hash để quyết định có cần xử lý không if not self._should_process(pdf_path, out): results["skipped"] += 1 continue # Tính hash trước khi xử lý file_hash = self.hasher.get_file_hash(pdf_path) md = self.parse_document(pdf_path) if md: out.write_text(md, encoding="utf-8") results["parsed"] += 1 # Lưu hash sau khi xử lý thành công if file_hash: self._save_hash(pdf_path, file_hash) else: results["errors"] += 1 # Dọn memory mỗi 10 files if (i + 1) % 10 == 0: gc.collect() self.logger.info(f"{i+1}/{len(pdf_files)} (bỏ qua: {results['skipped']})") # Lưu hash index sau khi xử lý xong self.hasher.save_processed_index(str(self.hash_index_path), self.hash_index) self.logger.info(f"Xong: {results['parsed']} đã xử lý, {results['skipped']} bỏ qua, {results['errors']} lỗi") return results