ba_dashboard / src /data /loaders.py
paulokewunmi's picture
Upload 10 files
0847744 verified
import streamlit as st
import pandas as pd
from datetime import datetime, timedelta
import logging
from .athena import execute_athena_query
from config import (
RUNS_TABLE, TURNS_TABLE, AI_SCRIPT_EVALS_TABLE, JOURNEY_EVALS_TABLE,
STREAMLIT_CACHE_TTL
)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@st.cache_data(ttl=STREAMLIT_CACHE_TTL)
def load_runs_data(filter_days=None, run_tags=None):
"""
Load runs data from Athena
Args:
filter_days (int, optional): Filter data to last N days
run_tags (list, optional): Filter data to specific run tags
Returns:
pandas.DataFrame: Runs data
"""
query = f"SELECT * FROM {RUNS_TABLE}"
conditions = []
# Add date filter if specified
if filter_days is not None and filter_days > 0:
cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d')
conditions.append(f"start_time >= '{cutoff_date}'")
# Add run tags filter if specified
if run_tags and len(run_tags) > 0:
tags_list = "', '".join(run_tags)
conditions.append(f"run_tag IN ('{tags_list}')")
# Add WHERE clause if there are conditions
if conditions:
query += " WHERE " + " AND ".join(conditions)
logger.info(f"Executing runs query: {query}")
df = execute_athena_query(query)
# Convert timestamp strings to datetime objects if needed
if not df.empty and 'start_time' in df.columns:
df['start_time'] = pd.to_datetime(df['start_time'], errors='coerce')
if not df.empty and 'end_time' in df.columns:
df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
return df
@st.cache_data(ttl=STREAMLIT_CACHE_TTL)
def load_turns_data(run_ids=None, filter_days=None):
"""
Load turns data from Athena
Args:
run_ids (list, optional): Filter data to specific run IDs
filter_days (int, optional): Filter data to last N days
Returns:
pandas.DataFrame: Turns data
"""
query = f"SELECT * FROM {TURNS_TABLE}"
conditions = []
# Add run_ids filter if specified
if run_ids and len(run_ids) > 0:
# Convert list to SQL tuple format
run_ids_str = "', '".join(run_ids)
conditions.append(f"run_id IN ('{run_ids_str}')")
# Add date filter if specified
if filter_days is not None and filter_days > 0:
cutoff_date = (datetime.now() - timedelta(days=filter_days)).strftime('%Y-%m-%d')
conditions.append(f"timestamp >= '{cutoff_date}'")
# Add WHERE clause if there are conditions
if conditions:
query += " WHERE " + " AND ".join(conditions)
logger.info(f"Executing turns query: {query}")
df = execute_athena_query(query)
# Convert timestamp strings to datetime objects if needed
if not df.empty and 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
return df
@st.cache_data(ttl=STREAMLIT_CACHE_TTL)
def load_ai_script_evals_data(turn_ids=None, run_ids=None):
"""
Load AI script evals data from Athena
Args:
turn_ids (list, optional): Filter data to specific turn IDs
run_ids (list, optional): Filter data to specific run IDs
Returns:
pandas.DataFrame: AI script evaluations data
"""
query = f"SELECT * FROM {AI_SCRIPT_EVALS_TABLE}"
conditions = []
# Add turn_ids filter if specified
if turn_ids and len(turn_ids) > 0:
turn_ids_str = "', '".join(turn_ids)
conditions.append(f"turn_id IN ('{turn_ids_str}')")
# Add run_ids filter if specified
if run_ids and len(run_ids) > 0:
run_ids_str = "', '".join(run_ids)
conditions.append(f"run_id IN ('{run_ids_str}')")
# Add WHERE clause if there are conditions
if conditions:
query += " WHERE " + " AND ".join(conditions)
logger.info(f"Executing AI script evals query: {query}")
return execute_athena_query(query)
@st.cache_data(ttl=STREAMLIT_CACHE_TTL)
def load_journey_evals_data(run_ids=None):
"""
Load journey evals data from Athena
Args:
run_ids (list, optional): Filter data to specific run IDs
Returns:
pandas.DataFrame: Journey evaluations data
"""
query = f"SELECT * FROM {JOURNEY_EVALS_TABLE}"
# Add run_ids filter if specified
if run_ids and len(run_ids) > 0:
run_ids_str = "', '".join(run_ids)
query += f" WHERE run_id IN ('{run_ids_str}')"
logger.info(f"Executing journey evals query: {query}")
return execute_athena_query(query)
# @st.cache_data(ttl=STREAMLIT_CACHE_TTL)
# def load_master_data(s3_bucket, s3_key):
# """
# Load master configuration data from S3
# Args:
# s3_bucket (str): S3 bucket name
# s3_key (str): S3 object key
# Returns:
# dict: Master configuration data
# """
# import boto3
# import json
# try:
# s3 = boto3.client('s3')
# response = s3.get_object(Bucket=s3_bucket, Key=s3_key)
# content = response['Body'].read().decode('utf-8')
# data = json.loads(content)
# # Convert to DataFrames
# metrics_df = pd.DataFrame(data.get('metrics', []))
# ai_script_evals_df = pd.DataFrame(data.get('ai_script_evals', []))
# journey_evals_df = pd.DataFrame(data.get('journey_evals', []))
# journey_plots_df = pd.DataFrame(data.get('journey_plots', []))
# journeys_df = pd.DataFrame(data.get('journeys', []))
# ai_scripts_df = pd.DataFrame(data.get('ai_scripts', []))
# return {
# 'metrics': metrics_df,
# 'ai_script_evals': ai_script_evals_df,
# 'journey_evals': journey_evals_df,
# 'journey_plots': journey_plots_df,
# 'journeys': journeys_df,
# 'ai_scripts': ai_scripts_df
# }
# except Exception as e:
# logger.error(f"Failed to load master data from S3: {e}")
# raise
@st.cache_data(ttl=STREAMLIT_CACHE_TTL)
def load_all_data(filter_days=None, run_tags=None):
"""
Load all data needed for the dashboard
Args:
filter_days (int, optional): Filter data to last N days
run_tags (list, optional): Filter data to specific run tags
Returns:
tuple: (runs_df, turns_df, ai_script_evals_df, journey_evals_df)
"""
# Load runs data with filters
runs_df = load_runs_data(filter_days=filter_days, run_tags=run_tags)
if runs_df.empty:
return pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
# Get run IDs from filtered runs
run_ids = runs_df['run_id'].tolist()
# Load turns data for these runs
turns_df = load_turns_data(run_ids=run_ids)
# Get turn IDs
turn_ids = [] if turns_df.empty else turns_df['turn_id'].tolist()
# Load AI script evaluations for these turns
ai_script_evals_df = load_ai_script_evals_data(turn_ids=turn_ids)
# Load journey evaluations for these runs
journey_evals_df = load_journey_evals_data(run_ids=run_ids)
return runs_df, turns_df, ai_script_evals_df, journey_evals_df