import streamlit as st import os import pandas as pd import pytz import base64 import altair as alt from datetime import datetime, date, time, timedelta from zoneinfo import ZoneInfo import snowflake.connector from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend # --- Secrets and Key Handling --- private_key_pem = os.getenv("SNOWFLAKE_PRIVATE_KEY").replace('\\n', "\n").encode() private_key_obj = serialization.load_pem_private_key( private_key_pem, password=None, backend=default_backend() ) private_key_der = private_key_obj.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) private_key_b64 = base64.b64encode(private_key_der).decode('utf-8') # Connection params account_identifier = os.getenv("SNOWFLAKE_ACCOUNT_IDENTIFIER") user = os.getenv("SNOWFLAKE_USER") warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") database = os.getenv("SNOWFLAKE_DATABASE") schema = os.getenv("SNOWFLAKE_SCHEMA") role = os.getenv("SNOWFLAKE_ROLE") table = os.getenv("SNOWFLAKE_TABLE") message_filter = os.getenv("SNOWFLAKE_MESSAGE_FILTER") campaign_id = os.getenv("SNOWFLAKE_CAMPAIGN_ID") # Import query builders from house_ad_main import run_house_ad_spike_query from delivery_main import run_drop_query from delivery_queries import ( get_main_query as get_main_delivery_query, get_main_int_sov_query, get_bidder_query as get_bidder_delivery_query, get_flex_bucket_query, get_device_query as get_device_delivery_query, get_ad_unit_query as get_ad_unit_delivery_query, get_refresh_query ) from house_ad_queries import ( get_main_query as get_main_house_query, get_flex_query as get_flex_house_query, get_bidder_query as get_bidder_house_query, get_deal_query, get_ad_unit_query as get_ad_unit_house_query, get_browser_query, get_device_query as get_device_house_query, get_random_integer_query, get_hb_pb_query, get_hb_size_query ) # OpenAI (if required) from openai import OpenAI client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Session defaults if "deep_values" not in st.session_state: st.session_state["deep_values"] = {} # Sidebar filters st.sidebar.title("Red Alert Investigations Filters") analysis_type = st.sidebar.radio( "Analysis Type", ["House_Ads","Display_Prebid","Display_OB","Display_AdX","Display_HBT_OB","Display_TAM", "Video_Prebid","Video_OB","Video_AdX","Video_TAM"] ) if analysis_type == "House_Ads": ad_format_filter = integration_filter = None else: ad_format_filter, integration_filter = analysis_type.split("_",1) # Time defaults now_edt = datetime.now(ZoneInfo("America/New_York")) default_start = now_edt - timedelta(hours=3) default_end = now_edt start_date = st.sidebar.date_input("Start Date", default_start.date()) start_hour = st.sidebar.selectbox("Start Hour", list(range(24)), index=default_start.hour) end_date = st.sidebar.date_input("End Date", default_end.date()) end_hour = st.sidebar.selectbox("End Hour", list(range(24)), index=default_end.hour) start_dt = datetime.combine(start_date, time(start_hour)) end_dt = datetime.combine(end_date, time(end_hour,59,59)) start_str = start_dt.strftime('%Y-%m-%d %H:%M:%S') end_str = end_dt.strftime('%Y-%m-%d %H:%M:%S') st.session_state["start_date"] = start_date st.session_state["end_date"] = end_date st.session_state["eastern"] = pytz.timezone("America/New_York") # Data fetch helper def fetch_df(sql: str) -> pd.DataFrame: conn = snowflake.connector.connect( account=account_identifier, user=user, private_key=private_key_b64, warehouse=warehouse, database=database, schema=schema, role=role, ) return pd.read_sql(sql, conn) # Tabs layout tab_auto, tab_deep = st.tabs(["Auto-Analysis","Deep Dive"]) # Auto-Analysis Tab with tab_auto: st.title("Red Alert Investigations") if analysis_type == "House_Ads": st.header("House Ad Analysis") if st.button("Run Analysis"): st.session_state["query_run"] = False run_house_ad_spike_query( table, start_str, end_str, message_filter, campaign_id, private_key_b64, user, account_identifier, warehouse, database, schema, role, client ) else: st.header(f"{ad_format_filter} {integration_filter} Analysis") if st.button("Run Analysis"): st.session_state["query_run"] = False run_drop_query( table, start_str, end_str, message_filter, campaign_id, private_key_b64, user, account_identifier, warehouse, database, schema, role, client, integration_filter, ad_format_filter ) with tab_deep: st.header("Deep Dive") # 1) Select dimensions if analysis_type == "House_Ads": all_dims = [ "Flex Bucket","Bidder","Deal","Ad Unit","Browser", "Device","Random Integer","HB Price Buckets","HB Size" ] else: all_dims = [ "Integration SOV","Bidder","Flex Bucket", "Device","Ad Unit Group","Refresh" ] to_plot = st.multiselect("1. Select dimensions", all_dims, key="dims") # 2) Fetch unique values per dimension if st.button("2. Fetch Values") and to_plot: vals = {} for dim in to_plot: if dim == "Integration SOV" and analysis_type != "House_Ads": dfv = fetch_df(get_main_int_sov_query( table, start_str, end_str, message_filter, campaign_id, ad_format_filter )) col = "Integration" elif analysis_type == "House_Ads": fn_map = { "Flex Bucket": get_flex_house_query, "Bidder": get_bidder_house_query, "Deal": get_deal_query, "Ad Unit": get_ad_unit_house_query, "Browser": get_browser_query, "Device": get_device_house_query, "Random Integer": get_random_integer_query, "HB Price Buckets":get_hb_pb_query, "HB Size": get_hb_size_query, } dfv = fetch_df(fn_map[dim]( table, start_str, end_str, message_filter, campaign_id )) col = [c for c in dfv.columns if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0] else: fn_map = { "Bidder": get_bidder_delivery_query, "Flex Bucket": get_flex_bucket_query, "Device": get_device_delivery_query, "Ad Unit Group": get_ad_unit_delivery_query, "Refresh": get_refresh_query, } dfv = fetch_df(fn_map[dim]( table, start_str, end_str, message_filter, campaign_id, integration_filter, ad_format_filter )) col = [c for c in dfv.columns if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0] vals[dim] = sorted(dfv[col].dropna().unique()) st.session_state["deep_values"] = vals # 3) Select filters & run the combined query if st.session_state.get("deep_values"): filters = {} for dim, options in st.session_state["deep_values"].items(): filters[dim] = st.multiselect( f"Filter {dim}", options, default=options, key=f"fv_{dim}" ) if st.button("3. Run Deep Dive"): # 3a) Build the base CTE if analysis_type == "House_Ads": base = get_main_house_query( table, start_str, end_str, message_filter, campaign_id ) snippet_map = { "Flex Bucket": "bucket", "Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS BIDDER", "Deal": "body[0]:slotTargeting:hb_deal[0]::varchar AS HB_DEAL", "Ad Unit": "split(body[0]['adUnitPath'],'/')[2]::varchar AS AD_UNIT", "Browser": "CASE WHEN lower(useragent) LIKE '%edg%' THEN 'Edge' WHEN lower(useragent) LIKE '%chrome%' THEN 'Chrome' WHEN lower(useragent) LIKE '%firefox%' THEN 'Firefox' WHEN lower(useragent) LIKE '%safari%' THEN 'Safari' ELSE 'Other' END AS BROWSER", "Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE", "Random Integer": "body[0]:siteTargeting:ri[0]::varchar AS RANDOM_INTEGER", "HB Price Buckets": "body[0]:slotTargeting:hb_pb[0]::varchar AS HB_PB", "HB Size": "body[0]:slotTargeting:hb_size[0]::varchar AS HB_SIZE", } else: base = get_main_delivery_query( table, start_str, end_str, message_filter, campaign_id, integration_filter, ad_format_filter ) snippet_map = { "Integration SOV":"INTEGRATION", "Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS HB_BIDDER", "Flex Bucket": "bucket", "Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE", "Ad Unit Group": "CASE WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Outstream%' THEN 'Sticky_Outstream' WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Video%' THEN 'Video' ELSE 'Other' END AS AD_UNIT_GROUP", "Refresh": "body[0]:slotTargeting:refresh[0]::varchar AS REFRESH", } # 3b) Inject all selected dimension snippets, matching both lowercase & uppercase select_snippets = [snippet_map[dim] for dim in to_plot] dynamic_cte = ( base .replace( "count(*) as CNT", f"count(*) as CNT, {', '.join(select_snippets)}" ) .replace( "COUNT(*) AS CNT", f"COUNT(*) AS CNT, {', '.join(select_snippets)}" ) ) # 3c) Build WHERE clauses from the filters where_clauses = [] for dim, vals in filters.items(): alias = snippet_map[dim].split(" AS ")[-1] val_list = ", ".join(f"'{v}'" for v in vals) where_clauses.append(f"{alias} IN ({val_list})") final_sql = ( f"SELECT *\n" f"FROM (\n{dynamic_cte}\n) sub\n" f"WHERE {' AND '.join(where_clauses)}" ) # 3d) Execute & display df_final = fetch_df(final_sql) for dim, snippet in snippet_map.items(): alias = snippet.split(" AS ")[-1] # e.g. "bucket", "BROWSER", etc. # find the actual DataFrame column (which will be uppercase) match = next((c for c in df_final.columns if c.upper() == alias.upper()), None) if match: df_final.rename(columns={match: dim}, inplace=True) # Build the minute‐precision datetime index df_final["EST_DATETIME"] = ( pd.to_datetime(df_final["EST_DATE"]) + pd.to_timedelta(df_final["EST_HOUR"], unit="h") + pd.to_timedelta(df_final["EST_MINUTE"], unit="m") ) st.subheader("Deep Dive Results") st.dataframe(df_final) # Build the Series column off your filtered dims df_final["Series"] = ( df_final[list(filters.keys())] .astype(str) .agg(":".join, axis=1) ) # Pivot on EST_DATETIME instead of EST_DATE pivot = ( df_final .pivot_table( index="EST_DATETIME", # ← minute‐level axis columns="Series", values="CNT", aggfunc="sum" ) .fillna(0) .sort_index() ) pivot.columns = [col.replace(":", "_") for col in pivot.columns] pivot_df = ( pivot .reset_index() .melt(id_vars="EST_DATETIME", var_name="Series", value_name="CNT") ) # Build an Altair line chart: chart = ( alt.Chart(pivot_df) .mark_line(point=True) .encode( x=alt.X( "EST_DATETIME:T", axis=alt.Axis( title="Time (NY)", format="%H:%M", # show hour:minute on the axis tickCount="hour" # one tick per hour ) ), y=alt.Y("CNT:Q", title="Count"), color=alt.Color("Series:N", title="Dimension"), tooltip=[ alt.Tooltip("EST_DATETIME:T", title="Timestamp", format="%Y-%m-%d %H:%M"), alt.Tooltip("Series:N", title="Series"), alt.Tooltip("CNT:Q", title="Count"), ] ) .properties(width=700, height=400) .interactive() # allow pan/zoom ) st.subheader("Deep Dive Trend") st.altair_chart(chart, use_container_width=True)