| import pandas as pd |
| from typing import List, Tuple |
| from concurrent.futures import ThreadPoolExecutor |
| from utils import Trie |
| import time |
|
|
|
|
| class Algorithm: |
| def __init__(self, df: pd.DataFrame = None): |
| self.df = df |
|
|
| def reorder(self, df: pd.DataFrame) -> pd.DataFrame: |
| raise NotImplementedError("Subclasses should implement this!") |
|
|
| @staticmethod |
| def evaluate_df_prefix_hit_cnt(self, df: pd.DataFrame) -> int: |
| """ |
| Function to evaluate the prefix hit count of a DataFrame |
| """ |
|
|
| def max_overlap(trie, row_string): |
| return trie.longest_common_prefix(row_string) |
|
|
| trie = Trie() |
| total_prefix_hit_count = 0 |
|
|
| def process_row(index, row): |
| row_string = "".join(row.astype(str).values) |
| row_prefix_hit_count = max_overlap(trie, row_string) |
| trie.insert(row_string) |
| return row_prefix_hit_count |
|
|
| with ThreadPoolExecutor() as executor: |
| results = executor.map(process_row, df.index, [row for _, row in df.iterrows()]) |
|
|
| total_prefix_hit_count = sum(results) |
| return total_prefix_hit_count |
|
|
| @staticmethod |
| def evaluate_cell_hit_cnt(df: pd.DataFrame) -> int: |
| """ |
| Function to evaluate the prefix hit count of a DataFrame based on exact cell matching. |
| For a cell to be a hit, all previous cells in the row must also be hits. |
| """ |
|
|
| total_prefix_hit_count = 0 |
| seen_rows = set() |
|
|
| def process_row(index, row): |
| nonlocal seen_rows |
| prefix_hit_count = 0 |
| current_row_cache = [] |
|
|
| for col_value in row: |
| |
| current_row_cache.append(col_value) |
| if tuple(current_row_cache) in seen_rows: |
| prefix_hit_count += 1 |
| else: |
| break |
|
|
| seen_rows.add(tuple(row)) |
| return prefix_hit_count |
|
|
| |
| for _, row in df.iterrows(): |
| total_prefix_hit_count += process_row(_, row) |
|
|
| return total_prefix_hit_count |
|
|
| @staticmethod |
| def get_groups_values(df: pd.DataFrame): |
| """ |
| Function to get the value counts of a DataFrame |
| """ |
| if df.empty: |
| return {} |
| value_counts = df.stack().value_counts() |
| if value_counts.empty: |
| return {} |
| return value_counts |
|
|
| @staticmethod |
| def calculate_length(value): |
| val = 0 |
| if isinstance(value, bool): |
| val = 4 |
| elif isinstance(value, (int, float)): |
| val = len(str(value)) |
| elif isinstance(value, str): |
| val = len(value) |
| else: |
| val = 0 |
| return val**2 |
|
|
| @staticmethod |
| def drop_col(df: pd.DataFrame, col): |
| return df.drop(columns=[col]) |
|
|
| @staticmethod |
| def drop_rows(df: pd.DataFrame, rows): |
| return df.drop(index=rows) |
|
|
| @staticmethod |
| def merging_columns(df: pd.DataFrame, col_names: List[str], delimiter: str = "_", prepended: bool = False) -> pd.DataFrame: |
| if not all(col in df.columns for col in col_names): |
| raise ValueError("Column names not found in DataFrame") |
|
|
| |
| if len(set(df[col_names].nunique())) != 1: |
| raise ValueError(f"Columns to be merged {col_names}, do not have the same number of unique values: {df.nunique().sort_values()}") |
|
|
| merged_names = delimiter.join(col_names) |
| if prepended: |
| df[merged_names] = df[col_names].apply( |
| lambda x: merged_names + ": " + delimiter.join([val.split(": ", 1)[1] for col, val in zip(col_names, x)]), axis=1 |
| ) |
| else: |
| df[merged_names] = df[col_names].apply(lambda x: "".join([f"{val}" for val in x]), axis=1) |
| df = df.drop(columns=col_names) |
| return df |
|
|
| @staticmethod |
| def calculate_col_stats(df: pd.DataFrame, enable_index=False): |
| num_rows = len(df) |
| column_stats = [] |
| for col in df.columns: |
| if col == "original_index": |
| continue |
|
|
| num_groups = df[col].nunique() |
| if df[col].dtype == "object" or df[col].dtype == "string": |
| avg_length = df[col].astype(str).str.len().mean() |
| elif df[col].dtype == "bool": |
| avg_length = 4 |
| elif df[col].dtype in ["int64", "float64"]: |
| avg_length = df[col].astype(str).str.len().mean() |
| else: |
| avg_length = 0 |
|
|
| avg_length = avg_length**2 |
|
|
| if num_groups == 0: |
| score = 0 |
| else: |
| |
| avg_size_per_group = num_rows / num_groups |
| |
| score = avg_length * (avg_size_per_group - 1) |
|
|
| if num_rows == num_groups: |
| score = 0 |
| column_stats.append((col, num_groups, avg_length, score)) |
|
|
| |
| if enable_index and "original_index" in df.columns: |
| column_stats.append(("original_index", len(df), 0, 0)) |
|
|
| |
| column_stats.sort(key=lambda x: x[3], reverse=True) |
| return num_rows, column_stats |
|
|