Spaces:
Paused
Paused
Update aggregations and debug script
Browse files- multistats.py +34 -23
- opendashboards/utils/aggregate.py +4 -5
multistats.py
CHANGED
|
@@ -3,7 +3,7 @@ import re
|
|
| 3 |
import argparse
|
| 4 |
import tqdm
|
| 5 |
import wandb
|
| 6 |
-
|
| 7 |
import plotly.express as px
|
| 8 |
import pandas as pd
|
| 9 |
from concurrent.futures import ProcessPoolExecutor
|
|
@@ -30,14 +30,14 @@ def pull_wandb_runs(project='openvalidators', filters=None, min_steps=50, max_st
|
|
| 30 |
summary = run.summary
|
| 31 |
if summary_filters is not None and not summary_filters(summary):
|
| 32 |
continue
|
| 33 |
-
if netuid is not None and
|
| 34 |
continue
|
| 35 |
step = summary.get('_step',0)
|
| 36 |
if step < min_steps or step > max_steps:
|
| 37 |
# warnings.warn(f'Skipped run `{run.name}` because it contains {step} events (<{min_steps})')
|
| 38 |
continue
|
| 39 |
|
| 40 |
-
prog_msg = f'Loading data {
|
| 41 |
pbar.set_description(f'{prog_msg}... **fetching** `{run.name}`')
|
| 42 |
|
| 43 |
duration = summary.get('_runtime')
|
|
@@ -108,13 +108,14 @@ def load_data(run_id, run_path=None, load=True, save=False, explode=True):
|
|
| 108 |
df = df.loc[df.step_length.notna()]
|
| 109 |
|
| 110 |
# detect list columns which as stored as strings
|
| 111 |
-
|
|
|
|
| 112 |
# convert string representation of list to list
|
| 113 |
# df[list_cols] = df[list_cols].apply(lambda x: eval(x, {'__builtins__': None}) if pd.notna(x) else x)
|
| 114 |
try:
|
| 115 |
-
df[list_cols] = df[list_cols].applymap(eval, na_action='ignore')
|
| 116 |
except ValueError as e:
|
| 117 |
-
print(f'Error loading {file_path!r} when converting columns {list_cols} to list: {e}')
|
| 118 |
|
| 119 |
else:
|
| 120 |
# Download the history from wandb and add metadata
|
|
@@ -152,37 +153,43 @@ def calculate_stats(df_long, freq='H', save_path=None, ntop=3 ):
|
|
| 152 |
])
|
| 153 |
df_long = df_schema.reset_index()
|
| 154 |
|
| 155 |
-
|
| 156 |
-
print(f'Calculating stats for dataframe with shape {df_long.shape}')
|
| 157 |
|
| 158 |
# Approximate number of tokens in each completion
|
| 159 |
-
df_long['completion_num_tokens'] = (df_long['completions'].str.split().str.len() / 0.75).round()
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
g = df_long.groupby([pd.Grouper(key='_timestamp', axis=0, freq=freq), 'run_id'])
|
| 163 |
|
| 164 |
# TODO: use named aggregations
|
| 165 |
reward_aggs = ['sum','mean','std','median','max',aggregate.nonzero_rate, aggregate.nonzero_mean, aggregate.nonzero_std, aggregate.nonzero_median]
|
| 166 |
aggs = {
|
| 167 |
'completions': ['nunique','count', aggregate.diversity, aggregate.successful_diversity, aggregate.success_rate],
|
| 168 |
'completion_num_tokens': ['mean', 'std', 'median', 'max'],
|
| 169 |
-
**{k: reward_aggs for k in df_long.filter(regex='reward')}
|
| 170 |
}
|
| 171 |
|
| 172 |
# Calculate tokens per second
|
| 173 |
if 'completion_times' in df_long.columns:
|
| 174 |
-
df_long['tokens_per_sec'] = df_long['completion_num_tokens']/df_long['completion_times']
|
| 175 |
aggs.update({
|
| 176 |
'completion_times': ['mean','std','median','min','max'],
|
| 177 |
'tokens_per_sec': ['mean','std','median','max'],
|
| 178 |
})
|
| 179 |
|
| 180 |
-
|
| 181 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 182 |
# flatten multiindex columns
|
| 183 |
-
stats.columns = ['_'.join(c) for c in stats.columns]
|
| 184 |
-
stats = stats.reset_index()
|
| 185 |
-
|
| 186 |
if save_path:
|
| 187 |
stats.to_csv(save_path, index=False)
|
| 188 |
|
|
@@ -212,7 +219,8 @@ def process(run, load=True, save=False, load_stats=True, freq='H', ntop=3):
|
|
| 212 |
return calculate_stats(df_long, freq=freq, save_path=stats_path, ntop=ntop)
|
| 213 |
|
| 214 |
except Exception as e:
|
| 215 |
-
print(f'Error processing run {run["run_id"]}: {
|
|
|
|
| 216 |
|
| 217 |
def line_chart(df, col, title=None):
|
| 218 |
title = title or col.replace('_',' ').title()
|
|
@@ -299,11 +307,12 @@ if __name__ == '__main__':
|
|
| 299 |
result = future.result()
|
| 300 |
results.append(result)
|
| 301 |
except Exception as e:
|
| 302 |
-
print(f'generated an exception: {
|
| 303 |
pbar.update(1)
|
| 304 |
|
| 305 |
if not results:
|
| 306 |
raise ValueError('No runs were successfully processed.')
|
|
|
|
| 307 |
|
| 308 |
# Concatenate the results into a single dataframe
|
| 309 |
df = pd.concat(results, ignore_index=True).sort_values(['_timestamp','run_id'], ignore_index=True)
|
|
@@ -312,6 +321,8 @@ if __name__ == '__main__':
|
|
| 312 |
print(f'Saved {df.shape[0]} rows to data/processed.csv')
|
| 313 |
|
| 314 |
display(df)
|
|
|
|
|
|
|
| 315 |
if not args.no_plot:
|
| 316 |
|
| 317 |
plots = []
|
|
@@ -328,10 +339,10 @@ if __name__ == '__main__':
|
|
| 328 |
result = future.result()
|
| 329 |
plots.append(result)
|
| 330 |
except Exception as e:
|
| 331 |
-
print(f'generated an exception: {
|
|
|
|
| 332 |
pbar.update(1)
|
| 333 |
|
| 334 |
print(f'Saved {len(plots)} plots to data/figures/')
|
| 335 |
|
| 336 |
|
| 337 |
-
|
|
|
|
| 3 |
import argparse
|
| 4 |
import tqdm
|
| 5 |
import wandb
|
| 6 |
+
import traceback
|
| 7 |
import plotly.express as px
|
| 8 |
import pandas as pd
|
| 9 |
from concurrent.futures import ProcessPoolExecutor
|
|
|
|
| 30 |
summary = run.summary
|
| 31 |
if summary_filters is not None and not summary_filters(summary):
|
| 32 |
continue
|
| 33 |
+
if netuid is not None and run.config.get('netuid') != netuid:
|
| 34 |
continue
|
| 35 |
step = summary.get('_step',0)
|
| 36 |
if step < min_steps or step > max_steps:
|
| 37 |
# warnings.warn(f'Skipped run `{run.name}` because it contains {step} events (<{min_steps})')
|
| 38 |
continue
|
| 39 |
|
| 40 |
+
prog_msg = f'Loading data {successful/ntop*100:.0f}% ({successful}/{ntop} runs, {n_events} events)'
|
| 41 |
pbar.set_description(f'{prog_msg}... **fetching** `{run.name}`')
|
| 42 |
|
| 43 |
duration = summary.get('_runtime')
|
|
|
|
| 108 |
df = df.loc[df.step_length.notna()]
|
| 109 |
|
| 110 |
# detect list columns which as stored as strings
|
| 111 |
+
ignore_cols = ('moving_averaged_scores')
|
| 112 |
+
list_cols = [c for c in df.columns if c not in ignore_cols and df[c].dtype == "object" and df[c].str.startswith("[").all()]
|
| 113 |
# convert string representation of list to list
|
| 114 |
# df[list_cols] = df[list_cols].apply(lambda x: eval(x, {'__builtins__': None}) if pd.notna(x) else x)
|
| 115 |
try:
|
| 116 |
+
df[list_cols] = df[list_cols].fillna('').applymap(eval, na_action='ignore')
|
| 117 |
except ValueError as e:
|
| 118 |
+
print(f'Error loading {file_path!r} when converting columns {list_cols} to list: {e}', flush=True)
|
| 119 |
|
| 120 |
else:
|
| 121 |
# Download the history from wandb and add metadata
|
|
|
|
| 153 |
])
|
| 154 |
df_long = df_schema.reset_index()
|
| 155 |
|
| 156 |
+
run_id = df_long['run_id'].iloc[0]
|
| 157 |
+
# print(f'Calculating stats for run {run_id!r} dataframe with shape {df_long.shape}')
|
| 158 |
|
| 159 |
# Approximate number of tokens in each completion
|
| 160 |
+
df_long['completion_num_tokens'] = (df_long['completions'].astype(str).str.split().str.len() / 0.75).round()
|
|
|
|
|
|
|
|
|
|
| 161 |
|
| 162 |
# TODO: use named aggregations
|
| 163 |
reward_aggs = ['sum','mean','std','median','max',aggregate.nonzero_rate, aggregate.nonzero_mean, aggregate.nonzero_std, aggregate.nonzero_median]
|
| 164 |
aggs = {
|
| 165 |
'completions': ['nunique','count', aggregate.diversity, aggregate.successful_diversity, aggregate.success_rate],
|
| 166 |
'completion_num_tokens': ['mean', 'std', 'median', 'max'],
|
| 167 |
+
**{k: reward_aggs for k in df_long.filter(regex='reward') if df_long[k].nunique() > 1}
|
| 168 |
}
|
| 169 |
|
| 170 |
# Calculate tokens per second
|
| 171 |
if 'completion_times' in df_long.columns:
|
| 172 |
+
df_long['tokens_per_sec'] = df_long['completion_num_tokens']/(df_long['completion_times']+1e-6)
|
| 173 |
aggs.update({
|
| 174 |
'completion_times': ['mean','std','median','min','max'],
|
| 175 |
'tokens_per_sec': ['mean','std','median','max'],
|
| 176 |
})
|
| 177 |
|
| 178 |
+
grouper = df_long.groupby(pd.Grouper(key='_timestamp', axis=0, freq=freq))
|
| 179 |
+
# carry out main aggregations
|
| 180 |
+
stats = grouper.agg(aggs)
|
| 181 |
+
# carry out multi-column aggregations using apply
|
| 182 |
+
diversity = grouper.apply(aggregate.successful_nonzero_diversity)
|
| 183 |
+
# carry out top completions aggregations using apply
|
| 184 |
+
top_completions = grouper.apply(aggregate.completion_top_stats, exclude='', ntop=ntop).unstack()
|
| 185 |
+
|
| 186 |
+
# combine all aggregations, which have the same index
|
| 187 |
+
stats = pd.concat([stats, diversity, top_completions], axis=1)
|
| 188 |
+
|
| 189 |
# flatten multiindex columns
|
| 190 |
+
stats.columns = ['_'.join([str(cc) for cc in c]) if isinstance(c, tuple) else str(c) for c in stats.columns]
|
| 191 |
+
stats = stats.reset_index().assign(run_id=run_id)
|
| 192 |
+
|
| 193 |
if save_path:
|
| 194 |
stats.to_csv(save_path, index=False)
|
| 195 |
|
|
|
|
| 219 |
return calculate_stats(df_long, freq=freq, save_path=stats_path, ntop=ntop)
|
| 220 |
|
| 221 |
except Exception as e:
|
| 222 |
+
print(f'Error processing run {run["run_id"]!r}:\t{e.__class__.__name__}: {e}',flush=True)
|
| 223 |
+
print(traceback.format_exc())
|
| 224 |
|
| 225 |
def line_chart(df, col, title=None):
|
| 226 |
title = title or col.replace('_',' ').title()
|
|
|
|
| 307 |
result = future.result()
|
| 308 |
results.append(result)
|
| 309 |
except Exception as e:
|
| 310 |
+
print(f'-----------------------------\nWorker generated an exception in "process" function:\n{e.__class__.__name__}: {e}\n-----------------------------\n',flush=True)
|
| 311 |
pbar.update(1)
|
| 312 |
|
| 313 |
if not results:
|
| 314 |
raise ValueError('No runs were successfully processed.')
|
| 315 |
+
print(f'Processed {len(results)} runs.',flush=True)
|
| 316 |
|
| 317 |
# Concatenate the results into a single dataframe
|
| 318 |
df = pd.concat(results, ignore_index=True).sort_values(['_timestamp','run_id'], ignore_index=True)
|
|
|
|
| 321 |
print(f'Saved {df.shape[0]} rows to data/processed.csv')
|
| 322 |
|
| 323 |
display(df)
|
| 324 |
+
print(f'Unique values in columns:')
|
| 325 |
+
display(df.nunique().sort_values())
|
| 326 |
if not args.no_plot:
|
| 327 |
|
| 328 |
plots = []
|
|
|
|
| 339 |
result = future.result()
|
| 340 |
plots.append(result)
|
| 341 |
except Exception as e:
|
| 342 |
+
print(f'-----------------------------\nWorker generated an exception in "line_chart" function:\n{e.__class__.__name__}: {e}\n-----------------------------\n',flush=True)
|
| 343 |
+
# traceback.print_exc()
|
| 344 |
pbar.update(1)
|
| 345 |
|
| 346 |
print(f'Saved {len(plots)} plots to data/figures/')
|
| 347 |
|
| 348 |
|
|
|
opendashboards/utils/aggregate.py
CHANGED
|
@@ -1,16 +1,16 @@
|
|
| 1 |
import pandas as pd
|
| 2 |
|
| 3 |
def diversity(x):
|
| 4 |
-
return x.nunique()/len(x)
|
| 5 |
|
| 6 |
def _nonempty(x):
|
| 7 |
-
return x[x.str.len()>0]
|
| 8 |
|
| 9 |
def successful_diversity(x):
|
| 10 |
return diversity(_nonempty(x))
|
| 11 |
|
| 12 |
def success_rate(x):
|
| 13 |
-
return len(_nonempty(x))/len(x)
|
| 14 |
|
| 15 |
def threshold_rate(x, threshold):
|
| 16 |
return (x>threshold).sum()/len(x)
|
|
@@ -25,9 +25,8 @@ def completion_top_stats(x, exclude=None, ntop=1):
|
|
| 25 |
if exclude is not None:
|
| 26 |
vc.drop(exclude, inplace=True, errors='ignore')
|
| 27 |
|
| 28 |
-
rewards = x.loc[x['completions'].isin(vc.index[:ntop])].groupby('completions').rewards.agg(['mean','std'])
|
| 29 |
return pd.DataFrame({
|
| 30 |
-
'completions_rank':range(ntop),
|
| 31 |
'completions_top':rewards.index.tolist(),
|
| 32 |
'completions_freq':vc.values[:ntop],
|
| 33 |
'completions_reward_mean':rewards['mean'].values,
|
|
|
|
| 1 |
import pandas as pd
|
| 2 |
|
| 3 |
def diversity(x):
|
| 4 |
+
return x.nunique()/len(x) if len(x)>0 else 0
|
| 5 |
|
| 6 |
def _nonempty(x):
|
| 7 |
+
return x[x.astype(str).str.len()>0]
|
| 8 |
|
| 9 |
def successful_diversity(x):
|
| 10 |
return diversity(_nonempty(x))
|
| 11 |
|
| 12 |
def success_rate(x):
|
| 13 |
+
return len(_nonempty(x))/len(x) if len(x)>0 else 0
|
| 14 |
|
| 15 |
def threshold_rate(x, threshold):
|
| 16 |
return (x>threshold).sum()/len(x)
|
|
|
|
| 25 |
if exclude is not None:
|
| 26 |
vc.drop(exclude, inplace=True, errors='ignore')
|
| 27 |
|
| 28 |
+
rewards = x.loc[x['completions'].isin(vc.index[:ntop])].groupby('completions').rewards.agg(['mean','std','max'])
|
| 29 |
return pd.DataFrame({
|
|
|
|
| 30 |
'completions_top':rewards.index.tolist(),
|
| 31 |
'completions_freq':vc.values[:ntop],
|
| 32 |
'completions_reward_mean':rewards['mean'].values,
|