from __future__ import annotations import re import difflib from typing import Any, Dict, List, Optional, Sequence, Tuple # ========================= # Targets (ONLY these 3) # ========================= TARGETS = ["balance_sheet", "profit_and_loss", "cash_flow"] AUX = ["comprehensive_income", "equity", "notes"] # only for delimiting (when available) # ========================= # Title variants # ========================= TITLE_VARIANTS: Dict[str, List[str]] = { "balance_sheet": [ "Consolidated Balance Sheets", "Standalone Balance Sheets", "Balance Sheets", "Statement of Financial Position", "Standalone Statement of Financial Position", ], "profit_and_loss": [ "Consolidated Statements of Earnings", "Standalone Statements of Earnings", "Consolidated Statements of Operations", "Standalone Statements of Operations", "Consolidated Statements of Income", "Standalone Statements of Income", "Income Statement", "Statement of Profit and Loss", "Statement of Profit & Loss", ], "cash_flow": [ "Consolidated Statements of Cash Flows", "Standalone Statements of Cash Flows", "Statement of Cash Flows", "Cash Flow Statement", ], # aux "comprehensive_income": [ "Consolidated Statements of Comprehensive Income", "Standalone Statements of Comprehensive Income", "Statement of Comprehensive Income", ], "equity": [ "Consolidated Statements of Equity", "Standalone Statements of Equity", "Statement of Stockholders' Equity", "Statement of Shareholders' Equity", ], "notes": [ "Notes to Consolidated Financial Statements", "Notes to Standalone Financial Statements", "Notes to Financial Statements", ], } INTEGRAL_FOOTER = "the accompanying notes are an integral part" SIG_TERMS: Dict[str, List[str]] = { "balance_sheet": [ "total assets", "total liabilities", "total equity", "stockholders' equity", "shareholders' equity", "liabilities and equity", "current assets", "current liabilities", "non-current assets", "non-current liabilities", ], "profit_and_loss": [ "net revenues", "net sales", "revenue", "cost of sales", "cost of products sold", "gross profit", "operating income", "operating profit", "profit before tax", "net income", "net earnings", "earnings per share", "basic", "diluted", ], "cash_flow": [ "cash flows from operating activities", "cash flows from investing activities", "cash flows from financing activities", "net cash provided by operating activities", "net cash used in investing activities", "net cash used in financing activities", "cash and cash equivalents, end of year", "net change in cash", ], } NOTE_HEADING_RE = re.compile(r"^\s*note\s+\d+\b", re.IGNORECASE) DOT_LEADER_RE = re.compile(r"\.{5,}") ITEM8_RE = re.compile( r"\bITEM\s+8\.\s+FINANCIAL\s+STATEMENTS\s+AND\s+SUPPLEMENTARY\s+DATA\b", re.IGNORECASE ) CONTINUED_RE = re.compile(r"\bcontinued\b", re.IGNORECASE) # ========================= # Utilities # ========================= def _combined_text(page_obj: Any) -> str: if page_obj is None: return "" if isinstance(page_obj, str): return page_obj if isinstance(page_obj, dict): a = page_obj.get("extracted_text") or page_obj.get("text") or "" b = page_obj.get("ocr_text") or "" return (a + "\n" + b).strip() a = getattr(page_obj, "extracted_text", None) or getattr(page_obj, "text", None) or "" b = getattr(page_obj, "ocr_text", None) or "" return (a + "\n" + b).strip() def _norm(s: str) -> str: return re.sub(r"\s+", " ", (s or "")).strip().lower() def _fuzzy_line_contains_title(top_lines: List[str], title: str, threshold: float = 0.86) -> bool: title_n = _norm(title) for ln in top_lines: ln_n = _norm(ln) if not ln_n: continue if title_n in ln_n: return True r = difflib.SequenceMatcher(None, ln_n, title_n).ratio() if r >= threshold: return True return False def detect_title_match(text: str, stmt: str) -> Tuple[bool, Optional[str], str]: """ Returns (matched?, matched_variant, scope) scope in {"consolidated","standalone","unknown"} """ lines = (text or "").splitlines() top_lines = [ln.strip() for ln in lines[:16] if ln.strip()] for variant in TITLE_VARIANTS.get(stmt, []): if _fuzzy_line_contains_title(top_lines, variant): vlow = variant.lower() if "consolidated" in vlow: scope = "consolidated" elif "standalone" in vlow or "separate" in vlow: scope = "standalone" else: scope = "unknown" return True, variant, scope joined = " ".join(top_lines).lower() # fallback for OCR garble if stmt == "balance_sheet" and ("balance sheet" in joined or "financial position" in joined): if "consolidated" in joined: return True, None, "consolidated" if "standalone" in joined or "separate" in joined: return True, None, "standalone" return True, None, "unknown" if stmt == "cash_flow" and ("cash flow" in joined or "cash flows" in joined): if "consolidated" in joined: return True, None, "consolidated" if "standalone" in joined or "separate" in joined: return True, None, "standalone" return True, None, "unknown" if stmt == "profit_and_loss" and ( "statement of profit" in joined or "profit and loss" in joined or "income statement" in joined or "statements of income" in joined or "statements of operations" in joined or "statements of earnings" in joined ): if "consolidated" in joined: return True, None, "consolidated" if "standalone" in joined or "separate" in joined: return True, None, "standalone" return True, None, "unknown" return False, None, "unknown" def detect_title(text: str, stmt: str) -> bool: ok, _, _ = detect_title_match(text, stmt) return ok # ========================= # (Optional) 10-K TOC mapping helpers (kept, but now scope-safe) # ========================= FOOTER_PIPE_RE = re.compile(r"\|\s*(\d{1,4})\s*$", re.MULTILINE) FOOTER_FORM_RE = re.compile(r"form\s+10-?k\s*\|\s*(\d{1,4})\s*$", re.IGNORECASE | re.MULTILINE) def extract_footer_internal_page(text: str) -> Optional[int]: t = text or "" m = FOOTER_PIPE_RE.findall(t) if m: return int(m[-1]) m = FOOTER_FORM_RE.findall(t) if m: return int(m[-1]) lines = [ln.strip() for ln in (t.splitlines() if t else []) if ln.strip()] for ln in reversed(lines[-6:]): if re.fullmatch(r"\d{1,4}", ln): return int(ln) return None def find_item8_toc_page(all_texts: Sequence[str]) -> Optional[int]: candidates = [] for i, txt in enumerate(all_texts): if not ITEM8_RE.search(txt or ""): continue low = _norm(txt) tocish = ("page" in low) and (DOT_LEADER_RE.search(txt or "") is not None) if tocish: candidates.append(i) return candidates[0] if candidates else None def parse_statement_index_numbers(toc_text: str) -> Dict[str, int]: """ Return internal page numbers from the index. IMPORTANT: keeps consolidated + standalone separately: key = f"{stmt}__{scope}" """ lines = [ln.strip() for ln in (toc_text or "").splitlines()] out: Dict[str, int] = {} pats = { "profit_and_loss": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+(earnings|operations|income)", re.I), "comprehensive_income": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+comprehensive\s+income", re.I), "balance_sheet": re.compile(r"(consolidated|standalone)\s+balance\s+sheets?|statement\s+of\s+financial\s+position", re.I), "equity": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+equity|stockholders[’']\s+equity|shareholders[’']\s+equity", re.I), "cash_flow": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+cash\s+flows?", re.I), "notes": re.compile(r"notes\s+to\s+(consolidated|standalone)\s+financial\s+statements", re.I), } for i, ln in enumerate(lines): if not ln: continue for stmt, pat in pats.items(): mscope = pat.search(ln) if not mscope: continue scope = (mscope.group(1) or "").strip().lower() if scope not in {"consolidated", "standalone"}: scope = "unknown" out_key = f"{stmt}__{scope}" # number at end of line m = re.findall(r"(\d{1,4})\s*$", ln) if m and ln.endswith(m[-1]): out.setdefault(out_key, int(m[-1])) continue # number on next line j = i + 1 while j < len(lines) and not lines[j]: j += 1 if j < len(lines) and re.fullmatch(r"\d{1,4}", lines[j]): out.setdefault(out_key, int(lines[j])) return out def build_internal_to_pdf_map(all_texts: Sequence[str]) -> Dict[int, int]: mapping: Dict[int, int] = {} for pdf_i, txt in enumerate(all_texts): n = extract_footer_internal_page(txt or "") if n is None: continue mapping.setdefault(n, pdf_i) return mapping def map_internal_to_pdf(internal: int, internal_to_pdf: Dict[int, int]) -> Optional[int]: if internal in internal_to_pdf: return internal_to_pdf[internal] keys = sorted(internal_to_pdf.keys()) if not keys: return None best_k = min(keys, key=lambda k: abs(k - internal)) return internal_to_pdf[best_k] + (internal - best_k) # ========================= # Scoring # ========================= def _page_stats(text: str) -> Dict[str, float]: t = text or "" low = t.lower() year_count = len(re.findall(r"\b20\d{2}\b", t)) currency_count = len(re.findall(r"[$€£]|usd|inr|eur|gbp", low)) paren_neg = len(re.findall(r"\(\s*\d", t)) integral = 1.0 if INTEGRAL_FOOTER in low else 0.0 tokens = re.findall(r"[A-Za-z]+|\d+(?:,\d{3})*(?:\.\d+)?", t) if not tokens: return dict(num_ratio=0.0, year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral) nums = sum(1 for tok in tokens if re.fullmatch(r"\d+(?:,\d{3})*(?:\.\d+)?", tok)) alphas = sum(1 for tok in tokens if re.fullmatch(r"[A-Za-z]+", tok)) num_ratio = nums / max(1.0, nums + alphas) return dict(num_ratio=float(num_ratio), year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral) def score_statement_page(text: str, stmt: str) -> Tuple[float, Dict[str, Any]]: low = (text or "").lower() top = (text or "")[:1200] st = _page_stats(text) reasons: Dict[str, Any] = {"title": False, "scope": "unknown", "sig_hits": [], "integral": False, "penalties": [], "stats": st} score = 0.0 ok, _, scope = detect_title_match(top, stmt) if ok: score += 60.0 reasons["title"] = True reasons["scope"] = scope else: score -= 20.0 reasons["penalties"].append("no_title(-20)") if st["integral"] > 0: score += 12.0 reasons["integral"] = True hits = 0 for term in SIG_TERMS.get(stmt, []): if term in low: hits += 1 reasons["sig_hits"].append(term) score += min(hits, 10) * 5.0 score += st["num_ratio"] * 24.0 score += min(st["year_count"], 10.0) * 1.2 score += min(st["currency"], 10.0) * 1.8 score += min(st["paren"], 10.0) * 1.0 if NOTE_HEADING_RE.search((text or "")[:220]): score -= 45.0 reasons["penalties"].append("note_heading(-45)") if DOT_LEADER_RE.search(text or ""): score -= 25.0 reasons["penalties"].append("toc_dotleaders(-25)") if reasons["title"] and st["num_ratio"] < 0.08 and st["year_count"] < 1: score -= 30.0 reasons["penalties"].append("title_without_table(-30)") if hits < 2: score -= 12.0 reasons["penalties"].append("low_sig_hits(<2)(-12)") return score, reasons def _statement_signal_no_title(text: str, stmt: str) -> float: """ Continuation-page score (no title required). Used to extend blocks forward. """ if not text: return 0.0 if NOTE_HEADING_RE.search(text[:220]): return 0.0 if DOT_LEADER_RE.search(text): return 0.0 low = text.lower() st = _page_stats(text) hits = 0 for term in SIG_TERMS.get(stmt, []): if term in low: hits += 1 score = 0.0 score += min(hits, 10) * 4.5 score += st["num_ratio"] * 26.0 score += min(st["year_count"], 10.0) * 1.1 score += min(st["currency"], 10.0) * 1.5 score += min(st["paren"], 10.0) * 0.7 if CONTINUED_RE.search(text[:240]): score += 8.0 # special: if a page has strong signature terms + years, it's often a continuation if hits >= 2 and st["year_count"] >= 1: score += 6.0 return score def _any_other_statement_title(text: str, stmt: str) -> bool: for other in TARGETS: if other == stmt: continue if detect_title(text[:1200], other): return True return False def _expand_block(all_texts: Sequence[str], stmt: str, start: int, max_forward: int = 6) -> int: """ Expand forward to include continuation pages. Stops if another statement begins (unless this stmt title repeats). """ end = start n = len(all_texts) for j in range(start + 1, min(n, start + 1 + max_forward)): txt = all_texts[j] or "" if _any_other_statement_title(txt, stmt) and not detect_title(txt[:1200], stmt): break sig = _statement_signal_no_title(txt, stmt) if sig >= 13.5: end = j continue if CONTINUED_RE.search(txt[:240]) and sig >= 8.0: end = j continue break return end def _blocks_overlap(a: Tuple[int, int], b: Tuple[int, int]) -> bool: return not (a[1] < b[0] or b[1] < a[0]) def _dedup_blocks(blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Deduplicate overlapping blocks, keeping higher 'score'. """ blocks = sorted(blocks, key=lambda x: (int(x.get("start", 10**9)), -(float(x.get("score") or 0.0)))) kept: List[Dict[str, Any]] = [] for b in blocks: r = (int(b.get("start")), int(b.get("end"))) merged = False for k in kept: kr = (int(k.get("start")), int(k.get("end"))) if _blocks_overlap(r, kr): if float(b.get("score") or 0.0) > float(k.get("score") or 0.0): k.update(b) merged = True break if not merged: kept.append(b) return kept def build_blocks_from_titles(all_texts: Sequence[str], continuation_max_forward: int = 6) -> Dict[str, List[Dict[str, Any]]]: """ Finds MULTIPLE blocks per statement (consolidated + standalone). Strategy: - find title pages for stmt - cluster nearby title hits of same scope - expand each start forward with continuation scoring """ out: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS} for stmt in TARGETS: title_hits: List[Tuple[int, float, str, Optional[str]]] = [] for i, txt in enumerate(all_texts): ok, variant, scope = detect_title_match((txt or "")[:1200], stmt) if not ok: continue sc, _why = score_statement_page(txt or "", stmt) if sc < 30.0: continue title_hits.append((i, float(sc), scope, variant)) if not title_hits: continue title_hits.sort(key=lambda x: x[0]) clusters: List[List[Tuple[int, float, str, Optional[str]]]] = [] for hit in title_hits: if not clusters: clusters.append([hit]) continue last = clusters[-1][-1] # group if same scope and close if hit[2] == last[2] and hit[0] <= last[0] + 3: clusters[-1].append(hit) else: clusters.append([hit]) blocks: List[Dict[str, Any]] = [] for cl in clusters: start = min(h[0] for h in cl) best = max(cl, key=lambda x: x[1]) best_score = best[1] scope = best[2] title = best[3] end = _expand_block(all_texts, stmt, start, max_forward=continuation_max_forward) blocks.append( { "start": int(start), "end": int(end), "scope": scope, "title": title, "score": float(best_score), } ) out[stmt] = _dedup_blocks(blocks) return out # ========================= # Main builder # ========================= def build_candidate_lists( pages: Sequence[Any], page_count: int, topk_per_statement: int = 3, continuation_max_forward: int = 6, debug: bool = True, ) -> Tuple[Dict[str, List[Tuple[int, float]]], Dict[str, Any]]: """ Returns: candidates: {stmt: [(page_idx, score), ...]} debug_info: includes heuristic_blocks_0_based per stmt (list of blocks) """ all_texts = [_combined_text(p) for p in pages] debug_info: Dict[str, Any] = { "item8_toc_page": None, "toc_internal": {}, "internal_to_pdf_map_size": 0, "heuristic_blocks_0_based": {k: [] for k in TARGETS}, "top_scoring": {k: [] for k in TARGETS}, } # 1) Title-based multi-blocks (works for many non-10K PDFs too) title_blocks = build_blocks_from_titles(all_texts, continuation_max_forward=continuation_max_forward) # 2) Try 10-K Item8 TOC mapping (optional; mostly US 10-Ks) toc_blocks: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS} toc_i = find_item8_toc_page(all_texts) if toc_i is not None: debug_info["item8_toc_page"] = toc_i toc_text = all_texts[toc_i] or "" toc_internal = parse_statement_index_numbers(toc_text) debug_info["toc_internal"] = toc_internal internal_to_pdf = build_internal_to_pdf_map(all_texts) debug_info["internal_to_pdf_map_size"] = len(internal_to_pdf) # convert internal -> pdf for key_scoped, internal_page in toc_internal.items(): if "__" not in key_scoped: continue stmt, scope = key_scoped.split("__", 1) if stmt not in TARGETS: continue start_pdf = map_internal_to_pdf(internal_page, internal_to_pdf) if start_pdf is None: continue # expand a block from TOC-derived start end_pdf = _expand_block(all_texts, stmt, start_pdf, max_forward=continuation_max_forward) toc_blocks[stmt].append( { "start": int(start_pdf), "end": int(end_pdf), "scope": scope if scope in {"consolidated", "standalone"} else "unknown", "title": None, "score": 55.0, # heuristic } ) for stmt in TARGETS: toc_blocks[stmt] = _dedup_blocks(toc_blocks[stmt]) # merge blocks merged_blocks: Dict[str, List[Dict[str, Any]]] = {} for stmt in TARGETS: merged_blocks[stmt] = _dedup_blocks((title_blocks.get(stmt) or []) + (toc_blocks.get(stmt) or [])) # keep only top N blocks by score, but keep distinct scope if possible bl = sorted(merged_blocks[stmt], key=lambda b: float(b.get("score") or 0.0), reverse=True) chosen: List[Dict[str, Any]] = [] seen_scope = set() for b in bl: scope = (b.get("scope") or "unknown") if scope in seen_scope and len(bl) > 1: continue chosen.append(b) seen_scope.add(scope) if len(chosen) >= 4: # internal cap, actual final cap comes from settings in main break merged_blocks[stmt] = sorted(chosen, key=lambda b: (int(b["start"]), int(b["end"]))) debug_info["heuristic_blocks_0_based"] = merged_blocks # 3) Strong per-page scoring candidates (fallback / also helpful for LLM page picking) candidates: Dict[str, List[Tuple[int, float]]] = {k: [] for k in TARGETS} reasons_store: Dict[str, Dict[int, Any]] = {k: {} for k in TARGETS} for i, txt in enumerate(all_texts): for stmt in TARGETS: sc, why = score_statement_page(txt or "", stmt) if sc > 0: candidates[stmt].append((i, float(sc))) if debug and (why.get("title") or sc > 80): reasons_store[stmt][i] = why for stmt in TARGETS: candidates[stmt].sort(key=lambda x: x[1], reverse=True) debug_info["top_scoring"][stmt] = candidates[stmt][: min(len(candidates[stmt]), 10)] candidates[stmt] = candidates[stmt][:topk_per_statement] debug_info[f"reasons_{stmt}"] = reasons_store[stmt] return candidates, debug_info def select_pages_for_llm( candidates: Dict[str, List[Tuple[int, float]]], debug_info: Dict[str, Any], page_count: int, max_images: int, max_blocks_per_statement: int = 2, ) -> List[int]: """ Prefer multi-block heuristic pages (include BOTH consolidated + standalone if found). Else fallback to top candidates + neighbors. """ picked: List[int] = [] seen = set() def add(p: int): if 0 <= p < page_count and p not in seen and len(picked) < max_images: seen.add(p) picked.append(p) blocks_by_stmt = debug_info.get("heuristic_blocks_0_based") or {} if isinstance(blocks_by_stmt, dict) and any(blocks_by_stmt.get(k) for k in TARGETS): for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]: bl = blocks_by_stmt.get(stmt) or [] if not isinstance(bl, list) or not bl: continue # pick top blocks, prefer distinct scopes bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True) chosen: List[Dict[str, Any]] = [] seen_scope = set() for b in bl_sorted: scope = (b.get("scope") or "unknown") if scope in seen_scope and len(bl_sorted) > 1: continue chosen.append(b) seen_scope.add(scope) if len(chosen) >= max_blocks_per_statement: break for b in chosen: s, e = int(b.get("start")), int(b.get("end")) for p in range(s, e + 1): add(p) add(s - 1) add(e + 1) return sorted(picked) # fallback: use top candidates for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]: for (p, _sc) in candidates.get(stmt, [])[:2]: add(p) add(p - 1) add(p + 1) return sorted(picked)