Spaces:
Sleeping
Sleeping
| # rag/ingest.py | |
| from __future__ import annotations | |
| import io, uuid, re, os, traceback | |
| from typing import List, Dict, Tuple, Optional | |
| from irpr.deps import add_to_index | |
| # ========================= | |
| # PDF → テキスト(多段フォールバック) | |
| # ========================= | |
| def _extract_with_pypdf(pdf_bytes: bytes) -> str: | |
| try: | |
| from pypdf import PdfReader # type: ignore | |
| reader = PdfReader(io.BytesIO(pdf_bytes)) | |
| texts = [(p.extract_text() or "") for p in reader.pages] | |
| return "\n".join(texts) | |
| except Exception: | |
| try: | |
| import PyPDF2 # type: ignore | |
| reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) | |
| texts = [(p.extract_text() or "") for p in reader.pages] | |
| return "\n".join(texts) | |
| except Exception: | |
| return "" | |
| def _extract_with_pdfminer(pdf_bytes: bytes) -> str: | |
| try: | |
| from pdfminer.high_level import extract_text # type: ignore | |
| return extract_text(io.BytesIO(pdf_bytes)) or "" | |
| except Exception: | |
| return "" | |
| def _extract_with_pymupdf_text(pdf_bytes: bytes) -> Tuple[str, int, int]: | |
| try: | |
| import fitz # PyMuPDF | |
| except Exception: | |
| return "", 0, 0 | |
| try: | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| except Exception: | |
| return "", 0, 0 | |
| buf: List[str] = [] | |
| text_chars = 0 | |
| for i in range(len(doc)): | |
| try: | |
| page = doc.load_page(i) | |
| t = page.get_text("text") or "" | |
| text_chars += len(t.strip()) | |
| buf.append(t) | |
| except Exception: | |
| buf.append("") | |
| pages = len(buf) | |
| doc.close() | |
| return ("\n".join(buf), text_chars, pages) | |
| def _ocr_with_tesseract_via_pymupdf(pdf_bytes: bytes, dpi_scale: float = 2.0) -> str: | |
| """ | |
| Tesseract OCR(任意)。pytesseract / Tesseract 本体が無い場合は空文字で返す。 | |
| Tesseract の未導入や言語データ欠如(jpn.traineddata 無し)による FileNotFoundError も | |
| ここで握りつぶして空文字を返します(上位で「OCRが必要」として案内)。 | |
| """ | |
| try: | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import pytesseract | |
| except Exception: | |
| return "" # OCR不可(依存未導入) | |
| try: | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| except Exception: | |
| return "" | |
| lang = os.environ.get("TESSERACT_LANG", "jpn+eng") # 日本語+英語 | |
| text_buf: List[str] = [] | |
| for i in range(len(doc)): | |
| try: | |
| page = doc.load_page(i) | |
| mat = fitz.Matrix(dpi_scale, dpi_scale) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| try: | |
| t = pytesseract.image_to_string(img, lang=lang) or "" | |
| except FileNotFoundError: | |
| # tesseract バイナリ or lang データが無い | |
| t = "" | |
| text_buf.append(t) | |
| except Exception: | |
| text_buf.append("") | |
| doc.close() | |
| return "\n".join(text_buf).strip() | |
| def _is_meaningful(text: str, min_len: int = 10) -> bool: | |
| return bool(text and text.strip() and len(text.strip()) >= min_len) | |
| def pdf_bytes_to_text(pdf_bytes: bytes) -> Tuple[str, Dict]: | |
| """ | |
| PDF → テキスト。抽出メタを返す: | |
| meta = {"method": "...", "scanned_likely": bool, "pages": int|None} | |
| FileNotFoundError(tesseract 未導入 等)はここで潰して RuntimeError にまとめます。 | |
| """ | |
| # 1) pypdf / PyPDF2 | |
| text = _extract_with_pypdf(pdf_bytes) | |
| if _is_meaningful(text): | |
| return text, {"method": "pypdf", "scanned_likely": False, "pages": None} | |
| # 2) pdfminer.six | |
| text = _extract_with_pdfminer(pdf_bytes) | |
| if _is_meaningful(text): | |
| return text, {"method": "pdfminer", "scanned_likely": False, "pages": None} | |
| # 3) PyMuPDF get_text | |
| pm_text, _chars, pages = _extract_with_pymupdf_text(pdf_bytes) | |
| if _is_meaningful(pm_text): | |
| return pm_text, {"method": "pymupdf", "scanned_likely": False, "pages": pages} | |
| # 4) OCR(依存が無い/学習データが無い時の FileNotFoundError は握りつぶして空文字) | |
| try: | |
| ocr_text = _ocr_with_tesseract_via_pymupdf(pdf_bytes) | |
| except FileNotFoundError: | |
| ocr_text = "" | |
| if _is_meaningful(ocr_text): | |
| return ocr_text, {"method": "ocr", "scanned_likely": True, "pages": pages or None} | |
| # 5) ここまで全滅 → 明示的に RuntimeError | |
| raise RuntimeError( | |
| "PDFテキスト抽出に失敗しました(pypdf/PyPDF2/pdfminer.six/PyMuPDF/OCR)。" | |
| "スキャンPDFの可能性が高いです。OCR を有効化するには " | |
| "『tesseract-ocr + pytesseract(必要なら tesseract-ocr-jpn)』を導入してください。" | |
| ) | |
| # ========================= | |
| # テキスト整形・分割 | |
| # ========================= | |
| _WS_RE = re.compile(r"[ \t\u3000]+") # 半角/全角スペース畳み込み | |
| def normalize_text(s: str) -> str: | |
| s = s.replace("\r\n", "\n").replace("\r", "\n") | |
| s = _WS_RE.sub(" ", s) | |
| s = re.sub(r"\n{3,}", "\n\n", s) | |
| return s.strip() | |
| def chunk_text(text: str, chunk_size: int = 1200, overlap: int = 200, min_chunk: int = 200) -> List[str]: | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| chunks: List[str] = [] | |
| i = 0 | |
| n = len(text) | |
| step = max(1, chunk_size - overlap) | |
| while i < n: | |
| j = min(n, i + chunk_size) | |
| chunk = text[i:j].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| if j >= n: | |
| break | |
| i += step | |
| if len(chunks) >= 2 and len(chunks[-1]) < min_chunk: | |
| chunks[-2] = (chunks[-2] + "\n" + chunks[-1]).strip() | |
| chunks.pop() | |
| return chunks | |
| # ========================= | |
| # 外部公開 API | |
| # ========================= | |
| def ingest_pdf_bytes(title: str, source_url: str, pdf_bytes: bytes) -> int: | |
| """ | |
| PDF バイト列を解析し、チャンクをインデックス登録(ファイルは書かない) | |
| 例外は RuntimeError に正規化して上位に伝えます(FileNotFound は潰す) | |
| """ | |
| if not pdf_bytes: | |
| raise RuntimeError("empty pdf_bytes") | |
| try: | |
| raw, meta = pdf_bytes_to_text(pdf_bytes) | |
| except FileNotFoundError as e: | |
| raise RuntimeError(f"OCR 実行に必要なバイナリ/言語データが見つかりません: {e}") from e | |
| txt = normalize_text(raw) | |
| if not _is_meaningful(txt): | |
| raise RuntimeError("Parsed text is too short or empty after normalization") | |
| doc_id = str(uuid.uuid4()) | |
| chunks = chunk_text(txt, chunk_size=1200, overlap=200, min_chunk=200) | |
| records: List[Dict] = [] | |
| for idx, ck in enumerate(chunks): | |
| records.append({ | |
| "doc_id": doc_id, | |
| "chunk_id": f"{idx:04d}", | |
| "title": title, | |
| "source_url": source_url, | |
| "text": ck, | |
| }) | |
| added = add_to_index(records) | |
| return int(added) | |
| # 互換ダミー | |
| def ingest_edinet_for_company(edinet_code: str, date: Optional[str] = None) -> int: | |
| return 0 | |
| def download_edinet_pdf(*args, **kwargs): | |
| raise NotImplementedError("download_edinet_pdf is not implemented in this minimal build.") | |