sky2 / benchmarks /ADRS /llm_sql /initial_program.py
JustinTX's picture
Add files using upload-large-folder tool
b0e88cf verified
# EVOLVE-BLOCK-START
import pandas as pd
from solver import Algorithm
from typing import Tuple, List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from collections import Counter
import networkx as nx
class Evolved(Algorithm):
"""
GGR algorithm
"""
def __init__(self, df: pd.DataFrame = None):
self.df = df
self.dep_graph = None # NOTE: not used, for one way dependency
self.num_rows = 0
self.num_cols = 0
self.column_stats = None
self.val_len = None
self.row_stop = None
self.col_stop = None
self.base = 2000
def find_max_group_value(self, df: pd.DataFrame, value_counts: Dict, early_stop: int = 0) -> str:
# NOTE: recalculate value counts and length for each value
value_counts = Counter(df.stack())
weighted_counts = {val: self.val_len[val] * (count - 1) for val, count in value_counts.items()} # if count > 1
if not weighted_counts:
return None
max_group_val, max_weighted_count = max(weighted_counts.items(), key=lambda x: x[1])
if max_weighted_count < early_stop:
return None
return max_group_val
def reorder_columns_for_value(self, row, value, column_names, grouped_rows_len: int = 1):
# cols_with_value will now use attribute access instead of indexing with row[]
cols_with_value = []
for idx, col in enumerate(column_names):
if hasattr(row, col) and getattr(row, col) == value:
cols_with_value.append(col)
elif hasattr(row, col.replace(" ", "_")) and getattr(row, col.replace(" ", "_")) == value:
cols_with_value.append(col)
else:
attr_name = f"_{idx}"
if hasattr(row, attr_name) and getattr(row, attr_name) == value:
cols_with_value.append(attr_name)
if self.dep_graph is not None and grouped_rows_len > 1:
# NOTE: experimental
reordered_cols = []
for col in cols_with_value:
dependent_cols = self.get_dependent_columns(col)
# check if dependent columns are in row, and if column exists in row attributes
valid_dependent_cols = []
for idx, dep_col in enumerate(dependent_cols):
if hasattr(row, dep_col):
valid_dependent_cols.append(dep_col)
elif hasattr(row, dep_col.replace(" ", "_")):
valid_dependent_cols.append(dep_col)
else:
attr_name = f"_{idx}"
if hasattr(row, attr_name):
valid_dependent_cols.append(dep_col)
reordered_cols.extend([col] + valid_dependent_cols)
cols_without_value = [col for col in column_names if col not in reordered_cols]
reordered_cols.extend(cols_without_value)
assert len(reordered_cols) == len(
column_names
), f"Reordered cols len: {len(reordered_cols)} Original cols len: {len(column_names)}"
return [getattr(row, col) for col in reordered_cols], cols_with_value
else:
cols_without_value = []
for idx, col in enumerate(column_names):
if hasattr(row, col) and getattr(row, col) != value:
cols_without_value.append(col)
elif hasattr(row, col.replace(" ", "_")) and getattr(row, col.replace(" ", "_")) != value:
cols_without_value.append(col)
else:
# Handle some edge cases
attr_name = f"_{idx}"
if hasattr(row, attr_name) and getattr(row, attr_name) != value:
cols_without_value.append(attr_name)
reordered_cols = cols_with_value + cols_without_value
assert len(reordered_cols) == len(
column_names
), f"Reordered cols len: {len(reordered_cols)} Original cols len: {len(column_names)}"
return [getattr(row, col) for col in reordered_cols], cols_with_value
def get_dependent_columns(self, col: str) -> List[str]:
if self.dep_graph is None or not self.dep_graph.has_node(col):
return []
return list(nx.descendants(self.dep_graph, col))
@lru_cache(maxsize=None)
def get_cached_dependent_columns(self, col: str) -> List[str]:
return self.get_dependent_columns(col)
def fixed_reorder(self, df: pd.DataFrame, row_sort: bool = True) -> Tuple[pd.DataFrame, List[List[str]]]:
num_rows, column_stats = self.calculate_col_stats(df, enable_index=True)
reordered_columns = [col for col, _, _, _ in column_stats]
reordered_df = df[reordered_columns]
assert reordered_df.shape == df.shape
column_orderings = [reordered_columns] * num_rows
if row_sort:
reordered_df = reordered_df.sort_values(by=reordered_columns, axis=0)
return reordered_df, column_orderings
def column_recursion(self, result_df, max_value, grouped_rows, row_stop, col_stop, early_stop):
cols_settled = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.reorder_columns_for_value, row, max_value, grouped_rows.columns.tolist(), len(grouped_rows))
for row in grouped_rows.itertuples(index=False)
]
for i, future in enumerate(as_completed(futures)):
reordered_row, cols_settled = future.result()
result_df.loc[i] = reordered_row
grouped_value_counts = Counter()
if not result_df.empty:
# Group by the first column
grouped_result_df = result_df.groupby(result_df.columns[0])
grouped_value_counts = Counter(grouped_rows.stack()) # this is still faster than updating from cached value counts
for _, group in grouped_result_df:
if group[group.columns[0]].iloc[0] != max_value:
continue
dependent_cols = self.get_cached_dependent_columns(group.columns[0])
length_of_settle_cols = len(cols_settled)
if dependent_cols:
assert length_of_settle_cols >= 1, f"Dependent columns should be no less than 1, but got {length_of_settle_cols}"
# test the first length_of_settle_cols columns, each column has nunique == 1
for col in group.columns[:length_of_settle_cols]:
assert group[col].nunique() == 1, f"Column {col} should have nunique == 1, but got {group[col].nunique()}"
# drop all the settled columns and reorder the rest
group_remainder = group.iloc[:, length_of_settle_cols:]
else:
group_remainder = group.iloc[:, 1:]
grouped_remainder_value_counts = Counter(group_remainder.stack())
reordered_group_remainder, _ = self.recursive_reorder(
group_remainder, grouped_remainder_value_counts, early_stop=early_stop, row_stop=row_stop, col_stop=col_stop + 1
)
# Update the group with the reordered columns
if dependent_cols:
group.iloc[:, length_of_settle_cols:] = reordered_group_remainder.values
else:
group.iloc[:, 1:] = reordered_group_remainder.values
result_df.update(group)
break
return result_df, grouped_value_counts
def recursive_reorder(
self,
df: pd.DataFrame,
value_counts: Dict,
early_stop: int = 0,
original_columns: List[str] = None,
row_stop: int = 0,
col_stop: int = 0,
) -> Tuple[pd.DataFrame, List[List[str]]]:
if df.empty or len(df.columns) == 0 or len(df) == 0:
return df, []
if self.row_stop is not None and row_stop >= self.row_stop:
return self.fixed_reorder(df)
if self.col_stop is not None and col_stop >= self.col_stop:
return self.fixed_reorder(df)
if original_columns is None:
original_columns = df.columns.tolist()
# Find the max group value using updated counts
max_value = self.find_max_group_value(df, value_counts, early_stop=early_stop)
if max_value is None:
# If there is no max value, then fall back to fixed reorder
return self.fixed_reorder(df)
grouped_rows = df[df.isin([max_value]).any(axis=1)]
remaining_rows = df[~df.isin([max_value]).any(axis=1)]
# If there is no grouped rows, return the original DataFrame
if grouped_rows.empty:
return self.fixed_reorder(df)
result_df = pd.DataFrame(columns=df.columns)
reordered_remaining_rows = pd.DataFrame(columns=df.columns) # Initialize empty dataframe first
# Column Recursion
result_df, grouped_value_counts = self.column_recursion(result_df, max_value, grouped_rows, row_stop, col_stop, early_stop)
remaining_value_counts = value_counts - grouped_value_counts # Approach 1 - update remaining value counts with subtraction
# Row Recursion
reordered_remaining_rows, _ = self.recursive_reorder(
remaining_rows, remaining_value_counts, early_stop=early_stop, row_stop=row_stop + 1, col_stop=col_stop
)
old_column_names = result_df.columns.tolist()
result_cols_reset = result_df.reset_index(drop=True)
result_rows_reset = reordered_remaining_rows.reset_index(drop=True)
final_result_df = pd.DataFrame(result_cols_reset.values.tolist() + result_rows_reset.values.tolist())
if row_stop == 0 and col_stop == 0:
final_result_df.columns = old_column_names
final_result_df.columns = final_result_df.columns.tolist()[:-1] + ["original_index"]
return final_result_df, []
def recursive_split_and_reorder(self, df: pd.DataFrame, original_columns: List[str] = None, early_stop: int = 0):
"""
Recursively split the DataFrame into halves until the size is <= 1000, then apply the recursive reorder function.
"""
if len(df) <= self.base:
initial_value_counts = Counter(df.stack())
return self.recursive_reorder(df, initial_value_counts, early_stop, original_columns, row_stop=0, col_stop=0)[0]
mid_index = len(df) // 2
df_top_half = df.iloc[:mid_index]
df_bottom_half = df.iloc[mid_index:]
with ThreadPoolExecutor() as executor:
future_top = executor.submit(self.recursive_split_and_reorder, df_top_half, original_columns, early_stop)
future_bottom = executor.submit(self.recursive_split_and_reorder, df_bottom_half, original_columns, early_stop)
reordered_top_half = future_top.result()
reordered_bottom_half = future_bottom.result()
assert reordered_bottom_half.shape == df_bottom_half.shape
reordered_df = pd.concat([reordered_top_half, reordered_bottom_half], axis=0, ignore_index=True)
assert reordered_df.shape == df.shape
return reordered_df
@lru_cache(maxsize=None)
def calculate_length(self, value):
if isinstance(value, bool):
return 4**2
if isinstance(value, (int, float)):
return len(str(value)) ** 2
if isinstance(value, str):
return len(value) ** 2
return 0
def reorder(
self,
df: pd.DataFrame,
early_stop: int = 0,
row_stop: int = None,
col_stop: int = None,
col_merge: List[List[str]] = [],
one_way_dep: List[Tuple[str, str]] = [],
distinct_value_threshold: float = 0.8,
parallel: bool = True,
) -> Tuple[pd.DataFrame, List[List[str]]]:
# Prepare
initial_df = df.copy()
if col_merge:
self.num_rows, self.column_stats = self.calculate_col_stats(df, enable_index=True)
reordered_columns = [col for col, _, _, _ in self.column_stats]
for col_to_merge in col_merge:
final_col_order = [col for col in reordered_columns if col in col_to_merge]
df = self.merging_columns(df, final_col_order, prepended=False)
self.num_rows, self.column_stats = self.calculate_col_stats(df, enable_index=True)
self.column_stats = {col: (num_groups, avg_len, score) for col, num_groups, avg_len, score in self.column_stats}
# One way dependency statistics [not used]
if one_way_dep is not None and len(one_way_dep) > 0:
self.dep_graph = nx.DiGraph()
for dep in one_way_dep:
col1 = [col for col in df.columns if dep[0] in col]
col2 = [col for col in df.columns if dep[1] in col]
assert len(col1) == 1, f"Expected one column to match {dep[0]}, but got {len(col1)}"
assert len(col2) == 1, f"Expected one column to match {dep[1]}, but got {len(col2)}"
col1 = col1[0]
col2 = col2[0]
self.dep_graph.add_edge(col1, col2)
# Discard too distinct columns by threshold [optional]
nunique_threshold = len(df) * distinct_value_threshold
columns_to_discard = [col for col in df.columns if df[col].nunique() > nunique_threshold]
columns_to_discard = sorted(columns_to_discard, key=lambda x: self.column_stats[x][2], reverse=True)
columns_to_recurse = [col for col in df.columns if col not in columns_to_discard]
df["original_index"] = range(len(df))
discarded_columns_df = df[columns_to_discard + ["original_index"]]
df_to_recurse = df[columns_to_recurse + ["original_index"]]
recurse_df = df_to_recurse
self.column_stats = {col: stats for col, stats in self.column_stats.items() if col not in columns_to_discard}
initial_value_counts = Counter(recurse_df.stack())
self.val_len = {val: self.calculate_length(val) for val in initial_value_counts.keys()}
self.row_stop = row_stop if row_stop else len(recurse_df)
self.col_stop = col_stop if col_stop else len(recurse_df.columns.tolist())
print("*" * 80)
print(f"DF columns = {df.columns}")
# print(f"Early stop = {early_stop}")
# print(f"Row recursion stop depth = {self.row_stop}, Column recursion stop depth = {self.col_stop}")
print("*" * 80)
# Eary stop and fall back
recurse_df, _ = self.fixed_reorder(recurse_df)
# Recursive reordering
self.num_cols = len(recurse_df.columns)
if parallel:
reordered_df = self.recursive_split_and_reorder(recurse_df, original_columns=columns_to_recurse, early_stop=early_stop)
else:
reordered_df, _ = self.recursive_reorder(
recurse_df,
initial_value_counts,
early_stop=early_stop,
)
assert (
reordered_df.shape == recurse_df.shape
), f"Reordered DataFrame shape {reordered_df.shape} does not match original DataFrame shape {recurse_df.shape}"
assert recurse_df["original_index"].is_unique, "Passed in recurse index contains duplicates!"
assert reordered_df["original_index"].is_unique, "Reordered index contains duplicates!"
if len(columns_to_discard) > 0:
final_df = pd.merge(reordered_df, discarded_columns_df, on="original_index", how="left")
else:
final_df = reordered_df
final_df = final_df.drop(columns=["original_index"])
if not col_merge:
assert (
final_df.shape == initial_df.shape
), f"Final DataFrame shape {final_df.shape} does not match original DataFrame shape {initial_df.shape}"
else:
assert (
final_df.shape[0] == initial_df.shape[0]
), f"Final DataFrame shape {final_df.shape} does not match original DataFrame shape {initial_df.shape}"
assert (
final_df.shape[1] == recurse_df.shape[1] + len(columns_to_discard) - 1
), f"Final DataFrame shape {final_df.shape} does not match original DataFrame shape {recurse_df.shape}"
# sort by the first column to get the final order
final_df = final_df.sort_values(by=final_df.columns.to_list(), axis=0)
return final_df, []
# EVOLVE-BLOCK-END