Spaces:
Sleeping
Sleeping
File size: 7,322 Bytes
fe9b124 ab914af 359d165 fe9b124 359d165 28ac450 359d165 28ac450 359d165 28ac450 359d165 28ac450 359d165 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 359d165 28ac450 359d165 28ac450 ab914af 28ac450 359d165 ab914af 28ac450 ab914af 28ac450 ab914af 28ac450 359d165 ab914af 359d165 086b0cd 359d165 086b0cd 359d165 086b0cd 359d165 086b0cd fe9b124 359d165 086b0cd 359d165 ab914af 359d165 ab914af 359d165 28ac450 359d165 086b0cd 359d165 28ac450 359d165 28ac450 359d165 086b0cd 359d165 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | # rag/ingest.py
from __future__ import annotations
import io, uuid, re, os, traceback
from typing import List, Dict, Tuple, Optional
from irpr.deps import add_to_index
# =========================
# PDF → テキスト(多段フォールバック)
# =========================
def _extract_with_pypdf(pdf_bytes: bytes) -> str:
try:
from pypdf import PdfReader # type: ignore
reader = PdfReader(io.BytesIO(pdf_bytes))
texts = [(p.extract_text() or "") for p in reader.pages]
return "\n".join(texts)
except Exception:
try:
import PyPDF2 # type: ignore
reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes))
texts = [(p.extract_text() or "") for p in reader.pages]
return "\n".join(texts)
except Exception:
return ""
def _extract_with_pdfminer(pdf_bytes: bytes) -> str:
try:
from pdfminer.high_level import extract_text # type: ignore
return extract_text(io.BytesIO(pdf_bytes)) or ""
except Exception:
return ""
def _extract_with_pymupdf_text(pdf_bytes: bytes) -> Tuple[str, int, int]:
try:
import fitz # PyMuPDF
except Exception:
return "", 0, 0
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception:
return "", 0, 0
buf: List[str] = []
text_chars = 0
for i in range(len(doc)):
try:
page = doc.load_page(i)
t = page.get_text("text") or ""
text_chars += len(t.strip())
buf.append(t)
except Exception:
buf.append("")
pages = len(buf)
doc.close()
return ("\n".join(buf), text_chars, pages)
def _ocr_with_tesseract_via_pymupdf(pdf_bytes: bytes, dpi_scale: float = 2.0) -> str:
"""
Tesseract OCR(任意)。pytesseract / Tesseract 本体が無い場合は空文字で返す。
Tesseract の未導入や言語データ欠如(jpn.traineddata 無し)による FileNotFoundError も
ここで握りつぶして空文字を返します(上位で「OCRが必要」として案内)。
"""
try:
import fitz # PyMuPDF
from PIL import Image
import pytesseract
except Exception:
return "" # OCR不可(依存未導入)
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception:
return ""
lang = os.environ.get("TESSERACT_LANG", "jpn+eng") # 日本語+英語
text_buf: List[str] = []
for i in range(len(doc)):
try:
page = doc.load_page(i)
mat = fitz.Matrix(dpi_scale, dpi_scale)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
try:
t = pytesseract.image_to_string(img, lang=lang) or ""
except FileNotFoundError:
# tesseract バイナリ or lang データが無い
t = ""
text_buf.append(t)
except Exception:
text_buf.append("")
doc.close()
return "\n".join(text_buf).strip()
def _is_meaningful(text: str, min_len: int = 10) -> bool:
return bool(text and text.strip() and len(text.strip()) >= min_len)
def pdf_bytes_to_text(pdf_bytes: bytes) -> Tuple[str, Dict]:
"""
PDF → テキスト。抽出メタを返す:
meta = {"method": "...", "scanned_likely": bool, "pages": int|None}
FileNotFoundError(tesseract 未導入 等)はここで潰して RuntimeError にまとめます。
"""
# 1) pypdf / PyPDF2
text = _extract_with_pypdf(pdf_bytes)
if _is_meaningful(text):
return text, {"method": "pypdf", "scanned_likely": False, "pages": None}
# 2) pdfminer.six
text = _extract_with_pdfminer(pdf_bytes)
if _is_meaningful(text):
return text, {"method": "pdfminer", "scanned_likely": False, "pages": None}
# 3) PyMuPDF get_text
pm_text, _chars, pages = _extract_with_pymupdf_text(pdf_bytes)
if _is_meaningful(pm_text):
return pm_text, {"method": "pymupdf", "scanned_likely": False, "pages": pages}
# 4) OCR(依存が無い/学習データが無い時の FileNotFoundError は握りつぶして空文字)
try:
ocr_text = _ocr_with_tesseract_via_pymupdf(pdf_bytes)
except FileNotFoundError:
ocr_text = ""
if _is_meaningful(ocr_text):
return ocr_text, {"method": "ocr", "scanned_likely": True, "pages": pages or None}
# 5) ここまで全滅 → 明示的に RuntimeError
raise RuntimeError(
"PDFテキスト抽出に失敗しました(pypdf/PyPDF2/pdfminer.six/PyMuPDF/OCR)。"
"スキャンPDFの可能性が高いです。OCR を有効化するには "
"『tesseract-ocr + pytesseract(必要なら tesseract-ocr-jpn)』を導入してください。"
)
# =========================
# テキスト整形・分割
# =========================
_WS_RE = re.compile(r"[ \t\u3000]+") # 半角/全角スペース畳み込み
def normalize_text(s: str) -> str:
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = _WS_RE.sub(" ", s)
s = re.sub(r"\n{3,}", "\n\n", s)
return s.strip()
def chunk_text(text: str, chunk_size: int = 1200, overlap: int = 200, min_chunk: int = 200) -> List[str]:
text = text.strip()
if not text:
return []
chunks: List[str] = []
i = 0
n = len(text)
step = max(1, chunk_size - overlap)
while i < n:
j = min(n, i + chunk_size)
chunk = text[i:j].strip()
if chunk:
chunks.append(chunk)
if j >= n:
break
i += step
if len(chunks) >= 2 and len(chunks[-1]) < min_chunk:
chunks[-2] = (chunks[-2] + "\n" + chunks[-1]).strip()
chunks.pop()
return chunks
# =========================
# 外部公開 API
# =========================
def ingest_pdf_bytes(title: str, source_url: str, pdf_bytes: bytes) -> int:
"""
PDF バイト列を解析し、チャンクをインデックス登録(ファイルは書かない)
例外は RuntimeError に正規化して上位に伝えます(FileNotFound は潰す)
"""
if not pdf_bytes:
raise RuntimeError("empty pdf_bytes")
try:
raw, meta = pdf_bytes_to_text(pdf_bytes)
except FileNotFoundError as e:
raise RuntimeError(f"OCR 実行に必要なバイナリ/言語データが見つかりません: {e}") from e
txt = normalize_text(raw)
if not _is_meaningful(txt):
raise RuntimeError("Parsed text is too short or empty after normalization")
doc_id = str(uuid.uuid4())
chunks = chunk_text(txt, chunk_size=1200, overlap=200, min_chunk=200)
records: List[Dict] = []
for idx, ck in enumerate(chunks):
records.append({
"doc_id": doc_id,
"chunk_id": f"{idx:04d}",
"title": title,
"source_url": source_url,
"text": ck,
})
added = add_to_index(records)
return int(added)
# 互換ダミー
def ingest_edinet_for_company(edinet_code: str, date: Optional[str] = None) -> int:
return 0
def download_edinet_pdf(*args, **kwargs):
raise NotImplementedError("download_edinet_pdf is not implemented in this minimal build.")
|