Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import logging | |
| from .athena import execute_athena_query | |
| from config import ( | |
| RUNS_TABLE, TURNS_TABLE, AI_SCRIPT_EVALS_TABLE, JOURNEY_EVALS_TABLE, | |
| STREAMLIT_CACHE_TTL | |
| ) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def load_runs_data(filter_days=None, run_tags=None): | |
| """ | |
| Load runs data from Athena | |
| Args: | |
| filter_days (int, optional): Filter data to last N days | |
| run_tags (list, optional): Filter data to specific run tags | |
| Returns: | |
| pandas.DataFrame: Runs data | |
| """ | |
| query = f"SELECT * FROM {RUNS_TABLE}" | |
| conditions = [] | |
| # Add date filter if specified | |
| if filter_days is not None and filter_days > 0: | |
| cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d') | |
| conditions.append(f"start_time >= '{cutoff_date}'") | |
| # Add run tags filter if specified | |
| if run_tags and len(run_tags) > 0: | |
| tags_list = "', '".join(run_tags) | |
| conditions.append(f"run_tag IN ('{tags_list}')") | |
| # Add WHERE clause if there are conditions | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| logger.info(f"Executing runs query: {query}") | |
| df = execute_athena_query(query) | |
| # Convert timestamp strings to datetime objects if needed | |
| if not df.empty and 'start_time' in df.columns: | |
| df['start_time'] = pd.to_datetime(df['start_time'], errors='coerce') | |
| if not df.empty and 'end_time' in df.columns: | |
| df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce') | |
| return df | |
| def load_turns_data(run_ids=None, filter_days=None): | |
| """ | |
| Load turns data from Athena | |
| Args: | |
| run_ids (list, optional): Filter data to specific run IDs | |
| filter_days (int, optional): Filter data to last N days | |
| Returns: | |
| pandas.DataFrame: Turns data | |
| """ | |
| query = f"SELECT * FROM {TURNS_TABLE}" | |
| conditions = [] | |
| # Add run_ids filter if specified | |
| if run_ids and len(run_ids) > 0: | |
| # Convert list to SQL tuple format | |
| run_ids_str = "', '".join(run_ids) | |
| conditions.append(f"run_id IN ('{run_ids_str}')") | |
| # Add date filter if specified | |
| if filter_days is not None and filter_days > 0: | |
| cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d') | |
| conditions.append(f"timestamp >= '{cutoff_date}'") | |
| # Add WHERE clause if there are conditions | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| logger.info(f"Executing turns query: {query}") | |
| df = execute_athena_query(query) | |
| # Convert timestamp strings to datetime objects if needed | |
| if not df.empty and 'timestamp' in df.columns: | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') | |
| return df | |
| def load_ai_script_evals_data(turn_ids=None, run_ids=None): | |
| """ | |
| Load AI script evals data from Athena | |
| Args: | |
| turn_ids (list, optional): Filter data to specific turn IDs | |
| run_ids (list, optional): Filter data to specific run IDs | |
| Returns: | |
| pandas.DataFrame: AI script evaluations data | |
| """ | |
| query = f"SELECT * FROM {AI_SCRIPT_EVALS_TABLE}" | |
| conditions = [] | |
| # Add turn_ids filter if specified | |
| if turn_ids and len(turn_ids) > 0: | |
| turn_ids_str = "', '".join(turn_ids) | |
| conditions.append(f"turn_id IN ('{turn_ids_str}')") | |
| # Add run_ids filter if specified | |
| if run_ids and len(run_ids) > 0: | |
| run_ids_str = "', '".join(run_ids) | |
| conditions.append(f"run_id IN ('{run_ids_str}')") | |
| # Add WHERE clause if there are conditions | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| logger.info(f"Executing AI script evals query: {query}") | |
| return execute_athena_query(query) | |
| def load_journey_evals_data(run_ids=None): | |
| """ | |
| Load journey evals data from Athena | |
| Args: | |
| run_ids (list, optional): Filter data to specific run IDs | |
| Returns: | |
| pandas.DataFrame: Journey evaluations data | |
| """ | |
| query = f"SELECT * FROM {JOURNEY_EVALS_TABLE}" | |
| # Add run_ids filter if specified | |
| if run_ids and len(run_ids) > 0: | |
| run_ids_str = "', '".join(run_ids) | |
| query += f" WHERE run_id IN ('{run_ids_str}')" | |
| logger.info(f"Executing journey evals query: {query}") | |
| return execute_athena_query(query) | |
| # @st.cache_data(ttl=STREAMLIT_CACHE_TTL) | |
| # def load_master_data(s3_bucket, s3_key): | |
| # """ | |
| # Load master configuration data from S3 | |
| # Args: | |
| # s3_bucket (str): S3 bucket name | |
| # s3_key (str): S3 object key | |
| # Returns: | |
| # dict: Master configuration data | |
| # """ | |
| # import boto3 | |
| # import json | |
| # try: | |
| # s3 = boto3.client('s3') | |
| # response = s3.get_object(Bucket=s3_bucket, Key=s3_key) | |
| # content = response['Body'].read().decode('utf-8') | |
| # data = json.loads(content) | |
| # # Convert to DataFrames | |
| # metrics_df = pd.DataFrame(data.get('metrics', [])) | |
| # ai_script_evals_df = pd.DataFrame(data.get('ai_script_evals', [])) | |
| # journey_evals_df = pd.DataFrame(data.get('journey_evals', [])) | |
| # journey_plots_df = pd.DataFrame(data.get('journey_plots', [])) | |
| # journeys_df = pd.DataFrame(data.get('journeys', [])) | |
| # ai_scripts_df = pd.DataFrame(data.get('ai_scripts', [])) | |
| # return { | |
| # 'metrics': metrics_df, | |
| # 'ai_script_evals': ai_script_evals_df, | |
| # 'journey_evals': journey_evals_df, | |
| # 'journey_plots': journey_plots_df, | |
| # 'journeys': journeys_df, | |
| # 'ai_scripts': ai_scripts_df | |
| # } | |
| # except Exception as e: | |
| # logger.error(f"Failed to load master data from S3: {e}") | |
| # raise | |
| def load_all_data(filter_days=None, run_tags=None): | |
| """ | |
| Load all data needed for the dashboard | |
| Args: | |
| filter_days (int, optional): Filter data to last N days | |
| run_tags (list, optional): Filter data to specific run tags | |
| Returns: | |
| tuple: (runs_df, turns_df, ai_script_evals_df, journey_evals_df) | |
| """ | |
| # Load runs data with filters | |
| runs_df = load_runs_data(filter_days=filter_days, run_tags=run_tags) | |
| if runs_df.empty: | |
| return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame() | |
| # Get run IDs from filtered runs | |
| run_ids = runs_df['run_id'].tolist() | |
| # Load turns data for these runs | |
| turns_df = load_turns_data(run_ids=run_ids) | |
| # Get turn IDs | |
| turn_ids = [] if turns_df.empty else turns_df['turn_id'].tolist() | |
| # Load AI script evaluations for these turns | |
| ai_script_evals_df = load_ai_script_evals_data(turn_ids=turn_ids) | |
| # Load journey evaluations for these runs | |
| journey_evals_df = load_journey_evals_data(run_ids=run_ids) | |
| return runs_df, turns_df, ai_script_evals_df, journey_evals_df |