import pandas as pd from datetime import datetime, timedelta import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def get_evals_for_journey( journey_grn, runs_df, turns_df, ai_script_evals_df, journey_evals_df, master_data, aggregation_type='mean', filter_days=None, group_by_plots=True, runtime_only=False, group_by_thread=False ): """ Query 2: Get all evals that have been run on a given Journey. Args: journey_grn (str): Journey GRN runs_df (pd.DataFrame): Runs data turns_df (pd.DataFrame): Turns data ai_script_evals_df (pd.DataFrame): AI script evaluations data journey_evals_df (pd.DataFrame): Journey evaluations data master_data (dict): Master configuration data aggregation_type (str): How to aggregate scores ('mean', 'median', etc.) filter_days (int, optional): Filter data to last N days group_by_plots (bool): Whether to group results by plots runtime_only (bool): Filter to runtime evaluations only group_by_thread (bool): Whether to group results by thread_id instead of plot_grn Returns: dict: Evaluation results """ # Filter runs by journey GRN filtered_runs = runs_df[runs_df['journey_grn'] == journey_grn].copy() # Apply runtime filter if specified if runtime_only: filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True] # Apply date filter if specified if filter_days is not None and filter_days > 0: cutoff_date = datetime.now() - timedelta(days=filter_days) filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date] if filtered_runs.empty: return {"message": f"No evaluations found for journey {journey_grn} with the specified filters"} run_ids = filtered_runs['run_id'].tolist() logger.info(f"Found {len(run_ids)} runs for journey {journey_grn}") # Get journey evaluations for these runs filtered_journey_evals = journey_evals_df[journey_evals_df['run_id'].isin(run_ids)] # Get turns for these runs filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)] # Join turns with filtered runs if group_by_thread and runtime_only: # Join with thread_id info turns_with_thread = filtered_turns.merge( filtered_runs[['run_id', 'thread_id']], on='run_id', how='left' ) # Get AI script evaluations for these turns filtered_ai_script_evals = ai_script_evals_df[ ai_script_evals_df['turn_id'].isin(turns_with_thread['turn_id']) ] # Add thread information to AI script evaluations ai_script_evals_with_thread = filtered_ai_script_evals.merge( turns_with_thread[['turn_id', 'thread_id']], on='turn_id', how='left' ) # Add thread information to journey evaluations journey_evals_with_thread = filtered_journey_evals.merge( filtered_runs[['run_id', 'thread_id']], on='run_id', how='left' ) else: # Join with plot info (original behavior) turns_with_plot = filtered_turns.merge( filtered_runs[['run_id', 'plot_grn']], on='run_id', how='left' ) # Get AI script evaluations for these turns filtered_ai_script_evals = ai_script_evals_df[ ai_script_evals_df['turn_id'].isin(turns_with_plot['turn_id']) ] # Add plot information to AI script evaluations ai_script_evals_with_plot = filtered_ai_script_evals.merge( turns_with_plot[['turn_id', 'plot_grn']], on='turn_id', how='left' ) # Add plot information to journey evaluations journey_evals_with_plot = filtered_journey_evals.merge( filtered_runs[['run_id', 'plot_grn']], on='run_id', how='left' ) # Define a helper function for aggregation def aggregate_scores(scores): if aggregation_type == 'mean': return scores.mean() elif aggregation_type == 'median': return scores.median() elif aggregation_type == 'count': return len(scores) elif aggregation_type == 'min': return scores.min() elif aggregation_type == 'max': return scores.max() if group_by_thread and runtime_only: # Group by thread_id instead of plot result = {} # Group journey evaluations by thread for thread_id, thread_group in journey_evals_with_thread.groupby('thread_id'): # Handle null thread_ids thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id if thread_name not in result: result[thread_name] = { "journeyEvals": {}, "aiScriptEvals": {} } for eval_name, eval_group in thread_group.groupby('eval_name'): result[thread_name]["journeyEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[thread_name]["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) # Group AI script evaluations by thread for thread_id, thread_group in ai_script_evals_with_thread.groupby('thread_id'): # Handle null thread_ids thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id if thread_name not in result: result[thread_name] = { "journeyEvals": {}, "aiScriptEvals": {} } for eval_name, eval_group in thread_group.groupby('eval_name'): result[thread_name]["aiScriptEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[thread_name]["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) elif group_by_plots: # Original behavior: Group by plot first result = {} # Group journey evaluations by plot, then by eval_name and metric_name for plot_grn, plot_group in journey_evals_with_plot.groupby('plot_grn'): # Handle null plot_grns plot_name = "unknown_plot" if pd.isna(plot_grn) else plot_grn.split(':')[-1] if plot_name not in result: result[plot_name] = { "journeyEvals": {}, "aiScriptEvals": {} } for eval_name, eval_group in plot_group.groupby('eval_name'): result[plot_name]["journeyEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[plot_name]["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) # Group AI script evaluations by plot, then by eval_name and metric_name for plot_grn, plot_group in ai_script_evals_with_plot.groupby('plot_grn'): # Handle null plot_grns plot_name = "unknown_plot" if pd.isna(plot_grn) else plot_grn.split(':')[-1] if plot_name not in result: result[plot_name] = { "journeyEvals": {}, "aiScriptEvals": {} } for eval_name, eval_group in plot_group.groupby('eval_name'): result[plot_name]["aiScriptEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[plot_name]["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) else: # Don't group by plots or threads result = { "journeyEvals": {}, "aiScriptEvals": {} } # Process all journey evaluations together for eval_name, eval_group in filtered_journey_evals.groupby('eval_name'): result["journeyEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) # Process all AI script evaluations together for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'): result["aiScriptEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) return result def get_evals_for_ai_script( script_grn, runs_df, turns_df, ai_script_evals_df, journey_evals_df, master_data, aggregation_type='mean', filter_days=None, group_by_plots=True, runtime_only=False, group_by_thread=False ): """ Query 3: Get all evals that have been run on an AIScript. Args: script_grn (str): AI Script GRN runs_df (pd.DataFrame): Runs data turns_df (pd.DataFrame): Turns data ai_script_evals_df (pd.DataFrame): AI script evaluations data journey_evals_df (pd.DataFrame): Journey evaluations data master_data (dict): Master configuration data aggregation_type (str): How to aggregate scores ('mean', 'median', etc.) filter_days (int, optional): Filter data to last N days group_by_plots (bool): Whether to group results by plots runtime_only (bool): Filter to runtime evaluations only group_by_thread (bool): Whether to group results by thread_id instead of plot_grn Returns: dict: Evaluation results """ # Filter turns by AI script GRN filtered_turns = turns_df[turns_df['ai_script_grn'] == script_grn].copy() # Get the corresponding runs run_ids = filtered_turns['run_id'].tolist() runs_for_turns = runs_df[runs_df['run_id'].isin(run_ids)].copy() logger.info(f"Found {len(run_ids)} runs for AI script {script_grn}") # Apply runtime filter if specified if runtime_only: runs_for_turns = runs_for_turns[runs_for_turns['is_runtime'] == True] # Get the filtered run_ids back filtered_run_ids = runs_for_turns['run_id'].tolist() # Filter turns again filtered_turns = filtered_turns[filtered_turns['run_id'].isin(filtered_run_ids)] # Apply date filter if specified if filter_days is not None and filter_days > 0: cutoff_date = datetime.now() - timedelta(days=filter_days) filtered_turns = filtered_turns[filtered_turns['timestamp'] >= cutoff_date] if filtered_turns.empty: return {"message": f"No evaluations found for AI script {script_grn} with the specified filters"} turn_ids = filtered_turns['turn_id'].tolist() # Get AI script evaluations for these turns filtered_ai_script_evals = ai_script_evals_df[ai_script_evals_df['turn_id'].isin(turn_ids)] # Define a helper function for aggregation def aggregate_scores(scores): if aggregation_type == 'mean': return scores.mean() elif aggregation_type == 'median': return scores.median() elif aggregation_type == 'count': return len(scores) elif aggregation_type == 'min': return scores.min() elif aggregation_type == 'max': return scores.max() if group_by_thread and runtime_only: # Join with runs to get thread information if 'thread_id' in runs_for_turns.columns: turns_with_thread = filtered_turns.merge( runs_for_turns[['run_id', 'thread_id']], on='run_id', how='left' ) # Add thread information to AI script evaluations ai_script_evals_with_thread = filtered_ai_script_evals.merge( turns_with_thread[['turn_id', 'thread_id']], on='turn_id', how='left' ) # Group by thread result = {} # Group AI script evaluations by thread, then by eval_name and metric_name for thread_id, thread_group in ai_script_evals_with_thread.groupby('thread_id'): # Handle null thread_ids thread_name = "unknown_thread" if pd.isna(thread_id) else thread_id if thread_name not in result: result[thread_name] = {} for eval_name, eval_group in thread_group.groupby('eval_name'): result[thread_name][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[thread_name][eval_name][metric_name] = aggregate_scores(metric_group['score']) else: # Fall back to non-grouped results if thread_id column doesn't exist result = {} # Group AI script evaluations by eval_name and metric_name for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'): result[eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[eval_name][metric_name] = aggregate_scores(metric_group['score']) elif group_by_plots and 'plot_grn' in runs_for_turns.columns: # Original behavior: Join with runs to get plot information turns_with_plot = filtered_turns.merge( runs_for_turns[['run_id', 'plot_grn']], on='run_id', how='left' ) # Add plot information to AI script evaluations ai_script_evals_with_plot = filtered_ai_script_evals.merge( turns_with_plot[['turn_id', 'plot_grn']], on='turn_id', how='left' ) # Group by plot result = {} # Group AI script evaluations by plot, then by eval_name and metric_name for plot_grn, plot_group in ai_script_evals_with_plot.groupby('plot_grn'): # Handle null plot_grns plot_name = 'unknown_plot' if pd.isna(plot_grn) else plot_grn.split(':')[-1] if plot_name not in result: result[plot_name] = {} for eval_name, eval_group in plot_group.groupby('eval_name'): result[plot_name][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[plot_name][eval_name][metric_name] = aggregate_scores(metric_group['score']) else: # Don't group by plots or threads result = {} # Group AI script evaluations by eval_name and metric_name for eval_name, eval_group in filtered_ai_script_evals.groupby('eval_name'): result[eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result[eval_name][metric_name] = aggregate_scores(metric_group['score']) return result # Keep the rest of the functions unchanged def get_evals_for_plot( plot_grn, runs_df, turns_df, ai_script_evals_df, journey_evals_df, master_data, aggregation_type='mean', filter_days=None, runtime_only=False, group_by_plots=True ): """ Query 1: Get all available evals that have been run on a given plot. Args: plot_grn (str): Plot GRN runs_df (pd.DataFrame): Runs data turns_df (pd.DataFrame): Turns data ai_script_evals_df (pd.DataFrame): AI script evaluations data journey_evals_df (pd.DataFrame): Journey evaluations data master_data (dict): Master configuration data aggregation_type (str): How to aggregate scores ('mean', 'median', etc.) filter_days (int, optional): Filter data to last N days runtime_only (bool): Filter to runtime evaluations only Returns: dict: Evaluation results """ # Filter runs by plot GRN filtered_runs = runs_df[runs_df['plot_grn'] == plot_grn].copy() # Apply runtime filter if specified if runtime_only: filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True] # Apply date filter if specified if filter_days is not None and filter_days > 0: cutoff_date = datetime.now() - timedelta(days=filter_days) filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date] if filtered_runs.empty: return {"message": f"No evaluations found for plot {plot_grn} with the specified filters"} run_ids = filtered_runs['run_id'].tolist() # Get journey evaluations for these runs filtered_journey_evals = journey_evals_df[journey_evals_df['run_id'].isin(run_ids)] # Get turns for these runs filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)] turn_ids = filtered_turns['turn_id'].tolist() # Get AI script evaluations for these turns filtered_ai_script_evals = ai_script_evals_df[ai_script_evals_df['turn_id'].isin(turn_ids)] # Define a helper function for aggregation def aggregate_scores(scores): if aggregation_type == 'mean': return scores.mean() elif aggregation_type == 'median': return scores.median() elif aggregation_type == 'count': return len(scores) elif aggregation_type == 'min': return scores.min() elif aggregation_type == 'max': return scores.max() # Group journey evaluations by eval_name and metric_name, then aggregate journey_eval_result = {} for eval_name, group in filtered_journey_evals.groupby('eval_name'): journey_eval_result[eval_name] = {} for metric_name, scores in group.groupby('metric_name'): journey_eval_result[eval_name][metric_name] = aggregate_scores(scores['score']) # Group AI script evaluations by eval_name and metric_name, then aggregate ai_script_eval_result = {} for eval_name, group in filtered_ai_script_evals.groupby('eval_name'): ai_script_eval_result[eval_name] = {} for metric_name, scores in group.groupby('metric_name'): ai_script_eval_result[eval_name][metric_name] = aggregate_scores(scores['score']) return { "journeyEvals": journey_eval_result, "aiScriptEvals": ai_script_eval_result } def get_shared_evals( runs_df, turns_df, ai_script_evals_df, journey_evals_df, master_data, aggregation_type='mean', filter_days=None, runtime_only=False ): """ Query 4: Get all shared evals across all runs. Args: runs_df (pd.DataFrame): Runs data turns_df (pd.DataFrame): Turns data ai_script_evals_df (pd.DataFrame): AI script evaluations data journey_evals_df (pd.DataFrame): Journey evaluations data master_data (dict): Master configuration data aggregation_type (str): How to aggregate scores ('mean', 'median', etc.) filter_days (int, optional): Filter data to last N days runtime_only (bool): Filter to runtime evaluations only Returns: dict: Evaluation results """ # Find all shared evals from master data shared_ai_script_evals = master_data['ai_script_evals'][ master_data['ai_script_evals']['is_shared_eval'] == True ]['name'].tolist() shared_journey_evals = master_data['journey_evals'][ master_data['journey_evals']['is_shared_eval'] == True ]['name'].tolist() # Apply runtime filter and date filter if specified filtered_runs = runs_df.copy() if runtime_only: filtered_runs = filtered_runs[filtered_runs['is_runtime'] == True] if filter_days is not None and filter_days > 0: cutoff_date = datetime.now() - timedelta(days=filter_days) filtered_runs = filtered_runs[filtered_runs['start_time'] >= cutoff_date] if filtered_runs.empty: return {"message": "No evaluations found with the specified filters"} run_ids = filtered_runs['run_id'].tolist() # Get turns for these runs filtered_turns = turns_df[turns_df['run_id'].isin(run_ids)] turn_ids = filtered_turns['turn_id'].tolist() # Filter AI script evals to only include shared evals filtered_ai_script_evals = ai_script_evals_df[ (ai_script_evals_df['turn_id'].isin(turn_ids)) & (ai_script_evals_df['eval_name'].isin(shared_ai_script_evals)) ] # Filter journey evals to only include shared evals filtered_journey_evals = journey_evals_df[ (journey_evals_df['run_id'].isin(run_ids)) & (journey_evals_df['eval_name'].isin(shared_journey_evals)) ] # Define a helper function for aggregation def aggregate_scores(scores): if aggregation_type == 'mean': return scores.mean() elif aggregation_type == 'median': return scores.median() elif aggregation_type == 'count': return len(scores) elif aggregation_type == 'min': return scores.min() elif aggregation_type == 'max': return scores.max() # Initialize result structure result = { "aiScriptEvals": {}, "journeyEvals": {} } # Aggregate AI script evaluations for eval_name in shared_ai_script_evals: eval_group = filtered_ai_script_evals[filtered_ai_script_evals['eval_name'] == eval_name] if not eval_group.empty: result["aiScriptEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result["aiScriptEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) # Aggregate journey evaluations for eval_name in shared_journey_evals: eval_group = filtered_journey_evals[filtered_journey_evals['eval_name'] == eval_name] if not eval_group.empty: result["journeyEvals"][eval_name] = {} for metric_name, metric_group in eval_group.groupby('metric_name'): result["journeyEvals"][eval_name][metric_name] = aggregate_scores(metric_group['score']) return result