from __future__ import annotations import argparse import json import os import time from typing import Any, Dict, List from config import load_settings, DEFAULT_FREE_VISION_MODELS from pdf_io import extract_texts_from_pdf, render_pages_to_png_bytes from statement_candidates import build_candidate_lists, select_pages_for_llm from openrouter_client import ( choose_free_vision_model, choose_any_free_text_model, chat_completion, make_user_message_with_images, robust_json_loads, repair_to_json, ) PROMPT_TEMPLATE = """ You are an expert financial-report analyst. Task: Given (a) OCR/native text snippets for certain pages and (b) images of those pages, identify page ranges that contain ONLY the three PRIMARY financial statements tables: 1) Balance Sheet / Statement of Financial Position 2) Profit & Loss / Income / Earnings / Operations 3) Cash Flow Statement Important: - Many annual reports include BOTH consolidated and standalone statements. - You MUST return blocks for BOTH if present. - If a statement spans multiple pages, include ALL pages in that block. - A continuation page may not repeat the full title; use table structure + line-items. Heuristic candidate blocks (for reference only; you must verify from images+snippets): {heuristic_blocks} Pages provided (OCR/native snippets): {page_snippets} Return STRICT JSON ONLY (no markdown, no commentary). Schema (IMPORTANT: each statement is a LIST of blocks): {{ "balance_sheet": [ {{ "scope": "consolidated|standalone|unknown", "start_page": <1-indexed int>, "end_page": <1-indexed int>, "pages": [<1-indexed ints>], "confidence": <0..1>, "title": "", "evidence_pages": [<1-indexed ints>] }} ], "profit_and_loss": [ ... same block schema ... ], "cash_flow": [ ... same block schema ... ], "notes": [ "" ] }} Rules: - "pages" must list ALL pages in the block (even if it's one page). - start_page = min(pages), end_page = max(pages). - If a statement is NOT present, return an empty list for it. """.strip() def _combined_for_snippet(p) -> str: a = getattr(p, "extracted_text", "") or "" b = getattr(p, "ocr_text", "") or "" return (a + "\n" + b).strip() def build_page_snippets(page_texts: List[Any], selected_pages_0: List[int], max_chars_per_page: int = 1400) -> str: parts = [] for p0 in selected_pages_0: pt = page_texts[p0] txt = _combined_for_snippet(pt) txt = txt[:max_chars_per_page] parts.append(f"--- Page {p0+1} ---\n{txt}\n") return "\n".join(parts).strip() def format_heuristic_blocks(heuristic_blocks_0_based: dict, max_per_stmt: int = 6) -> str: if not isinstance(heuristic_blocks_0_based, dict): return "(none)" lines = [] for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]: bl = heuristic_blocks_0_based.get(stmt) or [] if not isinstance(bl, list) or not bl: lines.append(f"- {stmt}: (none)") continue bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True)[:max_per_stmt] parts = [] for b in bl_sorted: s = int(b.get("start")) + 1 e = int(b.get("end")) + 1 scope = (b.get("scope") or "unknown") title = b.get("title") parts.append(f"{scope}: {s}-{e}" + (f" ({title})" if title else "")) lines.append(f"- {stmt}: " + "; ".join(parts)) return "\n".join(lines) def validate_ranges(result: dict, page_count: int) -> dict: """ Normalize model output into list-of-blocks schema. Ensures every block has pages list; fixes start/end from pages. """ def clamp_int(v): if v is None or not isinstance(v, int): return None return v if 1 <= v <= page_count else None def normalize_pages(pages_val): if not isinstance(pages_val, list): return [] out = [x for x in pages_val if isinstance(x, int) and 1 <= x <= page_count] return sorted(set(out)) def norm_block(obj) -> dict: if not isinstance(obj, dict): obj = {} sp = clamp_int(obj.get("start_page")) ep = clamp_int(obj.get("end_page")) pages = normalize_pages(obj.get("pages")) if pages and (sp is None or ep is None): sp = min(pages) ep = max(pages) if sp is not None and ep is not None and ep < sp: sp, ep, pages = None, None, [] if not pages and sp is not None and ep is not None: pages = list(range(sp, ep + 1)) scope = obj.get("scope") if not isinstance(scope, str): scope = "unknown" scope = scope.lower().strip() if scope not in {"consolidated", "standalone", "unknown"}: scope = "unknown" conf = obj.get("confidence") conf = float(conf) if isinstance(conf, (int, float)) else 0.0 conf = max(0.0, min(1.0, conf)) evidence = obj.get("evidence_pages") if not isinstance(evidence, list): evidence = [] evidence = [x for x in evidence if isinstance(x, int) and 1 <= x <= page_count] title = obj.get("title") if title is not None and not isinstance(title, str): title = None # ALWAYS keep pages list even if single page if sp is None or ep is None: return { "start_page": None, "end_page": None, "pages": [], "scope": scope, "confidence": conf, "title": title, "evidence_pages": evidence, } return { "start_page": sp, "end_page": ep, "pages": pages, "scope": scope, "confidence": conf, "title": title, "evidence_pages": evidence if evidence else ([sp] if sp else []), } for k in ["balance_sheet", "profit_and_loss", "cash_flow"]: val = result.get(k) if isinstance(val, dict): val = [val] if not isinstance(val, list): val = [] result[k] = [norm_block(x) for x in val] if "notes" not in result or not isinstance(result["notes"], list): result["notes"] = [] else: result["notes"] = [x for x in result["notes"] if isinstance(x, str)] return result def merge_with_heuristics(result: dict, heuristic_blocks_0_based: dict, page_count: int) -> dict: """ Add missing consolidated/standalone blocks if LLM returned only one. Also expands single-page LLM blocks if heuristics show a longer block with same start+scope. """ if not isinstance(heuristic_blocks_0_based, dict): return result def overlap(a, b): return not (a[1] < b[0] or b[1] < a[0]) for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]: llm_blocks = result.get(stmt) or [] if not isinstance(llm_blocks, list): llm_blocks = [] hb = heuristic_blocks_0_based.get(stmt) or [] heur_blocks = [] if isinstance(hb, list): for b in hb: try: s = int(b.get("start")) + 1 e = int(b.get("end")) + 1 except Exception: continue if not (1 <= s <= page_count and 1 <= e <= page_count and e >= s): continue heur_blocks.append( { "start_page": s, "end_page": e, "pages": list(range(s, e + 1)), "scope": (b.get("scope") or "unknown"), "confidence": 0.35, "title": b.get("title"), "evidence_pages": [s], } ) # expand single-page blocks using heuristics for lb in llm_blocks: if not isinstance(lb, dict): continue sp = lb.get("start_page") ep = lb.get("end_page") scope = (lb.get("scope") or "unknown") if sp is None or ep is None: continue if sp == ep: for hb2 in heur_blocks: if hb2["scope"] == scope and hb2["start_page"] == sp and hb2["end_page"] > ep: lb["end_page"] = hb2["end_page"] lb["pages"] = hb2["pages"] break present_ranges = [ (b.get("start_page"), b.get("end_page")) for b in llm_blocks if isinstance(b, dict) and b.get("start_page") and b.get("end_page") ] present_scopes = {(b.get("scope") or "unknown") for b in llm_blocks if isinstance(b, dict)} # add missing scope blocks (common: consolidated + standalone) for hb2 in heur_blocks: if hb2["scope"] in present_scopes and len(heur_blocks) > 1: continue r = (hb2["start_page"], hb2["end_page"]) if any(overlap(r, (ps, pe)) for (ps, pe) in present_ranges if ps and pe): continue llm_blocks.append(hb2) present_scopes.add(hb2["scope"]) present_ranges.append(r) llm_blocks = [b for b in llm_blocks if isinstance(b, dict)] llm_blocks.sort(key=lambda b: (b.get("start_page") or 10**9, b.get("end_page") or 10**9)) result[stmt] = llm_blocks return result def analyze_pdf( pdf_path: str, output_path: str = "", debug_dir: str = "", openrouter_api_key: str | None = None, ) -> Dict[str, Any]: settings = load_settings(openrouter_api_key=openrouter_api_key or os.getenv("OPENROUTER_API_KEY", "").strip()) t0 = time.time() print(f"[1/6] Extracting text/OCR from PDF: {pdf_path}", flush=True) page_texts, page_count = extract_texts_from_pdf( pdf_path=pdf_path, dpi=settings.dpi, ocr_lang=settings.ocr_lang, min_text_chars_for_digital=settings.min_text_chars_for_digital, ) print(f" -> pages: {page_count} (t={time.time()-t0:.1f}s)", flush=True) print(f"[2/6] Building statement candidates + heuristic blocks...", flush=True) candidates, debug_info = build_candidate_lists( pages=page_texts, page_count=page_count, topk_per_statement=settings.topk_per_statement, continuation_max_forward=settings.continuation_max_forward, debug=True, ) print("[3/6] Selecting pages to send to LLM (images)...", flush=True) selected_pages_0 = select_pages_for_llm( candidates=candidates, debug_info=debug_info, page_count=page_count, max_images=settings.max_images, max_blocks_per_statement=settings.max_blocks_per_statement, ) print(f" -> selected {len(selected_pages_0)} pages: {[p+1 for p in selected_pages_0]}", flush=True) print("[4/6] Rendering selected pages to PNG bytes...", flush=True) images = render_pages_to_png_bytes(pdf_path, selected_pages_0, dpi=settings.dpi) heuristic_blocks_str = format_heuristic_blocks(debug_info.get("heuristic_blocks_0_based") or {}) snippets = build_page_snippets(page_texts, selected_pages_0) prompt = PROMPT_TEMPLATE.format( heuristic_blocks=heuristic_blocks_str, page_snippets=snippets, ) # Choose model model = settings.openrouter_model if not model: print("[5/6] Selecting a free vision model from OpenRouter...", flush=True) model = choose_free_vision_model(settings.openrouter_api_key, DEFAULT_FREE_VISION_MODELS) print(f"[5/6] Calling OpenRouter model: {model}", flush=True) messages = [ # {"role": "system", "content": "Return STRICT JSON only."}, make_user_message_with_images(prompt, images), ] raw = chat_completion(settings.openrouter_api_key, model=model, messages=messages, temperature=0.0, max_tokens=1400) raw_text = (raw.content or "").strip() print("[6/6] Parsing model output...", flush=True) try: parsed = robust_json_loads(raw_text) except Exception as e: print(" -> JSON parse failed, attempting repair:", str(e), flush=True) text_model = choose_any_free_text_model(settings.openrouter_api_key) fixed = repair_to_json(settings.openrouter_api_key, raw_text, model=text_model) parsed = robust_json_loads(fixed) if not isinstance(parsed, dict): parsed = {"balance_sheet": [], "profit_and_loss": [], "cash_flow": [], "notes": []} parsed = validate_ranges(parsed, page_count=page_count) parsed = merge_with_heuristics(parsed, debug_info.get("heuristic_blocks_0_based") or {}, page_count=page_count) result: Dict[str, Any] = dict(parsed) result["debug"] = { "selected_pages_1_based": [p + 1 for p in selected_pages_0], "candidates_top": debug_info.get("top_scoring", {}), "heuristic_blocks_0_based": debug_info.get("heuristic_blocks_0_based", {}), "item8_toc_page_1_based": (debug_info.get("item8_toc_page") + 1) if debug_info.get("item8_toc_page") is not None else None, } if output_path: with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2) print(f"Saved output -> {output_path}", flush=True) return result def main(): ap = argparse.ArgumentParser() ap.add_argument("--pdf", required=True, help="Path to input PDF") ap.add_argument("--out", required=False, default="", help="Path to output JSON file") ap.add_argument("--debug_dir", required=False, default="", help="Directory to store debug artifacts (optional)") args = ap.parse_args() result = analyze_pdf(pdf_path=args.pdf, output_path=args.out, debug_dir=args.debug_dir) print(json.dumps(result, indent=2), flush=True) if __name__ == "__main__": main()