finalyze / main.py
FridayCodehhr's picture
Upload 10 files
a9d5e1b verified
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any, Dict, List
from config import load_settings, DEFAULT_FREE_VISION_MODELS
from pdf_io import extract_texts_from_pdf, render_pages_to_png_bytes
from statement_candidates import build_candidate_lists, select_pages_for_llm
from openrouter_client import (
choose_free_vision_model,
choose_any_free_text_model,
chat_completion,
make_user_message_with_images,
robust_json_loads,
repair_to_json,
)
PROMPT_TEMPLATE = """
You are an expert financial-report analyst.
Task:
Given (a) OCR/native text snippets for certain pages and (b) images of those pages,
identify page ranges that contain ONLY the three PRIMARY financial statements tables:
1) Balance Sheet / Statement of Financial Position
2) Profit & Loss / Income / Earnings / Operations
3) Cash Flow Statement
Important:
- Many annual reports include BOTH consolidated and standalone statements.
- You MUST return blocks for BOTH if present.
- If a statement spans multiple pages, include ALL pages in that block.
- A continuation page may not repeat the full title; use table structure + line-items.
Heuristic candidate blocks (for reference only; you must verify from images+snippets):
{heuristic_blocks}
Pages provided (OCR/native snippets):
{page_snippets}
Return STRICT JSON ONLY (no markdown, no commentary).
Schema (IMPORTANT: each statement is a LIST of blocks):
{{
"balance_sheet": [
{{
"scope": "consolidated|standalone|unknown",
"start_page": <1-indexed int>,
"end_page": <1-indexed int>,
"pages": [<1-indexed ints>],
"confidence": <0..1>,
"title": "<string or null>",
"evidence_pages": [<1-indexed ints>]
}}
],
"profit_and_loss": [ ... same block schema ... ],
"cash_flow": [ ... same block schema ... ],
"notes": [ "<optional strings>" ]
}}
Rules:
- "pages" must list ALL pages in the block (even if it's one page).
- start_page = min(pages), end_page = max(pages).
- If a statement is NOT present, return an empty list for it.
""".strip()
def _combined_for_snippet(p) -> str:
a = getattr(p, "extracted_text", "") or ""
b = getattr(p, "ocr_text", "") or ""
return (a + "\n" + b).strip()
def build_page_snippets(page_texts: List[Any], selected_pages_0: List[int], max_chars_per_page: int = 1400) -> str:
parts = []
for p0 in selected_pages_0:
pt = page_texts[p0]
txt = _combined_for_snippet(pt)
txt = txt[:max_chars_per_page]
parts.append(f"--- Page {p0+1} ---\n{txt}\n")
return "\n".join(parts).strip()
def format_heuristic_blocks(heuristic_blocks_0_based: dict, max_per_stmt: int = 6) -> str:
if not isinstance(heuristic_blocks_0_based, dict):
return "(none)"
lines = []
for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]:
bl = heuristic_blocks_0_based.get(stmt) or []
if not isinstance(bl, list) or not bl:
lines.append(f"- {stmt}: (none)")
continue
bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True)[:max_per_stmt]
parts = []
for b in bl_sorted:
s = int(b.get("start")) + 1
e = int(b.get("end")) + 1
scope = (b.get("scope") or "unknown")
title = b.get("title")
parts.append(f"{scope}: {s}-{e}" + (f" ({title})" if title else ""))
lines.append(f"- {stmt}: " + "; ".join(parts))
return "\n".join(lines)
def validate_ranges(result: dict, page_count: int) -> dict:
"""
Normalize model output into list-of-blocks schema.
Ensures every block has pages list; fixes start/end from pages.
"""
def clamp_int(v):
if v is None or not isinstance(v, int):
return None
return v if 1 <= v <= page_count else None
def normalize_pages(pages_val):
if not isinstance(pages_val, list):
return []
out = [x for x in pages_val if isinstance(x, int) and 1 <= x <= page_count]
return sorted(set(out))
def norm_block(obj) -> dict:
if not isinstance(obj, dict):
obj = {}
sp = clamp_int(obj.get("start_page"))
ep = clamp_int(obj.get("end_page"))
pages = normalize_pages(obj.get("pages"))
if pages and (sp is None or ep is None):
sp = min(pages)
ep = max(pages)
if sp is not None and ep is not None and ep < sp:
sp, ep, pages = None, None, []
if not pages and sp is not None and ep is not None:
pages = list(range(sp, ep + 1))
scope = obj.get("scope")
if not isinstance(scope, str):
scope = "unknown"
scope = scope.lower().strip()
if scope not in {"consolidated", "standalone", "unknown"}:
scope = "unknown"
conf = obj.get("confidence")
conf = float(conf) if isinstance(conf, (int, float)) else 0.0
conf = max(0.0, min(1.0, conf))
evidence = obj.get("evidence_pages")
if not isinstance(evidence, list):
evidence = []
evidence = [x for x in evidence if isinstance(x, int) and 1 <= x <= page_count]
title = obj.get("title")
if title is not None and not isinstance(title, str):
title = None
# ALWAYS keep pages list even if single page
if sp is None or ep is None:
return {
"start_page": None,
"end_page": None,
"pages": [],
"scope": scope,
"confidence": conf,
"title": title,
"evidence_pages": evidence,
}
return {
"start_page": sp,
"end_page": ep,
"pages": pages,
"scope": scope,
"confidence": conf,
"title": title,
"evidence_pages": evidence if evidence else ([sp] if sp else []),
}
for k in ["balance_sheet", "profit_and_loss", "cash_flow"]:
val = result.get(k)
if isinstance(val, dict):
val = [val]
if not isinstance(val, list):
val = []
result[k] = [norm_block(x) for x in val]
if "notes" not in result or not isinstance(result["notes"], list):
result["notes"] = []
else:
result["notes"] = [x for x in result["notes"] if isinstance(x, str)]
return result
def merge_with_heuristics(result: dict, heuristic_blocks_0_based: dict, page_count: int) -> dict:
"""
Add missing consolidated/standalone blocks if LLM returned only one.
Also expands single-page LLM blocks if heuristics show a longer block with same start+scope.
"""
if not isinstance(heuristic_blocks_0_based, dict):
return result
def overlap(a, b):
return not (a[1] < b[0] or b[1] < a[0])
for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]:
llm_blocks = result.get(stmt) or []
if not isinstance(llm_blocks, list):
llm_blocks = []
hb = heuristic_blocks_0_based.get(stmt) or []
heur_blocks = []
if isinstance(hb, list):
for b in hb:
try:
s = int(b.get("start")) + 1
e = int(b.get("end")) + 1
except Exception:
continue
if not (1 <= s <= page_count and 1 <= e <= page_count and e >= s):
continue
heur_blocks.append(
{
"start_page": s,
"end_page": e,
"pages": list(range(s, e + 1)),
"scope": (b.get("scope") or "unknown"),
"confidence": 0.35,
"title": b.get("title"),
"evidence_pages": [s],
}
)
# expand single-page blocks using heuristics
for lb in llm_blocks:
if not isinstance(lb, dict):
continue
sp = lb.get("start_page")
ep = lb.get("end_page")
scope = (lb.get("scope") or "unknown")
if sp is None or ep is None:
continue
if sp == ep:
for hb2 in heur_blocks:
if hb2["scope"] == scope and hb2["start_page"] == sp and hb2["end_page"] > ep:
lb["end_page"] = hb2["end_page"]
lb["pages"] = hb2["pages"]
break
present_ranges = [
(b.get("start_page"), b.get("end_page"))
for b in llm_blocks
if isinstance(b, dict) and b.get("start_page") and b.get("end_page")
]
present_scopes = {(b.get("scope") or "unknown") for b in llm_blocks if isinstance(b, dict)}
# add missing scope blocks (common: consolidated + standalone)
for hb2 in heur_blocks:
if hb2["scope"] in present_scopes and len(heur_blocks) > 1:
continue
r = (hb2["start_page"], hb2["end_page"])
if any(overlap(r, (ps, pe)) for (ps, pe) in present_ranges if ps and pe):
continue
llm_blocks.append(hb2)
present_scopes.add(hb2["scope"])
present_ranges.append(r)
llm_blocks = [b for b in llm_blocks if isinstance(b, dict)]
llm_blocks.sort(key=lambda b: (b.get("start_page") or 10**9, b.get("end_page") or 10**9))
result[stmt] = llm_blocks
return result
def analyze_pdf(
pdf_path: str,
output_path: str = "",
debug_dir: str = "",
openrouter_api_key: str | None = None,
) -> Dict[str, Any]:
settings = load_settings(openrouter_api_key=openrouter_api_key or os.getenv("OPENROUTER_API_KEY", "").strip())
t0 = time.time()
print(f"[1/6] Extracting text/OCR from PDF: {pdf_path}", flush=True)
page_texts, page_count = extract_texts_from_pdf(
pdf_path=pdf_path,
dpi=settings.dpi,
ocr_lang=settings.ocr_lang,
min_text_chars_for_digital=settings.min_text_chars_for_digital,
)
print(f" -> pages: {page_count} (t={time.time()-t0:.1f}s)", flush=True)
print(f"[2/6] Building statement candidates + heuristic blocks...", flush=True)
candidates, debug_info = build_candidate_lists(
pages=page_texts,
page_count=page_count,
topk_per_statement=settings.topk_per_statement,
continuation_max_forward=settings.continuation_max_forward,
debug=True,
)
print("[3/6] Selecting pages to send to LLM (images)...", flush=True)
selected_pages_0 = select_pages_for_llm(
candidates=candidates,
debug_info=debug_info,
page_count=page_count,
max_images=settings.max_images,
max_blocks_per_statement=settings.max_blocks_per_statement,
)
print(f" -> selected {len(selected_pages_0)} pages: {[p+1 for p in selected_pages_0]}", flush=True)
print("[4/6] Rendering selected pages to PNG bytes...", flush=True)
images = render_pages_to_png_bytes(pdf_path, selected_pages_0, dpi=settings.dpi)
heuristic_blocks_str = format_heuristic_blocks(debug_info.get("heuristic_blocks_0_based") or {})
snippets = build_page_snippets(page_texts, selected_pages_0)
prompt = PROMPT_TEMPLATE.format(
heuristic_blocks=heuristic_blocks_str,
page_snippets=snippets,
)
# Choose model
model = settings.openrouter_model
if not model:
print("[5/6] Selecting a free vision model from OpenRouter...", flush=True)
model = choose_free_vision_model(settings.openrouter_api_key, DEFAULT_FREE_VISION_MODELS)
print(f"[5/6] Calling OpenRouter model: {model}", flush=True)
messages = [
# {"role": "system", "content": "Return STRICT JSON only."},
make_user_message_with_images(prompt, images),
]
raw = chat_completion(settings.openrouter_api_key, model=model, messages=messages, temperature=0.0, max_tokens=1400)
raw_text = (raw.content or "").strip()
print("[6/6] Parsing model output...", flush=True)
try:
parsed = robust_json_loads(raw_text)
except Exception as e:
print(" -> JSON parse failed, attempting repair:", str(e), flush=True)
text_model = choose_any_free_text_model(settings.openrouter_api_key)
fixed = repair_to_json(settings.openrouter_api_key, raw_text, model=text_model)
parsed = robust_json_loads(fixed)
if not isinstance(parsed, dict):
parsed = {"balance_sheet": [], "profit_and_loss": [], "cash_flow": [], "notes": []}
parsed = validate_ranges(parsed, page_count=page_count)
parsed = merge_with_heuristics(parsed, debug_info.get("heuristic_blocks_0_based") or {}, page_count=page_count)
result: Dict[str, Any] = dict(parsed)
result["debug"] = {
"selected_pages_1_based": [p + 1 for p in selected_pages_0],
"candidates_top": debug_info.get("top_scoring", {}),
"heuristic_blocks_0_based": debug_info.get("heuristic_blocks_0_based", {}),
"item8_toc_page_1_based": (debug_info.get("item8_toc_page") + 1) if debug_info.get("item8_toc_page") is not None else None,
}
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
print(f"Saved output -> {output_path}", flush=True)
return result
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--pdf", required=True, help="Path to input PDF")
ap.add_argument("--out", required=False, default="", help="Path to output JSON file")
ap.add_argument("--debug_dir", required=False, default="", help="Directory to store debug artifacts (optional)")
args = ap.parse_args()
result = analyze_pdf(pdf_path=args.pdf, output_path=args.out, debug_dir=args.debug_dir)
print(json.dumps(result, indent=2), flush=True)
if __name__ == "__main__":
main()