Spaces:
Running
Running
| """ | |
| core/bench_processor.py | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Document processor for the Peer Institution Benchmarking module. | |
| Responsibilities | |
| ββββββββββββββββ | |
| β’ Accept Streamlit UploadedFile objects and return text chunks suitable for | |
| LLM-based sustainability analysis. | |
| β’ Support all common sustainability report formats: | |
| PDF β text via pypdf | |
| DOCX β text via python-docx (paragraphs + tables) | |
| TXT β decoded directly (no external library needed) | |
| CSV β tabular text via pandas | |
| XLSX β multi-sheet tabular text via pandas | |
| β’ Apply benchmarking-appropriate chunking (sentence-boundary split, 600-char | |
| chunks with 80-char overlap β larger than the RAG default to preserve more | |
| context per LLM call). | |
| Public API | |
| ββββββββββ | |
| parse_peer_report(uploaded_file) β list[str] | |
| Streamlit UploadedFile β chunked text list. | |
| Returns [] on parse failure; surfaces errors via st.error(). | |
| extract_report_text(filepath) β str | |
| Filepath string/Path β raw plain text (un-chunked). | |
| Useful for ad-hoc extraction outside the Streamlit context. | |
| chunk_report(text, chunk_size, overlap) β list[str] | |
| Split raw text into overlapping sentence-boundary chunks. | |
| Design notes | |
| ββββββββββββ | |
| This module intentionally does NOT import from core.processor to avoid | |
| coupling β it only needs the low-level loaders, which it re-implements | |
| as thin wrappers. core.processor remains the authoritative source for | |
| SPJIMR's own operational data ingestion (extract_spjimr_metrics_raw, | |
| extract_waste_series, etc.). | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Union | |
| logger = logging.getLogger(__name__) | |
| # ββ Chunking defaults for benchmarking (larger than RAG default) ββββββββββββββ | |
| BENCH_CHUNK_SIZE = 600 # chars per chunk | |
| BENCH_CHUNK_OVERLAP = 80 # overlap between adjacent chunks | |
| BENCH_MAX_CHARS = 120_000 # hard cap per document to prevent MemoryError | |
| # ββ Accepted file extensions ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SUPPORTED_FORMATS = {".pdf", ".docx", ".txt", ".csv", ".xlsx", ".xls"} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Text extraction β one function per format | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_pdf(filepath: Union[str, Path]) -> str: | |
| """Extract text from a PDF using pypdf (page-by-page).""" | |
| from pypdf import PdfReader | |
| reader = PdfReader(str(filepath)) | |
| pages: list[str] = [] | |
| for i, page in enumerate(reader.pages): | |
| try: | |
| txt = page.extract_text() | |
| if txt and txt.strip(): | |
| pages.append(txt.strip()) | |
| except Exception as exc: | |
| logger.warning("PDF page %d extraction failed: %s", i, exc) | |
| return "\n\n".join(pages) | |
| def _extract_docx(filepath: Union[str, Path]) -> str: | |
| """Extract text from a DOCX file β paragraphs + table cells.""" | |
| from docx import Document | |
| doc = Document(str(filepath)) | |
| parts: list[str] = [] | |
| # Paragraphs | |
| for para in doc.paragraphs: | |
| t = para.text.strip() | |
| if t: | |
| parts.append(t) | |
| # Tables (each row joined with pipe separator) | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = " | ".join( | |
| cell.text.strip() for cell in row.cells if cell.text.strip() | |
| ) | |
| if row_text: | |
| parts.append(row_text) | |
| return "\n".join(parts) | |
| def _extract_txt(filepath: Union[str, Path]) -> str: | |
| """Read a plain-text file, trying UTF-8 then latin-1 fallback.""" | |
| path = Path(filepath) | |
| try: | |
| return path.read_text(encoding="utf-8") | |
| except UnicodeDecodeError: | |
| return path.read_text(encoding="latin-1", errors="replace") | |
| def _extract_csv(filepath: Union[str, Path]) -> str: | |
| """Convert a CSV to readable plain text (first 500 rows).""" | |
| import pandas as pd | |
| try: | |
| df = pd.read_csv(filepath, encoding="utf-8", on_bad_lines="skip") | |
| except UnicodeDecodeError: | |
| df = pd.read_csv(filepath, encoding="latin-1", on_bad_lines="skip") | |
| df.dropna(how="all", inplace=True) | |
| df = df.head(500) | |
| return f"=== {Path(filepath).stem} ===\n{df.to_string(index=False, na_rep='N/A')}" | |
| def _extract_xlsx(filepath: Union[str, Path]) -> str: | |
| """Convert all sheets of an XLSX to readable plain text (first 500 rows each).""" | |
| import pandas as pd | |
| xl = pd.ExcelFile(str(filepath), engine="openpyxl") | |
| parts: list[str] = [] | |
| for sheet in xl.sheet_names: | |
| df = xl.parse(sheet).dropna(how="all").head(500) | |
| if df.empty: | |
| continue | |
| df.columns = [str(c).strip() for c in df.columns] | |
| parts.append( | |
| f"=== {Path(filepath).stem} β {sheet} ===\n" | |
| + df.to_string(index=False, na_rep="N/A") | |
| ) | |
| return "\n\n".join(parts) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chunking | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_report( | |
| text: str, | |
| chunk_size: int = BENCH_CHUNK_SIZE, | |
| overlap: int = BENCH_CHUNK_OVERLAP, | |
| ) -> list[str]: | |
| """ | |
| Split text into overlapping chunks on sentence boundaries. | |
| Algorithm: | |
| 1. Split on sentence-ending punctuation (. ! ?) followed by whitespace. | |
| 2. Accumulate sentences until the chunk would exceed `chunk_size`. | |
| 3. Slide forward by one sentence at a time to create overlap. | |
| """ | |
| if not text or not text.strip(): | |
| return [] | |
| # Sentence split β keep the delimiter attached to the preceding sentence | |
| sentences = re.split(r"(?<=[.!?])\s+", text.strip()) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| chunks: list[str] = [] | |
| start_idx: int = 0 | |
| while start_idx < len(sentences): | |
| chunk_sents: list[str] = [] | |
| char_count = 0 | |
| for i in range(start_idx, len(sentences)): | |
| s = sentences[i] | |
| if char_count + len(s) > chunk_size and chunk_sents: | |
| break | |
| chunk_sents.append(s) | |
| char_count += len(s) + 1 # +1 for space | |
| if not chunk_sents: | |
| # Single sentence exceeds chunk_size β hard-split it | |
| long = sentences[start_idx] | |
| for j in range(0, len(long), chunk_size): | |
| chunks.append(long[j : j + chunk_size]) | |
| start_idx += 1 | |
| continue | |
| chunks.append(" ".join(chunk_sents)) | |
| # Find next start with overlap | |
| overlap_chars = 0 | |
| next_start = len(chunk_sents) # default: no overlap | |
| for back in range(len(chunk_sents) - 1, -1, -1): | |
| overlap_chars += len(chunk_sents[back]) | |
| if overlap_chars >= overlap: | |
| next_start = back | |
| break | |
| start_idx += max(1, next_start) | |
| return chunks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Public API | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_report_text(filepath: Union[str, Path]) -> str: | |
| """ | |
| Extract plain text from a sustainability report file. | |
| Supports: PDF, DOCX, TXT, CSV, XLSX/XLS. | |
| Applies BENCH_MAX_CHARS hard cap. | |
| Raises ValueError for unsupported extensions. | |
| Raises exceptions from underlying libraries on parse failure. | |
| """ | |
| filepath = Path(filepath) | |
| ext = filepath.suffix.lower() | |
| if ext not in SUPPORTED_FORMATS: | |
| raise ValueError( | |
| f"Unsupported format '{ext}'. " | |
| f"Accepted: {', '.join(sorted(SUPPORTED_FORMATS))}" | |
| ) | |
| if ext == ".pdf": text = _extract_pdf(filepath) | |
| elif ext == ".docx": text = _extract_docx(filepath) | |
| elif ext == ".txt": text = _extract_txt(filepath) | |
| elif ext == ".csv": text = _extract_csv(filepath) | |
| elif ext in (".xlsx", ".xls"):text = _extract_xlsx(filepath) | |
| else: | |
| text = "" # unreachable, but satisfies type checker | |
| # Hard cap | |
| if len(text) > BENCH_MAX_CHARS: | |
| logger.warning( | |
| "Document %s truncated from %d β %d chars.", | |
| filepath.name, len(text), BENCH_MAX_CHARS, | |
| ) | |
| text = text[:BENCH_MAX_CHARS] + "\n\n[... document truncated ...]" | |
| return text | |
| def parse_peer_report(uploaded_file, institution_name: str = "") -> list[str]: | |
| """ | |
| Parse a Streamlit UploadedFile containing a peer institution's sustainability | |
| report into a list of text chunks ready for LLM analysis. | |
| Parameters | |
| ---------- | |
| uploaded_file : Streamlit UploadedFile | |
| institution_name: str β used only in log messages | |
| Returns | |
| ------- | |
| list[str] β chunks (may be empty if extraction yields no text) | |
| Side-effects | |
| ------------ | |
| Calls st.error() when the file cannot be parsed so the UI shows a | |
| friendly message. Does NOT raise β always returns a list. | |
| """ | |
| import streamlit as st | |
| label = institution_name or uploaded_file.name | |
| suffix = Path(uploaded_file.name).suffix.lower() | |
| if suffix not in SUPPORTED_FORMATS: | |
| st.error( | |
| f"β **{label}** β unsupported format '{suffix}'. " | |
| f"Please upload one of: {', '.join(sorted(SUPPORTED_FORMATS))}" | |
| ) | |
| return [] | |
| # Write to a temp file so all extractors can use filepath-based APIs | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp.write(uploaded_file.read()) | |
| tmp_path = tmp.name | |
| except Exception as exc: | |
| st.error(f"β **{label}** β could not write temp file: {exc}") | |
| return [] | |
| try: | |
| text = extract_report_text(tmp_path) | |
| except Exception as exc: | |
| logger.error("parse_peer_report failed for %s: %s", label, exc) | |
| st.error(f"β **{label}** β failed to extract text: {exc}") | |
| return [] | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except OSError: | |
| pass | |
| if not text.strip(): | |
| st.warning( | |
| f"β οΈ **{label}** β no text could be extracted. " | |
| "The file may be scanned/image-only or empty." | |
| ) | |
| return [] | |
| chunks = chunk_report(text) | |
| logger.info( | |
| "parse_peer_report: '%s' β %d chars β %d chunks", label, len(text), len(chunks) | |
| ) | |
| return chunks | |