""" core/processor.py ----------------- Handles ingestion and cleaning of all supported file formats: - CSV / XLSX → Pandas DataFrames (with smart metadata-header skipping) - PDF → plain text via pypdf - DOCX → plain text via python-docx Returns a unified string corpus ready for FAISS indexing. """ import io import os import re import logging from pathlib import Path from typing import Optional, Union import pandas as pd logger = logging.getLogger(__name__) # ────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────── def _is_metadata_row(row: pd.Series) -> bool: """Return True when a row looks like a title / metadata line rather than data.""" non_null = row.dropna() if len(non_null) == 0: return True # Typical metadata: only one non-null cell AND it contains letters if len(non_null) == 1 and re.search(r"[A-Za-z]", str(non_null.iloc[0])): return True return False def _find_real_header(df: pd.DataFrame, max_scan: int = 10) -> int: """ Scan the first `max_scan` rows to find the first row that looks like a proper tabular header (≥ 2 non-null values with short string content). Returns the row index of the real header. """ for i, (_, row) in enumerate(df.head(max_scan).iterrows()): non_null = row.dropna() if len(non_null) >= 2: # Check values look like header labels (not numeric data rows) string_count = sum(1 for v in non_null if re.search(r"[A-Za-z]", str(v))) if string_count >= 2: return i return 0 MAX_TEXT_CHARS = 80_000 # hard cap per file to prevent MemoryError MAX_ROWS_PER_SHEET = 500 # only embed first N rows of very large sheets def _df_to_text(df: pd.DataFrame, label: str = "", max_rows: int = MAX_ROWS_PER_SHEET) -> str: """ Convert a cleaned DataFrame to a plain-text representation. Caps rows to avoid generating enormous strings from large Excel files. """ lines = [] if label: lines.append(f"=== {label} ===") if len(df) > max_rows: logger.warning( "'%s' has %d rows — truncating to first %d for embedding.", label, len(df), max_rows, ) lines.append(f"[Note: Showing first {max_rows} of {len(df)} rows]") df = df.head(max_rows) lines.append(df.to_string(index=False, na_rep="N/A")) return "\n".join(lines) def _truncate_text(text: str, max_chars: int = MAX_TEXT_CHARS) -> str: """Hard-cap total text length to prevent MemoryError during chunking.""" if len(text) <= max_chars: return text logger.warning( "Text truncated from %d → %d chars to prevent MemoryError.", len(text), max_chars ) return text[:max_chars] + "\n\n[... document truncated for memory safety ...]" # ────────────────────────────────────────────── # Format-specific loaders # ────────────────────────────────────────────── def load_csv(filepath: Union[str, Path]) -> tuple[pd.DataFrame, str]: """Load a CSV file, auto-detecting and skipping metadata header rows.""" try: # First pass: read without assumptions raw = pd.read_csv(filepath, header=None, encoding="utf-8", on_bad_lines="skip") header_row = _find_real_header(raw) df = pd.read_csv(filepath, skiprows=header_row, encoding="utf-8", on_bad_lines="skip") df.dropna(how="all", inplace=True) df.columns = [str(c).strip() for c in df.columns] text = _df_to_text(df, label=Path(filepath).stem) return df, text except Exception as exc: logger.error("CSV load failed for %s: %s", filepath, exc) raise def load_xlsx(filepath: Union[str, Path]) -> tuple[dict[str, pd.DataFrame], str]: """ Load all sheets from an XLSX workbook. Applies smart header-detection per sheet to handle embedded metadata rows (common in SPJIMR-style waste summary exports). """ try: xl = pd.ExcelFile(filepath, engine="openpyxl") sheets: dict[str, pd.DataFrame] = {} text_parts: list[str] = [] for sheet_name in xl.sheet_names: raw = xl.parse(sheet_name, header=None) if raw.empty: continue header_row = _find_real_header(raw) df = xl.parse(sheet_name, skiprows=header_row) df.dropna(how="all", inplace=True) df.columns = [str(c).strip() for c in df.columns] sheets[sheet_name] = df text_parts.append(_df_to_text(df, label=f"{Path(filepath).stem} → {sheet_name}")) combined_text = "\n\n".join(text_parts) return sheets, combined_text except Exception as exc: logger.error("XLSX load failed for %s: %s", filepath, exc) raise def load_pdf(filepath: Union[str, Path]) -> str: """Extract plain text from a PDF using pypdf.""" try: from pypdf import PdfReader reader = PdfReader(str(filepath)) pages = [] for page in reader.pages: txt = page.extract_text() if txt: pages.append(txt.strip()) return "\n\n".join(pages) except Exception as exc: logger.error("PDF load failed for %s: %s", filepath, exc) raise def load_docx(filepath: Union[str, Path]) -> str: """Extract plain text from a DOCX file using python-docx.""" try: from docx import Document doc = Document(str(filepath)) paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] # Also capture table cells for table in doc.tables: for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip()) if row_text: paragraphs.append(row_text) return "\n".join(paragraphs) except Exception as exc: logger.error("DOCX load failed for %s: %s", filepath, exc) raise # ────────────────────────────────────────────── # Unified entry point # ────────────────────────────────────────────── class DocumentProcessor: """ Unified document processor that routes files to the correct loader and returns (metadata_dict, plain_text_corpus). """ SUPPORTED_EXTENSIONS = {".csv", ".xlsx", ".xls", ".pdf", ".docx"} def __init__(self, upload_dir: str = "data/uploads"): self.upload_dir = Path(upload_dir) self.upload_dir.mkdir(parents=True, exist_ok=True) def save_uploaded_file(self, uploaded_file) -> Path: """Persist a Streamlit UploadedFile to disk and return its path.""" dest = self.upload_dir / uploaded_file.name with open(dest, "wb") as f: f.write(uploaded_file.getbuffer()) logger.info("Saved uploaded file → %s", dest) return dest def process(self, filepath: Union[str, Path], manual_context: str = "") -> dict: """ Main processing pipeline. Returns: { "filepath": str, "extension": str, "dataframes": dict | None, # for tabular files "text": str, # plain-text corpus "manual_context": str, } """ filepath = Path(filepath) ext = filepath.suffix.lower() if ext not in self.SUPPORTED_EXTENSIONS: raise ValueError(f"Unsupported file type: {ext}. Supported: {self.SUPPORTED_EXTENSIONS}") result = { "filepath": str(filepath), "extension": ext, "dataframes": None, "text": "", "manual_context": manual_context.strip(), } if ext == ".csv": df, text = load_csv(filepath) result["dataframes"] = {"Sheet1": df} result["text"] = text elif ext in (".xlsx", ".xls"): sheets, text = load_xlsx(filepath) result["dataframes"] = sheets result["text"] = text elif ext == ".pdf": result["text"] = load_pdf(filepath) elif ext == ".docx": result["text"] = load_docx(filepath) # Append manual context so it's embedded alongside the document if result["manual_context"]: result["text"] += f"\n\n[ANALYST CONTEXT]\n{result['manual_context']}" # Hard cap to prevent MemoryError on large files result["text"] = _truncate_text(result["text"]) return result # ────────────────────────────────────────────── # SPJIMR Multi-section format parser # ────────────────────────────────────────────── def extract_spjimr_metrics(sheets: dict) -> dict: """ Parse the SPJIMR Environmental Metrics format — a stacked multi-section workbook where Energy, Water, and Waste data share the same sheet with repeating month headers. Structure: Col 0: section number (1, 2, 3 …) Col 1: section title Col 2: row label ("Particular" header or data category name) Cols 3–14: monthly values (Apr'25 … Mar'26) Col 15: Total Col 16: % Returns dict with keys: "months" — list of period strings "energy" — DataFrame [period, solar_kwh, adani_kwh, nonrenewable_kwh, total_kwh, renewable_pct] "water" — DataFrame [period, municipal_kl, tanker_kl, rainwater_kl, total_kl] "waste" — DataFrame [period, recovered_kg, disposed_kg, total_kg, recovered_pct] "energy_series" — tidy DataFrame matching extract_energy_series() output format "waste_series" — tidy DataFrame matching extract_waste_series() output format """ result = {} for sheet_name, raw_df in sheets.items(): # Load completely raw — no header guessing try: raw = raw_df # already loaded by load_xlsx with header detection except Exception: continue # Re-read the sheet raw (header=None) to get the real structure continue # handled below via the filepath approach return result def extract_spjimr_metrics_raw(filepath) -> dict: """ Parse the SPJIMR multi-section Excel file directly from filepath (raw, no header). This is the primary entry point — call this after saving the uploaded file. """ import numpy as np try: xl = pd.ExcelFile(str(filepath), engine="openpyxl") except Exception as exc: logger.error("Cannot open %s: %s", filepath, exc) return {} all_results = {} for sheet_name in xl.sheet_names: raw = xl.parse(sheet_name, header=None) if raw.shape[0] < 3 or raw.shape[1] < 5: continue # ── Find month header rows (rows where col 2 == "Particular") ────────── # These mark the start of each section particular_rows = [ i for i, row in raw.iterrows() if str(row.iloc[2]).strip().lower() == "particular" ] if not particular_rows: logger.info("No SPJIMR-format header rows found in sheet '%s'", sheet_name) continue logger.info("SPJIMR format detected in '%s' — %d sections", sheet_name, len(particular_rows)) # Extract month labels from first header row header_row = raw.iloc[particular_rows[0]] # Months are in columns 3 onward until Total/% columns month_cols = [] months = [] for col_idx in range(3, len(header_row)): val = str(header_row.iloc[col_idx]).strip() if val in ("nan", "", "Total", "%", "NaN"): if months: # stop at first non-month after months started break continue months.append(val) month_cols.append(col_idx) if not months: continue def _row_values(row_idx: int) -> pd.Series: """Extract monthly numeric values from a data row.""" row = raw.iloc[row_idx] vals = [] for ci in month_cols: v = row.iloc[ci] try: vals.append(float(v)) except (ValueError, TypeError): vals.append(np.nan) return pd.Series(vals, index=months) def _find_row(keyword: str, col: int = 2) -> Optional[int]: """Find the first row where column `col` contains `keyword` (case-insensitive).""" kw = keyword.lower() for i, row in raw.iterrows(): cell = str(row.iloc[col]).strip().lower() if kw in cell: return i return None # ── Section 1: Energy ────────────────────────────────────────────────── r_solar = _find_row("solar") r_adani = _find_row("adani") r_nonrenew = _find_row("non-renewable") r_total_energy = _find_row("total energy consumed") r_energy_mix = _find_row("energy mix") if r_total_energy is not None: energy_data = {"period": months} if r_solar: energy_data["solar_kwh"] = _row_values(r_solar).values if r_adani: energy_data["adani_kwh"] = _row_values(r_adani).values if r_nonrenew: energy_data["nonrenewable_kwh"] = _row_values(r_nonrenew).values energy_data["total_kwh"] = _row_values(r_total_energy).values edf = pd.DataFrame(energy_data) # Compute renewable % from mix row or calculate if r_energy_mix is not None: mix_vals = _row_values(r_energy_mix) # Values may be decimals (0.98) or % (98) — normalise mix_numeric = pd.to_numeric(mix_vals, errors="coerce") if mix_numeric.dropna().max() <= 1.0: mix_numeric = mix_numeric * 100 edf["renewable_pct"] = mix_numeric.values else: # Calculate from solar + adani vs total renew_cols = [c for c in ["solar_kwh", "adani_kwh"] if c in edf.columns] if renew_cols and "total_kwh" in edf.columns: edf["renewable_pct"] = ( edf[renew_cols].sum(axis=1) / edf["total_kwh"].replace(0, np.nan) * 100 ).round(1) edf = edf[edf["total_kwh"].notna() & (edf["total_kwh"] > 0)].copy() all_results["energy"] = edf # Also expose as standard energy_series format all_results["energy_series"] = edf[["period", "renewable_pct"]].dropna().copy() logger.info("Extracted energy series: %d months", len(edf)) # ── Section 2: Water ────────────────────────────────────────────────── r_municipal = _find_row("municipal") r_tanker = _find_row("tanker") r_rainwater = _find_row("rainwater") r_total_water = _find_row("total water consumed") if r_total_water is not None: water_data = {"period": months} if r_municipal: water_data["municipal_kl"] = _row_values(r_municipal).values if r_tanker: water_data["tanker_kl"] = _row_values(r_tanker).values if r_rainwater: water_data["rainwater_kl"] = _row_values(r_rainwater).values water_data["total_kl"] = _row_values(r_total_water).values wdf = pd.DataFrame(water_data) wdf = wdf[wdf["total_kl"].notna() & (wdf["total_kl"] > 0)].copy() all_results["water"] = wdf logger.info("Extracted water series: %d months", len(wdf)) # ── Section 3: Waste ────────────────────────────────────────────────── r_recovered = _find_row("waste recovered") r_disposed = _find_row("waste disposed") r_total_waste = _find_row("total waste") r_pct_recov = _find_row("% waste recovered") if r_total_waste is not None: waste_data = {"period": months} if r_recovered: waste_data["recovered_kg"] = _row_values(r_recovered).values if r_disposed: waste_data["disposed_kg"] = _row_values(r_disposed).values waste_data["total_kg"] = _row_values(r_total_waste).values if r_pct_recov: pct_vals = pd.to_numeric(pd.Series(_row_values(r_pct_recov).values), errors="coerce") if pct_vals.dropna().max() <= 1.0: pct_vals = pct_vals * 100 waste_data["recovered_pct"] = pct_vals.values wstdf = pd.DataFrame(waste_data) wstdf = wstdf[wstdf["total_kg"].notna() & (wstdf["total_kg"] > 0)].copy() all_results["waste"] = wstdf # Expose as standard waste_series (recovered = "wet", disposed = "dry" for chart compat) std_waste = wstdf[["period"]].copy() if "recovered_kg" in wstdf.columns: std_waste["wet_kg"] = wstdf["recovered_kg"] if "disposed_kg" in wstdf.columns: std_waste["dry_kg"] = wstdf["disposed_kg"] all_results["waste_series"] = std_waste logger.info("Extracted waste series: %d months", len(wstdf)) all_results["months"] = months return all_results # ────────────────────────────────────────────── # Original series extractors (kept for non-SPJIMR files) # ────────────────────────────────────────────── # ────────────────────────────────────────────── # Chart-ready data extractor (generic fallback) # ────────────────────────────────────────────── def extract_waste_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]: """ Try to extract Wet / Dry waste time-series from any loaded sheet. Looks for columns whose names contain 'wet', 'dry', 'date', 'month', 'week', 'year'. Returns a tidy DataFrame with columns: [period, wet_kg, dry_kg] or None. """ for sheet_name, df in sheets.items(): lower_cols = {c.lower(): c for c in df.columns} date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "week", "year", "period"])), None) wet_col = next((lower_cols[k] for k in lower_cols if "wet" in k), None) dry_col = next((lower_cols[k] for k in lower_cols if "dry" in k), None) if date_col and (wet_col or dry_col): tidy = pd.DataFrame() tidy["period"] = df[date_col].astype(str) if wet_col: tidy["wet_kg"] = pd.to_numeric(df[wet_col], errors="coerce") if dry_col: tidy["dry_kg"] = pd.to_numeric(df[dry_col], errors="coerce") tidy.dropna(subset=["period"], inplace=True) logger.info("Extracted waste series from sheet '%s'", sheet_name) return tidy return None def extract_energy_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]: """ Extracts renewable vs total energy figures from sheets. Looks for columns containing 'renewable', 'solar', 'total', 'energy', 'kwh'. Returns tidy DataFrame with [period, renewable_pct] or None. """ for _, df in sheets.items(): lower_cols = {c.lower(): c for c in df.columns} date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "year", "period"])), None) renewable_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["renewable", "solar", "green"])), None) total_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["total", "kwh", "energy"])), None) if date_col and renewable_col and total_col: tidy = pd.DataFrame() tidy["period"] = df[date_col].astype(str) tidy["renewable"] = pd.to_numeric(df[renewable_col], errors="coerce") tidy["total"] = pd.to_numeric(df[total_col], errors="coerce") tidy["renewable_pct"] = (tidy["renewable"] / tidy["total"] * 100).round(1) tidy.dropna(inplace=True) return tidy return None