|
|
|
|
|
from __future__ import annotations |
|
|
import os, io, re, json, base64, shutil |
|
|
from typing import List, Dict, Any, Tuple |
|
|
|
|
|
import pandas as pd |
|
|
from pdf2image import convert_from_path |
|
|
import pdfplumber |
|
|
|
|
|
|
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
OPENAI_MODEL_VISION = os.environ.get("OPENAI_VISION_MODEL", "gpt-4o-mini") |
|
|
OPENAI_MODEL_TEXT = os.environ.get("OPENAI_TEXT_MODEL", "gpt-4o-mini") |
|
|
|
|
|
|
|
|
|
|
|
def _b64(img: bytes) -> str: |
|
|
return base64.b64encode(img).decode("utf-8") |
|
|
|
|
|
def _client() -> OpenAI: |
|
|
|
|
|
try: |
|
|
import httpx |
|
|
if not httpx.__version__.startswith("0.27."): |
|
|
raise RuntimeError( |
|
|
f"httpx==0.27.x を利用してください(現在: {httpx.__version__})。" |
|
|
" requirements.txt に `httpx==0.27.2` を明記。" |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
raise e |
|
|
|
|
|
key = os.environ.get("OPENAI_API_KEY") |
|
|
if not key: |
|
|
raise RuntimeError("OPENAI_API_KEY が未設定です。Spaces の Secrets に追加してください。") |
|
|
return OpenAI(api_key=key, timeout=60) |
|
|
|
|
|
def _coerce_filepaths(files) -> List[str]: |
|
|
"""Gradioから渡るfilesを確実にパス配列へ正規化""" |
|
|
paths: List[str] = [] |
|
|
if not files: |
|
|
return [] |
|
|
if isinstance(files, str): |
|
|
return [files] if files.lower().endswith(".pdf") and os.path.exists(files) else [] |
|
|
for f in files: |
|
|
if isinstance(f, str): |
|
|
p = f |
|
|
elif isinstance(f, dict) and "name" in f: |
|
|
p = f["name"] |
|
|
elif hasattr(f, "name"): |
|
|
p = getattr(f, "name") |
|
|
elif isinstance(f, tuple) and f and isinstance(f[0], str): |
|
|
p = f[0] |
|
|
else: |
|
|
p = None |
|
|
if p and p.lower().endswith(".pdf") and os.path.exists(p): |
|
|
paths.append(p) |
|
|
return paths |
|
|
|
|
|
|
|
|
|
|
|
def pdf_to_images(pdf_path: str, dpi: int = 220, max_pages: int = 6) -> List[bytes]: |
|
|
images = convert_from_path(pdf_path, dpi=dpi, fmt="png") |
|
|
out: List[bytes] = [] |
|
|
for i, im in enumerate(images): |
|
|
if i >= max_pages: |
|
|
break |
|
|
buf = io.BytesIO() |
|
|
im.save(buf, format="PNG") |
|
|
out.append(buf.getvalue()) |
|
|
return out |
|
|
|
|
|
def pdf_to_text(pdf_path: str, max_chars: int = 15000) -> str: |
|
|
chunks: List[str] = [] |
|
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
for i, page in enumerate(pdf.pages): |
|
|
t = (page.extract_text() or "").strip() |
|
|
if t: |
|
|
chunks.append(f"[page {i+1}]\n{t}") |
|
|
if sum(len(c) for c in chunks) > max_chars: |
|
|
break |
|
|
return "\n\n".join(chunks)[:max_chars] |
|
|
|
|
|
|
|
|
|
|
|
_UNIT_MAP = { |
|
|
"円": 1, |
|
|
"千円": 1_000, |
|
|
"万円": 10_000, |
|
|
"百万円": 1_000_000, |
|
|
"million yen": 1_000_000, |
|
|
"thousand yen": 1_000, |
|
|
"yen": 1, |
|
|
} |
|
|
_UNIT_PATTERNS = [ |
|
|
r"単位\s*[::]?\s*(百万円|千円|万円|円)", |
|
|
r"単位\s*[((]\s*(百万円|千円|万円|円)\s*[))]", |
|
|
r"(unit|units)\s*[::]?\s*(million yen|thousand yen|yen)", |
|
|
] |
|
|
|
|
|
def detect_unit(text: str) -> Tuple[str, int, list[str]]: |
|
|
""" |
|
|
PDFテキストから単位を推定。最頻ヒットを採用。無ければデフォルト百万円。 |
|
|
戻り値: (label, scale, hits[]) |
|
|
""" |
|
|
hits: list[str] = [] |
|
|
for pat in _UNIT_PATTERNS: |
|
|
for m in re.finditer(pat, text, flags=re.I): |
|
|
g = m.group(1).lower() |
|
|
|
|
|
if g in ["百万円","千円","万円","円"]: |
|
|
hits.append(g) |
|
|
elif g in ["million yen","thousand yen","yen"]: |
|
|
hits.append(g) |
|
|
|
|
|
if hits: |
|
|
|
|
|
from collections import Counter |
|
|
label = Counter(hits).most_common(1)[0][0] |
|
|
|
|
|
disp = {"million yen":"百万円","thousand yen":"千円","yen":"円"}.get(label, label) |
|
|
scale = _UNIT_MAP[label] |
|
|
return disp, scale, hits |
|
|
|
|
|
|
|
|
if re.search(r"千円.*切[捨下]", text): |
|
|
return "千円", 1_000, ["補助ヒント: 千円未満切捨て"] |
|
|
if re.search(r"百万円.*切[捨下]", text): |
|
|
return "百万円", 1_000_000, ["補助ヒント: 百万円切捨て"] |
|
|
|
|
|
|
|
|
return "百万円", 1_000_000, [] |
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_JSON = """あなたは有能な財務アナリストです。 |
|
|
与えられた決算書(画像またはテキスト)から、次の厳密な JSON 構造のみを日本語の単位なし・半角数値で返してください。分からない項目は null。 |
|
|
{ |
|
|
"company": {"name": null}, |
|
|
"period": {"start_date": null, "end_date": null}, |
|
|
"balance_sheet": { |
|
|
"total_assets": null, "total_liabilities": null, "total_equity": null, |
|
|
"current_assets": null, "fixed_assets": null, |
|
|
"current_liabilities": null, "long_term_liabilities": null |
|
|
}, |
|
|
"income_statement": { |
|
|
"sales": null, "cost_of_sales": null, "gross_profit": null, |
|
|
"operating_expenses": null, "operating_income": null, |
|
|
"ordinary_income": null, "net_income": null |
|
|
}, |
|
|
"cash_flows": { |
|
|
"operating_cash_flow": null, "investing_cash_flow": null, "financing_cash_flow": null |
|
|
} |
|
|
} |
|
|
""" |
|
|
|
|
|
def _extract_with_vision(images: List[bytes], company_hint: str = "") -> Dict[str, Any]: |
|
|
client = _client() |
|
|
content = [{"type": "text", "text": SYSTEM_JSON}] |
|
|
if company_hint: |
|
|
content.append({"type": "text", "text": f"会社名の候補: {company_hint}"}) |
|
|
for im in images: |
|
|
content.append({"type": "input_image", "image_url": f"data:image/png;base64,{_b64(im)}"}) |
|
|
|
|
|
resp = client.chat.completions.create( |
|
|
model=OPENAI_MODEL_VISION, |
|
|
messages=[ |
|
|
{"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。説明を含めない。"}, |
|
|
{"role": "user", "content": content}, |
|
|
], |
|
|
response_format={"type": "json_object"}, |
|
|
temperature=0.1, |
|
|
) |
|
|
return json.loads(resp.choices[0].message.content) |
|
|
|
|
|
def _extract_with_text(text: str, company_hint: str = "") -> Dict[str, Any]: |
|
|
client = _client() |
|
|
prompt = f"{SYSTEM_JSON}\n\n以下は決算書のテキストです。上記の JSON だけを返してください。\n\n{text or ''}" |
|
|
resp = client.chat.completions.create( |
|
|
model=OPENAI_MODEL_TEXT, |
|
|
messages=[ |
|
|
{"role": "system", "content": "返答は必ず有効な JSON オブジェクトのみ。"}, |
|
|
{"role": "user", "content": prompt}, |
|
|
], |
|
|
response_format={"type": "json_object"}, |
|
|
temperature=0.1, |
|
|
) |
|
|
return json.loads(resp.choices[0].message.content) |
|
|
|
|
|
|
|
|
|
|
|
def fin_to_df(fin: Dict[str, Any]) -> pd.DataFrame: |
|
|
rows = [] |
|
|
def add(cat, d): |
|
|
for k, v in (d or {}).items(): |
|
|
rows.append({"category": cat, "item": k, "value": v}) |
|
|
add("balance_sheet", fin.get("balance_sheet")) |
|
|
add("income_statement", fin.get("income_statement")) |
|
|
add("cash_flows", fin.get("cash_flows")) |
|
|
return pd.DataFrame(rows, columns=["category", "item", "value"]) |
|
|
|
|
|
def _scale_fin(fin: Dict[str, Any], scale: float) -> Dict[str, Any]: |
|
|
def sc_val(v): |
|
|
if v in (None, "", "null"): |
|
|
return None |
|
|
try: |
|
|
return float(v) * scale |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
out = json.loads(json.dumps(fin)) |
|
|
for sec in ("balance_sheet", "income_statement", "cash_flows"): |
|
|
if sec in out and isinstance(out[sec], dict): |
|
|
for k, v in out[sec].items(): |
|
|
out[sec][k] = sc_val(v) |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
def parse_pdf(files, company: str = "", use_vision: bool = True) -> Tuple[Dict[str,Any], "pd.DataFrame", Dict[str,Any], str]: |
|
|
""" |
|
|
返り値: (fin_scaled, df_scaled, meta, log) |
|
|
meta: {"unit_label","unit_scale","unit_hits":[...],"warnings":[...]} |
|
|
""" |
|
|
logs = [] |
|
|
paths = _coerce_filepaths(files) |
|
|
if not paths: |
|
|
raise RuntimeError("PDF をアップロードしてください。") |
|
|
|
|
|
|
|
|
all_text = "" |
|
|
for p in paths: |
|
|
t = pdf_to_text(p) |
|
|
all_text += ("\n\n" + t) if all_text else t |
|
|
unit_label, unit_scale, unit_hits = detect_unit(all_text) |
|
|
logs.append(f"[unit] 推定: {unit_label} (×{unit_scale:,}) / hits: {unit_hits[:5]}{'...' if len(unit_hits)>5 else ''}") |
|
|
|
|
|
|
|
|
fin_raw: Dict[str, Any] |
|
|
if use_vision: |
|
|
try: |
|
|
all_images: List[bytes] = [] |
|
|
for p in paths: |
|
|
all_images += pdf_to_images(p, dpi=220, max_pages=6) |
|
|
fin_raw = _extract_with_vision(all_images, company) |
|
|
logs.append("[extract] Vision 解析に成功") |
|
|
except Exception as e: |
|
|
logs.append(f"[extract] Vision 失敗→textへ: {e}") |
|
|
fin_raw = _extract_with_text(all_text, company) |
|
|
else: |
|
|
fin_raw = _extract_with_text(all_text, company) |
|
|
|
|
|
|
|
|
fin_scaled = _scale_fin(fin_raw, unit_scale) |
|
|
df_scaled = fin_to_df(fin_scaled) |
|
|
|
|
|
|
|
|
meta = { |
|
|
"unit_label": unit_label, |
|
|
"unit_scale": unit_scale, |
|
|
"unit_hits": unit_hits, |
|
|
"warnings": [], |
|
|
} |
|
|
|
|
|
|
|
|
log = "\n".join(logs) |
|
|
return fin_scaled, df_scaled, meta, log |
|
|
|