github-actions[bot]
sync: automatic content update from github
08c9602
import streamlit as st
import os
import pandas as pd
import pytz
import base64
import altair as alt
from datetime import datetime, date, time, timedelta
from zoneinfo import ZoneInfo
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
# --- Secrets and Key Handling ---
private_key_pem = os.getenv("SNOWFLAKE_PRIVATE_KEY").replace('\\n', "\n").encode()
private_key_obj = serialization.load_pem_private_key(
private_key_pem,
password=None,
backend=default_backend()
)
private_key_der = private_key_obj.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
private_key_b64 = base64.b64encode(private_key_der).decode('utf-8')
# Connection params
account_identifier = os.getenv("SNOWFLAKE_ACCOUNT_IDENTIFIER")
user = os.getenv("SNOWFLAKE_USER")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
database = os.getenv("SNOWFLAKE_DATABASE")
schema = os.getenv("SNOWFLAKE_SCHEMA")
role = os.getenv("SNOWFLAKE_ROLE")
table = os.getenv("SNOWFLAKE_TABLE")
message_filter = os.getenv("SNOWFLAKE_MESSAGE_FILTER")
campaign_id = os.getenv("SNOWFLAKE_CAMPAIGN_ID")
# Import query builders
from house_ad_main import run_house_ad_spike_query
from delivery_main import run_drop_query
from delivery_queries import (
get_main_query as get_main_delivery_query,
get_main_int_sov_query,
get_bidder_query as get_bidder_delivery_query,
get_flex_bucket_query,
get_device_query as get_device_delivery_query,
get_ad_unit_query as get_ad_unit_delivery_query,
get_refresh_query
)
from house_ad_queries import (
get_main_query as get_main_house_query,
get_flex_query as get_flex_house_query,
get_bidder_query as get_bidder_house_query,
get_deal_query,
get_ad_unit_query as get_ad_unit_house_query,
get_browser_query,
get_device_query as get_device_house_query,
get_random_integer_query,
get_hb_pb_query,
get_hb_size_query
)
# OpenAI (if required)
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Session defaults
if "deep_values" not in st.session_state:
st.session_state["deep_values"] = {}
# Sidebar filters
st.sidebar.title("Red Alert Investigations Filters")
analysis_type = st.sidebar.radio(
"Analysis Type",
["House_Ads","Display_Prebid","Display_OB","Display_AdX","Display_HBT_OB","Display_TAM",
"Video_Prebid","Video_OB","Video_AdX","Video_TAM"]
)
if analysis_type == "House_Ads":
ad_format_filter = integration_filter = None
else:
ad_format_filter, integration_filter = analysis_type.split("_",1)
# Time defaults
now_edt = datetime.now(ZoneInfo("America/New_York"))
default_start = now_edt - timedelta(hours=3)
default_end = now_edt
start_date = st.sidebar.date_input("Start Date", default_start.date())
start_hour = st.sidebar.selectbox("Start Hour", list(range(24)), index=default_start.hour)
end_date = st.sidebar.date_input("End Date", default_end.date())
end_hour = st.sidebar.selectbox("End Hour", list(range(24)), index=default_end.hour)
start_dt = datetime.combine(start_date, time(start_hour))
end_dt = datetime.combine(end_date, time(end_hour,59,59))
start_str = start_dt.strftime('%Y-%m-%d %H:%M:%S')
end_str = end_dt.strftime('%Y-%m-%d %H:%M:%S')
st.session_state["start_date"] = start_date
st.session_state["end_date"] = end_date
st.session_state["eastern"] = pytz.timezone("America/New_York")
# Data fetch helper
def fetch_df(sql: str) -> pd.DataFrame:
conn = snowflake.connector.connect(
account=account_identifier,
user=user,
private_key=private_key_b64,
warehouse=warehouse,
database=database,
schema=schema,
role=role,
)
return pd.read_sql(sql, conn)
# Tabs layout
tab_auto, tab_deep = st.tabs(["Auto-Analysis","Deep Dive"])
# Auto-Analysis Tab
with tab_auto:
st.title("Red Alert Investigations")
if analysis_type == "House_Ads":
st.header("House Ad Analysis")
if st.button("Run Analysis"):
st.session_state["query_run"] = False
run_house_ad_spike_query(
table, start_str, end_str,
message_filter, campaign_id,
private_key_b64, user,
account_identifier, warehouse,
database, schema, role,
client
)
else:
st.header(f"{ad_format_filter} {integration_filter} Analysis")
if st.button("Run Analysis"):
st.session_state["query_run"] = False
run_drop_query(
table, start_str, end_str,
message_filter, campaign_id,
private_key_b64, user,
account_identifier, warehouse,
database, schema, role,
client,
integration_filter, ad_format_filter
)
with tab_deep:
st.header("Deep Dive")
# 1) Select dimensions
if analysis_type == "House_Ads":
all_dims = [
"Flex Bucket","Bidder","Deal","Ad Unit","Browser",
"Device","Random Integer","HB Price Buckets","HB Size"
]
else:
all_dims = [
"Integration SOV","Bidder","Flex Bucket",
"Device","Ad Unit Group","Refresh"
]
to_plot = st.multiselect("1. Select dimensions", all_dims, key="dims")
# 2) Fetch unique values per dimension
if st.button("2. Fetch Values") and to_plot:
vals = {}
for dim in to_plot:
if dim == "Integration SOV" and analysis_type != "House_Ads":
dfv = fetch_df(get_main_int_sov_query(
table, start_str, end_str, message_filter,
campaign_id, ad_format_filter
))
col = "Integration"
elif analysis_type == "House_Ads":
fn_map = {
"Flex Bucket": get_flex_house_query,
"Bidder": get_bidder_house_query,
"Deal": get_deal_query,
"Ad Unit": get_ad_unit_house_query,
"Browser": get_browser_query,
"Device": get_device_house_query,
"Random Integer": get_random_integer_query,
"HB Price Buckets":get_hb_pb_query,
"HB Size": get_hb_size_query,
}
dfv = fetch_df(fn_map[dim](
table, start_str, end_str, message_filter, campaign_id
))
col = [c for c in dfv.columns
if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0]
else:
fn_map = {
"Bidder": get_bidder_delivery_query,
"Flex Bucket": get_flex_bucket_query,
"Device": get_device_delivery_query,
"Ad Unit Group": get_ad_unit_delivery_query,
"Refresh": get_refresh_query,
}
dfv = fetch_df(fn_map[dim](
table, start_str, end_str, message_filter,
campaign_id, integration_filter, ad_format_filter
))
col = [c for c in dfv.columns
if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0]
vals[dim] = sorted(dfv[col].dropna().unique())
st.session_state["deep_values"] = vals
# 3) Select filters & run the combined query
if st.session_state.get("deep_values"):
filters = {}
for dim, options in st.session_state["deep_values"].items():
filters[dim] = st.multiselect(
f"Filter {dim}", options, default=options,
key=f"fv_{dim}"
)
if st.button("3. Run Deep Dive"):
# 3a) Build the base CTE
if analysis_type == "House_Ads":
base = get_main_house_query(
table, start_str, end_str, message_filter, campaign_id
)
snippet_map = {
"Flex Bucket": "bucket",
"Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS BIDDER",
"Deal": "body[0]:slotTargeting:hb_deal[0]::varchar AS HB_DEAL",
"Ad Unit": "split(body[0]['adUnitPath'],'/')[2]::varchar AS AD_UNIT",
"Browser": "CASE WHEN lower(useragent) LIKE '%edg%' THEN 'Edge' WHEN lower(useragent) LIKE '%chrome%' THEN 'Chrome' WHEN lower(useragent) LIKE '%firefox%' THEN 'Firefox' WHEN lower(useragent) LIKE '%safari%' THEN 'Safari' ELSE 'Other' END AS BROWSER",
"Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE",
"Random Integer": "body[0]:siteTargeting:ri[0]::varchar AS RANDOM_INTEGER",
"HB Price Buckets": "body[0]:slotTargeting:hb_pb[0]::varchar AS HB_PB",
"HB Size": "body[0]:slotTargeting:hb_size[0]::varchar AS HB_SIZE",
}
else:
base = get_main_delivery_query(
table, start_str, end_str,
message_filter, campaign_id,
integration_filter, ad_format_filter
)
snippet_map = {
"Integration SOV":"INTEGRATION",
"Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS HB_BIDDER",
"Flex Bucket": "bucket",
"Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE",
"Ad Unit Group": "CASE WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Outstream%' THEN 'Sticky_Outstream' WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Video%' THEN 'Video' ELSE 'Other' END AS AD_UNIT_GROUP",
"Refresh": "body[0]:slotTargeting:refresh[0]::varchar AS REFRESH",
}
# 3b) Inject all selected dimension snippets, matching both lowercase & uppercase
select_snippets = [snippet_map[dim] for dim in to_plot]
dynamic_cte = (
base
.replace(
"count(*) as CNT",
f"count(*) as CNT, {', '.join(select_snippets)}"
)
.replace(
"COUNT(*) AS CNT",
f"COUNT(*) AS CNT, {', '.join(select_snippets)}"
)
)
# 3c) Build WHERE clauses from the filters
where_clauses = []
for dim, vals in filters.items():
alias = snippet_map[dim].split(" AS ")[-1]
val_list = ", ".join(f"'{v}'" for v in vals)
where_clauses.append(f"{alias} IN ({val_list})")
final_sql = (
f"SELECT *\n"
f"FROM (\n{dynamic_cte}\n) sub\n"
f"WHERE {' AND '.join(where_clauses)}"
)
# 3d) Execute & display
df_final = fetch_df(final_sql)
for dim, snippet in snippet_map.items():
alias = snippet.split(" AS ")[-1] # e.g. "bucket", "BROWSER", etc.
# find the actual DataFrame column (which will be uppercase)
match = next((c for c in df_final.columns if c.upper() == alias.upper()), None)
if match:
df_final.rename(columns={match: dim}, inplace=True)
# Build the minute‐precision datetime index
df_final["EST_DATETIME"] = (
pd.to_datetime(df_final["EST_DATE"]) +
pd.to_timedelta(df_final["EST_HOUR"], unit="h") +
pd.to_timedelta(df_final["EST_MINUTE"], unit="m")
)
st.subheader("Deep Dive Results")
st.dataframe(df_final)
# Build the Series column off your filtered dims
df_final["Series"] = (
df_final[list(filters.keys())]
.astype(str)
.agg(":".join, axis=1)
)
# Pivot on EST_DATETIME instead of EST_DATE
pivot = (
df_final
.pivot_table(
index="EST_DATETIME", # ← minute‐level axis
columns="Series",
values="CNT",
aggfunc="sum"
)
.fillna(0)
.sort_index()
)
pivot.columns = [col.replace(":", "_") for col in pivot.columns]
pivot_df = (
pivot
.reset_index()
.melt(id_vars="EST_DATETIME", var_name="Series", value_name="CNT")
)
# Build an Altair line chart:
chart = (
alt.Chart(pivot_df)
.mark_line(point=True)
.encode(
x=alt.X(
"EST_DATETIME:T",
axis=alt.Axis(
title="Time (NY)",
format="%H:%M", # show hour:minute on the axis
tickCount="hour" # one tick per hour
)
),
y=alt.Y("CNT:Q", title="Count"),
color=alt.Color("Series:N", title="Dimension"),
tooltip=[
alt.Tooltip("EST_DATETIME:T", title="Timestamp", format="%Y-%m-%d %H:%M"),
alt.Tooltip("Series:N", title="Series"),
alt.Tooltip("CNT:Q", title="Count"),
]
)
.properties(width=700, height=400)
.interactive() # allow pan/zoom
)
st.subheader("Deep Dive Trend")
st.altair_chart(chart, use_container_width=True)