| import logging |
| import os |
| import random |
| import sys |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| from scipy.stats import entropy |
|
|
| __all__ = ["compare_dataframes"] |
| TEMP_TARGET = "_temp_target" |
|
|
|
|
| def setup_logging(loglevel): |
| """Setup basic logging |
| |
| Args: |
| loglevel (int): minimum loglevel for emitting messages |
| """ |
| logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" |
| logging.basicConfig( |
| level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" |
| ) |
|
|
|
|
| def make_two_digit(num_as_str: str) -> str: |
| if len(num_as_str) < 2: |
| return "0" + num_as_str |
| return num_as_str |
|
|
|
|
| def get_year_mnth_dt_from_date(df: pd.DataFrame, date_col="Date") -> pd.DataFrame: |
| """ |
| Extracts year, month, and day from a date column in a pandas DataFrame. |
| |
| Args: |
| df (pd.DataFrame): Input DataFrame. |
| date_col (str): Name of the date column. |
| |
| Returns: |
| pd.DataFrame: DataFrame with year, month, and day columns added. |
| """ |
| df[date_col] = pd.to_datetime(df[date_col]) |
| df["year"] = df[date_col].dt.year |
| df["month"] = df[date_col].dt.month |
| df["day"] = df[date_col].dt.day |
| return df |
|
|
|
|
| def collect_dates(df: pd.DataFrame) -> pd.DataFrame: |
| df["Date"] = df["year"].astype(str) + "-" \ |
| + df["month"].astype(str).apply(make_two_digit) + "-" \ |
| + df["day"].astype(str).apply(make_two_digit) |
| df.drop(["year", "month", "day"], axis=1, inplace=True) |
| return df |
|
|
|
|
| def seed_everything(seed=1234): |
| random.seed(seed) |
| os.environ["PYTHONHASHSEED"] = str(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| torch.backends.cudnn.deterministic = True |
|
|
|
|
| def _sampler(creator, in_train, in_target, in_test) -> tuple: |
| _logger = logging.getLogger(__name__) |
| _logger.info("Starting generating data") |
| train, test = creator.generate_data_pipe(in_train, in_target, in_test) |
| _logger.info(f"Train Data: {train}\nTest Data: {test}") |
| _logger.info("Finished generation\n") |
| return train, test |
|
|
|
|
| def _drop_col_if_exist(df, col_to_drop) -> pd.DataFrame: |
| """Drops col_to_drop from input dataframe df if such column exists""" |
| if col_to_drop in df.columns: |
| return df.drop(col_to_drop, axis=1) |
| else: |
| return df |
|
|
|
|
| def get_columns_if_exists(df, col) -> pd.DataFrame: |
| if col in df.columns: |
| return df[col] |
| else: |
| return None |
|
|
|
|
| def calculate_psi(expected, actual, buckettype="bins", buckets=10, axis=0): |
| """Calculate the PSI (population stability index) across all variables |
| |
| Args: |
| expected: numpy matrix of original values |
| actual: numpy matrix of new values |
| buckettype: type of strategy for creating buckets, bins splits into even splits, quantiles splits into quantile buckets |
| buckets: number of quantiles to use in bucketing variables |
| axis: axis by which variables are defined, 0 for vertical, 1 for horizontal |
| |
| Returns: |
| psi_values: ndarray of psi values for each variable |
| |
| Author: |
| Matthew Burke |
| github.com/mwburke |
| mwburke.github.io.com |
| """ |
|
|
| def psi(expected_array, actual_array, buckets): |
| """Calculate the PSI for a single variable |
| |
| Args: |
| expected_array: numpy array of original values |
| actual_array: numpy array of new values, same size as expected |
| buckets: number of percentile ranges to bucket the values into |
| |
| Returns: |
| psi_value: calculated PSI value |
| """ |
|
|
| def scale_range(input_val, min_val, max_val): |
| input_val += -(np.min(input_val)) |
| input_val /= np.max(input_val) / (max_val - min_val) |
| input_val += min_val |
| return input_val |
|
|
| breakpoints = np.arange(0, buckets + 1) / buckets * 100 |
|
|
| if buckettype == "bins": |
| breakpoints = scale_range(breakpoints, np.min(expected_array), np.max(expected_array)) |
| elif buckettype == "quantiles": |
| breakpoints = np.stack([np.percentile(expected_array, b) for b in breakpoints]) |
|
|
| expected_fractions = np.histogram(expected_array, breakpoints)[0] / len(expected_array) |
| actual_fractions = np.histogram(actual_array, breakpoints)[0] / len(actual_array) |
|
|
| def sub_psi(e_perc, a_perc): |
| '''Calculate the actual PSI value from comparing the values. |
| Update the actual value to a very small number if equal to zero |
| ''' |
| if a_perc == 0: |
| a_perc = 0.0001 |
| if e_perc == 0: |
| e_perc = 0.0001 |
|
|
| value = (e_perc - a_perc) * np.log(e_perc / a_perc) |
| return (value) |
|
|
| psi_value = sum(sub_psi(expected_fractions[i], actual_fractions[i]) for i in range(0, len(expected_fractions))) |
|
|
| return (psi_value) |
|
|
| if len(expected.shape) == 1: |
| psi_values = np.empty(len(expected.shape)) |
| else: |
| psi_values = np.empty(expected.shape[1 - axis]) |
|
|
| for i in range(0, len(psi_values)): |
| if len(psi_values) == 1: |
| try: |
| psi_values = psi(expected, actual, buckets) |
| except (ValueError, ZeroDivisionError): |
| psi_values = 0.9 |
| elif axis == 0: |
| psi_values[i] = psi(expected[:, i], actual[:, i], buckets) |
| elif axis == 1: |
| psi_values[i] = psi(expected[i, :], actual[i, :], buckets) |
|
|
| return psi_values |
|
|
|
|
| def compare_dataframes(df_original, df_generated): |
| """ |
| Compares two DataFrames for similarity in terms of uniqueness, data quality, and PSI. |
| |
| Args: |
| df_original: The original DataFrame. |
| df_generated: The DataFrame with generated numbers. |
| |
| Returns: |
| float: A score between 0 (no similarity) and 1 (high similarity) representing the similarity of the two |
| DataFrames. |
| |
| # Example usage |
| df1 = pd.DataFrame({"col1": [1, 2, 3, 4], "col2": ["a", "b", "a", "c"]}) |
| df2 = pd.DataFrame({"col1": [1, 2, 5, 6], "col2": ["a", "b", "x", "y"]}) |
| |
| similarity_score = compare_dataframes(df1.copy(), df2.copy()) |
| print(similarity_score) |
| """ |
| |
| if len(df_original.columns) != len(df_generated.columns): |
| |
| return 0.0 |
| |
| n_original = len(df_original) |
| n_generated = len(df_generated) |
| min_rows = min(n_original, n_generated) |
|
|
| |
| uniq_original = df_original.nunique().sum() / (len(df_original.columns) + 1e-6) |
| uniq_generated = df_generated.nunique().sum() / (len(df_generated.columns) + 1e-6) |
| uniqueness_score = (uniq_generated / uniq_original) * (min_rows / n_generated) |
|
|
| |
| data_quality_scores = [] |
| for col in df_original.columns: |
| if col in df_generated.columns: |
| |
| if pd.api.types.is_numeric_dtype(df_original[col]) and pd.api.types.is_numeric_dtype(df_generated[col]): |
| _, p_value = df_original[col].value_counts().sort_index(ascending=False).diff().dropna().abs().sum() / ( |
| n_original + 1e-6), \ |
| df_generated[col].value_counts().sort_index( |
| ascending=False).diff().dropna().abs().sum() / (n_generated + 1e-6) |
| |
| p_value = max(p_value, 1e-6) |
| data_quality_scores.append(p_value) |
| data_quality_score = sum(data_quality_scores) / len(data_quality_scores) if data_quality_scores else 1 |
|
|
| |
| psi_scores = [] |
| for col_orig in df_original.columns: |
| if col_orig in df_generated.columns: |
| psi_scores.append(calculate_psi(df_original[col_orig], df_generated[col_orig], |
| buckets=10)) |
| psi_similarity = sum(psi_scores) / len(psi_scores) if psi_scores else 1 |
|
|
| |
| similarity_score = 0.1 * uniqueness_score + 0.45 * data_quality_score + 0.45 * (1/psi_similarity) |
| logging.debug(f"Similarity components: uniqueness={uniqueness_score}, quality={data_quality_score}, psi={psi_similarity}") |
| |
| similarity_score = min(max(similarity_score, 0), 1) |
|
|
| return similarity_score |
|
|