phase2withfrontend / snapshot_logic.py
triflix's picture
Create snapshot_logic.py
ef75f57 verified
# snapshot_logic.py
import hashlib
import json
import logging
import shutil
import uuid
from datetime import datetime
from pathlib import Path
import pandas as pd
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
CHUNK_SIZE = 65536
ALLOWED_EXT = {".csv", ".xls", ".xlsx"}
# --- Fingerprinting and Data Loading ---
def sha256_of_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def load_first_sheet(path: Path) -> pd.DataFrame:
ext = path.suffix.lower()
if ext == ".csv":
return pd.read_csv(path)
elif ext in {".xls", ".xlsx"}:
return pd.read_excel(path, sheet_name=0, engine='openpyxl')
else:
raise ValueError(f"Unsupported file extension: {ext}")
def canonicalize_df_for_fingerprint(df: pd.DataFrame) -> bytes:
df2 = df.copy()
df2.columns = [str(c) for c in df2.columns]
df2 = df2.astype(str).fillna("")
df2 = df2.reindex(sorted(df2.columns), axis=1)
try:
df2 = df2.sort_values(by=list(df2.columns), kind="mergesort").reset_index(drop=True)
except Exception:
df2 = df2.reset_index(drop=True)
return df2.to_csv(index=False).encode("utf-8")
def fingerprint_from_file(path: Path) -> dict:
file_hash = sha256_of_file(path)
df = load_first_sheet(path)
csv_bytes = canonicalize_df_for_fingerprint(df)
df_hash = hashlib.sha256(csv_bytes).hexdigest()
rows, cols = df.shape
colnames = [str(c) for c in df.columns]
return {
"file_hash": file_hash,
"data_hash": df_hash,
"rows": int(rows),
"cols": int(cols),
"colnames": colnames,
}
# --- Snapshot Management ---
def ensure_outdir(outdir: Path):
(outdir / "snapshots").mkdir(parents=True, exist_ok=True)
def load_index(outdir: Path) -> dict:
idx_path = outdir / "index.json"
if idx_path.exists():
try:
return json.loads(idx_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, IOError):
return {}
return {}
def save_index(outdir: Path, index: dict):
idx_path = outdir / "index.json"
idx_path.write_text(json.dumps(index, indent=2), encoding="utf-8")
def find_matching_snapshot(index: dict, file_hash: str, data_hash: str):
# Priority 1: Exact file byte match
for sid, meta in index.items():
if meta.get("file_hash") == file_hash:
return sid
# Priority 2: Canonical data content match (handles renamed files)
for sid, meta in index.items():
if meta.get("data_hash") == data_hash:
return sid
return None
def save_snapshot_bundle(outdir: Path, snapshot_id: str, api_response: dict, src_path: Path, metadata: dict):
snapshot_dir = outdir / "snapshots" / snapshot_id
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Copy original file
shutil.copy2(src_path, snapshot_dir / src_path.name)
# Save metadata and responses
(snapshot_dir / "api_response.json").write_text(json.dumps(api_response, indent=2), encoding="utf-8")
(snapshot_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8")
# Generate and save preprocessed data and column stats
preprocessed_df = preprocess_dataframe_locally(src_path)
preprocessed_df.to_csv(snapshot_dir / "preprocessed.csv", index=False)
col_stats = generate_column_stats(preprocessed_df)
(snapshot_dir / "column_stats.json").write_text(json.dumps(col_stats, indent=2), encoding="utf-8")
log.info(f"Saved new snapshot bundle to {snapshot_dir}")
return snapshot_dir
# --- Data Processing & API Calls ---
def preprocess_dataframe_locally(file_path: Path) -> pd.DataFrame:
"""A simplified local preprocessing pipeline, mirroring the logic in the original script."""
df = load_first_sheet(file_path)
df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]
df = df.loc[:, df.isnull().mean() < 0.5]
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].fillna(df[col].median())
elif pd.api.types.is_datetime64_any_dtype(df[col]):
df[col] = df[col].fillna(pd.Timestamp("1970-01-01"))
else:
df[col] = df[col].fillna("Unknown")
if df[col].dtype == "object":
df[col] = pd.to_numeric(df[col], errors='ignore')
df = df.drop_duplicates().reset_index(drop=True)
return df
def generate_column_stats(df: pd.DataFrame) -> dict:
col_stats = {}
for col in df.columns:
series = df[col]
try:
col_stats[col] = {
"dtype": str(series.dtype),
"n_unique": int(series.nunique(dropna=True)),
"n_missing": int(series.isna().sum()),
"sample_values": [str(v) for v in series.dropna().head(5).unique()]
}
except Exception:
col_stats[col] = {"dtype": "unknown", "n_unique": None, "n_missing": None, "sample_values": []}
return col_stats
def post_file_to_endpoint(endpoint: str, file_path: Path, timeout: int = 300) -> dict:
log.info(f"Posting file {file_path.name} to {endpoint}")
with file_path.open("rb") as f:
files = {"file": (file_path.name, f)}
resp = requests.post(endpoint, files=files, timeout=timeout)
resp.raise_for_status()
log.info("API request successful.")
return resp.json()