IR_PR_PilotPro / rag /ingest.py
Corin1998's picture
Update rag/ingest.py
ab914af verified
# rag/ingest.py
from __future__ import annotations
import io, uuid, re, os, traceback
from typing import List, Dict, Tuple, Optional
from irpr.deps import add_to_index
# =========================
# PDF → テキスト(多段フォールバック)
# =========================
def _extract_with_pypdf(pdf_bytes: bytes) -> str:
try:
from pypdf import PdfReader # type: ignore
reader = PdfReader(io.BytesIO(pdf_bytes))
texts = [(p.extract_text() or "") for p in reader.pages]
return "\n".join(texts)
except Exception:
try:
import PyPDF2 # type: ignore
reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes))
texts = [(p.extract_text() or "") for p in reader.pages]
return "\n".join(texts)
except Exception:
return ""
def _extract_with_pdfminer(pdf_bytes: bytes) -> str:
try:
from pdfminer.high_level import extract_text # type: ignore
return extract_text(io.BytesIO(pdf_bytes)) or ""
except Exception:
return ""
def _extract_with_pymupdf_text(pdf_bytes: bytes) -> Tuple[str, int, int]:
try:
import fitz # PyMuPDF
except Exception:
return "", 0, 0
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception:
return "", 0, 0
buf: List[str] = []
text_chars = 0
for i in range(len(doc)):
try:
page = doc.load_page(i)
t = page.get_text("text") or ""
text_chars += len(t.strip())
buf.append(t)
except Exception:
buf.append("")
pages = len(buf)
doc.close()
return ("\n".join(buf), text_chars, pages)
def _ocr_with_tesseract_via_pymupdf(pdf_bytes: bytes, dpi_scale: float = 2.0) -> str:
"""
Tesseract OCR(任意)。pytesseract / Tesseract 本体が無い場合は空文字で返す。
Tesseract の未導入や言語データ欠如(jpn.traineddata 無し)による FileNotFoundError も
ここで握りつぶして空文字を返します(上位で「OCRが必要」として案内)。
"""
try:
import fitz # PyMuPDF
from PIL import Image
import pytesseract
except Exception:
return "" # OCR不可(依存未導入)
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception:
return ""
lang = os.environ.get("TESSERACT_LANG", "jpn+eng") # 日本語+英語
text_buf: List[str] = []
for i in range(len(doc)):
try:
page = doc.load_page(i)
mat = fitz.Matrix(dpi_scale, dpi_scale)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
try:
t = pytesseract.image_to_string(img, lang=lang) or ""
except FileNotFoundError:
# tesseract バイナリ or lang データが無い
t = ""
text_buf.append(t)
except Exception:
text_buf.append("")
doc.close()
return "\n".join(text_buf).strip()
def _is_meaningful(text: str, min_len: int = 10) -> bool:
return bool(text and text.strip() and len(text.strip()) >= min_len)
def pdf_bytes_to_text(pdf_bytes: bytes) -> Tuple[str, Dict]:
"""
PDF → テキスト。抽出メタを返す:
meta = {"method": "...", "scanned_likely": bool, "pages": int|None}
FileNotFoundError(tesseract 未導入 等)はここで潰して RuntimeError にまとめます。
"""
# 1) pypdf / PyPDF2
text = _extract_with_pypdf(pdf_bytes)
if _is_meaningful(text):
return text, {"method": "pypdf", "scanned_likely": False, "pages": None}
# 2) pdfminer.six
text = _extract_with_pdfminer(pdf_bytes)
if _is_meaningful(text):
return text, {"method": "pdfminer", "scanned_likely": False, "pages": None}
# 3) PyMuPDF get_text
pm_text, _chars, pages = _extract_with_pymupdf_text(pdf_bytes)
if _is_meaningful(pm_text):
return pm_text, {"method": "pymupdf", "scanned_likely": False, "pages": pages}
# 4) OCR(依存が無い/学習データが無い時の FileNotFoundError は握りつぶして空文字)
try:
ocr_text = _ocr_with_tesseract_via_pymupdf(pdf_bytes)
except FileNotFoundError:
ocr_text = ""
if _is_meaningful(ocr_text):
return ocr_text, {"method": "ocr", "scanned_likely": True, "pages": pages or None}
# 5) ここまで全滅 → 明示的に RuntimeError
raise RuntimeError(
"PDFテキスト抽出に失敗しました(pypdf/PyPDF2/pdfminer.six/PyMuPDF/OCR)。"
"スキャンPDFの可能性が高いです。OCR を有効化するには "
"『tesseract-ocr + pytesseract(必要なら tesseract-ocr-jpn)』を導入してください。"
)
# =========================
# テキスト整形・分割
# =========================
_WS_RE = re.compile(r"[ \t\u3000]+") # 半角/全角スペース畳み込み
def normalize_text(s: str) -> str:
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = _WS_RE.sub(" ", s)
s = re.sub(r"\n{3,}", "\n\n", s)
return s.strip()
def chunk_text(text: str, chunk_size: int = 1200, overlap: int = 200, min_chunk: int = 200) -> List[str]:
text = text.strip()
if not text:
return []
chunks: List[str] = []
i = 0
n = len(text)
step = max(1, chunk_size - overlap)
while i < n:
j = min(n, i + chunk_size)
chunk = text[i:j].strip()
if chunk:
chunks.append(chunk)
if j >= n:
break
i += step
if len(chunks) >= 2 and len(chunks[-1]) < min_chunk:
chunks[-2] = (chunks[-2] + "\n" + chunks[-1]).strip()
chunks.pop()
return chunks
# =========================
# 外部公開 API
# =========================
def ingest_pdf_bytes(title: str, source_url: str, pdf_bytes: bytes) -> int:
"""
PDF バイト列を解析し、チャンクをインデックス登録(ファイルは書かない)
例外は RuntimeError に正規化して上位に伝えます(FileNotFound は潰す)
"""
if not pdf_bytes:
raise RuntimeError("empty pdf_bytes")
try:
raw, meta = pdf_bytes_to_text(pdf_bytes)
except FileNotFoundError as e:
raise RuntimeError(f"OCR 実行に必要なバイナリ/言語データが見つかりません: {e}") from e
txt = normalize_text(raw)
if not _is_meaningful(txt):
raise RuntimeError("Parsed text is too short or empty after normalization")
doc_id = str(uuid.uuid4())
chunks = chunk_text(txt, chunk_size=1200, overlap=200, min_chunk=200)
records: List[Dict] = []
for idx, ck in enumerate(chunks):
records.append({
"doc_id": doc_id,
"chunk_id": f"{idx:04d}",
"title": title,
"source_url": source_url,
"text": ck,
})
added = add_to_index(records)
return int(added)
# 互換ダミー
def ingest_edinet_for_company(edinet_code: str, date: Optional[str] = None) -> int:
return 0
def download_edinet_pdf(*args, **kwargs):
raise NotImplementedError("download_edinet_pdf is not implemented in this minimal build.")