Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import pandas as pd | |
| import pytz | |
| import base64 | |
| import altair as alt | |
| from datetime import datetime, date, time, timedelta | |
| from zoneinfo import ZoneInfo | |
| import snowflake.connector | |
| from cryptography.hazmat.primitives import serialization | |
| from cryptography.hazmat.backends import default_backend | |
| # --- Secrets and Key Handling --- | |
| private_key_pem = os.getenv("SNOWFLAKE_PRIVATE_KEY").replace('\\n', "\n").encode() | |
| private_key_obj = serialization.load_pem_private_key( | |
| private_key_pem, | |
| password=None, | |
| backend=default_backend() | |
| ) | |
| private_key_der = private_key_obj.private_bytes( | |
| encoding=serialization.Encoding.DER, | |
| format=serialization.PrivateFormat.PKCS8, | |
| encryption_algorithm=serialization.NoEncryption() | |
| ) | |
| private_key_b64 = base64.b64encode(private_key_der).decode('utf-8') | |
| # Connection params | |
| account_identifier = os.getenv("SNOWFLAKE_ACCOUNT_IDENTIFIER") | |
| user = os.getenv("SNOWFLAKE_USER") | |
| warehouse = os.getenv("SNOWFLAKE_WAREHOUSE") | |
| database = os.getenv("SNOWFLAKE_DATABASE") | |
| schema = os.getenv("SNOWFLAKE_SCHEMA") | |
| role = os.getenv("SNOWFLAKE_ROLE") | |
| table = os.getenv("SNOWFLAKE_TABLE") | |
| message_filter = os.getenv("SNOWFLAKE_MESSAGE_FILTER") | |
| campaign_id = os.getenv("SNOWFLAKE_CAMPAIGN_ID") | |
| # Import query builders | |
| from house_ad_main import run_house_ad_spike_query | |
| from delivery_main import run_drop_query | |
| from delivery_queries import ( | |
| get_main_query as get_main_delivery_query, | |
| get_main_int_sov_query, | |
| get_bidder_query as get_bidder_delivery_query, | |
| get_flex_bucket_query, | |
| get_device_query as get_device_delivery_query, | |
| get_ad_unit_query as get_ad_unit_delivery_query, | |
| get_refresh_query | |
| ) | |
| from house_ad_queries import ( | |
| get_main_query as get_main_house_query, | |
| get_flex_query as get_flex_house_query, | |
| get_bidder_query as get_bidder_house_query, | |
| get_deal_query, | |
| get_ad_unit_query as get_ad_unit_house_query, | |
| get_browser_query, | |
| get_device_query as get_device_house_query, | |
| get_random_integer_query, | |
| get_hb_pb_query, | |
| get_hb_size_query | |
| ) | |
| # OpenAI (if required) | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Session defaults | |
| if "deep_values" not in st.session_state: | |
| st.session_state["deep_values"] = {} | |
| # Sidebar filters | |
| st.sidebar.title("Red Alert Investigations Filters") | |
| analysis_type = st.sidebar.radio( | |
| "Analysis Type", | |
| ["House_Ads","Display_Prebid","Display_OB","Display_AdX","Display_HBT_OB","Display_TAM", | |
| "Video_Prebid","Video_OB","Video_AdX","Video_TAM"] | |
| ) | |
| if analysis_type == "House_Ads": | |
| ad_format_filter = integration_filter = None | |
| else: | |
| ad_format_filter, integration_filter = analysis_type.split("_",1) | |
| # Time defaults | |
| now_edt = datetime.now(ZoneInfo("America/New_York")) | |
| default_start = now_edt - timedelta(hours=3) | |
| default_end = now_edt | |
| start_date = st.sidebar.date_input("Start Date", default_start.date()) | |
| start_hour = st.sidebar.selectbox("Start Hour", list(range(24)), index=default_start.hour) | |
| end_date = st.sidebar.date_input("End Date", default_end.date()) | |
| end_hour = st.sidebar.selectbox("End Hour", list(range(24)), index=default_end.hour) | |
| start_dt = datetime.combine(start_date, time(start_hour)) | |
| end_dt = datetime.combine(end_date, time(end_hour,59,59)) | |
| start_str = start_dt.strftime('%Y-%m-%d %H:%M:%S') | |
| end_str = end_dt.strftime('%Y-%m-%d %H:%M:%S') | |
| st.session_state["start_date"] = start_date | |
| st.session_state["end_date"] = end_date | |
| st.session_state["eastern"] = pytz.timezone("America/New_York") | |
| # Data fetch helper | |
| def fetch_df(sql: str) -> pd.DataFrame: | |
| conn = snowflake.connector.connect( | |
| account=account_identifier, | |
| user=user, | |
| private_key=private_key_b64, | |
| warehouse=warehouse, | |
| database=database, | |
| schema=schema, | |
| role=role, | |
| ) | |
| return pd.read_sql(sql, conn) | |
| # Tabs layout | |
| tab_auto, tab_deep = st.tabs(["Auto-Analysis","Deep Dive"]) | |
| # Auto-Analysis Tab | |
| with tab_auto: | |
| st.title("Red Alert Investigations") | |
| if analysis_type == "House_Ads": | |
| st.header("House Ad Analysis") | |
| if st.button("Run Analysis"): | |
| st.session_state["query_run"] = False | |
| run_house_ad_spike_query( | |
| table, start_str, end_str, | |
| message_filter, campaign_id, | |
| private_key_b64, user, | |
| account_identifier, warehouse, | |
| database, schema, role, | |
| client | |
| ) | |
| else: | |
| st.header(f"{ad_format_filter} {integration_filter} Analysis") | |
| if st.button("Run Analysis"): | |
| st.session_state["query_run"] = False | |
| run_drop_query( | |
| table, start_str, end_str, | |
| message_filter, campaign_id, | |
| private_key_b64, user, | |
| account_identifier, warehouse, | |
| database, schema, role, | |
| client, | |
| integration_filter, ad_format_filter | |
| ) | |
| with tab_deep: | |
| st.header("Deep Dive") | |
| # 1) Select dimensions | |
| if analysis_type == "House_Ads": | |
| all_dims = [ | |
| "Flex Bucket","Bidder","Deal","Ad Unit","Browser", | |
| "Device","Random Integer","HB Price Buckets","HB Size" | |
| ] | |
| else: | |
| all_dims = [ | |
| "Integration SOV","Bidder","Flex Bucket", | |
| "Device","Ad Unit Group","Refresh" | |
| ] | |
| to_plot = st.multiselect("1. Select dimensions", all_dims, key="dims") | |
| # 2) Fetch unique values per dimension | |
| if st.button("2. Fetch Values") and to_plot: | |
| vals = {} | |
| for dim in to_plot: | |
| if dim == "Integration SOV" and analysis_type != "House_Ads": | |
| dfv = fetch_df(get_main_int_sov_query( | |
| table, start_str, end_str, message_filter, | |
| campaign_id, ad_format_filter | |
| )) | |
| col = "Integration" | |
| elif analysis_type == "House_Ads": | |
| fn_map = { | |
| "Flex Bucket": get_flex_house_query, | |
| "Bidder": get_bidder_house_query, | |
| "Deal": get_deal_query, | |
| "Ad Unit": get_ad_unit_house_query, | |
| "Browser": get_browser_query, | |
| "Device": get_device_house_query, | |
| "Random Integer": get_random_integer_query, | |
| "HB Price Buckets":get_hb_pb_query, | |
| "HB Size": get_hb_size_query, | |
| } | |
| dfv = fetch_df(fn_map[dim]( | |
| table, start_str, end_str, message_filter, campaign_id | |
| )) | |
| col = [c for c in dfv.columns | |
| if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0] | |
| else: | |
| fn_map = { | |
| "Bidder": get_bidder_delivery_query, | |
| "Flex Bucket": get_flex_bucket_query, | |
| "Device": get_device_delivery_query, | |
| "Ad Unit Group": get_ad_unit_delivery_query, | |
| "Refresh": get_refresh_query, | |
| } | |
| dfv = fetch_df(fn_map[dim]( | |
| table, start_str, end_str, message_filter, | |
| campaign_id, integration_filter, ad_format_filter | |
| )) | |
| col = [c for c in dfv.columns | |
| if c not in ("EST_DATE","EST_HOUR","EST_MINUTE","CNT")][0] | |
| vals[dim] = sorted(dfv[col].dropna().unique()) | |
| st.session_state["deep_values"] = vals | |
| # 3) Select filters & run the combined query | |
| if st.session_state.get("deep_values"): | |
| filters = {} | |
| for dim, options in st.session_state["deep_values"].items(): | |
| filters[dim] = st.multiselect( | |
| f"Filter {dim}", options, default=options, | |
| key=f"fv_{dim}" | |
| ) | |
| if st.button("3. Run Deep Dive"): | |
| # 3a) Build the base CTE | |
| if analysis_type == "House_Ads": | |
| base = get_main_house_query( | |
| table, start_str, end_str, message_filter, campaign_id | |
| ) | |
| snippet_map = { | |
| "Flex Bucket": "bucket", | |
| "Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS BIDDER", | |
| "Deal": "body[0]:slotTargeting:hb_deal[0]::varchar AS HB_DEAL", | |
| "Ad Unit": "split(body[0]['adUnitPath'],'/')[2]::varchar AS AD_UNIT", | |
| "Browser": "CASE WHEN lower(useragent) LIKE '%edg%' THEN 'Edge' WHEN lower(useragent) LIKE '%chrome%' THEN 'Chrome' WHEN lower(useragent) LIKE '%firefox%' THEN 'Firefox' WHEN lower(useragent) LIKE '%safari%' THEN 'Safari' ELSE 'Other' END AS BROWSER", | |
| "Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE", | |
| "Random Integer": "body[0]:siteTargeting:ri[0]::varchar AS RANDOM_INTEGER", | |
| "HB Price Buckets": "body[0]:slotTargeting:hb_pb[0]::varchar AS HB_PB", | |
| "HB Size": "body[0]:slotTargeting:hb_size[0]::varchar AS HB_SIZE", | |
| } | |
| else: | |
| base = get_main_delivery_query( | |
| table, start_str, end_str, | |
| message_filter, campaign_id, | |
| integration_filter, ad_format_filter | |
| ) | |
| snippet_map = { | |
| "Integration SOV":"INTEGRATION", | |
| "Bidder": "body[0]:slotTargeting:hb_bidder[0]::varchar AS HB_BIDDER", | |
| "Flex Bucket": "bucket", | |
| "Device": "CASE WHEN useragent LIKE '%Windows%' OR useragent LIKE '%Macintosh%' THEN 'desktop' WHEN useragent LIKE '%Android%' OR useragent LIKE '%Mobi%' THEN 'phone' WHEN useragent LIKE '%iPad%' OR useragent LIKE '%Tablet%' THEN 'tablet' ELSE 'other' END AS DEVICE", | |
| "Ad Unit Group": "CASE WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Outstream%' THEN 'Sticky_Outstream' WHEN split(body[0]['adUnitPath'],'/')[2]::varchar LIKE '%Video%' THEN 'Video' ELSE 'Other' END AS AD_UNIT_GROUP", | |
| "Refresh": "body[0]:slotTargeting:refresh[0]::varchar AS REFRESH", | |
| } | |
| # 3b) Inject all selected dimension snippets, matching both lowercase & uppercase | |
| select_snippets = [snippet_map[dim] for dim in to_plot] | |
| dynamic_cte = ( | |
| base | |
| .replace( | |
| "count(*) as CNT", | |
| f"count(*) as CNT, {', '.join(select_snippets)}" | |
| ) | |
| .replace( | |
| "COUNT(*) AS CNT", | |
| f"COUNT(*) AS CNT, {', '.join(select_snippets)}" | |
| ) | |
| ) | |
| # 3c) Build WHERE clauses from the filters | |
| where_clauses = [] | |
| for dim, vals in filters.items(): | |
| alias = snippet_map[dim].split(" AS ")[-1] | |
| val_list = ", ".join(f"'{v}'" for v in vals) | |
| where_clauses.append(f"{alias} IN ({val_list})") | |
| final_sql = ( | |
| f"SELECT *\n" | |
| f"FROM (\n{dynamic_cte}\n) sub\n" | |
| f"WHERE {' AND '.join(where_clauses)}" | |
| ) | |
| # 3d) Execute & display | |
| df_final = fetch_df(final_sql) | |
| for dim, snippet in snippet_map.items(): | |
| alias = snippet.split(" AS ")[-1] # e.g. "bucket", "BROWSER", etc. | |
| # find the actual DataFrame column (which will be uppercase) | |
| match = next((c for c in df_final.columns if c.upper() == alias.upper()), None) | |
| if match: | |
| df_final.rename(columns={match: dim}, inplace=True) | |
| # Build the minute‐precision datetime index | |
| df_final["EST_DATETIME"] = ( | |
| pd.to_datetime(df_final["EST_DATE"]) + | |
| pd.to_timedelta(df_final["EST_HOUR"], unit="h") + | |
| pd.to_timedelta(df_final["EST_MINUTE"], unit="m") | |
| ) | |
| st.subheader("Deep Dive Results") | |
| st.dataframe(df_final) | |
| # Build the Series column off your filtered dims | |
| df_final["Series"] = ( | |
| df_final[list(filters.keys())] | |
| .astype(str) | |
| .agg(":".join, axis=1) | |
| ) | |
| # Pivot on EST_DATETIME instead of EST_DATE | |
| pivot = ( | |
| df_final | |
| .pivot_table( | |
| index="EST_DATETIME", # ← minute‐level axis | |
| columns="Series", | |
| values="CNT", | |
| aggfunc="sum" | |
| ) | |
| .fillna(0) | |
| .sort_index() | |
| ) | |
| pivot.columns = [col.replace(":", "_") for col in pivot.columns] | |
| pivot_df = ( | |
| pivot | |
| .reset_index() | |
| .melt(id_vars="EST_DATETIME", var_name="Series", value_name="CNT") | |
| ) | |
| # Build an Altair line chart: | |
| chart = ( | |
| alt.Chart(pivot_df) | |
| .mark_line(point=True) | |
| .encode( | |
| x=alt.X( | |
| "EST_DATETIME:T", | |
| axis=alt.Axis( | |
| title="Time (NY)", | |
| format="%H:%M", # show hour:minute on the axis | |
| tickCount="hour" # one tick per hour | |
| ) | |
| ), | |
| y=alt.Y("CNT:Q", title="Count"), | |
| color=alt.Color("Series:N", title="Dimension"), | |
| tooltip=[ | |
| alt.Tooltip("EST_DATETIME:T", title="Timestamp", format="%Y-%m-%d %H:%M"), | |
| alt.Tooltip("Series:N", title="Series"), | |
| alt.Tooltip("CNT:Q", title="Count"), | |
| ] | |
| ) | |
| .properties(width=700, height=400) | |
| .interactive() # allow pan/zoom | |
| ) | |
| st.subheader("Deep Dive Trend") | |
| st.altair_chart(chart, use_container_width=True) | |