Score / core /extract.py
Corin1998's picture
Update core/extract.py
c37aeda verified
# core/extract.py
from __future__ import annotations
import os, io, re, json, base64, shutil
from typing import List, Dict, Any, Tuple
import pandas as pd
from pdf2image import convert_from_path
import pdfplumber
# OpenAI SDK v1系を想定(requirements側で httpx==0.27.2 を厳格指定してください)
from openai import OpenAI
OPENAI_MODEL_VISION = os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini")
OPENAI_MODEL_TEXT = os.environ.get("OPENAI_TEXT_MODEL", "gpt-4o-mini")
# ---------- 内部ユーティリティ ----------
def _b64(img: bytes) -> str:
return base64.b64encode(img).decode("utf-8")
def _client() -> OpenAI:
# httpxバージョンの相性チェック(0.28系だとproxies引数でコケる)
try:
import httpx
if not httpx.__version__.startswith("0.27."):
raise RuntimeError(
f"httpx==0.27.x を利用してください(現在: {httpx.__version__})。"
" requirements.txt に `httpx==0.27.2` を明記。"
)
except Exception as e:
# ここで例外にしてUIに表示する(診断しやすくする)
raise e
key = os.environ.get("OPENAI_API_KEY")
if not key:
raise RuntimeError("OPENAI_API_KEY が未設定です。Spaces の Secrets に追加してください。")
return OpenAI(api_key=key, timeout=60)
def _coerce_filepaths(files) -> List[str]:
"""Gradioから渡るfilesを確実にパス配列へ正規化"""
paths: List[str] = []
if not files:
return []
if isinstance(files, str):
return [files] if files.lower().endswith(".pdf") and os.path.exists(files) else []
for f in files:
if isinstance(f, str):
p = f
elif isinstance(f, dict) and "name" in f:
p = f["name"]
elif hasattr(f, "name"):
p = getattr(f, "name")
elif isinstance(f, tuple) and f and isinstance(f[0], str):
p = f[0]
else:
p = None
if p and p.lower().endswith(".pdf") and os.path.exists(p):
paths.append(p)
return paths
# ---------- PDF -> 画像 / テキスト ----------
def pdf_to_images(pdf_path: str, dpi: int = 220, max_pages: int = 6) -> List[bytes]:
images = convert_from_path(pdf_path, dpi=dpi, fmt="png")
out: List[bytes] = []
for i, im in enumerate(images):
if i >= max_pages:
break
buf = io.BytesIO()
im.save(buf, format="PNG")
out.append(buf.getvalue())
return out
def pdf_to_text(pdf_path: str, max_chars: int = 15000) -> str:
chunks: List[str] = []
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages):
t = (page.extract_text() or "").strip()
if t:
chunks.append(f"[page {i+1}]\n{t}")
if sum(len(c) for c in chunks) > max_chars:
break
return "\n\n".join(chunks)[:max_chars]
# ---------- 単位推定 ----------
_UNIT_MAP = {
"円": 1,
"千円": 1_000,
"万円": 10_000,
"百万円": 1_000_000,
"million yen": 1_000_000,
"thousand yen": 1_000,
"yen": 1,
}
_UNIT_PATTERNS = [
r"単位\s*[::]?\s*(百万円|千円|万円|円)",
r"単位\s*[((]\s*(百万円|千円|万円|円)\s*[))]",
r"(unit|units)\s*[::]?\s*(million yen|thousand yen|yen)",
]
def detect_unit(text: str) -> Tuple[str, int, list[str]]:
"""
PDFテキストから単位を推定。最頻ヒットを採用。無ければデフォルト百万円。
戻り値: (label, scale, hits[])
"""
hits: list[str] = []
for pat in _UNIT_PATTERNS:
for m in re.finditer(pat, text, flags=re.I):
g = m.group(1).lower()
# 日本語はそのまま、英語は小文字のまま map
if g in ["百万円","千円","万円","円"]:
hits.append(g)
elif g in ["million yen","thousand yen","yen"]:
hits.append(g)
if hits:
# 最頻値
from collections import Counter
label = Counter(hits).most_common(1)[0][0]
# 表示は日本語優先
disp = {"million yen":"百万円","thousand yen":"千円","yen":"円"}.get(label, label)
scale = _UNIT_MAP[label]
return disp, scale, hits
# 「千円未満切捨て」などの補助ヒント
if re.search(r"千円.*切[捨下]", text):
return "千円", 1_000, ["補助ヒント: 千円未満切捨て"]
if re.search(r"百万円.*切[捨下]", text):
return "百万円", 1_000_000, ["補助ヒント: 百万円切捨て"]
# 何も見つからなければ百万円を既定
return "百万円", 1_000_000, []
# ---------- OpenAI で表読み取り ----------
SYSTEM_JSON = """あなたは有能な財務アナリストです。
与えられた決算書(画像またはテキスト)から、次の厳密な JSON 構造のみを日本語の単位なし・半角数値で返してください。分からない項目は null。
{
"company": {"name": null},
"period": {"start_date": null, "end_date": null},
"balance_sheet": {
"total_assets": null, "total_liabilities": null, "total_equity": null,
"current_assets": null, "fixed_assets": null,
"current_liabilities": null, "long_term_liabilities": null
},
"income_statement": {
"sales": null, "cost_of_sales": null, "gross_profit": null,
"operating_expenses": null, "operating_income": null,
"ordinary_income": null, "net_income": null
},
"cash_flows": {
"operating_cash_flow": null, "investing_cash_flow": null, "financing_cash_flow": null
}
}
"""
def _extract_with_vision(images: List[bytes], company_hint: str = "") -> Dict[str, Any]:
client = _client()
content = [{"type": "text", "text": SYSTEM_JSON}]
if company_hint:
content.append({"type": "text", "text": f"会社名の候補: {company_hint}"})
for im in images:
content.append({"type": "input_image", "image_url": f"data:image/png;base64,{_b64(im)}"})
resp = client.chat.completions.create(
model=OPENAI_MODEL_VISION,
messages=[
{"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。説明を含めない。"},
{"role": "user", "content": content},
],
response_format={"type": "json_object"},
temperature=0.1,
)
return json.loads(resp.choices[0].message.content)
def _extract_with_text(text: str, company_hint: str = "") -> Dict[str, Any]:
client = _client()
prompt = f"{SYSTEM_JSON}\n\n以下は決算書のテキストです。上記の JSON だけを返してください。\n\n{text or ''}"
resp = client.chat.completions.create(
model=OPENAI_MODEL_TEXT,
messages=[
{"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。"},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
temperature=0.1,
)
return json.loads(resp.choices[0].message.content)
# ---------- JSON<->DataFrame 変換とスケーリング ----------
def fin_to_df(fin: Dict[str, Any]) -> pd.DataFrame:
rows = []
def add(cat, d):
for k, v in (d or {}).items():
rows.append({"category": cat, "item": k, "value": v})
add("balance_sheet", fin.get("balance_sheet"))
add("income_statement", fin.get("income_statement"))
add("cash_flows", fin.get("cash_flows"))
return pd.DataFrame(rows, columns=["category", "item", "value"])
def _scale_fin(fin: Dict[str, Any], scale: float) -> Dict[str, Any]:
def sc_val(v):
if v in (None, "", "null"):
return None
try:
return float(v) * scale
except Exception:
return None
out = json.loads(json.dumps(fin)) # shallow copy
for sec in ("balance_sheet", "income_statement", "cash_flows"):
if sec in out and isinstance(out[sec], dict):
for k, v in out[sec].items():
out[sec][k] = sc_val(v)
return out
# ---------- 入口:PDF解析 ----------
def parse_pdf(files, company: str = "", use_vision: bool = True) -> Tuple[Dict[str,Any], "pd.DataFrame", Dict[str,Any], str]:
"""
返り値: (fin_scaled, df_scaled, meta, log)
meta: {"unit_label","unit_scale","unit_hits":[...],"warnings":[...]}
"""
logs = []
paths = _coerce_filepaths(files)
if not paths:
raise RuntimeError("PDF をアップロードしてください。")
# 1) テキスト連結(単位推定の根拠に使用)
all_text = ""
for p in paths:
t = pdf_to_text(p)
all_text += ("\n\n" + t) if all_text else t
unit_label, unit_scale, unit_hits = detect_unit(all_text)
logs.append(f"[unit] 推定: {unit_label}{unit_scale:,}) / hits: {unit_hits[:5]}{'...' if len(unit_hits)>5 else ''}")
# 2) 画像化 + Vision → ダメならテキストへ
fin_raw: Dict[str, Any]
if use_vision:
try:
all_images: List[bytes] = []
for p in paths:
all_images += pdf_to_images(p, dpi=220, max_pages=6)
fin_raw = _extract_with_vision(all_images, company)
logs.append("[extract] Vision 解析に成功")
except Exception as e:
logs.append(f"[extract] Vision 失敗→textへ: {e}")
fin_raw = _extract_with_text(all_text, company)
else:
fin_raw = _extract_with_text(all_text, company)
# 3) 単位スケーリング
fin_scaled = _scale_fin(fin_raw, unit_scale)
df_scaled = fin_to_df(fin_scaled)
# 4) メタ情報
meta = {
"unit_label": unit_label,
"unit_scale": unit_scale,
"unit_hits": unit_hits,
"warnings": [],
}
# 5) ログ
log = "\n".join(logs)
return fin_scaled, df_scaled, meta, log