Spaces:
Running
Running
| """ | |
| core/processor.py | |
| ----------------- | |
| Handles ingestion and cleaning of all supported file formats: | |
| - CSV / XLSX β Pandas DataFrames (with smart metadata-header skipping) | |
| - PDF β plain text via pypdf | |
| - DOCX β plain text via python-docx | |
| Returns a unified string corpus ready for FAISS indexing. | |
| """ | |
| import io | |
| import os | |
| import re | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_metadata_row(row: pd.Series) -> bool: | |
| """Return True when a row looks like a title / metadata line rather than data.""" | |
| non_null = row.dropna() | |
| if len(non_null) == 0: | |
| return True | |
| # Typical metadata: only one non-null cell AND it contains letters | |
| if len(non_null) == 1 and re.search(r"[A-Za-z]", str(non_null.iloc[0])): | |
| return True | |
| return False | |
| def _find_real_header(df: pd.DataFrame, max_scan: int = 10) -> int: | |
| """ | |
| Scan the first `max_scan` rows to find the first row that looks like | |
| a proper tabular header (β₯ 2 non-null values with short string content). | |
| Returns the row index of the real header. | |
| """ | |
| for i, (_, row) in enumerate(df.head(max_scan).iterrows()): | |
| non_null = row.dropna() | |
| if len(non_null) >= 2: | |
| # Check values look like header labels (not numeric data rows) | |
| string_count = sum(1 for v in non_null if re.search(r"[A-Za-z]", str(v))) | |
| if string_count >= 2: | |
| return i | |
| return 0 | |
| MAX_TEXT_CHARS = 80_000 # hard cap per file to prevent MemoryError | |
| MAX_ROWS_PER_SHEET = 500 # only embed first N rows of very large sheets | |
| def _df_to_text(df: pd.DataFrame, label: str = "", max_rows: int = MAX_ROWS_PER_SHEET) -> str: | |
| """ | |
| Convert a cleaned DataFrame to a plain-text representation. | |
| Caps rows to avoid generating enormous strings from large Excel files. | |
| """ | |
| lines = [] | |
| if label: | |
| lines.append(f"=== {label} ===") | |
| if len(df) > max_rows: | |
| logger.warning( | |
| "'%s' has %d rows β truncating to first %d for embedding.", | |
| label, len(df), max_rows, | |
| ) | |
| lines.append(f"[Note: Showing first {max_rows} of {len(df)} rows]") | |
| df = df.head(max_rows) | |
| lines.append(df.to_string(index=False, na_rep="N/A")) | |
| return "\n".join(lines) | |
| def _truncate_text(text: str, max_chars: int = MAX_TEXT_CHARS) -> str: | |
| """Hard-cap total text length to prevent MemoryError during chunking.""" | |
| if len(text) <= max_chars: | |
| return text | |
| logger.warning( | |
| "Text truncated from %d β %d chars to prevent MemoryError.", len(text), max_chars | |
| ) | |
| return text[:max_chars] + "\n\n[... document truncated for memory safety ...]" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Format-specific loaders | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_csv(filepath: Union[str, Path]) -> tuple[pd.DataFrame, str]: | |
| """Load a CSV file, auto-detecting and skipping metadata header rows.""" | |
| try: | |
| # First pass: read without assumptions | |
| raw = pd.read_csv(filepath, header=None, encoding="utf-8", on_bad_lines="skip") | |
| header_row = _find_real_header(raw) | |
| df = pd.read_csv(filepath, skiprows=header_row, encoding="utf-8", on_bad_lines="skip") | |
| df.dropna(how="all", inplace=True) | |
| df.columns = [str(c).strip() for c in df.columns] | |
| text = _df_to_text(df, label=Path(filepath).stem) | |
| return df, text | |
| except Exception as exc: | |
| logger.error("CSV load failed for %s: %s", filepath, exc) | |
| raise | |
| def load_xlsx(filepath: Union[str, Path]) -> tuple[dict[str, pd.DataFrame], str]: | |
| """ | |
| Load all sheets from an XLSX workbook. | |
| Applies smart header-detection per sheet to handle embedded metadata rows | |
| (common in SPJIMR-style waste summary exports). | |
| """ | |
| try: | |
| xl = pd.ExcelFile(filepath, engine="openpyxl") | |
| sheets: dict[str, pd.DataFrame] = {} | |
| text_parts: list[str] = [] | |
| for sheet_name in xl.sheet_names: | |
| raw = xl.parse(sheet_name, header=None) | |
| if raw.empty: | |
| continue | |
| header_row = _find_real_header(raw) | |
| df = xl.parse(sheet_name, skiprows=header_row) | |
| df.dropna(how="all", inplace=True) | |
| df.columns = [str(c).strip() for c in df.columns] | |
| sheets[sheet_name] = df | |
| text_parts.append(_df_to_text(df, label=f"{Path(filepath).stem} β {sheet_name}")) | |
| combined_text = "\n\n".join(text_parts) | |
| return sheets, combined_text | |
| except Exception as exc: | |
| logger.error("XLSX load failed for %s: %s", filepath, exc) | |
| raise | |
| def load_pdf(filepath: Union[str, Path]) -> str: | |
| """Extract plain text from a PDF using pypdf.""" | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(str(filepath)) | |
| pages = [] | |
| for page in reader.pages: | |
| txt = page.extract_text() | |
| if txt: | |
| pages.append(txt.strip()) | |
| return "\n\n".join(pages) | |
| except Exception as exc: | |
| logger.error("PDF load failed for %s: %s", filepath, exc) | |
| raise | |
| def load_docx(filepath: Union[str, Path]) -> str: | |
| """Extract plain text from a DOCX file using python-docx.""" | |
| try: | |
| from docx import Document | |
| doc = Document(str(filepath)) | |
| paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] | |
| # Also capture table cells | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip()) | |
| if row_text: | |
| paragraphs.append(row_text) | |
| return "\n".join(paragraphs) | |
| except Exception as exc: | |
| logger.error("DOCX load failed for %s: %s", filepath, exc) | |
| raise | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Unified entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DocumentProcessor: | |
| """ | |
| Unified document processor that routes files to the correct loader | |
| and returns (metadata_dict, plain_text_corpus). | |
| """ | |
| SUPPORTED_EXTENSIONS = {".csv", ".xlsx", ".xls", ".pdf", ".docx"} | |
| def __init__(self, upload_dir: str = "data/uploads"): | |
| self.upload_dir = Path(upload_dir) | |
| self.upload_dir.mkdir(parents=True, exist_ok=True) | |
| def save_uploaded_file(self, uploaded_file) -> Path: | |
| """Persist a Streamlit UploadedFile to disk and return its path.""" | |
| dest = self.upload_dir / uploaded_file.name | |
| with open(dest, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| logger.info("Saved uploaded file β %s", dest) | |
| return dest | |
| def process(self, filepath: Union[str, Path], manual_context: str = "") -> dict: | |
| """ | |
| Main processing pipeline. | |
| Returns: | |
| { | |
| "filepath": str, | |
| "extension": str, | |
| "dataframes": dict | None, # for tabular files | |
| "text": str, # plain-text corpus | |
| "manual_context": str, | |
| } | |
| """ | |
| filepath = Path(filepath) | |
| ext = filepath.suffix.lower() | |
| if ext not in self.SUPPORTED_EXTENSIONS: | |
| raise ValueError(f"Unsupported file type: {ext}. Supported: {self.SUPPORTED_EXTENSIONS}") | |
| result = { | |
| "filepath": str(filepath), | |
| "extension": ext, | |
| "dataframes": None, | |
| "text": "", | |
| "manual_context": manual_context.strip(), | |
| } | |
| if ext == ".csv": | |
| df, text = load_csv(filepath) | |
| result["dataframes"] = {"Sheet1": df} | |
| result["text"] = text | |
| elif ext in (".xlsx", ".xls"): | |
| sheets, text = load_xlsx(filepath) | |
| result["dataframes"] = sheets | |
| result["text"] = text | |
| elif ext == ".pdf": | |
| result["text"] = load_pdf(filepath) | |
| elif ext == ".docx": | |
| result["text"] = load_docx(filepath) | |
| # Append manual context so it's embedded alongside the document | |
| if result["manual_context"]: | |
| result["text"] += f"\n\n[ANALYST CONTEXT]\n{result['manual_context']}" | |
| # Hard cap to prevent MemoryError on large files | |
| result["text"] = _truncate_text(result["text"]) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SPJIMR Multi-section format parser | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_spjimr_metrics(sheets: dict) -> dict: | |
| """ | |
| Parse the SPJIMR Environmental Metrics format β a stacked multi-section workbook | |
| where Energy, Water, and Waste data share the same sheet with repeating month headers. | |
| Structure: | |
| Col 0: section number (1, 2, 3 β¦) | |
| Col 1: section title | |
| Col 2: row label ("Particular" header or data category name) | |
| Cols 3β14: monthly values (Apr'25 β¦ Mar'26) | |
| Col 15: Total | |
| Col 16: % | |
| Returns dict with keys: | |
| "months" β list of period strings | |
| "energy" β DataFrame [period, solar_kwh, adani_kwh, nonrenewable_kwh, total_kwh, renewable_pct] | |
| "water" β DataFrame [period, municipal_kl, tanker_kl, rainwater_kl, total_kl] | |
| "waste" β DataFrame [period, recovered_kg, disposed_kg, total_kg, recovered_pct] | |
| "energy_series" β tidy DataFrame matching extract_energy_series() output format | |
| "waste_series" β tidy DataFrame matching extract_waste_series() output format | |
| """ | |
| result = {} | |
| for sheet_name, raw_df in sheets.items(): | |
| # Load completely raw β no header guessing | |
| try: | |
| raw = raw_df # already loaded by load_xlsx with header detection | |
| except Exception: | |
| continue | |
| # Re-read the sheet raw (header=None) to get the real structure | |
| continue # handled below via the filepath approach | |
| return result | |
| def extract_spjimr_metrics_raw(filepath) -> dict: | |
| """ | |
| Parse the SPJIMR multi-section Excel file directly from filepath (raw, no header). | |
| This is the primary entry point β call this after saving the uploaded file. | |
| """ | |
| import numpy as np | |
| try: | |
| xl = pd.ExcelFile(str(filepath), engine="openpyxl") | |
| except Exception as exc: | |
| logger.error("Cannot open %s: %s", filepath, exc) | |
| return {} | |
| all_results = {} | |
| for sheet_name in xl.sheet_names: | |
| raw = xl.parse(sheet_name, header=None) | |
| if raw.shape[0] < 3 or raw.shape[1] < 5: | |
| continue | |
| # ββ Find month header rows (rows where col 2 == "Particular") ββββββββββ | |
| # These mark the start of each section | |
| particular_rows = [ | |
| i for i, row in raw.iterrows() | |
| if str(row.iloc[2]).strip().lower() == "particular" | |
| ] | |
| if not particular_rows: | |
| logger.info("No SPJIMR-format header rows found in sheet '%s'", sheet_name) | |
| continue | |
| logger.info("SPJIMR format detected in '%s' β %d sections", sheet_name, len(particular_rows)) | |
| # Extract month labels from first header row | |
| header_row = raw.iloc[particular_rows[0]] | |
| # Months are in columns 3 onward until Total/% columns | |
| month_cols = [] | |
| months = [] | |
| for col_idx in range(3, len(header_row)): | |
| val = str(header_row.iloc[col_idx]).strip() | |
| if val in ("nan", "", "Total", "%", "NaN"): | |
| if months: # stop at first non-month after months started | |
| break | |
| continue | |
| months.append(val) | |
| month_cols.append(col_idx) | |
| if not months: | |
| continue | |
| def _row_values(row_idx: int) -> pd.Series: | |
| """Extract monthly numeric values from a data row.""" | |
| row = raw.iloc[row_idx] | |
| vals = [] | |
| for ci in month_cols: | |
| v = row.iloc[ci] | |
| try: | |
| vals.append(float(v)) | |
| except (ValueError, TypeError): | |
| vals.append(np.nan) | |
| return pd.Series(vals, index=months) | |
| def _find_row(keyword: str, col: int = 2) -> Optional[int]: | |
| """Find the first row where column `col` contains `keyword` (case-insensitive).""" | |
| kw = keyword.lower() | |
| for i, row in raw.iterrows(): | |
| cell = str(row.iloc[col]).strip().lower() | |
| if kw in cell: | |
| return i | |
| return None | |
| # ββ Section 1: Energy ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| r_solar = _find_row("solar") | |
| r_adani = _find_row("adani") | |
| r_nonrenew = _find_row("non-renewable") | |
| r_total_energy = _find_row("total energy consumed") | |
| r_energy_mix = _find_row("energy mix") | |
| if r_total_energy is not None: | |
| energy_data = {"period": months} | |
| if r_solar: energy_data["solar_kwh"] = _row_values(r_solar).values | |
| if r_adani: energy_data["adani_kwh"] = _row_values(r_adani).values | |
| if r_nonrenew: energy_data["nonrenewable_kwh"] = _row_values(r_nonrenew).values | |
| energy_data["total_kwh"] = _row_values(r_total_energy).values | |
| edf = pd.DataFrame(energy_data) | |
| # Compute renewable % from mix row or calculate | |
| if r_energy_mix is not None: | |
| mix_vals = _row_values(r_energy_mix) | |
| # Values may be decimals (0.98) or % (98) β normalise | |
| mix_numeric = pd.to_numeric(mix_vals, errors="coerce") | |
| if mix_numeric.dropna().max() <= 1.0: | |
| mix_numeric = mix_numeric * 100 | |
| edf["renewable_pct"] = mix_numeric.values | |
| else: | |
| # Calculate from solar + adani vs total | |
| renew_cols = [c for c in ["solar_kwh", "adani_kwh"] if c in edf.columns] | |
| if renew_cols and "total_kwh" in edf.columns: | |
| edf["renewable_pct"] = ( | |
| edf[renew_cols].sum(axis=1) / edf["total_kwh"].replace(0, np.nan) * 100 | |
| ).round(1) | |
| edf = edf[edf["total_kwh"].notna() & (edf["total_kwh"] > 0)].copy() | |
| all_results["energy"] = edf | |
| # Also expose as standard energy_series format | |
| all_results["energy_series"] = edf[["period", "renewable_pct"]].dropna().copy() | |
| logger.info("Extracted energy series: %d months", len(edf)) | |
| # ββ Section 2: Water ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| r_municipal = _find_row("municipal") | |
| r_tanker = _find_row("tanker") | |
| r_rainwater = _find_row("rainwater") | |
| r_total_water = _find_row("total water consumed") | |
| if r_total_water is not None: | |
| water_data = {"period": months} | |
| if r_municipal: water_data["municipal_kl"] = _row_values(r_municipal).values | |
| if r_tanker: water_data["tanker_kl"] = _row_values(r_tanker).values | |
| if r_rainwater: water_data["rainwater_kl"] = _row_values(r_rainwater).values | |
| water_data["total_kl"] = _row_values(r_total_water).values | |
| wdf = pd.DataFrame(water_data) | |
| wdf = wdf[wdf["total_kl"].notna() & (wdf["total_kl"] > 0)].copy() | |
| all_results["water"] = wdf | |
| logger.info("Extracted water series: %d months", len(wdf)) | |
| # ββ Section 3: Waste ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| r_recovered = _find_row("waste recovered") | |
| r_disposed = _find_row("waste disposed") | |
| r_total_waste = _find_row("total waste") | |
| r_pct_recov = _find_row("% waste recovered") | |
| if r_total_waste is not None: | |
| waste_data = {"period": months} | |
| if r_recovered: waste_data["recovered_kg"] = _row_values(r_recovered).values | |
| if r_disposed: waste_data["disposed_kg"] = _row_values(r_disposed).values | |
| waste_data["total_kg"] = _row_values(r_total_waste).values | |
| if r_pct_recov: | |
| pct_vals = pd.to_numeric(pd.Series(_row_values(r_pct_recov).values), errors="coerce") | |
| if pct_vals.dropna().max() <= 1.0: | |
| pct_vals = pct_vals * 100 | |
| waste_data["recovered_pct"] = pct_vals.values | |
| wstdf = pd.DataFrame(waste_data) | |
| wstdf = wstdf[wstdf["total_kg"].notna() & (wstdf["total_kg"] > 0)].copy() | |
| all_results["waste"] = wstdf | |
| # Expose as standard waste_series (recovered = "wet", disposed = "dry" for chart compat) | |
| std_waste = wstdf[["period"]].copy() | |
| if "recovered_kg" in wstdf.columns: std_waste["wet_kg"] = wstdf["recovered_kg"] | |
| if "disposed_kg" in wstdf.columns: std_waste["dry_kg"] = wstdf["disposed_kg"] | |
| all_results["waste_series"] = std_waste | |
| logger.info("Extracted waste series: %d months", len(wstdf)) | |
| all_results["months"] = months | |
| return all_results | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Original series extractors (kept for non-SPJIMR files) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chart-ready data extractor (generic fallback) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_waste_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]: | |
| """ | |
| Try to extract Wet / Dry waste time-series from any loaded sheet. | |
| Looks for columns whose names contain 'wet', 'dry', 'date', 'month', 'week', 'year'. | |
| Returns a tidy DataFrame with columns: [period, wet_kg, dry_kg] or None. | |
| """ | |
| for sheet_name, df in sheets.items(): | |
| lower_cols = {c.lower(): c for c in df.columns} | |
| date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "week", "year", "period"])), None) | |
| wet_col = next((lower_cols[k] for k in lower_cols if "wet" in k), None) | |
| dry_col = next((lower_cols[k] for k in lower_cols if "dry" in k), None) | |
| if date_col and (wet_col or dry_col): | |
| tidy = pd.DataFrame() | |
| tidy["period"] = df[date_col].astype(str) | |
| if wet_col: | |
| tidy["wet_kg"] = pd.to_numeric(df[wet_col], errors="coerce") | |
| if dry_col: | |
| tidy["dry_kg"] = pd.to_numeric(df[dry_col], errors="coerce") | |
| tidy.dropna(subset=["period"], inplace=True) | |
| logger.info("Extracted waste series from sheet '%s'", sheet_name) | |
| return tidy | |
| return None | |
| def extract_energy_series(sheets: dict[str, pd.DataFrame]) -> Optional[pd.DataFrame]: | |
| """ | |
| Extracts renewable vs total energy figures from sheets. | |
| Looks for columns containing 'renewable', 'solar', 'total', 'energy', 'kwh'. | |
| Returns tidy DataFrame with [period, renewable_pct] or None. | |
| """ | |
| for _, df in sheets.items(): | |
| lower_cols = {c.lower(): c for c in df.columns} | |
| date_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["date", "month", "year", "period"])), None) | |
| renewable_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["renewable", "solar", "green"])), None) | |
| total_col = next((lower_cols[k] for k in lower_cols if any(t in k for t in ["total", "kwh", "energy"])), None) | |
| if date_col and renewable_col and total_col: | |
| tidy = pd.DataFrame() | |
| tidy["period"] = df[date_col].astype(str) | |
| tidy["renewable"] = pd.to_numeric(df[renewable_col], errors="coerce") | |
| tidy["total"] = pd.to_numeric(df[total_col], errors="coerce") | |
| tidy["renewable_pct"] = (tidy["renewable"] / tidy["total"] * 100).round(1) | |
| tidy.dropna(inplace=True) | |
| return tidy | |
| return None |