Spaces:
Sleeping
Sleeping
File size: 14,614 Bytes
08c9602 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
import streamlit as st
import os
import pandas as pd
import pytz
import base64
import altair as alt
from datetime import datetime, date, time, timedelta
from zoneinfo import ZoneInfo
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
# --- Secrets and Key Handling ---
private_key_pem = os.getenv("SNOWFLAKE_PRIVATE_KEY").replace('\\n', "\n").encode()
private_key_obj = serialization.load_pem_private_key(
private_key_pem,
password=None,
backend=default_backend()
)
private_key_der = private_key_obj.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
private_key_b64 = base64.b64encode(private_key_der).decode('utf-8')
# Connection params
account_identifier = os.getenv("SNOWFLAKE_ACCOUNT_IDENTIFIER")
user = os.getenv("SNOWFLAKE_USER")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
database = os.getenv("SNOWFLAKE_DATABASE")
schema = os.getenv("SNOWFLAKE_SCHEMA")
role = os.getenv("SNOWFLAKE_ROLE")
table = os.getenv("SNOWFLAKE_TABLE")
message_filter = os.getenv("SNOWFLAKE_MESSAGE_FILTER")
campaign_id = os.getenv("SNOWFLAKE_CAMPAIGN_ID")
# Import query builders
from house_ad_main import run_house_ad_spike_query
from delivery_main import run_drop_query
from delivery_queries import (
get_main_query as get_main_delivery_query,
get_main_int_sov_query,
get_bidder_query as get_bidder_delivery_query,
get_flex_bucket_query,
get_device_query as get_device_delivery_query,
get_ad_unit_query as get_ad_unit_delivery_query,
get_refresh_query
)
from house_ad_queries import (
get_main_query as get_main_house_query,
get_flex_query as get_flex_house_query,
get_bidder_query as get_bidder_house_query,
get_deal_query,
get_ad_unit_query as get_ad_unit_house_query,
get_browser_query,
get_device_query as get_device_house_query,
get_random_integer_query,
get_hb_pb_query,
get_hb_size_query
)
# OpenAI (if required)
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Session defaults
if "deep_values" not in st.session_state:
st.session_state["deep_values"] = {}
# Sidebar filters
st.sidebar.title("Red Alert Investigations Filters")
analysis_type = st.sidebar.radio(
"Analysis Type",
["House_Ads","Display_Prebid","Display_OB","Display_AdX","Display_HBT_OB","Display_TAM",
"Video_Prebid","Video_OB","Video_AdX","Video_TAM"]
)
if analysis_type == "House_Ads":
ad_format_filter = integration_filter = None
else:
ad_format_filter, integration_filter = analysis_type.split("_",1)
# Time defaults
now_edt = datetime.now(ZoneInfo("America/New_York"))
default_start = now_edt - timedelta(hours=3)
default_end = now_edt
start_date = st.sidebar.date_input("Start Date", default_start.date())
start_hour = st.sidebar.selectbox("Start Hour", list(range(24)), index=default_start.hour)
end_date = st.sidebar.date_input("End Date", default_end.date())
end_hour = st.sidebar.selectbox("End Hour", list(range(24)), index=default_end.hour)
start_dt = datetime.combine(start_date, time(start_hour))
end_dt = datetime.combine(end_date, time(end_hour,59,59))
start_str = start_dt.strftime('%Y-%m-%d %H:%M:%S')
end_str = end_dt.strftime('%Y-%m-%d %H:%M:%S')
st.session_state["start_date"] = start_date
st.session_state["end_date"] = end_date
st.session_state["eastern"] = pytz.timezone("America/New_York")
# Data fetch helper
def fetch_df(sql: str) -> pd.DataFrame:
conn = snowflake.connector.connect(
account=account_identifier,
user=user,
private_key=private_key_b64,
warehouse=warehouse,
database=database,
schema=schema,
role=role,
)
return pd.read_sql(sql, conn)
# Tabs layout
tab_auto, tab_deep = st.tabs(["Auto-Analysis","Deep Dive"])
# Auto-Analysis Tab
with tab_auto:
st.title("Red Alert Investigations")
if analysis_type == "House_Ads":
st.header("House Ad Analysis")
if st.button("Run Analysis"):
st.session_state["query_run"] = False
run_house_ad_spike_query(
table, start_str, end_str,
message_filter, campaign_id,
private_key_b64, user,
account_identifier, warehouse,
database, schema, role,
client
)
else:
st.header(f"{ad_format_filter} {integration_filter} Analysis")
if st.button("Run Analysis"):
st.session_state["query_run"] = False
run_drop_query(
table, start_str, end_str,
message_filter, campaign_id,
private_key_b64, user,
account_identifier, warehouse,
database, schema, role,
client,
integration_filter, ad_format_filter
)
with tab_deep:
st.header("Deep Dive")
# 1) Select dimensions
if analysis_type == "House_Ads":
all_dims = [
"Flex Bucket","Bidder","Deal","Ad Unit","Browser",
"Device","Random Integer","HB Price Buckets","HB Size"
]
else:
all_dims = [
"Integration SOV","Bidder","Flex Bucket",
"Device","Ad Unit Group","Refresh"
]
to_plot = st.multiselect("1. Select dimensions", all_dims, key="dims")
# 2) Fetch unique values per dimension
if st.button("2. Fetch Values") and to_plot:
vals = {}
for dim in to_plot:
if dim == "Integration SOV" and analysis_type != "House_Ads":
dfv = fetch_df(get_main_int_sov_query(
table, start_str, end_str, message_filter,
campaign_id, ad_format_filter
))
col = "Integration"
elif analysis_type == "House_Ads":
fn_map = {
"Flex Bucket": get_flex_house_query,
"Bidder": get_bidder_house_query,
"Deal": get_deal_query,
"Ad Unit": get_ad_unit_house_query,
"Browser": get_browser_query,
"Device": get_device_house_query,
"Random Integer": get_random_integer_query,
"HB Price Buckets":get_hb_pb_query,
"HB Size": get_hb_size_query,
}
dfv = fetch_df(fn_map[dim](
table, start_str, end_str, message_filter, campaign_id
))
col = [c for c in dfv.columns
if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0]
else:
fn_map = {
"Bidder": get_bidder_delivery_query,
"Flex Bucket": get_flex_bucket_query,
"Device": get_device_delivery_query,
"Ad Unit Group": get_ad_unit_delivery_query,
"Refresh": get_refresh_query,
}
dfv = fetch_df(fn_map[dim](
table, start_str, end_str, message_filter,
campaign_id, integration_filter, ad_format_filter
))
col = [c for c in dfv.columns
if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0]
vals[dim] = sorted(dfv[col].dropna().unique())
st.session_state["deep_values"] = vals
# 3) Select filters & run the combined query
if st.session_state.get("deep_values"):
filters = {}
for dim, options in st.session_state["deep_values"].items():
filters[dim] = st.multiselect(
f"Filter {dim}", options, default=options,
key=f"fv_{dim}"
)
if st.button("3. Run Deep Dive"):
# 3a) Build the base CTE
if analysis_type == "House_Ads":
base = get_main_house_query(
table, start_str, end_str, message_filter, campaign_id
)
snippet_map = {
"Flex Bucket": "bucket",
"Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS BIDDER",
"Deal": "body[0]:slotTargeting:hb_deal[0]::varchar AS HB_DEAL",
"Ad Unit": "split(body[0]['adUnitPath'],'/')[2]::varchar AS AD_UNIT",
"Browser": "CASE WHEN lower(useragent) LIKE '%edg%' THEN 'Edge' WHEN lower(useragent) LIKE '%chrome%' THEN 'Chrome' WHEN lower(useragent) LIKE '%firefox%' THEN 'Firefox' WHEN lower(useragent) LIKE '%safari%' THEN 'Safari' ELSE 'Other' END AS BROWSER",
"Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE",
"Random Integer": "body[0]:siteTargeting:ri[0]::varchar AS RANDOM_INTEGER",
"HB Price Buckets": "body[0]:slotTargeting:hb_pb[0]::varchar AS HB_PB",
"HB Size": "body[0]:slotTargeting:hb_size[0]::varchar AS HB_SIZE",
}
else:
base = get_main_delivery_query(
table, start_str, end_str,
message_filter, campaign_id,
integration_filter, ad_format_filter
)
snippet_map = {
"Integration SOV":"INTEGRATION",
"Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS HB_BIDDER",
"Flex Bucket": "bucket",
"Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE",
"Ad Unit Group": "CASE WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Outstream%' THEN 'Sticky_Outstream' WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Video%' THEN 'Video' ELSE 'Other' END AS AD_UNIT_GROUP",
"Refresh": "body[0]:slotTargeting:refresh[0]::varchar AS REFRESH",
}
# 3b) Inject all selected dimension snippets, matching both lowercase & uppercase
select_snippets = [snippet_map[dim] for dim in to_plot]
dynamic_cte = (
base
.replace(
"count(*) as CNT",
f"count(*) as CNT, {', '.join(select_snippets)}"
)
.replace(
"COUNT(*) AS CNT",
f"COUNT(*) AS CNT, {', '.join(select_snippets)}"
)
)
# 3c) Build WHERE clauses from the filters
where_clauses = []
for dim, vals in filters.items():
alias = snippet_map[dim].split(" AS ")[-1]
val_list = ", ".join(f"'{v}'" for v in vals)
where_clauses.append(f"{alias} IN ({val_list})")
final_sql = (
f"SELECT *\n"
f"FROM (\n{dynamic_cte}\n) sub\n"
f"WHERE {' AND '.join(where_clauses)}"
)
# 3d) Execute & display
df_final = fetch_df(final_sql)
for dim, snippet in snippet_map.items():
alias = snippet.split(" AS ")[-1] # e.g. "bucket", "BROWSER", etc.
# find the actual DataFrame column (which will be uppercase)
match = next((c for c in df_final.columns if c.upper() == alias.upper()), None)
if match:
df_final.rename(columns={match: dim}, inplace=True)
# Build the minute‐precision datetime index
df_final["EST_DATETIME"] = (
pd.to_datetime(df_final["EST_DATE"]) +
pd.to_timedelta(df_final["EST_HOUR"], unit="h") +
pd.to_timedelta(df_final["EST_MINUTE"], unit="m")
)
st.subheader("Deep Dive Results")
st.dataframe(df_final)
# Build the Series column off your filtered dims
df_final["Series"] = (
df_final[list(filters.keys())]
.astype(str)
.agg(":".join, axis=1)
)
# Pivot on EST_DATETIME instead of EST_DATE
pivot = (
df_final
.pivot_table(
index="EST_DATETIME", # ← minute‐level axis
columns="Series",
values="CNT",
aggfunc="sum"
)
.fillna(0)
.sort_index()
)
pivot.columns = [col.replace(":", "_") for col in pivot.columns]
pivot_df = (
pivot
.reset_index()
.melt(id_vars="EST_DATETIME", var_name="Series", value_name="CNT")
)
# Build an Altair line chart:
chart = (
alt.Chart(pivot_df)
.mark_line(point=True)
.encode(
x=alt.X(
"EST_DATETIME:T",
axis=alt.Axis(
title="Time (NY)",
format="%H:%M", # show hour:minute on the axis
tickCount="hour" # one tick per hour
)
),
y=alt.Y("CNT:Q", title="Count"),
color=alt.Color("Series:N", title="Dimension"),
tooltip=[
alt.Tooltip("EST_DATETIME:T", title="Timestamp", format="%Y-%m-%d %H:%M"),
alt.Tooltip("Series:N", title="Series"),
alt.Tooltip("CNT:Q", title="Count"),
]
)
.properties(width=700, height=400)
.interactive() # allow pan/zoom
)
st.subheader("Deep Dive Trend")
st.altair_chart(chart, use_container_width=True)
|