WISE_Energy / src /core /processor.py
ahanbose's picture
Upload 14 files
339726b verified
"""
core/processor.py
-----------------
Handles ingestion and cleaning of all supported file formats:
- CSV / XLSX β†’ Pandas DataFrames (with smart metadata-header skipping)
- PDF β†’ plain text via pypdf
- DOCX β†’ plain text via python-docx
Returns a unified string corpus ready for FAISS indexing.
"""
import io
import os
import re
import logging
from pathlib import Path
from typing import Optional, Union
import pandas as pd
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────
def _is_metadata_row(row: pd.Series) -> bool:
"""Return True when a row looks like a title / metadata line rather than data."""
non_null = row.dropna()
if len(non_null) == 0:
return True
# Typical metadata: only one non-null cell AND it contains letters
if len(non_null) == 1 and re.search(r"[A-Za-z]", str(non_null.iloc[0])):
return True
return False
def _find_real_header(df: pd.DataFrame, max_scan: int = 10) -> int:
"""
Scan the first `max_scan` rows to find the first row that looks like
a proper tabular header (β‰₯ 2 non-null values with short string content).
Returns the row index of the real header.
"""
for i, (_, row) in enumerate(df.head(max_scan).iterrows()):
non_null = row.dropna()
if len(non_null) >= 2:
# Check values look like header labels (not numeric data rows)
string_count = sum(1 for v in non_null if re.search(r"[A-Za-z]", str(v)))
if string_count >= 2:
return i
return 0
MAX_TEXT_CHARS = 80_000 # hard cap per file to prevent MemoryError
MAX_ROWS_PER_SHEET = 500 # only embed first N rows of very large sheets
def _df_to_text(df: pd.DataFrame, label: str = "", max_rows: int = MAX_ROWS_PER_SHEET) -> str:
"""
Convert a cleaned DataFrame to a plain-text representation.
Caps rows to avoid generating enormous strings from large Excel files.
"""
lines = []
if label:
lines.append(f"=== {label} ===")
if len(df) > max_rows:
logger.warning(
"'%s' has %d rows β€” truncating to first %d for embedding.",
label, len(df), max_rows,
)
lines.append(f"[Note: Showing first {max_rows} of {len(df)} rows]")
df = df.head(max_rows)
lines.append(df.to_string(index=False, na_rep="N/A"))
return "\n".join(lines)
def _truncate_text(text: str, max_chars: int = MAX_TEXT_CHARS) -> str:
"""Hard-cap total text length to prevent MemoryError during chunking."""
if len(text) <= max_chars:
return text
logger.warning(
"Text truncated from %d β†’ %d chars to prevent MemoryError.", len(text), max_chars
)
return text[:max_chars] + "\n\n[... document truncated for memory safety ...]"
# ──────────────────────────────────────────────
# Format-specific loaders
# ──────────────────────────────────────────────
def load_csv(filepath: Union[str, Path]) -> tuple[pd.DataFrame, str]:
"""Load a CSV file, auto-detecting and skipping metadata header rows."""
try:
# First pass: read without assumptions
raw = pd.read_csv(filepath, header=None, encoding="utf-8", on_bad_lines="skip")
header_row = _find_real_header(raw)
df = pd.read_csv(filepath, skiprows=header_row, encoding="utf-8", on_bad_lines="skip")
df.dropna(how="all", inplace=True)
df.columns = [str(c).strip() for c in df.columns]
text = _df_to_text(df, label=Path(filepath).stem)
return df, text
except Exception as exc:
logger.error("CSV load failed for %s: %s", filepath, exc)
raise
def load_xlsx(filepath: Union[str, Path]) -> tuple[dict[str, pd.DataFrame], str]:
"""
Load all sheets from an XLSX workbook.
Applies smart header-detection per sheet to handle embedded metadata rows
(common in SPJIMR-style waste summary exports).
"""
try:
xl = pd.ExcelFile(filepath, engine="openpyxl")
sheets: dict[str, pd.DataFrame] = {}
text_parts: list[str] = []
for sheet_name in xl.sheet_names:
raw = xl.parse(sheet_name, header=None)
if raw.empty:
continue
header_row = _find_real_header(raw)
df = xl.parse(sheet_name, skiprows=header_row)
df.dropna(how="all", inplace=True)
df.columns = [str(c).strip() for c in df.columns]
sheets[sheet_name] = df
text_parts.append(_df_to_text(df, label=f"{Path(filepath).stem} β†’ {sheet_name}"))
combined_text = "\n\n".join(text_parts)
return sheets, combined_text
except Exception as exc:
logger.error("XLSX load failed for %s: %s", filepath, exc)
raise
def load_pdf(filepath: Union[str, Path]) -> str:
"""Extract plain text from a PDF using pypdf."""
try:
from pypdf import PdfReader
reader = PdfReader(str(filepath))
pages = []
for page in reader.pages:
txt = page.extract_text()
if txt:
pages.append(txt.strip())
return "\n\n".join(pages)
except Exception as exc:
logger.error("PDF load failed for %s: %s", filepath, exc)
raise
def load_docx(filepath: Union[str, Path]) -> str:
"""Extract plain text from a DOCX file using python-docx."""
try:
from docx import Document
doc = Document(str(filepath))
paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
# Also capture table cells
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
if row_text:
paragraphs.append(row_text)
return "\n".join(paragraphs)
except Exception as exc:
logger.error("DOCX load failed for %s: %s", filepath, exc)
raise
# ──────────────────────────────────────────────
# Unified entry point
# ──────────────────────────────────────────────
class DocumentProcessor:
"""
Unified document processor that routes files to the correct loader
and returns (metadata_dict, plain_text_corpus).
"""
SUPPORTED_EXTENSIONS = {".csv", ".xlsx", ".xls", ".pdf", ".docx"}
def __init__(self, upload_dir: str = "data/uploads"):
self.upload_dir = Path(upload_dir)
self.upload_dir.mkdir(parents=True, exist_ok=True)
def save_uploaded_file(self, uploaded_file) -> Path:
"""Persist a Streamlit UploadedFile to disk and return its path."""
dest = self.upload_dir / uploaded_file.name
with open(dest, "wb") as f:
f.write(uploaded_file.getbuffer())
logger.info("Saved uploaded file β†’ %s", dest)
return dest
def process(self, filepath: Union[str, Path], manual_context: str = "") -> dict:
"""
Main processing pipeline.
Returns:
{
"filepath": str,
"extension": str,
"dataframes": dict | None, # for tabular files
"text": str, # plain-text corpus
"manual_context": str,
}
"""
filepath = Path(filepath)
ext = filepath.suffix.lower()
if ext not in self.SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported file type: {ext}. Supported: {self.SUPPORTED_EXTENSIONS}")
result = {
"filepath": str(filepath),
"extension": ext,
"dataframes": None,
"text": "",
"manual_context": manual_context.strip(),
}
if ext == ".csv":
df, text = load_csv(filepath)
result["dataframes"] = {"Sheet1": df}
result["text"] = text
elif ext in (".xlsx", ".xls"):
sheets, text = load_xlsx(filepath)
result["dataframes"] = sheets
result["text"] = text
elif ext == ".pdf":
result["text"] = load_pdf(filepath)
elif ext == ".docx":
result["text"] = load_docx(filepath)
# Append manual context so it's embedded alongside the document
if result["manual_context"]:
result["text"] += f"\n\n[ANALYST CONTEXT]\n{result['manual_context']}"
# Hard cap to prevent MemoryError on large files
result["text"] = _truncate_text(result["text"])
return result
# ──────────────────────────────────────────────
# SPJIMR Multi-section format parser
# ──────────────────────────────────────────────
def extract_spjimr_metrics(sheets: dict) -> dict:
"""
Parse the SPJIMR Environmental Metrics format β€” a stacked multi-section workbook
where Energy, Water, and Waste data share the same sheet with repeating month headers.
Structure:
Col 0: section number (1, 2, 3 …)
Col 1: section title
Col 2: row label ("Particular" header or data category name)
Cols 3–14: monthly values (Apr'25 … Mar'26)
Col 15: Total
Col 16: %
Returns dict with keys:
"months" β€” list of period strings
"energy" β€” DataFrame [period, solar_kwh, adani_kwh, nonrenewable_kwh, total_kwh, renewable_pct]
"water" β€” DataFrame [period, municipal_kl, tanker_kl, rainwater_kl, total_kl]
"waste" β€” DataFrame [period, recovered_kg, disposed_kg, total_kg, recovered_pct]
"energy_series" β€” tidy DataFrame matching extract_energy_series() output format
"waste_series" β€” tidy DataFrame matching extract_waste_series() output format
"""
result = {}
for sheet_name, raw_df in sheets.items():
# Load completely raw β€” no header guessing
try:
raw = raw_df # already loaded by load_xlsx with header detection
except Exception:
continue
# Re-read the sheet raw (header=None) to get the real structure
continue # handled below via the filepath approach
return result
def extract_spjimr_metrics_raw(filepath) -> dict:
"""
Parse the SPJIMR multi-section Excel file directly from filepath (raw, no header).
This is the primary entry point β€” call this after saving the uploaded file.
"""
import numpy as np
try:
xl = pd.ExcelFile(str(filepath), engine="openpyxl")
except Exception as exc:
logger.error("Cannot open %s: %s", filepath, exc)
return {}
all_results = {}
for sheet_name in xl.sheet_names:
raw = xl.parse(sheet_name, header=None)
if raw.shape[0] < 3 or raw.shape[1] < 5:
continue
# ── Find month header rows (rows where col 2 == "Particular") ──────────
# These mark the start of each section
particular_rows = [
i for i, row in raw.iterrows()
if str(row.iloc[2]).strip().lower() == "particular"
]
if not particular_rows:
logger.info("No SPJIMR-format header rows found in sheet '%s'", sheet_name)
continue
logger.info("SPJIMR format detected in '%s' β€” %d sections", sheet_name, len(particular_rows))
# Extract month labels from first header row
header_row = raw.iloc[particular_rows[0]]
# Months are in columns 3 onward until Total/% columns
month_cols = []
months = []
for col_idx in range(3, len(header_row)):
val = str(header_row.iloc[col_idx]).strip()
if val in ("nan", "", "Total", "%", "NaN"):
if months: # stop at first non-month after months started
break
continue
months.append(val)
month_cols.append(col_idx)
if not months:
continue
def _row_values(row_idx: int) -> pd.Series:
"""Extract monthly numeric values from a data row."""
row = raw.iloc[row_idx]
vals = []
for ci in month_cols:
v = row.iloc[ci]
try:
vals.append(float(v))
except (ValueError, TypeError):
vals.append(np.nan)
return pd.Series(vals, index=months)
def _find_row(keyword: str, col: int = 2) -> Optional[int]:
"""Find the first row where column `col` contains `keyword` (case-insensitive)."""
kw = keyword.lower()
for i, row in raw.iterrows():
cell = str(row.iloc[col]).strip().lower()
if kw in cell:
return i
return None
# ── Section 1: Energy ──────────────────────────────────────────────────
r_solar = _find_row("solar")
r_adani = _find_row("adani")
r_nonrenew = _find_row("non-renewable")
r_total_energy = _find_row("total energy consumed")
r_energy_mix = _find_row("energy mix")
if r_total_energy is not None:
energy_data = {"period": months}
if r_solar: energy_data["solar_kwh"] = _row_values(r_solar).values
if r_adani: energy_data["adani_kwh"] = _row_values(r_adani).values
if r_nonrenew: energy_data["nonrenewable_kwh"] = _row_values(r_nonrenew).values
energy_data["total_kwh"] = _row_values(r_total_energy).values
edf = pd.DataFrame(energy_data)
# Compute renewable % from mix row or calculate
if r_energy_mix is not None:
mix_vals = _row_values(r_energy_mix)
# Values may be decimals (0.98) or % (98) β€” normalise
mix_numeric = pd.to_numeric(mix_vals, errors="coerce")
if mix_numeric.dropna().max() <= 1.0:
mix_numeric = mix_numeric * 100
edf["renewable_pct"] = mix_numeric.values
else:
# Calculate from solar + adani vs total
renew_cols = [c for c in ["solar_kwh", "adani_kwh"] if c in edf.columns]
if renew_cols and "total_kwh" in edf.columns:
edf["renewable_pct"] = (
edf[renew_cols].sum(axis=1) / edf["total_kwh"].replace(0, np.nan) * 100
).round(1)
edf = edf[edf["total_kwh"].notna() & (edf["total_kwh"] > 0)].copy()
all_results["energy"] = edf
# Also expose as standard energy_series format
all_results["energy_series"] = edf[["period", "renewable_pct"]].dropna().copy()
logger.info("Extracted energy series: %d months", len(edf))
# ── Section 2: Water ──────────────────────────────────────────────────
r_municipal = _find_row("municipal")
r_tanker = _find_row("tanker")
r_rainwater = _find_row("rainwater")
r_total_water = _find_row("total water consumed")
if r_total_water is not None:
water_data = {"period": months}
if r_municipal: water_data["municipal_kl"] = _row_values(r_municipal).values
if r_tanker: water_data["tanker_kl"] = _row_values(r_tanker).values
if r_rainwater: water_data["rainwater_kl"] = _row_values(r_rainwater).values
water_data["total_kl"] = _row_values(r_total_water).values
wdf = pd.DataFrame(water_data)
wdf = wdf[wdf["total_kl"].notna() & (wdf["total_kl"] > 0)].copy()
all_results["water"] = wdf
logger.info("Extracted water series: %d months", len(wdf))
# ── Section 3: Waste ──────────────────────────────────────────────────
r_recovered = _find_row("waste recovered")
r_disposed = _find_row("waste disposed")
r_total_waste = _find_row("total waste")
r_pct_recov = _find_row("% waste recovered")
if r_total_waste is not None:
waste_data = {"period": months}
if r_recovered: waste_data["recovered_kg"] = _row_values(r_recovered).values
if r_disposed: waste_data["disposed_kg"] = _row_values(r_disposed).values
waste_data["total_kg"] = _row_values(r_total_waste).values
if r_pct_recov:
pct_vals = pd.to_numeric(pd.Series(_row_values(r_pct_recov).values), errors="coerce")
if pct_vals.dropna().max() <= 1.0:
pct_vals = pct_vals * 100
waste_data["recovered_pct"] = pct_vals.values
wstdf = pd.DataFrame(waste_data)
wstdf = wstdf[wstdf["total_kg"].notna() & (wstdf["total_kg"] > 0)].copy()
all_results["waste"] = wstdf
# Expose as standard waste_series (recovered = "wet", disposed = "dry" for chart compat)
std_waste = wstdf[["period"]].copy()
if "recovered_kg" in wstdf.columns: std_waste["wet_kg"] = wstdf["recovered_kg"]
if "disposed_kg" in wstdf.columns: std_waste["dry_kg"] = wstdf["disposed_kg"]
all_results["waste_series"] = std_waste
logger.info("Extracted waste series: %d months", len(wstdf))
all_results["months"] = months
return all_results
# ──────────────────────────────────────────────
# Original series extractors (kept for non-SPJIMR files)
# ──────────────────────────────────────────────
# ──────────────────────────────────────────────
# Chart-ready data extractor (generic fallback)
# ──────────────────────────────────────────────
def extract_waste_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]:
"""
Try to extract Wet / Dry waste time-series from any loaded sheet.
Looks for columns whose names contain 'wet', 'dry', 'date', 'month', 'week', 'year'.
Returns a tidy DataFrame with columns: [period, wet_kg, dry_kg] or None.
"""
for sheet_name, df in sheets.items():
lower_cols = {c.lower(): c for c in df.columns}
date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "week", "year", "period"])), None)
wet_col = next((lower_cols[k] for k in lower_cols if "wet" in k), None)
dry_col = next((lower_cols[k] for k in lower_cols if "dry" in k), None)
if date_col and (wet_col or dry_col):
tidy = pd.DataFrame()
tidy["period"] = df[date_col].astype(str)
if wet_col:
tidy["wet_kg"] = pd.to_numeric(df[wet_col], errors="coerce")
if dry_col:
tidy["dry_kg"] = pd.to_numeric(df[dry_col], errors="coerce")
tidy.dropna(subset=["period"], inplace=True)
logger.info("Extracted waste series from sheet '%s'", sheet_name)
return tidy
return None
def extract_energy_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]:
"""
Extracts renewable vs total energy figures from sheets.
Looks for columns containing 'renewable', 'solar', 'total', 'energy', 'kwh'.
Returns tidy DataFrame with [period, renewable_pct] or None.
"""
for _, df in sheets.items():
lower_cols = {c.lower(): c for c in df.columns}
date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "year", "period"])), None)
renewable_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["renewable", "solar", "green"])), None)
total_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["total", "kwh", "energy"])), None)
if date_col and renewable_col and total_col:
tidy = pd.DataFrame()
tidy["period"] = df[date_col].astype(str)
tidy["renewable"] = pd.to_numeric(df[renewable_col], errors="coerce")
tidy["total"] = pd.to_numeric(df[total_col], errors="coerce")
tidy["renewable_pct"] = (tidy["renewable"] / tidy["total"] * 100).round(1)
tidy.dropna(inplace=True)
return tidy
return None