File size: 5,612 Bytes
ef75f57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# snapshot_logic.py
import hashlib
import json
import logging
import shutil
import uuid
from datetime import datetime
from pathlib import Path

import pandas as pd
import requests

# Configure logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

CHUNK_SIZE = 65536
ALLOWED_EXT = {".csv", ".xls", ".xlsx"}

# --- Fingerprinting and Data Loading ---

def sha256_of_file(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            chunk = f.read(CHUNK_SIZE)
            if not chunk:
                break
            h.update(chunk)
    return h.hexdigest()

def load_first_sheet(path: Path) -> pd.DataFrame:
    ext = path.suffix.lower()
    if ext == ".csv":
        return pd.read_csv(path)
    elif ext in {".xls", ".xlsx"}:
        return pd.read_excel(path, sheet_name=0, engine='openpyxl')
    else:
        raise ValueError(f"Unsupported file extension: {ext}")

def canonicalize_df_for_fingerprint(df: pd.DataFrame) -> bytes:
    df2 = df.copy()
    df2.columns = [str(c) for c in df2.columns]
    df2 = df2.astype(str).fillna("")
    df2 = df2.reindex(sorted(df2.columns), axis=1)
    try:
        df2 = df2.sort_values(by=list(df2.columns), kind="mergesort").reset_index(drop=True)
    except Exception:
        df2 = df2.reset_index(drop=True)
    return df2.to_csv(index=False).encode("utf-8")

def fingerprint_from_file(path: Path) -> dict:
    file_hash = sha256_of_file(path)
    df = load_first_sheet(path)
    csv_bytes = canonicalize_df_for_fingerprint(df)
    df_hash = hashlib.sha256(csv_bytes).hexdigest()
    rows, cols = df.shape
    colnames = [str(c) for c in df.columns]
    return {
        "file_hash": file_hash,
        "data_hash": df_hash,
        "rows": int(rows),
        "cols": int(cols),
        "colnames": colnames,
    }

# --- Snapshot Management ---

def ensure_outdir(outdir: Path):
    (outdir / "snapshots").mkdir(parents=True, exist_ok=True)

def load_index(outdir: Path) -> dict:
    idx_path = outdir / "index.json"
    if idx_path.exists():
        try:
            return json.loads(idx_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, IOError):
            return {}
    return {}

def save_index(outdir: Path, index: dict):
    idx_path = outdir / "index.json"
    idx_path.write_text(json.dumps(index, indent=2), encoding="utf-8")

def find_matching_snapshot(index: dict, file_hash: str, data_hash: str):
    # Priority 1: Exact file byte match
    for sid, meta in index.items():
        if meta.get("file_hash") == file_hash:
            return sid
    # Priority 2: Canonical data content match (handles renamed files)
    for sid, meta in index.items():
        if meta.get("data_hash") == data_hash:
            return sid
    return None

def save_snapshot_bundle(outdir: Path, snapshot_id: str, api_response: dict, src_path: Path, metadata: dict):
    snapshot_dir = outdir / "snapshots" / snapshot_id
    snapshot_dir.mkdir(parents=True, exist_ok=True)

    # Copy original file
    shutil.copy2(src_path, snapshot_dir / src_path.name)

    # Save metadata and responses
    (snapshot_dir / "api_response.json").write_text(json.dumps(api_response, indent=2), encoding="utf-8")
    (snapshot_dir / "metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8")

    # Generate and save preprocessed data and column stats
    preprocessed_df = preprocess_dataframe_locally(src_path)
    preprocessed_df.to_csv(snapshot_dir / "preprocessed.csv", index=False)
    
    col_stats = generate_column_stats(preprocessed_df)
    (snapshot_dir / "column_stats.json").write_text(json.dumps(col_stats, indent=2), encoding="utf-8")
    
    log.info(f"Saved new snapshot bundle to {snapshot_dir}")
    return snapshot_dir

# --- Data Processing & API Calls ---

def preprocess_dataframe_locally(file_path: Path) -> pd.DataFrame:
    """A simplified local preprocessing pipeline, mirroring the logic in the original script."""
    df = load_first_sheet(file_path)
    df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]
    df = df.loc[:, df.isnull().mean() < 0.5]
    for col in df.columns:
        if pd.api.types.is_numeric_dtype(df[col]):
            df[col] = df[col].fillna(df[col].median())
        elif pd.api.types.is_datetime64_any_dtype(df[col]):
            df[col] = df[col].fillna(pd.Timestamp("1970-01-01"))
        else:
            df[col] = df[col].fillna("Unknown")
        if df[col].dtype == "object":
            df[col] = pd.to_numeric(df[col], errors='ignore')
    df = df.drop_duplicates().reset_index(drop=True)
    return df

def generate_column_stats(df: pd.DataFrame) -> dict:
    col_stats = {}
    for col in df.columns:
        series = df[col]
        try:
            col_stats[col] = {
                "dtype": str(series.dtype),
                "n_unique": int(series.nunique(dropna=True)),
                "n_missing": int(series.isna().sum()),
                "sample_values": [str(v) for v in series.dropna().head(5).unique()]
            }
        except Exception:
            col_stats[col] = {"dtype": "unknown", "n_unique": None, "n_missing": None, "sample_values": []}
    return col_stats

def post_file_to_endpoint(endpoint: str, file_path: Path, timeout: int = 300) -> dict:
    log.info(f"Posting file {file_path.name} to {endpoint}")
    with file_path.open("rb") as f:
        files = {"file": (file_path.name, f)}
        resp = requests.post(endpoint, files=files, timeout=timeout)
    resp.raise_for_status()
    log.info("API request successful.")
    return resp.json()