# core/extract.py from __future__ import annotations import os, io, re, json, base64, shutil from typing import List, Dict, Any, Tuple import pandas as pd from pdf2image import convert_from_path import pdfplumber # OpenAI SDK v1系を想定(requirements側で httpx==0.27.2 を厳格指定してください) from openai import OpenAI OPENAI_MODEL_VISION = os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini") OPENAI_MODEL_TEXT = os.environ.get("OPENAI_TEXT_MODEL", "gpt-4o-mini") # ---------- 内部ユーティリティ ---------- def _b64(img: bytes) -> str: return base64.b64encode(img).decode("utf-8") def _client() -> OpenAI: # httpxバージョンの相性チェック(0.28系だとproxies引数でコケる) try: import httpx if not httpx.__version__.startswith("0.27."): raise RuntimeError( f"httpx==0.27.x を利用してください(現在: {httpx.__version__})。" " requirements.txt に `httpx==0.27.2` を明記。" ) except Exception as e: # ここで例外にしてUIに表示する(診断しやすくする) raise e key = os.environ.get("OPENAI_API_KEY") if not key: raise RuntimeError("OPENAI_API_KEY が未設定です。Spaces の Secrets に追加してください。") return OpenAI(api_key=key, timeout=60) def _coerce_filepaths(files) -> List[str]: """Gradioから渡るfilesを確実にパス配列へ正規化""" paths: List[str] = [] if not files: return [] if isinstance(files, str): return [files] if files.lower().endswith(".pdf") and os.path.exists(files) else [] for f in files: if isinstance(f, str): p = f elif isinstance(f, dict) and "name" in f: p = f["name"] elif hasattr(f, "name"): p = getattr(f, "name") elif isinstance(f, tuple) and f and isinstance(f[0], str): p = f[0] else: p = None if p and p.lower().endswith(".pdf") and os.path.exists(p): paths.append(p) return paths # ---------- PDF -> 画像 / テキスト ---------- def pdf_to_images(pdf_path: str, dpi: int = 220, max_pages: int = 6) -> List[bytes]: images = convert_from_path(pdf_path, dpi=dpi, fmt="png") out: List[bytes] = [] for i, im in enumerate(images): if i >= max_pages: break buf = io.BytesIO() im.save(buf, format="PNG") out.append(buf.getvalue()) return out def pdf_to_text(pdf_path: str, max_chars: int = 15000) -> str: chunks: List[str] = [] with pdfplumber.open(pdf_path) as pdf: for i, page in enumerate(pdf.pages): t = (page.extract_text() or "").strip() if t: chunks.append(f"[page {i+1}]\n{t}") if sum(len(c) for c in chunks) > max_chars: break return "\n\n".join(chunks)[:max_chars] # ---------- 単位推定 ---------- _UNIT_MAP = { "円": 1, "千円": 1_000, "万円": 10_000, "百万円": 1_000_000, "million yen": 1_000_000, "thousand yen": 1_000, "yen": 1, } _UNIT_PATTERNS = [ r"単位\s*[::]?\s*(百万円|千円|万円|円)", r"単位\s*[((]\s*(百万円|千円|万円|円)\s*[))]", r"(unit|units)\s*[::]?\s*(million yen|thousand yen|yen)", ] def detect_unit(text: str) -> Tuple[str, int, list[str]]: """ PDFテキストから単位を推定。最頻ヒットを採用。無ければデフォルト百万円。 戻り値: (label, scale, hits[]) """ hits: list[str] = [] for pat in _UNIT_PATTERNS: for m in re.finditer(pat, text, flags=re.I): g = m.group(1).lower() # 日本語はそのまま、英語は小文字のまま map if g in ["百万円","千円","万円","円"]: hits.append(g) elif g in ["million yen","thousand yen","yen"]: hits.append(g) if hits: # 最頻値 from collections import Counter label = Counter(hits).most_common(1)[0][0] # 表示は日本語優先 disp = {"million yen":"百万円","thousand yen":"千円","yen":"円"}.get(label, label) scale = _UNIT_MAP[label] return disp, scale, hits # 「千円未満切捨て」などの補助ヒント if re.search(r"千円.*切[捨下]", text): return "千円", 1_000, ["補助ヒント: 千円未満切捨て"] if re.search(r"百万円.*切[捨下]", text): return "百万円", 1_000_000, ["補助ヒント: 百万円切捨て"] # 何も見つからなければ百万円を既定 return "百万円", 1_000_000, [] # ---------- OpenAI で表読み取り ---------- SYSTEM_JSON = """あなたは有能な財務アナリストです。 与えられた決算書(画像またはテキスト)から、次の厳密な JSON 構造のみを日本語の単位なし・半角数値で返してください。分からない項目は null。 { "company": {"name": null}, "period": {"start_date": null, "end_date": null}, "balance_sheet": { "total_assets": null, "total_liabilities": null, "total_equity": null, "current_assets": null, "fixed_assets": null, "current_liabilities": null, "long_term_liabilities": null }, "income_statement": { "sales": null, "cost_of_sales": null, "gross_profit": null, "operating_expenses": null, "operating_income": null, "ordinary_income": null, "net_income": null }, "cash_flows": { "operating_cash_flow": null, "investing_cash_flow": null, "financing_cash_flow": null } } """ def _extract_with_vision(images: List[bytes], company_hint: str = "") -> Dict[str, Any]: client = _client() content = [{"type": "text", "text": SYSTEM_JSON}] if company_hint: content.append({"type": "text", "text": f"会社名の候補: {company_hint}"}) for im in images: content.append({"type": "input_image", "image_url": f"data:image/png;base64,{_b64(im)}"}) resp = client.chat.completions.create( model=OPENAI_MODEL_VISION, messages=[ {"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。説明を含めない。"}, {"role": "user", "content": content}, ], response_format={"type": "json_object"}, temperature=0.1, ) return json.loads(resp.choices[0].message.content) def _extract_with_text(text: str, company_hint: str = "") -> Dict[str, Any]: client = _client() prompt = f"{SYSTEM_JSON}\n\n以下は決算書のテキストです。上記の JSON だけを返してください。\n\n{text or ''}" resp = client.chat.completions.create( model=OPENAI_MODEL_TEXT, messages=[ {"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。"}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, temperature=0.1, ) return json.loads(resp.choices[0].message.content) # ---------- JSON<->DataFrame 変換とスケーリング ---------- def fin_to_df(fin: Dict[str, Any]) -> pd.DataFrame: rows = [] def add(cat, d): for k, v in (d or {}).items(): rows.append({"category": cat, "item": k, "value": v}) add("balance_sheet", fin.get("balance_sheet")) add("income_statement", fin.get("income_statement")) add("cash_flows", fin.get("cash_flows")) return pd.DataFrame(rows, columns=["category", "item", "value"]) def _scale_fin(fin: Dict[str, Any], scale: float) -> Dict[str, Any]: def sc_val(v): if v in (None, "", "null"): return None try: return float(v) * scale except Exception: return None out = json.loads(json.dumps(fin)) # shallow copy for sec in ("balance_sheet", "income_statement", "cash_flows"): if sec in out and isinstance(out[sec], dict): for k, v in out[sec].items(): out[sec][k] = sc_val(v) return out # ---------- 入口:PDF解析 ---------- def parse_pdf(files, company: str = "", use_vision: bool = True) -> Tuple[Dict[str,Any], "pd.DataFrame", Dict[str,Any], str]: """ 返り値: (fin_scaled, df_scaled, meta, log) meta: {"unit_label","unit_scale","unit_hits":[...],"warnings":[...]} """ logs = [] paths = _coerce_filepaths(files) if not paths: raise RuntimeError("PDF をアップロードしてください。") # 1) テキスト連結(単位推定の根拠に使用) all_text = "" for p in paths: t = pdf_to_text(p) all_text += ("\n\n" + t) if all_text else t unit_label, unit_scale, unit_hits = detect_unit(all_text) logs.append(f"[unit] 推定: {unit_label} (×{unit_scale:,}) / hits: {unit_hits[:5]}{'...' if len(unit_hits)>5 else ''}") # 2) 画像化 + Vision → ダメならテキストへ fin_raw: Dict[str, Any] if use_vision: try: all_images: List[bytes] = [] for p in paths: all_images += pdf_to_images(p, dpi=220, max_pages=6) fin_raw = _extract_with_vision(all_images, company) logs.append("[extract] Vision 解析に成功") except Exception as e: logs.append(f"[extract] Vision 失敗→textへ: {e}") fin_raw = _extract_with_text(all_text, company) else: fin_raw = _extract_with_text(all_text, company) # 3) 単位スケーリング fin_scaled = _scale_fin(fin_raw, unit_scale) df_scaled = fin_to_df(fin_scaled) # 4) メタ情報 meta = { "unit_label": unit_label, "unit_scale": unit_scale, "unit_hits": unit_hits, "warnings": [], } # 5) ログ log = "\n".join(logs) return fin_scaled, df_scaled, meta, log