import streamlit as st import time import pandas as pd import plotly.express as px import snowflake.connector import base64 from datetime import timedelta, datetime from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import concurrent.futures # Import SQL query functions from delivery_queries import ( get_main_query, get_main_int_sov_query, get_bidder_query, get_flex_bucket_query, get_device_query, get_ad_unit_query, get_refresh_query, ) from delivery_section_utils import update_section_generic_drop # Import the NEXT_STEPS_INSTRUCTIONS for delivery drops from delivery_instructions import NEXT_STEPS_INSTRUCTIONS # Initialize session state st.session_state.setdefault("query_run", False) st.session_state.setdefault("findings_messages", []) st.session_state.setdefault("query_df", None) st.session_state.setdefault("agg_df", None) st.session_state.setdefault("top_level_drop_time", None) st.session_state.setdefault("key_findings_output", None) @st.cache_data(show_spinner=False) def cached_run_query( query, private_key_b64: str, user: str, account_identifier: str, warehouse: str, database: str, schema: str, role: str, ): """Run a Snowflake query and return a DataFrame.""" der = base64.b64decode(private_key_b64) conn = snowflake.connector.connect( user=user, account=account_identifier, warehouse=warehouse, database=database, schema=schema, role=role, private_key=der, ) cs = conn.cursor() cs.execute("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 1800") cs.execute(query) rows = cs.fetchall() cols = [c[0] for c in cs.description] df = pd.DataFrame(rows, columns=cols) cs.close() conn.close() return df def run_drop_query( table, start_datetime, end_datetime, message_filter, campaign_id, private_key_str, user, account_identifier, warehouse, database, schema, role, client, integration_filter=None, ad_format_filter=None, ): """ Universal drop analysis for any Integration + Ad_Format. """ # 1) Build SQL statements with filters main_sql = get_main_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) flex_sql = get_flex_bucket_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) bidder_sql = get_bidder_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) device_sql = get_device_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) ad_unit_sql = get_ad_unit_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) refresh_sql = get_refresh_query( table, start_datetime, end_datetime, message_filter, campaign_id, integration_filter, ad_format_filter, ) # 2) Run top-level query once if not st.session_state["query_run"]: try: t0 = time.time() with st.spinner("Running top-level impressions query..."): df = cached_run_query( main_sql, private_key_str, user, account_identifier, warehouse, database, schema, role, ) elapsed = time.time() - t0 mins, secs = divmod(elapsed, 60) st.info(f"Query ran in {int(mins)}m {secs:.2f}s") # Normalize timestamps df.columns = [c.upper() for c in df.columns] df = df.sort_values(["EST_HOUR", "EST_MINUTE"]) df["timestamp"] = pd.to_datetime( df["EST_DATE"].astype(str) + " " + df["EST_HOUR"].astype(str).str.zfill(2) + ":" + df["EST_MINUTE"].astype(str).str.zfill(2) ) df["5min"] = df["timestamp"].dt.floor("5T") base_date = ( df[df["TIMEFRAME"] == "TODAY"]["5min"].iloc[0].normalize() if not df[df["TIMEFRAME"] == "TODAY"].empty else pd.Timestamp("today").normalize() ) start_hour = int(st.session_state.get("start_hour", 23)) def norm(ts): return ts + pd.Timedelta(hours=24) if ts.hour < start_hour else ts df["normalized_time"] = ( base_date + (df["5min"] - df["5min"].dt.normalize()) ).apply(norm) # Aggregate agg_df = df.groupby(["TIMEFRAME", "normalized_time"], as_index=False)[ "CNT" ].sum() # Save to state st.session_state.update( query_df=df, agg_df=agg_df, query_run=True, top_level_drop_time=None ) except Exception as e: st.error(f"Main query error: {e}") return else: df = st.session_state["query_df"] agg_df = st.session_state["agg_df"] # 3) Display top-level st.header("Top-Level Impressions Data") drop_time = None for ts in sorted(agg_df["normalized_time"].unique()): today_cnt = agg_df[ (agg_df["normalized_time"] == ts) & (agg_df["TIMEFRAME"] == "TODAY") ]["CNT"] other_cnt = agg_df[ (agg_df["normalized_time"] == ts) & (agg_df["TIMEFRAME"] != "TODAY") ]["CNT"] if ( not today_cnt.empty and not other_cnt.empty and today_cnt.values[0] <= 0.9 * other_cnt.mean() ): drop_time = ts break if drop_time: msg = f"Top-Level: Delivery drop detected at {drop_time.strftime('%I:%M %p')}." st.warning(msg) else: msg = "Top-Level: No significant delivery drop detected." st.info(msg) # Append message once findings_messages = st.session_state.setdefault("findings_messages", []) if msg not in findings_messages: findings_messages.append(msg) st.session_state["top_level_drop_time"] = drop_time with st.expander("Raw Data"): st.dataframe(df) with st.expander("Aggregated Data"): st.dataframe(agg_df) fig = px.line( agg_df, x="normalized_time", y="CNT", color="TIMEFRAME", labels={"normalized_time": "Time of Day", "CNT": "Impressions"}, ) fig.update_xaxes(tickformat="%I:%M %p") st.plotly_chart(fig, use_container_width=True) # 4) Share-of-Voice st.markdown("