WISE_Energy / src /core /bench_processor.py
ahanbose's picture
Upload bench_processor.py
22d8a93 verified
"""
core/bench_processor.py
───────────────────────────────────────────────────────────────────────────────
Document processor for the Peer Institution Benchmarking module.
Responsibilities
────────────────
β€’ Accept Streamlit UploadedFile objects and return text chunks suitable for
LLM-based sustainability analysis.
β€’ Support all common sustainability report formats:
PDF β†’ text via pypdf
DOCX β†’ text via python-docx (paragraphs + tables)
TXT β†’ decoded directly (no external library needed)
CSV β†’ tabular text via pandas
XLSX β†’ multi-sheet tabular text via pandas
β€’ Apply benchmarking-appropriate chunking (sentence-boundary split, 600-char
chunks with 80-char overlap β€” larger than the RAG default to preserve more
context per LLM call).
Public API
──────────
parse_peer_report(uploaded_file) β†’ list[str]
Streamlit UploadedFile β†’ chunked text list.
Returns [] on parse failure; surfaces errors via st.error().
extract_report_text(filepath) β†’ str
Filepath string/Path β†’ raw plain text (un-chunked).
Useful for ad-hoc extraction outside the Streamlit context.
chunk_report(text, chunk_size, overlap) β†’ list[str]
Split raw text into overlapping sentence-boundary chunks.
Design notes
────────────
This module intentionally does NOT import from core.processor to avoid
coupling β€” it only needs the low-level loaders, which it re-implements
as thin wrappers. core.processor remains the authoritative source for
SPJIMR's own operational data ingestion (extract_spjimr_metrics_raw,
extract_waste_series, etc.).
"""
from __future__ import annotations
import logging
import os
import re
import tempfile
from pathlib import Path
from typing import Union
logger = logging.getLogger(__name__)
# ── Chunking defaults for benchmarking (larger than RAG default) ──────────────
BENCH_CHUNK_SIZE = 600 # chars per chunk
BENCH_CHUNK_OVERLAP = 80 # overlap between adjacent chunks
BENCH_MAX_CHARS = 120_000 # hard cap per document to prevent MemoryError
# ── Accepted file extensions ──────────────────────────────────────────────────
SUPPORTED_FORMATS = {".pdf", ".docx", ".txt", ".csv", ".xlsx", ".xls"}
# ══════════════════════════════════════════════════════════════════════════════
# Text extraction β€” one function per format
# ══════════════════════════════════════════════════════════════════════════════
def _extract_pdf(filepath: Union[str, Path]) -> str:
"""Extract text from a PDF using pypdf (page-by-page)."""
from pypdf import PdfReader
reader = PdfReader(str(filepath))
pages: list[str] = []
for i, page in enumerate(reader.pages):
try:
txt = page.extract_text()
if txt and txt.strip():
pages.append(txt.strip())
except Exception as exc:
logger.warning("PDF page %d extraction failed: %s", i, exc)
return "\n\n".join(pages)
def _extract_docx(filepath: Union[str, Path]) -> str:
"""Extract text from a DOCX file β€” paragraphs + table cells."""
from docx import Document
doc = Document(str(filepath))
parts: list[str] = []
# Paragraphs
for para in doc.paragraphs:
t = para.text.strip()
if t:
parts.append(t)
# Tables (each row joined with pipe separator)
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(
cell.text.strip() for cell in row.cells if cell.text.strip()
)
if row_text:
parts.append(row_text)
return "\n".join(parts)
def _extract_txt(filepath: Union[str, Path]) -> str:
"""Read a plain-text file, trying UTF-8 then latin-1 fallback."""
path = Path(filepath)
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return path.read_text(encoding="latin-1", errors="replace")
def _extract_csv(filepath: Union[str, Path]) -> str:
"""Convert a CSV to readable plain text (first 500 rows)."""
import pandas as pd
try:
df = pd.read_csv(filepath, encoding="utf-8", on_bad_lines="skip")
except UnicodeDecodeError:
df = pd.read_csv(filepath, encoding="latin-1", on_bad_lines="skip")
df.dropna(how="all", inplace=True)
df = df.head(500)
return f"=== {Path(filepath).stem} ===\n{df.to_string(index=False, na_rep='N/A')}"
def _extract_xlsx(filepath: Union[str, Path]) -> str:
"""Convert all sheets of an XLSX to readable plain text (first 500 rows each)."""
import pandas as pd
xl = pd.ExcelFile(str(filepath), engine="openpyxl")
parts: list[str] = []
for sheet in xl.sheet_names:
df = xl.parse(sheet).dropna(how="all").head(500)
if df.empty:
continue
df.columns = [str(c).strip() for c in df.columns]
parts.append(
f"=== {Path(filepath).stem} β†’ {sheet} ===\n"
+ df.to_string(index=False, na_rep="N/A")
)
return "\n\n".join(parts)
# ══════════════════════════════════════════════════════════════════════════════
# Chunking
# ══════════════════════════════════════════════════════════════════════════════
def chunk_report(
text: str,
chunk_size: int = BENCH_CHUNK_SIZE,
overlap: int = BENCH_CHUNK_OVERLAP,
) -> list[str]:
"""
Split text into overlapping chunks on sentence boundaries.
Algorithm:
1. Split on sentence-ending punctuation (. ! ?) followed by whitespace.
2. Accumulate sentences until the chunk would exceed `chunk_size`.
3. Slide forward by one sentence at a time to create overlap.
"""
if not text or not text.strip():
return []
# Sentence split β€” keep the delimiter attached to the preceding sentence
sentences = re.split(r"(?<=[.!?])\s+", text.strip())
sentences = [s.strip() for s in sentences if s.strip()]
chunks: list[str] = []
start_idx: int = 0
while start_idx < len(sentences):
chunk_sents: list[str] = []
char_count = 0
for i in range(start_idx, len(sentences)):
s = sentences[i]
if char_count + len(s) > chunk_size and chunk_sents:
break
chunk_sents.append(s)
char_count += len(s) + 1 # +1 for space
if not chunk_sents:
# Single sentence exceeds chunk_size β€” hard-split it
long = sentences[start_idx]
for j in range(0, len(long), chunk_size):
chunks.append(long[j : j + chunk_size])
start_idx += 1
continue
chunks.append(" ".join(chunk_sents))
# Find next start with overlap
overlap_chars = 0
next_start = len(chunk_sents) # default: no overlap
for back in range(len(chunk_sents) - 1, -1, -1):
overlap_chars += len(chunk_sents[back])
if overlap_chars >= overlap:
next_start = back
break
start_idx += max(1, next_start)
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# Public API
# ══════════════════════════════════════════════════════════════════════════════
def extract_report_text(filepath: Union[str, Path]) -> str:
"""
Extract plain text from a sustainability report file.
Supports: PDF, DOCX, TXT, CSV, XLSX/XLS.
Applies BENCH_MAX_CHARS hard cap.
Raises ValueError for unsupported extensions.
Raises exceptions from underlying libraries on parse failure.
"""
filepath = Path(filepath)
ext = filepath.suffix.lower()
if ext not in SUPPORTED_FORMATS:
raise ValueError(
f"Unsupported format '{ext}'. "
f"Accepted: {', '.join(sorted(SUPPORTED_FORMATS))}"
)
if ext == ".pdf": text = _extract_pdf(filepath)
elif ext == ".docx": text = _extract_docx(filepath)
elif ext == ".txt": text = _extract_txt(filepath)
elif ext == ".csv": text = _extract_csv(filepath)
elif ext in (".xlsx", ".xls"):text = _extract_xlsx(filepath)
else:
text = "" # unreachable, but satisfies type checker
# Hard cap
if len(text) > BENCH_MAX_CHARS:
logger.warning(
"Document %s truncated from %d β†’ %d chars.",
filepath.name, len(text), BENCH_MAX_CHARS,
)
text = text[:BENCH_MAX_CHARS] + "\n\n[... document truncated ...]"
return text
def parse_peer_report(uploaded_file, institution_name: str = "") -> list[str]:
"""
Parse a Streamlit UploadedFile containing a peer institution's sustainability
report into a list of text chunks ready for LLM analysis.
Parameters
----------
uploaded_file : Streamlit UploadedFile
institution_name: str β€” used only in log messages
Returns
-------
list[str] β€” chunks (may be empty if extraction yields no text)
Side-effects
------------
Calls st.error() when the file cannot be parsed so the UI shows a
friendly message. Does NOT raise β€” always returns a list.
"""
import streamlit as st
label = institution_name or uploaded_file.name
suffix = Path(uploaded_file.name).suffix.lower()
if suffix not in SUPPORTED_FORMATS:
st.error(
f"❌ **{label}** β€” unsupported format '{suffix}'. "
f"Please upload one of: {', '.join(sorted(SUPPORTED_FORMATS))}"
)
return []
# Write to a temp file so all extractors can use filepath-based APIs
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(uploaded_file.read())
tmp_path = tmp.name
except Exception as exc:
st.error(f"❌ **{label}** β€” could not write temp file: {exc}")
return []
try:
text = extract_report_text(tmp_path)
except Exception as exc:
logger.error("parse_peer_report failed for %s: %s", label, exc)
st.error(f"❌ **{label}** β€” failed to extract text: {exc}")
return []
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
if not text.strip():
st.warning(
f"⚠️ **{label}** β€” no text could be extracted. "
"The file may be scanned/image-only or empty."
)
return []
chunks = chunk_report(text)
logger.info(
"parse_peer_report: '%s' β†’ %d chars β†’ %d chunks", label, len(text), len(chunks)
)
return chunks