finalyze / statement_candidates.py
FridayCodehhr's picture
Upload 10 files
a9d5e1b verified
from __future__ import annotations
import re
import difflib
from typing import Any, Dict, List, Optional, Sequence, Tuple
# =========================
# Targets (ONLY these 3)
# =========================
TARGETS = ["balance_sheet", "profit_and_loss", "cash_flow"]
AUX = ["comprehensive_income", "equity", "notes"] # only for delimiting (when available)
# =========================
# Title variants
# =========================
TITLE_VARIANTS: Dict[str, List[str]] = {
"balance_sheet": [
"Consolidated Balance Sheets",
"Standalone Balance Sheets",
"Balance Sheets",
"Statement of Financial Position",
"Standalone Statement of Financial Position",
],
"profit_and_loss": [
"Consolidated Statements of Earnings",
"Standalone Statements of Earnings",
"Consolidated Statements of Operations",
"Standalone Statements of Operations",
"Consolidated Statements of Income",
"Standalone Statements of Income",
"Income Statement",
"Statement of Profit and Loss",
"Statement of Profit & Loss",
],
"cash_flow": [
"Consolidated Statements of Cash Flows",
"Standalone Statements of Cash Flows",
"Statement of Cash Flows",
"Cash Flow Statement",
],
# aux
"comprehensive_income": [
"Consolidated Statements of Comprehensive Income",
"Standalone Statements of Comprehensive Income",
"Statement of Comprehensive Income",
],
"equity": [
"Consolidated Statements of Equity",
"Standalone Statements of Equity",
"Statement of Stockholders' Equity",
"Statement of Shareholders' Equity",
],
"notes": [
"Notes to Consolidated Financial Statements",
"Notes to Standalone Financial Statements",
"Notes to Financial Statements",
],
}
INTEGRAL_FOOTER = "the accompanying notes are an integral part"
SIG_TERMS: Dict[str, List[str]] = {
"balance_sheet": [
"total assets",
"total liabilities",
"total equity",
"stockholders' equity",
"shareholders' equity",
"liabilities and equity",
"current assets",
"current liabilities",
"non-current assets",
"non-current liabilities",
],
"profit_and_loss": [
"net revenues",
"net sales",
"revenue",
"cost of sales",
"cost of products sold",
"gross profit",
"operating income",
"operating profit",
"profit before tax",
"net income",
"net earnings",
"earnings per share",
"basic",
"diluted",
],
"cash_flow": [
"cash flows from operating activities",
"cash flows from investing activities",
"cash flows from financing activities",
"net cash provided by operating activities",
"net cash used in investing activities",
"net cash used in financing activities",
"cash and cash equivalents, end of year",
"net change in cash",
],
}
NOTE_HEADING_RE = re.compile(r"^\s*note\s+\d+\b", re.IGNORECASE)
DOT_LEADER_RE = re.compile(r"\.{5,}")
ITEM8_RE = re.compile(
r"\bITEM\s+8\.\s+FINANCIAL\s+STATEMENTS\s+AND\s+SUPPLEMENTARY\s+DATA\b", re.IGNORECASE
)
CONTINUED_RE = re.compile(r"\bcontinued\b", re.IGNORECASE)
# =========================
# Utilities
# =========================
def _combined_text(page_obj: Any) -> str:
if page_obj is None:
return ""
if isinstance(page_obj, str):
return page_obj
if isinstance(page_obj, dict):
a = page_obj.get("extracted_text") or page_obj.get("text") or ""
b = page_obj.get("ocr_text") or ""
return (a + "\n" + b).strip()
a = getattr(page_obj, "extracted_text", None) or getattr(page_obj, "text", None) or ""
b = getattr(page_obj, "ocr_text", None) or ""
return (a + "\n" + b).strip()
def _norm(s: str) -> str:
return re.sub(r"\s+", " ", (s or "")).strip().lower()
def _fuzzy_line_contains_title(top_lines: List[str], title: str, threshold: float = 0.86) -> bool:
title_n = _norm(title)
for ln in top_lines:
ln_n = _norm(ln)
if not ln_n:
continue
if title_n in ln_n:
return True
r = difflib.SequenceMatcher(None, ln_n, title_n).ratio()
if r >= threshold:
return True
return False
def detect_title_match(text: str, stmt: str) -> Tuple[bool, Optional[str], str]:
"""
Returns (matched?, matched_variant, scope)
scope in {"consolidated","standalone","unknown"}
"""
lines = (text or "").splitlines()
top_lines = [ln.strip() for ln in lines[:16] if ln.strip()]
for variant in TITLE_VARIANTS.get(stmt, []):
if _fuzzy_line_contains_title(top_lines, variant):
vlow = variant.lower()
if "consolidated" in vlow:
scope = "consolidated"
elif "standalone" in vlow or "separate" in vlow:
scope = "standalone"
else:
scope = "unknown"
return True, variant, scope
joined = " ".join(top_lines).lower()
# fallback for OCR garble
if stmt == "balance_sheet" and ("balance sheet" in joined or "financial position" in joined):
if "consolidated" in joined:
return True, None, "consolidated"
if "standalone" in joined or "separate" in joined:
return True, None, "standalone"
return True, None, "unknown"
if stmt == "cash_flow" and ("cash flow" in joined or "cash flows" in joined):
if "consolidated" in joined:
return True, None, "consolidated"
if "standalone" in joined or "separate" in joined:
return True, None, "standalone"
return True, None, "unknown"
if stmt == "profit_and_loss" and (
"statement of profit" in joined
or "profit and loss" in joined
or "income statement" in joined
or "statements of income" in joined
or "statements of operations" in joined
or "statements of earnings" in joined
):
if "consolidated" in joined:
return True, None, "consolidated"
if "standalone" in joined or "separate" in joined:
return True, None, "standalone"
return True, None, "unknown"
return False, None, "unknown"
def detect_title(text: str, stmt: str) -> bool:
ok, _, _ = detect_title_match(text, stmt)
return ok
# =========================
# (Optional) 10-K TOC mapping helpers (kept, but now scope-safe)
# =========================
FOOTER_PIPE_RE = re.compile(r"\|\s*(\d{1,4})\s*$", re.MULTILINE)
FOOTER_FORM_RE = re.compile(r"form\s+10-?k\s*\|\s*(\d{1,4})\s*$", re.IGNORECASE | re.MULTILINE)
def extract_footer_internal_page(text: str) -> Optional[int]:
t = text or ""
m = FOOTER_PIPE_RE.findall(t)
if m:
return int(m[-1])
m = FOOTER_FORM_RE.findall(t)
if m:
return int(m[-1])
lines = [ln.strip() for ln in (t.splitlines() if t else []) if ln.strip()]
for ln in reversed(lines[-6:]):
if re.fullmatch(r"\d{1,4}", ln):
return int(ln)
return None
def find_item8_toc_page(all_texts: Sequence[str]) -> Optional[int]:
candidates = []
for i, txt in enumerate(all_texts):
if not ITEM8_RE.search(txt or ""):
continue
low = _norm(txt)
tocish = ("page" in low) and (DOT_LEADER_RE.search(txt or "") is not None)
if tocish:
candidates.append(i)
return candidates[0] if candidates else None
def parse_statement_index_numbers(toc_text: str) -> Dict[str, int]:
"""
Return internal page numbers from the index.
IMPORTANT: keeps consolidated + standalone separately:
key = f"{stmt}__{scope}"
"""
lines = [ln.strip() for ln in (toc_text or "").splitlines()]
out: Dict[str, int] = {}
pats = {
"profit_and_loss": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+(earnings|operations|income)", re.I),
"comprehensive_income": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+comprehensive\s+income", re.I),
"balance_sheet": re.compile(r"(consolidated|standalone)\s+balance\s+sheets?|statement\s+of\s+financial\s+position", re.I),
"equity": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+equity|stockholders[’']\s+equity|shareholders[’']\s+equity", re.I),
"cash_flow": re.compile(r"(consolidated|standalone)\s+statements?\s+of\s+cash\s+flows?", re.I),
"notes": re.compile(r"notes\s+to\s+(consolidated|standalone)\s+financial\s+statements", re.I),
}
for i, ln in enumerate(lines):
if not ln:
continue
for stmt, pat in pats.items():
mscope = pat.search(ln)
if not mscope:
continue
scope = (mscope.group(1) or "").strip().lower()
if scope not in {"consolidated", "standalone"}:
scope = "unknown"
out_key = f"{stmt}__{scope}"
# number at end of line
m = re.findall(r"(\d{1,4})\s*$", ln)
if m and ln.endswith(m[-1]):
out.setdefault(out_key, int(m[-1]))
continue
# number on next line
j = i + 1
while j < len(lines) and not lines[j]:
j += 1
if j < len(lines) and re.fullmatch(r"\d{1,4}", lines[j]):
out.setdefault(out_key, int(lines[j]))
return out
def build_internal_to_pdf_map(all_texts: Sequence[str]) -> Dict[int, int]:
mapping: Dict[int, int] = {}
for pdf_i, txt in enumerate(all_texts):
n = extract_footer_internal_page(txt or "")
if n is None:
continue
mapping.setdefault(n, pdf_i)
return mapping
def map_internal_to_pdf(internal: int, internal_to_pdf: Dict[int, int]) -> Optional[int]:
if internal in internal_to_pdf:
return internal_to_pdf[internal]
keys = sorted(internal_to_pdf.keys())
if not keys:
return None
best_k = min(keys, key=lambda k: abs(k - internal))
return internal_to_pdf[best_k] + (internal - best_k)
# =========================
# Scoring
# =========================
def _page_stats(text: str) -> Dict[str, float]:
t = text or ""
low = t.lower()
year_count = len(re.findall(r"\b20\d{2}\b", t))
currency_count = len(re.findall(r"[$€£]|usd|inr|eur|gbp", low))
paren_neg = len(re.findall(r"\(\s*\d", t))
integral = 1.0 if INTEGRAL_FOOTER in low else 0.0
tokens = re.findall(r"[A-Za-z]+|\d+(?:,\d{3})*(?:\.\d+)?", t)
if not tokens:
return dict(num_ratio=0.0, year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral)
nums = sum(1 for tok in tokens if re.fullmatch(r"\d+(?:,\d{3})*(?:\.\d+)?", tok))
alphas = sum(1 for tok in tokens if re.fullmatch(r"[A-Za-z]+", tok))
num_ratio = nums / max(1.0, nums + alphas)
return dict(num_ratio=float(num_ratio), year_count=float(year_count), currency=float(currency_count), paren=float(paren_neg), integral=integral)
def score_statement_page(text: str, stmt: str) -> Tuple[float, Dict[str, Any]]:
low = (text or "").lower()
top = (text or "")[:1200]
st = _page_stats(text)
reasons: Dict[str, Any] = {"title": False, "scope": "unknown", "sig_hits": [], "integral": False, "penalties": [], "stats": st}
score = 0.0
ok, _, scope = detect_title_match(top, stmt)
if ok:
score += 60.0
reasons["title"] = True
reasons["scope"] = scope
else:
score -= 20.0
reasons["penalties"].append("no_title(-20)")
if st["integral"] > 0:
score += 12.0
reasons["integral"] = True
hits = 0
for term in SIG_TERMS.get(stmt, []):
if term in low:
hits += 1
reasons["sig_hits"].append(term)
score += min(hits, 10) * 5.0
score += st["num_ratio"] * 24.0
score += min(st["year_count"], 10.0) * 1.2
score += min(st["currency"], 10.0) * 1.8
score += min(st["paren"], 10.0) * 1.0
if NOTE_HEADING_RE.search((text or "")[:220]):
score -= 45.0
reasons["penalties"].append("note_heading(-45)")
if DOT_LEADER_RE.search(text or ""):
score -= 25.0
reasons["penalties"].append("toc_dotleaders(-25)")
if reasons["title"] and st["num_ratio"] < 0.08 and st["year_count"] < 1:
score -= 30.0
reasons["penalties"].append("title_without_table(-30)")
if hits < 2:
score -= 12.0
reasons["penalties"].append("low_sig_hits(<2)(-12)")
return score, reasons
def _statement_signal_no_title(text: str, stmt: str) -> float:
"""
Continuation-page score (no title required). Used to extend blocks forward.
"""
if not text:
return 0.0
if NOTE_HEADING_RE.search(text[:220]):
return 0.0
if DOT_LEADER_RE.search(text):
return 0.0
low = text.lower()
st = _page_stats(text)
hits = 0
for term in SIG_TERMS.get(stmt, []):
if term in low:
hits += 1
score = 0.0
score += min(hits, 10) * 4.5
score += st["num_ratio"] * 26.0
score += min(st["year_count"], 10.0) * 1.1
score += min(st["currency"], 10.0) * 1.5
score += min(st["paren"], 10.0) * 0.7
if CONTINUED_RE.search(text[:240]):
score += 8.0
# special: if a page has strong signature terms + years, it's often a continuation
if hits >= 2 and st["year_count"] >= 1:
score += 6.0
return score
def _any_other_statement_title(text: str, stmt: str) -> bool:
for other in TARGETS:
if other == stmt:
continue
if detect_title(text[:1200], other):
return True
return False
def _expand_block(all_texts: Sequence[str], stmt: str, start: int, max_forward: int = 6) -> int:
"""
Expand forward to include continuation pages.
Stops if another statement begins (unless this stmt title repeats).
"""
end = start
n = len(all_texts)
for j in range(start + 1, min(n, start + 1 + max_forward)):
txt = all_texts[j] or ""
if _any_other_statement_title(txt, stmt) and not detect_title(txt[:1200], stmt):
break
sig = _statement_signal_no_title(txt, stmt)
if sig >= 13.5:
end = j
continue
if CONTINUED_RE.search(txt[:240]) and sig >= 8.0:
end = j
continue
break
return end
def _blocks_overlap(a: Tuple[int, int], b: Tuple[int, int]) -> bool:
return not (a[1] < b[0] or b[1] < a[0])
def _dedup_blocks(blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Deduplicate overlapping blocks, keeping higher 'score'.
"""
blocks = sorted(blocks, key=lambda x: (int(x.get("start", 10**9)), -(float(x.get("score") or 0.0))))
kept: List[Dict[str, Any]] = []
for b in blocks:
r = (int(b.get("start")), int(b.get("end")))
merged = False
for k in kept:
kr = (int(k.get("start")), int(k.get("end")))
if _blocks_overlap(r, kr):
if float(b.get("score") or 0.0) > float(k.get("score") or 0.0):
k.update(b)
merged = True
break
if not merged:
kept.append(b)
return kept
def build_blocks_from_titles(all_texts: Sequence[str], continuation_max_forward: int = 6) -> Dict[str, List[Dict[str, Any]]]:
"""
Finds MULTIPLE blocks per statement (consolidated + standalone).
Strategy:
- find title pages for stmt
- cluster nearby title hits of same scope
- expand each start forward with continuation scoring
"""
out: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS}
for stmt in TARGETS:
title_hits: List[Tuple[int, float, str, Optional[str]]] = []
for i, txt in enumerate(all_texts):
ok, variant, scope = detect_title_match((txt or "")[:1200], stmt)
if not ok:
continue
sc, _why = score_statement_page(txt or "", stmt)
if sc < 30.0:
continue
title_hits.append((i, float(sc), scope, variant))
if not title_hits:
continue
title_hits.sort(key=lambda x: x[0])
clusters: List[List[Tuple[int, float, str, Optional[str]]]] = []
for hit in title_hits:
if not clusters:
clusters.append([hit])
continue
last = clusters[-1][-1]
# group if same scope and close
if hit[2] == last[2] and hit[0] <= last[0] + 3:
clusters[-1].append(hit)
else:
clusters.append([hit])
blocks: List[Dict[str, Any]] = []
for cl in clusters:
start = min(h[0] for h in cl)
best = max(cl, key=lambda x: x[1])
best_score = best[1]
scope = best[2]
title = best[3]
end = _expand_block(all_texts, stmt, start, max_forward=continuation_max_forward)
blocks.append(
{
"start": int(start),
"end": int(end),
"scope": scope,
"title": title,
"score": float(best_score),
}
)
out[stmt] = _dedup_blocks(blocks)
return out
# =========================
# Main builder
# =========================
def build_candidate_lists(
pages: Sequence[Any],
page_count: int,
topk_per_statement: int = 3,
continuation_max_forward: int = 6,
debug: bool = True,
) -> Tuple[Dict[str, List[Tuple[int, float]]], Dict[str, Any]]:
"""
Returns:
candidates: {stmt: [(page_idx, score), ...]}
debug_info: includes heuristic_blocks_0_based per stmt (list of blocks)
"""
all_texts = [_combined_text(p) for p in pages]
debug_info: Dict[str, Any] = {
"item8_toc_page": None,
"toc_internal": {},
"internal_to_pdf_map_size": 0,
"heuristic_blocks_0_based": {k: [] for k in TARGETS},
"top_scoring": {k: [] for k in TARGETS},
}
# 1) Title-based multi-blocks (works for many non-10K PDFs too)
title_blocks = build_blocks_from_titles(all_texts, continuation_max_forward=continuation_max_forward)
# 2) Try 10-K Item8 TOC mapping (optional; mostly US 10-Ks)
toc_blocks: Dict[str, List[Dict[str, Any]]] = {k: [] for k in TARGETS}
toc_i = find_item8_toc_page(all_texts)
if toc_i is not None:
debug_info["item8_toc_page"] = toc_i
toc_text = all_texts[toc_i] or ""
toc_internal = parse_statement_index_numbers(toc_text)
debug_info["toc_internal"] = toc_internal
internal_to_pdf = build_internal_to_pdf_map(all_texts)
debug_info["internal_to_pdf_map_size"] = len(internal_to_pdf)
# convert internal -> pdf
for key_scoped, internal_page in toc_internal.items():
if "__" not in key_scoped:
continue
stmt, scope = key_scoped.split("__", 1)
if stmt not in TARGETS:
continue
start_pdf = map_internal_to_pdf(internal_page, internal_to_pdf)
if start_pdf is None:
continue
# expand a block from TOC-derived start
end_pdf = _expand_block(all_texts, stmt, start_pdf, max_forward=continuation_max_forward)
toc_blocks[stmt].append(
{
"start": int(start_pdf),
"end": int(end_pdf),
"scope": scope if scope in {"consolidated", "standalone"} else "unknown",
"title": None,
"score": 55.0, # heuristic
}
)
for stmt in TARGETS:
toc_blocks[stmt] = _dedup_blocks(toc_blocks[stmt])
# merge blocks
merged_blocks: Dict[str, List[Dict[str, Any]]] = {}
for stmt in TARGETS:
merged_blocks[stmt] = _dedup_blocks((title_blocks.get(stmt) or []) + (toc_blocks.get(stmt) or []))
# keep only top N blocks by score, but keep distinct scope if possible
bl = sorted(merged_blocks[stmt], key=lambda b: float(b.get("score") or 0.0), reverse=True)
chosen: List[Dict[str, Any]] = []
seen_scope = set()
for b in bl:
scope = (b.get("scope") or "unknown")
if scope in seen_scope and len(bl) > 1:
continue
chosen.append(b)
seen_scope.add(scope)
if len(chosen) >= 4: # internal cap, actual final cap comes from settings in main
break
merged_blocks[stmt] = sorted(chosen, key=lambda b: (int(b["start"]), int(b["end"])))
debug_info["heuristic_blocks_0_based"] = merged_blocks
# 3) Strong per-page scoring candidates (fallback / also helpful for LLM page picking)
candidates: Dict[str, List[Tuple[int, float]]] = {k: [] for k in TARGETS}
reasons_store: Dict[str, Dict[int, Any]] = {k: {} for k in TARGETS}
for i, txt in enumerate(all_texts):
for stmt in TARGETS:
sc, why = score_statement_page(txt or "", stmt)
if sc > 0:
candidates[stmt].append((i, float(sc)))
if debug and (why.get("title") or sc > 80):
reasons_store[stmt][i] = why
for stmt in TARGETS:
candidates[stmt].sort(key=lambda x: x[1], reverse=True)
debug_info["top_scoring"][stmt] = candidates[stmt][: min(len(candidates[stmt]), 10)]
candidates[stmt] = candidates[stmt][:topk_per_statement]
debug_info[f"reasons_{stmt}"] = reasons_store[stmt]
return candidates, debug_info
def select_pages_for_llm(
candidates: Dict[str, List[Tuple[int, float]]],
debug_info: Dict[str, Any],
page_count: int,
max_images: int,
max_blocks_per_statement: int = 2,
) -> List[int]:
"""
Prefer multi-block heuristic pages (include BOTH consolidated + standalone if found).
Else fallback to top candidates + neighbors.
"""
picked: List[int] = []
seen = set()
def add(p: int):
if 0 <= p < page_count and p not in seen and len(picked) < max_images:
seen.add(p)
picked.append(p)
blocks_by_stmt = debug_info.get("heuristic_blocks_0_based") or {}
if isinstance(blocks_by_stmt, dict) and any(blocks_by_stmt.get(k) for k in TARGETS):
for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]:
bl = blocks_by_stmt.get(stmt) or []
if not isinstance(bl, list) or not bl:
continue
# pick top blocks, prefer distinct scopes
bl_sorted = sorted(bl, key=lambda b: float(b.get("score") or 0.0), reverse=True)
chosen: List[Dict[str, Any]] = []
seen_scope = set()
for b in bl_sorted:
scope = (b.get("scope") or "unknown")
if scope in seen_scope and len(bl_sorted) > 1:
continue
chosen.append(b)
seen_scope.add(scope)
if len(chosen) >= max_blocks_per_statement:
break
for b in chosen:
s, e = int(b.get("start")), int(b.get("end"))
for p in range(s, e + 1):
add(p)
add(s - 1)
add(e + 1)
return sorted(picked)
# fallback: use top candidates
for stmt in ["profit_and_loss", "balance_sheet", "cash_flow"]:
for (p, _sc) in candidates.get(stmt, [])[:2]:
add(p)
add(p - 1)
add(p + 1)
return sorted(picked)