# snapshot_logic.py import hashlib import json import logging import shutil import uuid from datetime import datetime from pathlib import Path import pandas as pd import requests # Configure logging logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) CHUNK_SIZE = 65536 ALLOWED_EXT = {".csv", ".xls", ".xlsx"} # --- Fingerprinting and Data Loading --- def sha256_of_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: while True: chunk = f.read(CHUNK_SIZE) if not chunk: break h.update(chunk) return h.hexdigest() def load_first_sheet(path: Path) -> pd.DataFrame: ext = path.suffix.lower() if ext == ".csv": return pd.read_csv(path) elif ext in {".xls", ".xlsx"}: return pd.read_excel(path, sheet_name=0, engine='openpyxl') else: raise ValueError(f"Unsupported file extension: {ext}") def canonicalize_df_for_fingerprint(df: pd.DataFrame) -> bytes: df2 = df.copy() df2.columns = [str(c) for c in df2.columns] df2 = df2.astype(str).fillna("") df2 = df2.reindex(sorted(df2.columns), axis=1) try: df2 = df2.sort_values(by=list(df2.columns), kind="mergesort").reset_index(drop=True) except Exception: df2 = df2.reset_index(drop=True) return df2.to_csv(index=False).encode("utf-8") def fingerprint_from_file(path: Path) -> dict: file_hash = sha256_of_file(path) df = load_first_sheet(path) csv_bytes = canonicalize_df_for_fingerprint(df) df_hash = hashlib.sha256(csv_bytes).hexdigest() rows, cols = df.shape colnames = [str(c) for c in df.columns] return { "file_hash": file_hash, "data_hash": df_hash, "rows": int(rows), "cols": int(cols), "colnames": colnames, } # --- Snapshot Management --- def ensure_outdir(outdir: Path): (outdir / "snapshots").mkdir(parents=True, exist_ok=True) def load_index(outdir: Path) -> dict: idx_path = outdir / "index.json" if idx_path.exists(): try: return json.loads(idx_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, IOError): return {} return {} def save_index(outdir: Path, index: dict): idx_path = outdir / "index.json" idx_path.write_text(json.dumps(index, indent=2), encoding="utf-8") def find_matching_snapshot(index: dict, file_hash: str, data_hash: str): # Priority 1: Exact file byte match for sid, meta in index.items(): if meta.get("file_hash") == file_hash: return sid # Priority 2: Canonical data content match (handles renamed files) for sid, meta in index.items(): if meta.get("data_hash") == data_hash: return sid return None def save_snapshot_bundle(outdir: Path, snapshot_id: str, api_response: dict, src_path: Path, metadata: dict): snapshot_dir = outdir / "snapshots" / snapshot_id snapshot_dir.mkdir(parents=True, exist_ok=True) # Copy original file shutil.copy2(src_path, snapshot_dir / src_path.name) # Save metadata and responses (snapshot_dir / "api_response.json").write_text(json.dumps(api_response, indent=2), encoding="utf-8") (snapshot_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") # Generate and save preprocessed data and column stats preprocessed_df = preprocess_dataframe_locally(src_path) preprocessed_df.to_csv(snapshot_dir / "preprocessed.csv", index=False) col_stats = generate_column_stats(preprocessed_df) (snapshot_dir / "column_stats.json").write_text(json.dumps(col_stats, indent=2), encoding="utf-8") log.info(f"Saved new snapshot bundle to {snapshot_dir}") return snapshot_dir # --- Data Processing & API Calls --- def preprocess_dataframe_locally(file_path: Path) -> pd.DataFrame: """A simplified local preprocessing pipeline, mirroring the logic in the original script.""" df = load_first_sheet(file_path) df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns] df = df.loc[:, df.isnull().mean() < 0.5] for col in df.columns: if pd.api.types.is_numeric_dtype(df[col]): df[col] = df[col].fillna(df[col].median()) elif pd.api.types.is_datetime64_any_dtype(df[col]): df[col] = df[col].fillna(pd.Timestamp("1970-01-01")) else: df[col] = df[col].fillna("Unknown") if df[col].dtype == "object": df[col] = pd.to_numeric(df[col], errors='ignore') df = df.drop_duplicates().reset_index(drop=True) return df def generate_column_stats(df: pd.DataFrame) -> dict: col_stats = {} for col in df.columns: series = df[col] try: col_stats[col] = { "dtype": str(series.dtype), "n_unique": int(series.nunique(dropna=True)), "n_missing": int(series.isna().sum()), "sample_values": [str(v) for v in series.dropna().head(5).unique()] } except Exception: col_stats[col] = {"dtype": "unknown", "n_unique": None, "n_missing": None, "sample_values": []} return col_stats def post_file_to_endpoint(endpoint: str, file_path: Path, timeout: int = 300) -> dict: log.info(f"Posting file {file_path.name} to {endpoint}") with file_path.open("rb") as f: files = {"file": (file_path.name, f)} resp = requests.post(endpoint, files=files, timeout=timeout) resp.raise_for_status() log.info("API request successful.") return resp.json()