ba_dashboard / src /data /queries.py
paulokewunmi's picture
Upload 10 files
0847744 verified
import pandas as pd
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_evals_for_journey(
journey_grn,
runs_df,
turns_df,
ai_script_evals_df,
journey_evals_df,
master_data,
aggregation_type='mean',
filter_days=None,
group_by_plots=True,
runtime_only=False,
group_by_thread=False
):
"""
Query 2: Get all evals that have been run on a given Journey.
Args:
journey_grn (str): Journey GRN
runs_df (pd.DataFrame): Runs data
turns_df (pd.DataFrame): Turns data
ai_script_evals_df (pd.DataFrame): AI script evaluations data
journey_evals_df (pd.DataFrame): Journey evaluations data
master_data (dict): Master configuration data
aggregation_type (str): How to aggregate scores ('mean', 'median', etc.)
filter_days (int, optional): Filter data to last N days
group_by_plots (bool): Whether to group results by plots
runtime_only (bool): Filter to runtime evaluations only
group_by_thread (bool): Whether to group results by thread_id instead of plot_grn
Returns:
dict: Evaluation results
"""
# Filter runs by journey GRN
filtered_runs = runs_df[runs_df['journey_grn'] == journey_grn].copy()
# Apply runtime filter if specified
if runtime_only:
filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True]
# Apply date filter if specified
if filter_days is not None and filter_days > 0:
cutoff_date = datetime.now() - timedelta(days=filter_days)
filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date]
if filtered_runs.empty:
return {"message": f"No evaluations found for journey {journey_grn} with the specified filters"}
run_ids = filtered_runs['run_id'].tolist()
logger.info(f"Found {len(run_ids)} runs for journey {journey_grn}")
# Get journey evaluations for these runs
filtered_journey_evals = journey_evals_df[journey_evals_df['run_id'].isin(run_ids)]
# Get turns for these runs
filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)]
# Join turns with filtered runs
if group_by_thread and runtime_only:
# Join with thread_id info
turns_with_thread = filtered_turns.merge(
filtered_runs[['run_id', 'thread_id']],
on='run_id',
how='left'
)
# Get AI script evaluations for these turns
filtered_ai_script_evals = ai_script_evals_df[
ai_script_evals_df['turn_id'].isin(turns_with_thread['turn_id'])
]
# Add thread information to AI script evaluations
ai_script_evals_with_thread = filtered_ai_script_evals.merge(
turns_with_thread[['turn_id', 'thread_id']],
on='turn_id',
how='left'
)
# Add thread information to journey evaluations
journey_evals_with_thread = filtered_journey_evals.merge(
filtered_runs[['run_id', 'thread_id']],
on='run_id',
how='left'
)
else:
# Join with plot info (original behavior)
turns_with_plot = filtered_turns.merge(
filtered_runs[['run_id', 'plot_grn']],
on='run_id',
how='left'
)
# Get AI script evaluations for these turns
filtered_ai_script_evals = ai_script_evals_df[
ai_script_evals_df['turn_id'].isin(turns_with_plot['turn_id'])
]
# Add plot information to AI script evaluations
ai_script_evals_with_plot = filtered_ai_script_evals.merge(
turns_with_plot[['turn_id', 'plot_grn']],
on='turn_id',
how='left'
)
# Add plot information to journey evaluations
journey_evals_with_plot = filtered_journey_evals.merge(
filtered_runs[['run_id', 'plot_grn']],
on='run_id',
how='left'
)
# Define a helper function for aggregation
def aggregate_scores(scores):
if aggregation_type == 'mean':
return scores.mean()
elif aggregation_type == 'median':
return scores.median()
elif aggregation_type == 'count':
return len(scores)
elif aggregation_type == 'min':
return scores.min()
elif aggregation_type == 'max':
return scores.max()
if group_by_thread and runtime_only:
# Group by thread_id instead of plot
result = {}
# Group journey evaluations by thread
for thread_id, thread_group in journey_evals_with_thread.groupby('thread_id'):
# Handle null thread_ids
thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id
if thread_name not in result:
result[thread_name] = {
"journeyEvals": {},
"aiScriptEvals": {}
}
for eval_name, eval_group in thread_group.groupby('eval_name'):
result[thread_name]["journeyEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[thread_name]["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
# Group AI script evaluations by thread
for thread_id, thread_group in ai_script_evals_with_thread.groupby('thread_id'):
# Handle null thread_ids
thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id
if thread_name not in result:
result[thread_name] = {
"journeyEvals": {},
"aiScriptEvals": {}
}
for eval_name, eval_group in thread_group.groupby('eval_name'):
result[thread_name]["aiScriptEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[thread_name]["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
elif group_by_plots:
# Original behavior: Group by plot first
result = {}
# Group journey evaluations by plot, then by eval_name and metric_name
for plot_grn, plot_group in journey_evals_with_plot.groupby('plot_grn'):
# Handle null plot_grns
plot_name = "unknown_plot" if pd.isna(plot_grn) else plot_grn.split(':')[-1]
if plot_name not in result:
result[plot_name] = {
"journeyEvals": {},
"aiScriptEvals": {}
}
for eval_name, eval_group in plot_group.groupby('eval_name'):
result[plot_name]["journeyEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[plot_name]["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
# Group AI script evaluations by plot, then by eval_name and metric_name
for plot_grn, plot_group in ai_script_evals_with_plot.groupby('plot_grn'):
# Handle null plot_grns
plot_name = "unknown_plot" if pd.isna(plot_grn) else plot_grn.split(':')[-1]
if plot_name not in result:
result[plot_name] = {
"journeyEvals": {},
"aiScriptEvals": {}
}
for eval_name, eval_group in plot_group.groupby('eval_name'):
result[plot_name]["aiScriptEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[plot_name]["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
else:
# Don't group by plots or threads
result = {
"journeyEvals": {},
"aiScriptEvals": {}
}
# Process all journey evaluations together
for eval_name, eval_group in filtered_journey_evals.groupby('eval_name'):
result["journeyEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
# Process all AI script evaluations together
for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'):
result["aiScriptEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
return result
def get_evals_for_ai_script(
script_grn,
runs_df,
turns_df,
ai_script_evals_df,
journey_evals_df,
master_data,
aggregation_type='mean',
filter_days=None,
group_by_plots=True,
runtime_only=False,
group_by_thread=False
):
"""
Query 3: Get all evals that have been run on an AIScript.
Args:
script_grn (str): AI Script GRN
runs_df (pd.DataFrame): Runs data
turns_df (pd.DataFrame): Turns data
ai_script_evals_df (pd.DataFrame): AI script evaluations data
journey_evals_df (pd.DataFrame): Journey evaluations data
master_data (dict): Master configuration data
aggregation_type (str): How to aggregate scores ('mean', 'median', etc.)
filter_days (int, optional): Filter data to last N days
group_by_plots (bool): Whether to group results by plots
runtime_only (bool): Filter to runtime evaluations only
group_by_thread (bool): Whether to group results by thread_id instead of plot_grn
Returns:
dict: Evaluation results
"""
# Filter turns by AI script GRN
filtered_turns = turns_df[turns_df['ai_script_grn'] == script_grn].copy()
# Get the corresponding runs
run_ids = filtered_turns['run_id'].tolist()
runs_for_turns = runs_df[runs_df['run_id'].isin(run_ids)].copy()
logger.info(f"Found {len(run_ids)} runs for AI script {script_grn}")
# Apply runtime filter if specified
if runtime_only:
runs_for_turns = runs_for_turns[runs_for_turns['is_runtime'] == True]
# Get the filtered run_ids back
filtered_run_ids = runs_for_turns['run_id'].tolist()
# Filter turns again
filtered_turns = filtered_turns[filtered_turns['run_id'].isin(filtered_run_ids)]
# Apply date filter if specified
if filter_days is not None and filter_days > 0:
cutoff_date = datetime.now() - timedelta(days=filter_days)
filtered_turns = filtered_turns[filtered_turns['timestamp'] >= cutoff_date]
if filtered_turns.empty:
return {"message": f"No evaluations found for AI script {script_grn} with the specified filters"}
turn_ids = filtered_turns['turn_id'].tolist()
# Get AI script evaluations for these turns
filtered_ai_script_evals = ai_script_evals_df[ai_script_evals_df['turn_id'].isin(turn_ids)]
# Define a helper function for aggregation
def aggregate_scores(scores):
if aggregation_type == 'mean':
return scores.mean()
elif aggregation_type == 'median':
return scores.median()
elif aggregation_type == 'count':
return len(scores)
elif aggregation_type == 'min':
return scores.min()
elif aggregation_type == 'max':
return scores.max()
if group_by_thread and runtime_only:
# Join with runs to get thread information
if 'thread_id' in runs_for_turns.columns:
turns_with_thread = filtered_turns.merge(
runs_for_turns[['run_id', 'thread_id']],
on='run_id',
how='left'
)
# Add thread information to AI script evaluations
ai_script_evals_with_thread = filtered_ai_script_evals.merge(
turns_with_thread[['turn_id', 'thread_id']],
on='turn_id',
how='left'
)
# Group by thread
result = {}
# Group AI script evaluations by thread, then by eval_name and metric_name
for thread_id, thread_group in ai_script_evals_with_thread.groupby('thread_id'):
# Handle null thread_ids
thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id
if thread_name not in result:
result[thread_name] = {}
for eval_name, eval_group in thread_group.groupby('eval_name'):
result[thread_name][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[thread_name][eval_name][metric_name] = aggregate_scores(metric_group['score'])
else:
# Fall back to non-grouped results if thread_id column doesn't exist
result = {}
# Group AI script evaluations by eval_name and metric_name
for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'):
result[eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[eval_name][metric_name] = aggregate_scores(metric_group['score'])
elif group_by_plots and 'plot_grn' in runs_for_turns.columns:
# Original behavior: Join with runs to get plot information
turns_with_plot = filtered_turns.merge(
runs_for_turns[['run_id', 'plot_grn']],
on='run_id',
how='left'
)
# Add plot information to AI script evaluations
ai_script_evals_with_plot = filtered_ai_script_evals.merge(
turns_with_plot[['turn_id', 'plot_grn']],
on='turn_id',
how='left'
)
# Group by plot
result = {}
# Group AI script evaluations by plot, then by eval_name and metric_name
for plot_grn, plot_group in ai_script_evals_with_plot.groupby('plot_grn'):
# Handle null plot_grns
plot_name = 'unknown_plot' if pd.isna(plot_grn) else plot_grn.split(':')[-1]
if plot_name not in result:
result[plot_name] = {}
for eval_name, eval_group in plot_group.groupby('eval_name'):
result[plot_name][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[plot_name][eval_name][metric_name] = aggregate_scores(metric_group['score'])
else:
# Don't group by plots or threads
result = {}
# Group AI script evaluations by eval_name and metric_name
for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'):
result[eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result[eval_name][metric_name] = aggregate_scores(metric_group['score'])
return result
# Keep the rest of the functions unchanged
def get_evals_for_plot(
plot_grn,
runs_df,
turns_df,
ai_script_evals_df,
journey_evals_df,
master_data,
aggregation_type='mean',
filter_days=None,
runtime_only=False,
group_by_plots=True
):
"""
Query 1: Get all available evals that have been run on a given plot.
Args:
plot_grn (str): Plot GRN
runs_df (pd.DataFrame): Runs data
turns_df (pd.DataFrame): Turns data
ai_script_evals_df (pd.DataFrame): AI script evaluations data
journey_evals_df (pd.DataFrame): Journey evaluations data
master_data (dict): Master configuration data
aggregation_type (str): How to aggregate scores ('mean', 'median', etc.)
filter_days (int, optional): Filter data to last N days
runtime_only (bool): Filter to runtime evaluations only
Returns:
dict: Evaluation results
"""
# Filter runs by plot GRN
filtered_runs = runs_df[runs_df['plot_grn'] == plot_grn].copy()
# Apply runtime filter if specified
if runtime_only:
filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True]
# Apply date filter if specified
if filter_days is not None and filter_days > 0:
cutoff_date = datetime.now() - timedelta(days=filter_days)
filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date]
if filtered_runs.empty:
return {"message": f"No evaluations found for plot {plot_grn} with the specified filters"}
run_ids = filtered_runs['run_id'].tolist()
# Get journey evaluations for these runs
filtered_journey_evals = journey_evals_df[journey_evals_df['run_id'].isin(run_ids)]
# Get turns for these runs
filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)]
turn_ids = filtered_turns['turn_id'].tolist()
# Get AI script evaluations for these turns
filtered_ai_script_evals = ai_script_evals_df[ai_script_evals_df['turn_id'].isin(turn_ids)]
# Define a helper function for aggregation
def aggregate_scores(scores):
if aggregation_type == 'mean':
return scores.mean()
elif aggregation_type == 'median':
return scores.median()
elif aggregation_type == 'count':
return len(scores)
elif aggregation_type == 'min':
return scores.min()
elif aggregation_type == 'max':
return scores.max()
# Group journey evaluations by eval_name and metric_name, then aggregate
journey_eval_result = {}
for eval_name, group in filtered_journey_evals.groupby('eval_name'):
journey_eval_result[eval_name] = {}
for metric_name, scores in group.groupby('metric_name'):
journey_eval_result[eval_name][metric_name] = aggregate_scores(scores['score'])
# Group AI script evaluations by eval_name and metric_name, then aggregate
ai_script_eval_result = {}
for eval_name, group in filtered_ai_script_evals.groupby('eval_name'):
ai_script_eval_result[eval_name] = {}
for metric_name, scores in group.groupby('metric_name'):
ai_script_eval_result[eval_name][metric_name] = aggregate_scores(scores['score'])
return {
"journeyEvals": journey_eval_result,
"aiScriptEvals": ai_script_eval_result
}
def get_shared_evals(
runs_df,
turns_df,
ai_script_evals_df,
journey_evals_df,
master_data,
aggregation_type='mean',
filter_days=None,
runtime_only=False
):
"""
Query 4: Get all shared evals across all runs.
Args:
runs_df (pd.DataFrame): Runs data
turns_df (pd.DataFrame): Turns data
ai_script_evals_df (pd.DataFrame): AI script evaluations data
journey_evals_df (pd.DataFrame): Journey evaluations data
master_data (dict): Master configuration data
aggregation_type (str): How to aggregate scores ('mean', 'median', etc.)
filter_days (int, optional): Filter data to last N days
runtime_only (bool): Filter to runtime evaluations only
Returns:
dict: Evaluation results
"""
# Find all shared evals from master data
shared_ai_script_evals = master_data['ai_script_evals'][
master_data['ai_script_evals']['is_shared_eval'] == True
]['name'].tolist()
shared_journey_evals = master_data['journey_evals'][
master_data['journey_evals']['is_shared_eval'] == True
]['name'].tolist()
# Apply runtime filter and date filter if specified
filtered_runs = runs_df.copy()
if runtime_only:
filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True]
if filter_days is not None and filter_days > 0:
cutoff_date = datetime.now() - timedelta(days=filter_days)
filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date]
if filtered_runs.empty:
return {"message": "No evaluations found with the specified filters"}
run_ids = filtered_runs['run_id'].tolist()
# Get turns for these runs
filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)]
turn_ids = filtered_turns['turn_id'].tolist()
# Filter AI script evals to only include shared evals
filtered_ai_script_evals = ai_script_evals_df[
(ai_script_evals_df['turn_id'].isin(turn_ids)) &
(ai_script_evals_df['eval_name'].isin(shared_ai_script_evals))
]
# Filter journey evals to only include shared evals
filtered_journey_evals = journey_evals_df[
(journey_evals_df['run_id'].isin(run_ids)) &
(journey_evals_df['eval_name'].isin(shared_journey_evals))
]
# Define a helper function for aggregation
def aggregate_scores(scores):
if aggregation_type == 'mean':
return scores.mean()
elif aggregation_type == 'median':
return scores.median()
elif aggregation_type == 'count':
return len(scores)
elif aggregation_type == 'min':
return scores.min()
elif aggregation_type == 'max':
return scores.max()
# Initialize result structure
result = {
"aiScriptEvals": {},
"journeyEvals": {}
}
# Aggregate AI script evaluations
for eval_name in shared_ai_script_evals:
eval_group = filtered_ai_script_evals[filtered_ai_script_evals['eval_name'] == eval_name]
if not eval_group.empty:
result["aiScriptEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
# Aggregate journey evaluations
for eval_name in shared_journey_evals:
eval_group = filtered_journey_evals[filtered_journey_evals['eval_name'] == eval_name]
if not eval_group.empty:
result["journeyEvals"][eval_name] = {}
for metric_name, metric_group in eval_group.groupby('metric_name'):
result["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score'])
return result