Spaces:
Sleeping
Sleeping
| # snapshot_logic.py | |
| import hashlib | |
| import json | |
| import logging | |
| import shutil | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| import pandas as pd | |
| import requests | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger(__name__) | |
| CHUNK_SIZE = 65536 | |
| ALLOWED_EXT = {".csv", ".xls", ".xlsx"} | |
| # --- Fingerprinting and Data Loading --- | |
| def sha256_of_file(path: Path) -> str: | |
| h = hashlib.sha256() | |
| with path.open("rb") as f: | |
| while True: | |
| chunk = f.read(CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def load_first_sheet(path: Path) -> pd.DataFrame: | |
| ext = path.suffix.lower() | |
| if ext == ".csv": | |
| return pd.read_csv(path) | |
| elif ext in {".xls", ".xlsx"}: | |
| return pd.read_excel(path, sheet_name=0, engine='openpyxl') | |
| else: | |
| raise ValueError(f"Unsupported file extension: {ext}") | |
| def canonicalize_df_for_fingerprint(df: pd.DataFrame) -> bytes: | |
| df2 = df.copy() | |
| df2.columns = [str(c) for c in df2.columns] | |
| df2 = df2.astype(str).fillna("") | |
| df2 = df2.reindex(sorted(df2.columns), axis=1) | |
| try: | |
| df2 = df2.sort_values(by=list(df2.columns), kind="mergesort").reset_index(drop=True) | |
| except Exception: | |
| df2 = df2.reset_index(drop=True) | |
| return df2.to_csv(index=False).encode("utf-8") | |
| def fingerprint_from_file(path: Path) -> dict: | |
| file_hash = sha256_of_file(path) | |
| df = load_first_sheet(path) | |
| csv_bytes = canonicalize_df_for_fingerprint(df) | |
| df_hash = hashlib.sha256(csv_bytes).hexdigest() | |
| rows, cols = df.shape | |
| colnames = [str(c) for c in df.columns] | |
| return { | |
| "file_hash": file_hash, | |
| "data_hash": df_hash, | |
| "rows": int(rows), | |
| "cols": int(cols), | |
| "colnames": colnames, | |
| } | |
| # --- Snapshot Management --- | |
| def ensure_outdir(outdir: Path): | |
| (outdir / "snapshots").mkdir(parents=True, exist_ok=True) | |
| def load_index(outdir: Path) -> dict: | |
| idx_path = outdir / "index.json" | |
| if idx_path.exists(): | |
| try: | |
| return json.loads(idx_path.read_text(encoding="utf-8")) | |
| except (json.JSONDecodeError, IOError): | |
| return {} | |
| return {} | |
| def save_index(outdir: Path, index: dict): | |
| idx_path = outdir / "index.json" | |
| idx_path.write_text(json.dumps(index, indent=2), encoding="utf-8") | |
| def find_matching_snapshot(index: dict, file_hash: str, data_hash: str): | |
| # Priority 1: Exact file byte match | |
| for sid, meta in index.items(): | |
| if meta.get("file_hash") == file_hash: | |
| return sid | |
| # Priority 2: Canonical data content match (handles renamed files) | |
| for sid, meta in index.items(): | |
| if meta.get("data_hash") == data_hash: | |
| return sid | |
| return None | |
| def save_snapshot_bundle(outdir: Path, snapshot_id: str, api_response: dict, src_path: Path, metadata: dict): | |
| snapshot_dir = outdir / "snapshots" / snapshot_id | |
| snapshot_dir.mkdir(parents=True, exist_ok=True) | |
| # Copy original file | |
| shutil.copy2(src_path, snapshot_dir / src_path.name) | |
| # Save metadata and responses | |
| (snapshot_dir / "api_response.json").write_text(json.dumps(api_response, indent=2), encoding="utf-8") | |
| (snapshot_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") | |
| # Generate and save preprocessed data and column stats | |
| preprocessed_df = preprocess_dataframe_locally(src_path) | |
| preprocessed_df.to_csv(snapshot_dir / "preprocessed.csv", index=False) | |
| col_stats = generate_column_stats(preprocessed_df) | |
| (snapshot_dir / "column_stats.json").write_text(json.dumps(col_stats, indent=2), encoding="utf-8") | |
| log.info(f"Saved new snapshot bundle to {snapshot_dir}") | |
| return snapshot_dir | |
| # --- Data Processing & API Calls --- | |
| def preprocess_dataframe_locally(file_path: Path) -> pd.DataFrame: | |
| """A simplified local preprocessing pipeline, mirroring the logic in the original script.""" | |
| df = load_first_sheet(file_path) | |
| df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns] | |
| df = df.loc[:, df.isnull().mean() < 0.5] | |
| for col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| df[col] = df[col].fillna(df[col].median()) | |
| elif pd.api.types.is_datetime64_any_dtype(df[col]): | |
| df[col] = df[col].fillna(pd.Timestamp("1970-01-01")) | |
| else: | |
| df[col] = df[col].fillna("Unknown") | |
| if df[col].dtype == "object": | |
| df[col] = pd.to_numeric(df[col], errors='ignore') | |
| df = df.drop_duplicates().reset_index(drop=True) | |
| return df | |
| def generate_column_stats(df: pd.DataFrame) -> dict: | |
| col_stats = {} | |
| for col in df.columns: | |
| series = df[col] | |
| try: | |
| col_stats[col] = { | |
| "dtype": str(series.dtype), | |
| "n_unique": int(series.nunique(dropna=True)), | |
| "n_missing": int(series.isna().sum()), | |
| "sample_values": [str(v) for v in series.dropna().head(5).unique()] | |
| } | |
| except Exception: | |
| col_stats[col] = {"dtype": "unknown", "n_unique": None, "n_missing": None, "sample_values": []} | |
| return col_stats | |
| def post_file_to_endpoint(endpoint: str, file_path: Path, timeout: int = 300) -> dict: | |
| log.info(f"Posting file {file_path.name} to {endpoint}") | |
| with file_path.open("rb") as f: | |
| files = {"file": (file_path.name, f)} | |
| resp = requests.post(endpoint, files=files, timeout=timeout) | |
| resp.raise_for_status() | |
| log.info("API request successful.") | |
| return resp.json() |