""" core/bench_processor.py ─────────────────────────────────────────────────────────────────────────────── Document processor for the Peer Institution Benchmarking module. Responsibilities ──────────────── • Accept Streamlit UploadedFile objects and return text chunks suitable for LLM-based sustainability analysis. • Support all common sustainability report formats: PDF → text via pypdf DOCX → text via python-docx (paragraphs + tables) TXT → decoded directly (no external library needed) CSV → tabular text via pandas XLSX → multi-sheet tabular text via pandas • Apply benchmarking-appropriate chunking (sentence-boundary split, 600-char chunks with 80-char overlap — larger than the RAG default to preserve more context per LLM call). Public API ────────── parse_peer_report(uploaded_file) → list[str] Streamlit UploadedFile → chunked text list. Returns [] on parse failure; surfaces errors via st.error(). extract_report_text(filepath) → str Filepath string/Path → raw plain text (un-chunked). Useful for ad-hoc extraction outside the Streamlit context. chunk_report(text, chunk_size, overlap) → list[str] Split raw text into overlapping sentence-boundary chunks. Design notes ──────────── This module intentionally does NOT import from core.processor to avoid coupling — it only needs the low-level loaders, which it re-implements as thin wrappers. core.processor remains the authoritative source for SPJIMR's own operational data ingestion (extract_spjimr_metrics_raw, extract_waste_series, etc.). """ from __future__ import annotations import logging import os import re import tempfile from pathlib import Path from typing import Union logger = logging.getLogger(__name__) # ── Chunking defaults for benchmarking (larger than RAG default) ────────────── BENCH_CHUNK_SIZE = 600 # chars per chunk BENCH_CHUNK_OVERLAP = 80 # overlap between adjacent chunks BENCH_MAX_CHARS = 120_000 # hard cap per document to prevent MemoryError # ── Accepted file extensions ────────────────────────────────────────────────── SUPPORTED_FORMATS = {".pdf", ".docx", ".txt", ".csv", ".xlsx", ".xls"} # ══════════════════════════════════════════════════════════════════════════════ # Text extraction — one function per format # ══════════════════════════════════════════════════════════════════════════════ def _extract_pdf(filepath: Union[str, Path]) -> str: """Extract text from a PDF using pypdf (page-by-page).""" from pypdf import PdfReader reader = PdfReader(str(filepath)) pages: list[str] = [] for i, page in enumerate(reader.pages): try: txt = page.extract_text() if txt and txt.strip(): pages.append(txt.strip()) except Exception as exc: logger.warning("PDF page %d extraction failed: %s", i, exc) return "\n\n".join(pages) def _extract_docx(filepath: Union[str, Path]) -> str: """Extract text from a DOCX file — paragraphs + table cells.""" from docx import Document doc = Document(str(filepath)) parts: list[str] = [] # Paragraphs for para in doc.paragraphs: t = para.text.strip() if t: parts.append(t) # Tables (each row joined with pipe separator) for table in doc.tables: for row in table.rows: row_text = " | ".join( cell.text.strip() for cell in row.cells if cell.text.strip() ) if row_text: parts.append(row_text) return "\n".join(parts) def _extract_txt(filepath: Union[str, Path]) -> str: """Read a plain-text file, trying UTF-8 then latin-1 fallback.""" path = Path(filepath) try: return path.read_text(encoding="utf-8") except UnicodeDecodeError: return path.read_text(encoding="latin-1", errors="replace") def _extract_csv(filepath: Union[str, Path]) -> str: """Convert a CSV to readable plain text (first 500 rows).""" import pandas as pd try: df = pd.read_csv(filepath, encoding="utf-8", on_bad_lines="skip") except UnicodeDecodeError: df = pd.read_csv(filepath, encoding="latin-1", on_bad_lines="skip") df.dropna(how="all", inplace=True) df = df.head(500) return f"=== {Path(filepath).stem} ===\n{df.to_string(index=False, na_rep='N/A')}" def _extract_xlsx(filepath: Union[str, Path]) -> str: """Convert all sheets of an XLSX to readable plain text (first 500 rows each).""" import pandas as pd xl = pd.ExcelFile(str(filepath), engine="openpyxl") parts: list[str] = [] for sheet in xl.sheet_names: df = xl.parse(sheet).dropna(how="all").head(500) if df.empty: continue df.columns = [str(c).strip() for c in df.columns] parts.append( f"=== {Path(filepath).stem} → {sheet} ===\n" + df.to_string(index=False, na_rep="N/A") ) return "\n\n".join(parts) # ══════════════════════════════════════════════════════════════════════════════ # Chunking # ══════════════════════════════════════════════════════════════════════════════ def chunk_report( text: str, chunk_size: int = BENCH_CHUNK_SIZE, overlap: int = BENCH_CHUNK_OVERLAP, ) -> list[str]: """ Split text into overlapping chunks on sentence boundaries. Algorithm: 1. Split on sentence-ending punctuation (. ! ?) followed by whitespace. 2. Accumulate sentences until the chunk would exceed `chunk_size`. 3. Slide forward by one sentence at a time to create overlap. """ if not text or not text.strip(): return [] # Sentence split — keep the delimiter attached to the preceding sentence sentences = re.split(r"(?<=[.!?])\s+", text.strip()) sentences = [s.strip() for s in sentences if s.strip()] chunks: list[str] = [] start_idx: int = 0 while start_idx < len(sentences): chunk_sents: list[str] = [] char_count = 0 for i in range(start_idx, len(sentences)): s = sentences[i] if char_count + len(s) > chunk_size and chunk_sents: break chunk_sents.append(s) char_count += len(s) + 1 # +1 for space if not chunk_sents: # Single sentence exceeds chunk_size — hard-split it long = sentences[start_idx] for j in range(0, len(long), chunk_size): chunks.append(long[j : j + chunk_size]) start_idx += 1 continue chunks.append(" ".join(chunk_sents)) # Find next start with overlap overlap_chars = 0 next_start = len(chunk_sents) # default: no overlap for back in range(len(chunk_sents) - 1, -1, -1): overlap_chars += len(chunk_sents[back]) if overlap_chars >= overlap: next_start = back break start_idx += max(1, next_start) return chunks # ══════════════════════════════════════════════════════════════════════════════ # Public API # ══════════════════════════════════════════════════════════════════════════════ def extract_report_text(filepath: Union[str, Path]) -> str: """ Extract plain text from a sustainability report file. Supports: PDF, DOCX, TXT, CSV, XLSX/XLS. Applies BENCH_MAX_CHARS hard cap. Raises ValueError for unsupported extensions. Raises exceptions from underlying libraries on parse failure. """ filepath = Path(filepath) ext = filepath.suffix.lower() if ext not in SUPPORTED_FORMATS: raise ValueError( f"Unsupported format '{ext}'. " f"Accepted: {', '.join(sorted(SUPPORTED_FORMATS))}" ) if ext == ".pdf": text = _extract_pdf(filepath) elif ext == ".docx": text = _extract_docx(filepath) elif ext == ".txt": text = _extract_txt(filepath) elif ext == ".csv": text = _extract_csv(filepath) elif ext in (".xlsx", ".xls"):text = _extract_xlsx(filepath) else: text = "" # unreachable, but satisfies type checker # Hard cap if len(text) > BENCH_MAX_CHARS: logger.warning( "Document %s truncated from %d → %d chars.", filepath.name, len(text), BENCH_MAX_CHARS, ) text = text[:BENCH_MAX_CHARS] + "\n\n[... document truncated ...]" return text def parse_peer_report(uploaded_file, institution_name: str = "") -> list[str]: """ Parse a Streamlit UploadedFile containing a peer institution's sustainability report into a list of text chunks ready for LLM analysis. Parameters ---------- uploaded_file : Streamlit UploadedFile institution_name: str — used only in log messages Returns ------- list[str] — chunks (may be empty if extraction yields no text) Side-effects ------------ Calls st.error() when the file cannot be parsed so the UI shows a friendly message. Does NOT raise — always returns a list. """ import streamlit as st label = institution_name or uploaded_file.name suffix = Path(uploaded_file.name).suffix.lower() if suffix not in SUPPORTED_FORMATS: st.error( f"❌ **{label}** — unsupported format '{suffix}'. " f"Please upload one of: {', '.join(sorted(SUPPORTED_FORMATS))}" ) return [] # Write to a temp file so all extractors can use filepath-based APIs try: with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(uploaded_file.read()) tmp_path = tmp.name except Exception as exc: st.error(f"❌ **{label}** — could not write temp file: {exc}") return [] try: text = extract_report_text(tmp_path) except Exception as exc: logger.error("parse_peer_report failed for %s: %s", label, exc) st.error(f"❌ **{label}** — failed to extract text: {exc}") return [] finally: try: os.unlink(tmp_path) except OSError: pass if not text.strip(): st.warning( f"⚠️ **{label}** — no text could be extracted. " "The file may be scanned/image-only or empty." ) return [] chunks = chunk_report(text) logger.info( "parse_peer_report: '%s' → %d chars → %d chunks", label, len(text), len(chunks) ) return chunks