Spaces:
Sleeping
Sleeping
File size: 5,612 Bytes
ef75f57 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | # snapshot_logic.py
import hashlib
import json
import logging
import shutil
import uuid
from datetime import datetime
from pathlib import Path
import pandas as pd
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
CHUNK_SIZE = 65536
ALLOWED_EXT = {".csv", ".xls", ".xlsx"}
# --- Fingerprinting and Data Loading ---
def sha256_of_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def load_first_sheet(path: Path) -> pd.DataFrame:
ext = path.suffix.lower()
if ext == ".csv":
return pd.read_csv(path)
elif ext in {".xls", ".xlsx"}:
return pd.read_excel(path, sheet_name=0, engine='openpyxl')
else:
raise ValueError(f"Unsupported file extension: {ext}")
def canonicalize_df_for_fingerprint(df: pd.DataFrame) -> bytes:
df2 = df.copy()
df2.columns = [str(c) for c in df2.columns]
df2 = df2.astype(str).fillna("")
df2 = df2.reindex(sorted(df2.columns), axis=1)
try:
df2 = df2.sort_values(by=list(df2.columns), kind="mergesort").reset_index(drop=True)
except Exception:
df2 = df2.reset_index(drop=True)
return df2.to_csv(index=False).encode("utf-8")
def fingerprint_from_file(path: Path) -> dict:
file_hash = sha256_of_file(path)
df = load_first_sheet(path)
csv_bytes = canonicalize_df_for_fingerprint(df)
df_hash = hashlib.sha256(csv_bytes).hexdigest()
rows, cols = df.shape
colnames = [str(c) for c in df.columns]
return {
"file_hash": file_hash,
"data_hash": df_hash,
"rows": int(rows),
"cols": int(cols),
"colnames": colnames,
}
# --- Snapshot Management ---
def ensure_outdir(outdir: Path):
(outdir / "snapshots").mkdir(parents=True, exist_ok=True)
def load_index(outdir: Path) -> dict:
idx_path = outdir / "index.json"
if idx_path.exists():
try:
return json.loads(idx_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, IOError):
return {}
return {}
def save_index(outdir: Path, index: dict):
idx_path = outdir / "index.json"
idx_path.write_text(json.dumps(index, indent=2), encoding="utf-8")
def find_matching_snapshot(index: dict, file_hash: str, data_hash: str):
# Priority 1: Exact file byte match
for sid, meta in index.items():
if meta.get("file_hash") == file_hash:
return sid
# Priority 2: Canonical data content match (handles renamed files)
for sid, meta in index.items():
if meta.get("data_hash") == data_hash:
return sid
return None
def save_snapshot_bundle(outdir: Path, snapshot_id: str, api_response: dict, src_path: Path, metadata: dict):
snapshot_dir = outdir / "snapshots" / snapshot_id
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Copy original file
shutil.copy2(src_path, snapshot_dir / src_path.name)
# Save metadata and responses
(snapshot_dir / "api_response.json").write_text(json.dumps(api_response, indent=2), encoding="utf-8")
(snapshot_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8")
# Generate and save preprocessed data and column stats
preprocessed_df = preprocess_dataframe_locally(src_path)
preprocessed_df.to_csv(snapshot_dir / "preprocessed.csv", index=False)
col_stats = generate_column_stats(preprocessed_df)
(snapshot_dir / "column_stats.json").write_text(json.dumps(col_stats, indent=2), encoding="utf-8")
log.info(f"Saved new snapshot bundle to {snapshot_dir}")
return snapshot_dir
# --- Data Processing & API Calls ---
def preprocess_dataframe_locally(file_path: Path) -> pd.DataFrame:
"""A simplified local preprocessing pipeline, mirroring the logic in the original script."""
df = load_first_sheet(file_path)
df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]
df = df.loc[:, df.isnull().mean() < 0.5]
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].fillna(df[col].median())
elif pd.api.types.is_datetime64_any_dtype(df[col]):
df[col] = df[col].fillna(pd.Timestamp("1970-01-01"))
else:
df[col] = df[col].fillna("Unknown")
if df[col].dtype == "object":
df[col] = pd.to_numeric(df[col], errors='ignore')
df = df.drop_duplicates().reset_index(drop=True)
return df
def generate_column_stats(df: pd.DataFrame) -> dict:
col_stats = {}
for col in df.columns:
series = df[col]
try:
col_stats[col] = {
"dtype": str(series.dtype),
"n_unique": int(series.nunique(dropna=True)),
"n_missing": int(series.isna().sum()),
"sample_values": [str(v) for v in series.dropna().head(5).unique()]
}
except Exception:
col_stats[col] = {"dtype": "unknown", "n_unique": None, "n_missing": None, "sample_values": []}
return col_stats
def post_file_to_endpoint(endpoint: str, file_path: Path, timeout: int = 300) -> dict:
log.info(f"Posting file {file_path.name} to {endpoint}")
with file_path.open("rb") as f:
files = {"file": (file_path.name, f)}
resp = requests.post(endpoint, files=files, timeout=timeout)
resp.raise_for_status()
log.info("API request successful.")
return resp.json() |