import streamlit as st import pandas as pd from datetime import datetime, timedelta import logging from .athena import execute_athena_query from config import ( RUNS_TABLE, TURNS_TABLE, AI_SCRIPT_EVALS_TABLE, JOURNEY_EVALS_TABLE, STREAMLIT_CACHE_TTL ) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @st.cache_data(ttl=STREAMLIT_CACHE_TTL) def load_runs_data(filter_days=None, run_tags=None): """ Load runs data from Athena Args: filter_days (int, optional): Filter data to last N days run_tags (list, optional): Filter data to specific run tags Returns: pandas.DataFrame: Runs data """ query = f"SELECT * FROM {RUNS_TABLE}" conditions = [] # Add date filter if specified if filter_days is not None and filter_days > 0: cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d') conditions.append(f"start_time >= '{cutoff_date}'") # Add run tags filter if specified if run_tags and len(run_tags) > 0: tags_list = "', '".join(run_tags) conditions.append(f"run_tag IN ('{tags_list}')") # Add WHERE clause if there are conditions if conditions: query += " WHERE " + " AND ".join(conditions) logger.info(f"Executing runs query: {query}") df = execute_athena_query(query) # Convert timestamp strings to datetime objects if needed if not df.empty and 'start_time' in df.columns: df['start_time'] = pd.to_datetime(df['start_time'], errors='coerce') if not df.empty and 'end_time' in df.columns: df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce') return df @st.cache_data(ttl=STREAMLIT_CACHE_TTL) def load_turns_data(run_ids=None, filter_days=None): """ Load turns data from Athena Args: run_ids (list, optional): Filter data to specific run IDs filter_days (int, optional): Filter data to last N days Returns: pandas.DataFrame: Turns data """ query = f"SELECT * FROM {TURNS_TABLE}" conditions = [] # Add run_ids filter if specified if run_ids and len(run_ids) > 0: # Convert list to SQL tuple format run_ids_str = "', '".join(run_ids) conditions.append(f"run_id IN ('{run_ids_str}')") # Add date filter if specified if filter_days is not None and filter_days > 0: cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d') conditions.append(f"timestamp >= '{cutoff_date}'") # Add WHERE clause if there are conditions if conditions: query += " WHERE " + " AND ".join(conditions) logger.info(f"Executing turns query: {query}") df = execute_athena_query(query) # Convert timestamp strings to datetime objects if needed if not df.empty and 'timestamp' in df.columns: df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') return df @st.cache_data(ttl=STREAMLIT_CACHE_TTL) def load_ai_script_evals_data(turn_ids=None, run_ids=None): """ Load AI script evals data from Athena Args: turn_ids (list, optional): Filter data to specific turn IDs run_ids (list, optional): Filter data to specific run IDs Returns: pandas.DataFrame: AI script evaluations data """ query = f"SELECT * FROM {AI_SCRIPT_EVALS_TABLE}" conditions = [] # Add turn_ids filter if specified if turn_ids and len(turn_ids) > 0: turn_ids_str = "', '".join(turn_ids) conditions.append(f"turn_id IN ('{turn_ids_str}')") # Add run_ids filter if specified if run_ids and len(run_ids) > 0: run_ids_str = "', '".join(run_ids) conditions.append(f"run_id IN ('{run_ids_str}')") # Add WHERE clause if there are conditions if conditions: query += " WHERE " + " AND ".join(conditions) logger.info(f"Executing AI script evals query: {query}") return execute_athena_query(query) @st.cache_data(ttl=STREAMLIT_CACHE_TTL) def load_journey_evals_data(run_ids=None): """ Load journey evals data from Athena Args: run_ids (list, optional): Filter data to specific run IDs Returns: pandas.DataFrame: Journey evaluations data """ query = f"SELECT * FROM {JOURNEY_EVALS_TABLE}" # Add run_ids filter if specified if run_ids and len(run_ids) > 0: run_ids_str = "', '".join(run_ids) query += f" WHERE run_id IN ('{run_ids_str}')" logger.info(f"Executing journey evals query: {query}") return execute_athena_query(query) # @st.cache_data(ttl=STREAMLIT_CACHE_TTL) # def load_master_data(s3_bucket, s3_key): # """ # Load master configuration data from S3 # Args: # s3_bucket (str): S3 bucket name # s3_key (str): S3 object key # Returns: # dict: Master configuration data # """ # import boto3 # import json # try: # s3 = boto3.client('s3') # response = s3.get_object(Bucket=s3_bucket, Key=s3_key) # content = response['Body'].read().decode('utf-8') # data = json.loads(content) # # Convert to DataFrames # metrics_df = pd.DataFrame(data.get('metrics', [])) # ai_script_evals_df = pd.DataFrame(data.get('ai_script_evals', [])) # journey_evals_df = pd.DataFrame(data.get('journey_evals', [])) # journey_plots_df = pd.DataFrame(data.get('journey_plots', [])) # journeys_df = pd.DataFrame(data.get('journeys', [])) # ai_scripts_df = pd.DataFrame(data.get('ai_scripts', [])) # return { # 'metrics': metrics_df, # 'ai_script_evals': ai_script_evals_df, # 'journey_evals': journey_evals_df, # 'journey_plots': journey_plots_df, # 'journeys': journeys_df, # 'ai_scripts': ai_scripts_df # } # except Exception as e: # logger.error(f"Failed to load master data from S3: {e}") # raise @st.cache_data(ttl=STREAMLIT_CACHE_TTL) def load_all_data(filter_days=None, run_tags=None): """ Load all data needed for the dashboard Args: filter_days (int, optional): Filter data to last N days run_tags (list, optional): Filter data to specific run tags Returns: tuple: (runs_df, turns_df, ai_script_evals_df, journey_evals_df) """ # Load runs data with filters runs_df = load_runs_data(filter_days=filter_days, run_tags=run_tags) if runs_df.empty: return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame() # Get run IDs from filtered runs run_ids = runs_df['run_id'].tolist() # Load turns data for these runs turns_df = load_turns_data(run_ids=run_ids) # Get turn IDs turn_ids = [] if turns_df.empty else turns_df['turn_id'].tolist() # Load AI script evaluations for these turns ai_script_evals_df = load_ai_script_evals_data(turn_ids=turn_ids) # Load journey evaluations for these runs journey_evals_df = load_journey_evals_data(run_ids=run_ids) return runs_df, turns_df, ai_script_evals_df, journey_evals_df