OCR_RAG-AX650N / ocr_loader.py
H022329's picture
Upload folder using huggingface_hub
0378e25 verified
Raw
History Blame Contribute Delete
28.8 kB
"""
============================================================
PaddleOCR-VL-1.5 文档加载器
============================================================
模型: PaddleOCR-VL-1.5 (0.9B 视觉语言模型, OmniDocBench v1.5 94.5% 精度)
支持格式: PDF / PNG / JPG / JPEG / BMP / TIF / TIFF
功能:
1. 文档 (PDF/图片) → PaddleOCR-VL-1.5 端到端识别
2. 输出 Markdown/JSON 结构化结果 (含版面/表格/公式/印章)
3. 转换为 LangChain Document 对象
"""
import gc
import time
import warnings
from pathlib import Path
from typing import List, Optional, Iterator, Dict, Any, Union
from dataclasses import dataclass, field
import fitz # PyMuPDF: PDF 页面渲染和元数据提取
import numpy as np
from PIL import Image
from langchain_core.documents import Document
from loguru import logger
import config
warnings.filterwarnings("ignore")
# ============================================================
# PaddleOCR-VL-1.5 全局单例
# ============================================================
_ocr_vl_pipeline = None
def _get_ocr_vl_pipeline():
"""懒加载 PaddleOCR-VL-1.5 模型 (单例)"""
global _ocr_vl_pipeline
if _ocr_vl_pipeline is None:
from paddleocr import PaddleOCRVL
logger.info(
f"正在初始化 PaddleOCR-VL-1.5 模型 "
f"(backend={config.OCR_VL_BACKEND})..."
)
kwargs = dict(
use_layout_detection=config.OCR_USE_LAYOUT,
use_chart_recognition=config.OCR_USE_CHART,
merge_layout_blocks=True,
layout_threshold=config.OCR_LAYOUT_THRESHOLD,
)
if config.OCR_VL_BACKEND == "vllm-server":
kwargs["vl_rec_backend"] = "vllm-server"
kwargs["vl_rec_server_url"] = config.OCR_VL_SERVER_URL
elif config.OCR_VL_BACKEND == "llama-cpp-server":
kwargs["vl_rec_backend"] = "llama-cpp-server"
kwargs["vl_rec_server_url"] = config.OCR_VL_SERVER_URL
_ocr_vl_pipeline = PaddleOCRVL(**kwargs)
logger.info("PaddleOCR-VL-1.5 模型初始化完成 ✓")
return _ocr_vl_pipeline
# ============================================================
# 数据结构
# ============================================================
@dataclass
class OCRResult:
"""单页/单图 OCR 结果"""
page_num: int = 0
markdown_text: str = ""
json_data: Optional[Dict[str, Any]] = None
text_blocks: List[Dict[str, Any]] = field(default_factory=list)
tables: List[Dict[str, Any]] = field(default_factory=list)
formulas: List[Dict[str, Any]] = field(default_factory=list)
images_in_page: List[Dict[str, Any]] = field(default_factory=list)
layout_regions: List[Dict[str, Any]] = field(default_factory=list)
ocr_time_ms: float = 0.0
source_format: str = "" # pdf / png / jpg / ...
# ============================================================
# PaddleOCR-VL-1.5 文本提取器
# ============================================================
class VLOCRExtractor:
"""使用 PaddleOCR-VL-1.5 从文档中提取结构化内容"""
@staticmethod
def extract(image_or_path: Union[str, Path, np.ndarray]) -> List[OCRResult]:
"""
对单张图片或 PDF 执行 OCR 识别
Args:
image_or_path: 图片路径 / PDF路径 / numpy 数组
Returns:
OCRResult 列表 (PDF 为多页, 图片为单页)
"""
pipeline = _get_ocr_vl_pipeline()
start_time = time.time()
logger.info("PaddleOCR-VL 正在推理中 (首次调用较慢, CPU 约 30-60s/页) ...")
raw_output = pipeline.predict(image_or_path)
logger.info(f"推理完成, 耗时 {time.time() - start_time:.1f}s")
results = []
for i, res in enumerate(raw_output):
page_result = OCRResult(
page_num=i + 1,
ocr_time_ms=(time.time() - start_time) * 1000 / len(raw_output),
)
# 尝试获取 structured JSON
try:
json_data = res.json
if json_data:
page_result.json_data = json_data
# 解析结构化内容
page_result.text_blocks = VLOCRExtractor._parse_text_blocks(json_data)
page_result.tables = VLOCRExtractor._parse_tables(json_data)
page_result.formulas = VLOCRExtractor._parse_formulas(json_data)
except Exception as e:
logger.debug(f"JSON 解析跳过: {e}")
# 获取 Markdown 文本
try:
md = res.markdown
if isinstance(md, dict):
page_result.markdown_text = md.get("text", "") or ""
elif isinstance(md, str):
page_result.markdown_text = md
else:
page_result.markdown_text = str(md) if md else ""
except Exception:
page_result.markdown_text = ""
# 回退: markdown 为空时从 JSON blocks 构建文本
if not page_result.markdown_text and page_result.json_data:
page_result.markdown_text = VLOCRExtractor._build_text_from_blocks(
page_result.json_data
)
results.append(page_result)
return results
@staticmethod
def extract_text(image_or_path: Union[str, Path, np.ndarray]) -> str:
"""便捷方法: 只返回纯文本 (合并所有页)"""
results = VLOCRExtractor.extract(image_or_path)
return "\n\n".join(r.markdown_text for r in results if r.markdown_text)
@staticmethod
def extract_to_markdown(image_or_path: Union[str, Path, np.ndarray]) -> str:
"""返回完整的 Markdown 格式文本"""
return VLOCRExtractor.extract_text(image_or_path)
@staticmethod
def extract_to_json(
image_or_path: Union[str, Path, np.ndarray],
save_path: Optional[str] = None,
) -> Dict[str, Any]:
"""返回结构化 JSON 或保存到文件"""
results = VLOCRExtractor.extract(image_or_path)
output = {
"pages": [],
"total_pages": len(results),
}
for r in results:
page_data = {
"page_num": r.page_num,
"markdown": r.markdown_text,
"json": r.json_data,
"tables": r.tables,
"formulas": r.formulas,
}
output["pages"].append(page_data)
if save_path:
import json
save_path = Path(save_path)
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(f"OCR 结果已保存: {save_path}")
return output
# ---- 结构化解析辅助 ----
@staticmethod
def _get_parsing_list(json_data: Dict) -> List[Dict]:
"""从 PaddleOCR-VL JSON 中提取 parsing_res_list"""
res = json_data.get("res", json_data)
return res.get("parsing_res_list", [])
@staticmethod
def _parse_text_blocks(json_data: Dict) -> List[Dict[str, Any]]:
"""从 parsing_res_list 中提取文本块"""
blocks = []
for item in VLOCRExtractor._get_parsing_list(json_data):
label = item.get("block_label", "")
content = item.get("block_content", "")
bbox = item.get("block_bbox", [])
if content and label not in ("image",):
blocks.append({
"type": label,
"text": content,
"bbox": bbox,
})
return blocks
@staticmethod
def _parse_tables(json_data: Dict) -> List[Dict[str, Any]]:
"""从 parsing_res_list 中提取表格"""
tables = []
for item in VLOCRExtractor._get_parsing_list(json_data):
if item.get("block_label") == "table":
tables.append({
"text": item.get("block_content", ""),
"html": item.get("block_html", ""),
"markdown": item.get("block_markdown", ""),
"bbox": item.get("block_bbox", []),
})
return tables
@staticmethod
def _parse_formulas(json_data: Dict) -> List[Dict[str, Any]]:
"""从 parsing_res_list 中提取公式"""
formulas = []
for item in VLOCRExtractor._get_parsing_list(json_data):
if item.get("block_label") == "formula":
formulas.append({
"latex": item.get("block_latex", ""),
"text": item.get("block_content", ""),
"bbox": item.get("block_bbox", []),
})
return formulas
@staticmethod
def _build_text_from_blocks(json_data: Dict) -> str:
"""从 parsing_res_list 构建纯文本"""
lines = []
for item in VLOCRExtractor._get_parsing_list(json_data):
label = item.get("block_label", "")
content = item.get("block_content", "")
if not content:
continue
if label == "table":
lines.append(f"[表格] {content}")
elif label == "formula":
lines.append(f"[公式] {content}")
elif label in ("paragraph_title", "header"):
lines.append(f"## {content}")
elif label == "image":
continue # 跳过纯图片块
else:
lines.append(content)
return "\n\n".join(lines)
# ============================================================
# OCR API 提取器 (OpenAI 兼容格式, 无需本地推理)
# ============================================================
_ocr_api_client = None
def _get_ocr_api_client():
"""懒加载 OCR API 客户端"""
global _ocr_api_client
if _ocr_api_client is None:
from openai import OpenAI
_ocr_api_client = OpenAI(
api_key=config.OCR_API_KEY,
base_url=config.OCR_API_BASE,
)
logger.info(
f"OCR API 连接: model={config.OCR_API_MODEL}, "
f"base_url={config.OCR_API_BASE}"
)
return _ocr_api_client
class OCRApiExtractor:
"""
基于 OpenAI 兼容 API 的 PaddleOCR-VL-1.5 提取器
通过 vLLM 或其他 OpenAI 兼容服务调用, 无需本地 GPU 推理。
支持任务: ocr / table / formula / chart / spotting / seal
"""
PROMPTS = {
"ocr": "OCR:",
"table": "Table Recognition:",
"formula": "Formula Recognition:",
"chart": "Chart Recognition:",
"spotting": "Spotting:",
"seal": "Seal Recognition:",
}
@staticmethod
def extract(
image_or_path: Union[str, Path, np.ndarray],
task: Optional[str] = None,
max_new_tokens: int = 2048,
) -> List[OCRResult]:
"""
通过 API 执行 OCR 识别
Args:
image_or_path: 图片路径 / numpy 数组
task: 任务类型
max_new_tokens: 最大生成 token 数
Returns:
OCRResult 列表
"""
import base64
import io
task = task or config.OCR_TASK
client = _get_ocr_api_client()
start_time = time.time()
logger.info(f"OCR API 推理中 (task={task}) ...")
# 图片 → base64 data URL
if isinstance(image_or_path, (str, Path)):
with open(image_or_path, "rb") as f:
img_bytes = f.read()
elif isinstance(image_or_path, np.ndarray):
img = Image.fromarray(image_or_path).convert("RGB")
buf = io.BytesIO()
img.save(buf, format="PNG")
img_bytes = buf.getvalue()
else:
img_bytes = image_or_path
b64 = base64.b64encode(img_bytes).decode("utf-8")
image_url = f"data:image/png;base64,{b64}"
messages = [{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": OCRApiExtractor.PROMPTS[task]},
],
}]
response = client.chat.completions.create(
model=config.OCR_API_MODEL,
messages=messages,
max_tokens=max_new_tokens,
)
result_text = response.choices[0].message.content.strip()
elapsed = (time.time() - start_time) * 1000
result = OCRResult(
page_num=1,
markdown_text=result_text,
ocr_time_ms=elapsed,
source_format="image",
text_blocks=[{"type": task, "text": result_text, "bbox": []}],
)
logger.info(f"OCR API 完成, 耗时 {elapsed:.0f}ms, {len(result_text)} 字符")
return [result]
@staticmethod
def extract_text(
image_or_path: Union[str, Path, np.ndarray],
task: Optional[str] = None,
) -> str:
"""便捷方法: 只返回识别文本"""
results = OCRApiExtractor.extract(image_or_path, task=task)
return "\n".join(r.markdown_text for r in results)
# ============================================================
# 统一提取器入口
# ============================================================
def _extract_ocr(image_or_path: Union[str, Path, np.ndarray]) -> List[OCRResult]:
"""根据配置选择 OCR 引擎并执行识别"""
if config.OCR_ENGINE == "api":
return OCRApiExtractor.extract(image_or_path)
else:
return VLOCRExtractor.extract(image_or_path)
# ============================================================
# PDF 工具
# ============================================================
class PDFUtils:
"""PDF 处理工具: 渲染和元数据提取"""
@staticmethod
def render_page_to_image(page: fitz.Page, dpi: int = 300) -> np.ndarray:
"""将 PyMuPDF 页面渲染为 numpy 图片数组 (RGB)"""
zoom = dpi / 72.0
matrix = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=matrix)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
return np.array(img)
@staticmethod
def get_page_count(pdf_path: Path) -> int:
"""获取 PDF 页数"""
doc = fitz.open(str(pdf_path))
count = len(doc)
doc.close()
return count
@staticmethod
def is_scanned_pdf(pdf_path: Path, sample_pages: int = 3) -> bool:
"""
检测 PDF 是否为扫描版 (图片型 PDF)
通过检查前几页是否包含可提取的文本层来判断
"""
doc = fitz.open(str(pdf_path))
text_chars = 0
pages_to_check = min(sample_pages, len(doc))
for i in range(pages_to_check):
text_chars += len(doc[i].get_text().strip())
doc.close()
# 如果前几页几乎没有文本, 认为是扫描版
return text_chars < 100 * pages_to_check
@staticmethod
def extract_text_layer(pdf_path: Path) -> List[Dict[str, Any]]:
"""
提取 PDF 内嵌文本层 (非 OCR, 用于数字原生 PDF)
返回每页的文本和元数据
"""
doc = fitz.open(str(pdf_path))
pages = []
for i in range(len(doc)):
page = doc[i]
text = page.get_text("text")
if text.strip():
pages.append({
"page_num": i + 1,
"text": text,
"char_count": len(text),
"has_text_layer": True,
})
doc.close()
return pages
# ============================================================
# LangChain PaddleOCR-VL-1.5 文档加载器
# ============================================================
class PaddleOCRLoader:
"""
LangChain 兼容的 PaddleOCR-VL-1.5 文档加载器
支持格式: PDF / PNG / JPG / JPEG / BMP / TIF / TIFF
用法:
# 加载 PDF
loader = PaddleOCRLoader("document.pdf")
documents = loader.load()
# 加载图片
loader = PaddleOCRLoader("scan.png")
documents = loader.load()
# 延迟加载 (大文件推荐)
for doc in loader.lazy_load():
process(doc)
"""
def __init__(
self,
file_path: Union[str, Path],
dpi: int = config.PDF_RENDER_DPI,
verbose: bool = True,
):
self.file_path = Path(file_path)
if not self.file_path.exists():
raise FileNotFoundError(f"文件不存在: {self.file_path}")
self.suffix = self.file_path.suffix.lower()
if self.suffix not in config.SUPPORTED_FORMATS:
raise ValueError(
f"不支持的文件格式: {self.suffix}. "
f"支持: {config.SUPPORTED_FORMATS}"
)
self.dpi = dpi
self.verbose = verbose
self._doc_name = self.file_path.stem
self._is_pdf = (self.suffix == ".pdf")
def load(self) -> List[Document]:
"""完整加载文档, 返回 LangChain Document 列表"""
return list(self.lazy_load())
def lazy_load(self) -> Iterator[Document]:
"""逐页延迟加载"""
if self._is_pdf:
yield from self._load_pdf()
else:
yield from self._load_image()
def _load_pdf(self) -> Iterator[Document]:
"""加载 PDF 文件"""
total_start = time.time()
page_count = PDFUtils.get_page_count(self.file_path)
self._log(f"开始处理 PDF: {self.file_path.name} ({page_count} 页, DPI={self.dpi})")
pdf_doc = fitz.open(str(self.file_path))
for page_idx in range(page_count):
page_start = time.time()
# 渲染页面为高清图片
page = pdf_doc[page_idx]
image = PDFUtils.render_page_to_image(page, dpi=self.dpi)
# PaddleOCR-VL-1.5 识别
results = _extract_ocr(image)
# 释放页面图像内存 (高DPI图片可能占用数百MB)
del image
ocr_time = (time.time() - page_start) * 1000
for ocr_result in results:
ocr_result.page_num = page_idx + 1
ocr_result.source_format = "pdf"
text = ocr_result.markdown_text
if not text and ocr_result.json_data:
text = self._extract_text_from_json(ocr_result.json_data)
if isinstance(text, dict):
text = text.get("text", "") or ""
if not text or not str(text).strip():
self._log(f" 第 {page_idx + 1} 页: 未检测到文本")
continue
# 构建元数据
metadata = {
"source": str(self.file_path),
"document_name": self._doc_name,
"page": page_idx + 1,
"total_pages": page_count,
"ocr_text_length": len(text),
"ocr_time_ms": round(ocr_time, 1),
"dpi": self.dpi,
"source_format": "pdf",
"tables_count": len(ocr_result.tables),
"formulas_count": len(ocr_result.formulas),
"text_blocks_count": len(ocr_result.text_blocks),
}
# 附加表格/公式数据
if ocr_result.tables:
metadata["tables_markdown"] = [
t.get("markdown", "") for t in ocr_result.tables
]
metadata["tables_html"] = [
t.get("html", "") for t in ocr_result.tables
]
if ocr_result.formulas:
metadata["formulas_latex"] = [
f.get("latex", "") for f in ocr_result.formulas
]
doc = Document(page_content=text, metadata=metadata)
self._log(
f" 第 {page_idx + 1}/{page_count} 页: "
f"{len(text)} 字符, "
f"表格={metadata['tables_count']}, "
f"公式={metadata['formulas_count']}, "
f"耗时 {ocr_time:.0f}ms"
)
yield doc
pdf_doc.close()
gc.collect() # 强制回收页面渲染残留内存
self._log(f"PDF 处理完成, 总耗时 {time.time() - total_start:.1f}s")
def _load_image(self) -> Iterator[Document]:
"""加载单张图片"""
total_start = time.time()
self._log(f"开始处理图片: {self.file_path.name}")
# 验证图片可读
try:
img = Image.open(self.file_path)
img.verify()
img = Image.open(self.file_path) # verify 后需重新打开
except Exception as e:
raise ValueError(f"无法读取图片文件: {e}")
# PaddleOCR-VL-1.5 可以直接接受图片路径
results = _extract_ocr(str(self.file_path))
ocr_time = (time.time() - total_start) * 1000
for ocr_result in results:
ocr_result.source_format = self.suffix.lstrip(".")
# print("ocr_result: ",ocr_result)
text = ocr_result.markdown_text
if not text and ocr_result.json_data:
text = self._extract_text_from_json(ocr_result.json_data)
if isinstance(text, dict):
text = text.get("text", "") or ""
if not text or not str(text).strip():
self._log(" 未检测到文本")
continue
metadata = {
"source": str(self.file_path),
"document_name": self._doc_name,
"page": 1,
"total_pages": 1,
"ocr_text_length": len(text),
"ocr_time_ms": round(ocr_time, 1),
"dpi": self.dpi,
"source_format": self.suffix.lstrip("."),
"image_width": img.width,
"image_height": img.height,
"tables_count": len(ocr_result.tables),
"formulas_count": len(ocr_result.formulas),
"text_blocks_count": len(ocr_result.text_blocks),
}
if ocr_result.tables:
metadata["tables_markdown"] = [
t.get("markdown", "") for t in ocr_result.tables
]
metadata["tables_html"] = [
t.get("html", "") for t in ocr_result.tables
]
if ocr_result.formulas:
metadata["formulas_latex"] = [
f.get("latex", "") for f in ocr_result.formulas
]
doc = Document(page_content=text, metadata=metadata)
yield doc
self._log(f"图片处理完成, 耗时 {time.time() - total_start:.1f}s")
def load_with_ocr_results(self) -> List[OCRResult]:
"""返回 OCRResult 对象列表 (包含更丰富的结构化信息)"""
if self._is_pdf:
pdf_doc = fitz.open(str(self.file_path))
all_results = []
for page_idx in range(len(pdf_doc)):
page = pdf_doc[page_idx]
image = PDFUtils.render_page_to_image(page, dpi=self.dpi)
results = _extract_ocr(image)
for r in results:
r.page_num = page_idx + 1
r.source_format = "pdf"
all_results.extend(results)
pdf_doc.close()
return all_results
else:
results = _extract_ocr(str(self.file_path))
for r in results:
r.source_format = self.suffix.lstrip(".")
return results
@staticmethod
def _extract_text_from_json(json_data: Dict) -> str:
"""从 PaddleOCR-VL JSON 结构中提取所有文本"""
return VLOCRExtractor._build_text_from_blocks(json_data)
def _log(self, msg: str):
if self.verbose:
logger.info(msg)
# ============================================================
# 批量加载器
# ============================================================
class PaddleOCRDirectoryLoader:
"""批量加载目录下的所有支持的文档文件"""
def __init__(
self,
directory: Union[str, Path],
glob_patterns: Optional[List[str]] = None,
**loader_kwargs,
):
self.directory = Path(directory)
self.glob_patterns = glob_patterns or [
"**/*.pdf", "**/*.png", "**/*.jpg", "**/*.jpeg",
"**/*.bmp", "**/*.tif", "**/*.tiff",
]
self.loader_kwargs = loader_kwargs
def load(self) -> List[Document]:
"""加载目录下所有支持的文档"""
all_docs = []
files = []
for pattern in self.glob_patterns:
files.extend(self.directory.glob(pattern))
files = sorted(set(files))
if not files:
logger.warning(f"目录 {self.directory} 中未找到支持的文档文件")
return all_docs
logger.info(f"在 {self.directory} 中找到 {len(files)} 个文件")
for file_path in files:
try:
loader = PaddleOCRLoader(file_path, **self.loader_kwargs)
docs = loader.load()
all_docs.extend(docs)
logger.info(f" ✓ {file_path.name}: {len(docs)} 页/块")
except Exception as e:
logger.error(f" ✗ {file_path.name}: {e}")
logger.info(f"批量加载完成, 共 {len(all_docs)} 个文档块")
return all_docs
def lazy_load(self) -> Iterator[Document]:
"""延迟加载"""
files = []
for pattern in self.glob_patterns:
files.extend(self.directory.glob(pattern))
files = sorted(set(files))
for file_path in files:
try:
loader = PaddleOCRLoader(file_path, **self.loader_kwargs)
yield from loader.lazy_load()
except Exception as e:
logger.error(f"加载失败 {file_path.name}: {e}")
# ============================================================
# 便捷函数
# ============================================================
def load_document(file_path: Union[str, Path], **kwargs) -> List[Document]:
"""便捷函数: 加载单个文档 (自动识别格式)"""
loader = PaddleOCRLoader(file_path, **kwargs)
return loader.load()
def load_directory(directory: Union[str, Path], **kwargs) -> List[Document]:
"""便捷函数: 加载目录下所有文档"""
loader = PaddleOCRDirectoryLoader(directory, **kwargs)
return loader.load()
def ocr_to_markdown(file_path: Union[str, Path]) -> str:
"""便捷函数: OCR 识别并返回 Markdown"""
return VLOCRExtractor.extract_to_markdown(file_path)
def ocr_to_json(file_path: Union[str, Path], save_path: Optional[str] = None) -> Dict:
"""便捷函数: OCR 识别并返回 JSON"""
return VLOCRExtractor.extract_to_json(file_path, save_path)
# ============================================================
# 测试入口
# ============================================================
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"用法: python {__file__} <file_path> [--json] [--md]")
print(f"支持格式: {config.SUPPORTED_FORMATS}")
sys.exit(1)
file_path = sys.argv[1]
output_mode = "doc" # doc / json / md
if "--json" in sys.argv:
output_mode = "json"
elif "--md" in sys.argv:
output_mode = "md"
loader = PaddleOCRLoader(file_path, verbose=True)
if output_mode == "json":
result = ocr_to_json(file_path)
import json
print(json.dumps(result, ensure_ascii=False, indent=2)[:5000])
elif output_mode == "md":
md = ocr_to_markdown(file_path)
print(md[:5000])
else:
documents = loader.load()
print(f"\n{'='*60}")
print(f"共加载 {len(documents)} 页/文档")
print(f"{'='*60}")
for i, doc in enumerate(documents):
print(f"\n--- 第 {doc.metadata.get('page', '?')} 页 "
f"({len(doc.page_content)} 字符) ---")
print(doc.page_content[:500])
if len(doc.page_content) > 500:
print("...")
print(f" 元数据: source={doc.metadata.get('document_name')}, "
f"tables={doc.metadata.get('tables_count', 0)}, "
f"formulas={doc.metadata.get('formulas_count', 0)}")