Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import time | |
| from typing import Any, Dict, List | |
| from config import load_settings, DEFAULT_FREE_VISION_MODELS | |
| from pdf_io import extract_texts_from_pdf, render_pages_to_png_bytes | |
| from statement_candidates import build_candidate_lists, select_pages_for_llm | |
| from openrouter_client import ( | |
| choose_free_vision_model, | |
| choose_any_free_text_model, | |
| chat_completion, | |
| make_user_message_with_images, | |
| robust_json_loads, | |
| repair_to_json, | |
| ) | |
| PROMPT_TEMPLATE = """ | |
| You are an expert financial-report analyst. | |
| Task: | |
| Given (a) OCR/native text snippets for certain pages and (b) images of those pages, | |
| identify page ranges that contain ONLY the three PRIMARY financial statements tables: | |
| 1) Balance Sheet / Statement of Financial Position | |
| 2) Profit & Loss / Income / Earnings / Operations | |
| 3) Cash Flow Statement | |
| Important: | |
| - Many annual reports include BOTH consolidated and standalone statements. | |
| - You MUST return blocks for BOTH if present. | |
| - If a statement spans multiple pages, include ALL pages in that block. | |
| - A continuation page may not repeat the full title; use table structure + line-items. | |
| Heuristic candidate blocks (for reference only; you must verify from images+snippets): | |
| {heuristic_blocks} | |
| Pages provided (OCR/native snippets): | |
| {page_snippets} | |
| Return STRICT JSON ONLY (no markdown, no commentary). | |
| Schema (IMPORTANT: each statement is a LIST of blocks): | |
| {{ | |
| "balance_sheet": [ | |
| {{ | |
| "scope": "consolidated|standalone|unknown", | |
| "start_page": <1-indexed int>, | |
| "end_page": <1-indexed int>, | |
| "pages": [<1-indexed ints>], | |
| "confidence": <0..1>, | |
| "title": "<string or null>", | |
| "evidence_pages": [<1-indexed ints>] | |
| }} | |
| ], | |
| "profit_and_loss": [ ... same block schema ... ], | |
| "cash_flow": [ ... same block schema ... ], | |
| "notes": [ "<optional strings>" ] | |
| }} | |
| Rules: | |
| - "pages" must list ALL pages in the block (even if it's one page). | |
| - start_page = min(pages), end_page = max(pages). | |
| - If a statement is NOT present, return an empty list for it. | |
| """.strip() | |
| def _combined_for_snippet(p) -> str: | |
| a = getattr(p, "extracted_text", "") or "" | |
| b = getattr(p, "ocr_text", "") or "" | |
| return (a + "\n" + b).strip() | |
| def build_page_snippets(page_texts: List[Any], selected_pages_0: List[int], max_chars_per_page: int = 1400) -> str: | |
| parts = [] | |
| for p0 in selected_pages_0: | |
| pt = page_texts[p0] | |
| txt = _combined_for_snippet(pt) | |
| txt = txt[:max_chars_per_page] | |
| parts.append(f"--- Page {p0+1} ---\n{txt}\n") | |
| return "\n".join(parts).strip() | |
| def format_heuristic_blocks(heuristic_blocks_0_based: dict, max_per_stmt: int = 6) -> str: | |
| if not isinstance(heuristic_blocks_0_based, dict): | |
| return "(none)" | |
| lines = [] | |
| for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]: | |
| bl = heuristic_blocks_0_based.get(stmt) or [] | |
| if not isinstance(bl, list) or not bl: | |
| lines.append(f"- {stmt}: (none)") | |
| continue | |
| bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True)[:max_per_stmt] | |
| parts = [] | |
| for b in bl_sorted: | |
| s = int(b.get("start")) + 1 | |
| e = int(b.get("end")) + 1 | |
| scope = (b.get("scope") or "unknown") | |
| title = b.get("title") | |
| parts.append(f"{scope}: {s}-{e}" + (f" ({title})" if title else "")) | |
| lines.append(f"- {stmt}: " + "; ".join(parts)) | |
| return "\n".join(lines) | |
| def validate_ranges(result: dict, page_count: int) -> dict: | |
| """ | |
| Normalize model output into list-of-blocks schema. | |
| Ensures every block has pages list; fixes start/end from pages. | |
| """ | |
| def clamp_int(v): | |
| if v is None or not isinstance(v, int): | |
| return None | |
| return v if 1 <= v <= page_count else None | |
| def normalize_pages(pages_val): | |
| if not isinstance(pages_val, list): | |
| return [] | |
| out = [x for x in pages_val if isinstance(x, int) and 1 <= x <= page_count] | |
| return sorted(set(out)) | |
| def norm_block(obj) -> dict: | |
| if not isinstance(obj, dict): | |
| obj = {} | |
| sp = clamp_int(obj.get("start_page")) | |
| ep = clamp_int(obj.get("end_page")) | |
| pages = normalize_pages(obj.get("pages")) | |
| if pages and (sp is None or ep is None): | |
| sp = min(pages) | |
| ep = max(pages) | |
| if sp is not None and ep is not None and ep < sp: | |
| sp, ep, pages = None, None, [] | |
| if not pages and sp is not None and ep is not None: | |
| pages = list(range(sp, ep + 1)) | |
| scope = obj.get("scope") | |
| if not isinstance(scope, str): | |
| scope = "unknown" | |
| scope = scope.lower().strip() | |
| if scope not in {"consolidated", "standalone", "unknown"}: | |
| scope = "unknown" | |
| conf = obj.get("confidence") | |
| conf = float(conf) if isinstance(conf, (int, float)) else 0.0 | |
| conf = max(0.0, min(1.0, conf)) | |
| evidence = obj.get("evidence_pages") | |
| if not isinstance(evidence, list): | |
| evidence = [] | |
| evidence = [x for x in evidence if isinstance(x, int) and 1 <= x <= page_count] | |
| title = obj.get("title") | |
| if title is not None and not isinstance(title, str): | |
| title = None | |
| # ALWAYS keep pages list even if single page | |
| if sp is None or ep is None: | |
| return { | |
| "start_page": None, | |
| "end_page": None, | |
| "pages": [], | |
| "scope": scope, | |
| "confidence": conf, | |
| "title": title, | |
| "evidence_pages": evidence, | |
| } | |
| return { | |
| "start_page": sp, | |
| "end_page": ep, | |
| "pages": pages, | |
| "scope": scope, | |
| "confidence": conf, | |
| "title": title, | |
| "evidence_pages": evidence if evidence else ([sp] if sp else []), | |
| } | |
| for k in ["balance_sheet", "profit_and_loss", "cash_flow"]: | |
| val = result.get(k) | |
| if isinstance(val, dict): | |
| val = [val] | |
| if not isinstance(val, list): | |
| val = [] | |
| result[k] = [norm_block(x) for x in val] | |
| if "notes" not in result or not isinstance(result["notes"], list): | |
| result["notes"] = [] | |
| else: | |
| result["notes"] = [x for x in result["notes"] if isinstance(x, str)] | |
| return result | |
| def merge_with_heuristics(result: dict, heuristic_blocks_0_based: dict, page_count: int) -> dict: | |
| """ | |
| Add missing consolidated/standalone blocks if LLM returned only one. | |
| Also expands single-page LLM blocks if heuristics show a longer block with same start+scope. | |
| """ | |
| if not isinstance(heuristic_blocks_0_based, dict): | |
| return result | |
| def overlap(a, b): | |
| return not (a[1] < b[0] or b[1] < a[0]) | |
| for stmt in ["balance_sheet", "profit_and_loss", "cash_flow"]: | |
| llm_blocks = result.get(stmt) or [] | |
| if not isinstance(llm_blocks, list): | |
| llm_blocks = [] | |
| hb = heuristic_blocks_0_based.get(stmt) or [] | |
| heur_blocks = [] | |
| if isinstance(hb, list): | |
| for b in hb: | |
| try: | |
| s = int(b.get("start")) + 1 | |
| e = int(b.get("end")) + 1 | |
| except Exception: | |
| continue | |
| if not (1 <= s <= page_count and 1 <= e <= page_count and e >= s): | |
| continue | |
| heur_blocks.append( | |
| { | |
| "start_page": s, | |
| "end_page": e, | |
| "pages": list(range(s, e + 1)), | |
| "scope": (b.get("scope") or "unknown"), | |
| "confidence": 0.35, | |
| "title": b.get("title"), | |
| "evidence_pages": [s], | |
| } | |
| ) | |
| # expand single-page blocks using heuristics | |
| for lb in llm_blocks: | |
| if not isinstance(lb, dict): | |
| continue | |
| sp = lb.get("start_page") | |
| ep = lb.get("end_page") | |
| scope = (lb.get("scope") or "unknown") | |
| if sp is None or ep is None: | |
| continue | |
| if sp == ep: | |
| for hb2 in heur_blocks: | |
| if hb2["scope"] == scope and hb2["start_page"] == sp and hb2["end_page"] > ep: | |
| lb["end_page"] = hb2["end_page"] | |
| lb["pages"] = hb2["pages"] | |
| break | |
| present_ranges = [ | |
| (b.get("start_page"), b.get("end_page")) | |
| for b in llm_blocks | |
| if isinstance(b, dict) and b.get("start_page") and b.get("end_page") | |
| ] | |
| present_scopes = {(b.get("scope") or "unknown") for b in llm_blocks if isinstance(b, dict)} | |
| # add missing scope blocks (common: consolidated + standalone) | |
| for hb2 in heur_blocks: | |
| if hb2["scope"] in present_scopes and len(heur_blocks) > 1: | |
| continue | |
| r = (hb2["start_page"], hb2["end_page"]) | |
| if any(overlap(r, (ps, pe)) for (ps, pe) in present_ranges if ps and pe): | |
| continue | |
| llm_blocks.append(hb2) | |
| present_scopes.add(hb2["scope"]) | |
| present_ranges.append(r) | |
| llm_blocks = [b for b in llm_blocks if isinstance(b, dict)] | |
| llm_blocks.sort(key=lambda b: (b.get("start_page") or 10**9, b.get("end_page") or 10**9)) | |
| result[stmt] = llm_blocks | |
| return result | |
| def analyze_pdf( | |
| pdf_path: str, | |
| output_path: str = "", | |
| debug_dir: str = "", | |
| openrouter_api_key: str | None = None, | |
| ) -> Dict[str, Any]: | |
| settings = load_settings(openrouter_api_key=openrouter_api_key or os.getenv("OPENROUTER_API_KEY", "").strip()) | |
| t0 = time.time() | |
| print(f"[1/6] Extracting text/OCR from PDF: {pdf_path}", flush=True) | |
| page_texts, page_count = extract_texts_from_pdf( | |
| pdf_path=pdf_path, | |
| dpi=settings.dpi, | |
| ocr_lang=settings.ocr_lang, | |
| min_text_chars_for_digital=settings.min_text_chars_for_digital, | |
| ) | |
| print(f" -> pages: {page_count} (t={time.time()-t0:.1f}s)", flush=True) | |
| print(f"[2/6] Building statement candidates + heuristic blocks...", flush=True) | |
| candidates, debug_info = build_candidate_lists( | |
| pages=page_texts, | |
| page_count=page_count, | |
| topk_per_statement=settings.topk_per_statement, | |
| continuation_max_forward=settings.continuation_max_forward, | |
| debug=True, | |
| ) | |
| print("[3/6] Selecting pages to send to LLM (images)...", flush=True) | |
| selected_pages_0 = select_pages_for_llm( | |
| candidates=candidates, | |
| debug_info=debug_info, | |
| page_count=page_count, | |
| max_images=settings.max_images, | |
| max_blocks_per_statement=settings.max_blocks_per_statement, | |
| ) | |
| print(f" -> selected {len(selected_pages_0)} pages: {[p+1 for p in selected_pages_0]}", flush=True) | |
| print("[4/6] Rendering selected pages to PNG bytes...", flush=True) | |
| images = render_pages_to_png_bytes(pdf_path, selected_pages_0, dpi=settings.dpi) | |
| heuristic_blocks_str = format_heuristic_blocks(debug_info.get("heuristic_blocks_0_based") or {}) | |
| snippets = build_page_snippets(page_texts, selected_pages_0) | |
| prompt = PROMPT_TEMPLATE.format( | |
| heuristic_blocks=heuristic_blocks_str, | |
| page_snippets=snippets, | |
| ) | |
| # Choose model | |
| model = settings.openrouter_model | |
| if not model: | |
| print("[5/6] Selecting a free vision model from OpenRouter...", flush=True) | |
| model = choose_free_vision_model(settings.openrouter_api_key, DEFAULT_FREE_VISION_MODELS) | |
| print(f"[5/6] Calling OpenRouter model: {model}", flush=True) | |
| messages = [ | |
| # {"role": "system", "content": "Return STRICT JSON only."}, | |
| make_user_message_with_images(prompt, images), | |
| ] | |
| raw = chat_completion(settings.openrouter_api_key, model=model, messages=messages, temperature=0.0, max_tokens=1400) | |
| raw_text = (raw.content or "").strip() | |
| print("[6/6] Parsing model output...", flush=True) | |
| try: | |
| parsed = robust_json_loads(raw_text) | |
| except Exception as e: | |
| print(" -> JSON parse failed, attempting repair:", str(e), flush=True) | |
| text_model = choose_any_free_text_model(settings.openrouter_api_key) | |
| fixed = repair_to_json(settings.openrouter_api_key, raw_text, model=text_model) | |
| parsed = robust_json_loads(fixed) | |
| if not isinstance(parsed, dict): | |
| parsed = {"balance_sheet": [], "profit_and_loss": [], "cash_flow": [], "notes": []} | |
| parsed = validate_ranges(parsed, page_count=page_count) | |
| parsed = merge_with_heuristics(parsed, debug_info.get("heuristic_blocks_0_based") or {}, page_count=page_count) | |
| result: Dict[str, Any] = dict(parsed) | |
| result["debug"] = { | |
| "selected_pages_1_based": [p + 1 for p in selected_pages_0], | |
| "candidates_top": debug_info.get("top_scoring", {}), | |
| "heuristic_blocks_0_based": debug_info.get("heuristic_blocks_0_based", {}), | |
| "item8_toc_page_1_based": (debug_info.get("item8_toc_page") + 1) if debug_info.get("item8_toc_page") is not None else None, | |
| } | |
| if output_path: | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2) | |
| print(f"Saved output -> {output_path}", flush=True) | |
| return result | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--pdf", required=True, help="Path to input PDF") | |
| ap.add_argument("--out", required=False, default="", help="Path to output JSON file") | |
| ap.add_argument("--debug_dir", required=False, default="", help="Directory to store debug artifacts (optional)") | |
| args = ap.parse_args() | |
| result = analyze_pdf(pdf_path=args.pdf, output_path=args.out, debug_dir=args.debug_dir) | |
| print(json.dumps(result, indent=2), flush=True) | |
| if __name__ == "__main__": | |
| main() | |