File size: 5,835 Bytes
517cbd2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import pandas as pd
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor
from utils import Trie
import time
class Algorithm:
def __init__(self, df: pd.DataFrame = None):
self.df = df
def reorder(self, df: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError("Subclasses should implement this!")
@staticmethod
def evaluate_df_prefix_hit_cnt(self, df: pd.DataFrame) -> int:
"""
Function to evaluate the prefix hit count of a DataFrame
"""
def max_overlap(trie, row_string):
return trie.longest_common_prefix(row_string)
trie = Trie()
total_prefix_hit_count = 0
def process_row(index, row):
row_string = "".join(row.astype(str).values) # No spaces between columns
row_prefix_hit_count = max_overlap(trie, row_string)
trie.insert(row_string)
return row_prefix_hit_count
with ThreadPoolExecutor() as executor:
results = executor.map(process_row, df.index, [row for _, row in df.iterrows()])
total_prefix_hit_count = sum(results)
return total_prefix_hit_count
@staticmethod
def evaluate_cell_hit_cnt(df: pd.DataFrame) -> int:
"""
Function to evaluate the prefix hit count of a DataFrame based on exact cell matching.
For a cell to be a hit, all previous cells in the row must also be hits.
"""
total_prefix_hit_count = 0
seen_rows = set() # Cache of fully processed rows
def process_row(index, row):
nonlocal seen_rows
prefix_hit_count = 0
current_row_cache = []
for col_value in row:
# Check if adding this cell matches exactly with prior cache
current_row_cache.append(col_value)
if tuple(current_row_cache) in seen_rows:
prefix_hit_count += 1
else:
break # Stop counting hits if any cell isn't in the cache
seen_rows.add(tuple(row)) # Add the fully processed row to cache
return prefix_hit_count
# Process each row sequentially (row-to-row comparison for hits)
for _, row in df.iterrows():
total_prefix_hit_count += process_row(_, row)
return total_prefix_hit_count
@staticmethod
def get_groups_values(df: pd.DataFrame):
"""
Function to get the value counts of a DataFrame
"""
if df.empty:
return {}
value_counts = df.stack().value_counts()
if value_counts.empty:
return {}
return value_counts
@staticmethod
def calculate_length(value):
val = 0
if isinstance(value, bool):
val = 4 # length of 'True' or 'False'
elif isinstance(value, (int, float)):
val = len(str(value))
elif isinstance(value, str):
val = len(value)
else:
val = 0
return val**2
@staticmethod
def drop_col(df: pd.DataFrame, col):
return df.drop(columns=[col])
@staticmethod
def drop_rows(df: pd.DataFrame, rows):
return df.drop(index=rows)
@staticmethod
def merging_columns(df: pd.DataFrame, col_names: List[str], delimiter: str = "_", prepended: bool = False) -> pd.DataFrame:
if not all(col in df.columns for col in col_names):
raise ValueError("Column names not found in DataFrame")
# before merging, check that each column to be merged has the same number of unique values
if len(set(df[col_names].nunique())) != 1:
raise ValueError(f"Columns to be merged {col_names}, do not have the same number of unique values: {df.nunique().sort_values()}")
merged_names = delimiter.join(col_names)
if prepended:
df[merged_names] = df[col_names].apply(
lambda x: merged_names + ": " + delimiter.join([val.split(": ", 1)[1] for col, val in zip(col_names, x)]), axis=1
)
else:
df[merged_names] = df[col_names].apply(lambda x: "".join([f"{val}" for val in x]), axis=1)
df = df.drop(columns=col_names)
return df
@staticmethod
def calculate_col_stats(df: pd.DataFrame, enable_index=False):
num_rows = len(df)
column_stats = []
for col in df.columns:
if col == "original_index":
continue
num_groups = df[col].nunique()
if df[col].dtype == "object" or df[col].dtype == "string":
avg_length = df[col].astype(str).str.len().mean()
elif df[col].dtype == "bool":
avg_length = 4 # Assuming 'True' or 'False' as average length
elif df[col].dtype in ["int64", "float64"]:
avg_length = df[col].astype(str).str.len().mean()
else:
avg_length = 0
avg_length = avg_length**2
if num_groups == 0:
score = 0
else:
# Average size per group: number of rows in each group
avg_size_per_group = num_rows / num_groups
# score = avg_size_per_group * avg_length
score = avg_length * (avg_size_per_group - 1)
if num_rows == num_groups: # no sharing at all
score = 0
column_stats.append((col, num_groups, avg_length, score))
# original_index all distinct values, so give lowest score
if enable_index and "original_index" in df.columns:
column_stats.append(("original_index", len(df), 0, 0))
# Sort the columns based on the score
column_stats.sort(key=lambda x: x[3], reverse=True)
return num_rows, column_stats
|