Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import time | |
| import pandas as pd | |
| import plotly.express as px | |
| import snowflake.connector | |
| import base64 | |
| from datetime import timedelta, datetime | |
| from cryptography.hazmat.primitives import serialization | |
| from cryptography.hazmat.backends import default_backend | |
| import concurrent.futures | |
| # Import SQL query functions | |
| from delivery_queries import ( | |
| get_main_query, | |
| get_main_int_sov_query, | |
| get_bidder_query, | |
| get_flex_bucket_query, | |
| get_device_query, | |
| get_ad_unit_query, | |
| get_refresh_query, | |
| ) | |
| from delivery_section_utils import update_section_generic_drop | |
| # Import the NEXT_STEPS_INSTRUCTIONS for delivery drops | |
| from delivery_instructions import NEXT_STEPS_INSTRUCTIONS | |
| # Initialize session state | |
| st.session_state.setdefault("query_run", False) | |
| st.session_state.setdefault("findings_messages", []) | |
| st.session_state.setdefault("query_df", None) | |
| st.session_state.setdefault("agg_df", None) | |
| st.session_state.setdefault("top_level_drop_time", None) | |
| st.session_state.setdefault("key_findings_output", None) | |
| def cached_run_query( | |
| query, | |
| private_key_b64: str, | |
| user: str, | |
| account_identifier: str, | |
| warehouse: str, | |
| database: str, | |
| schema: str, | |
| role: str, | |
| ): | |
| """Run a Snowflake query and return a DataFrame.""" | |
| der = base64.b64decode(private_key_b64) | |
| conn = snowflake.connector.connect( | |
| user=user, | |
| account=account_identifier, | |
| warehouse=warehouse, | |
| database=database, | |
| schema=schema, | |
| role=role, | |
| private_key=der, | |
| ) | |
| cs = conn.cursor() | |
| cs.execute("ALTER SESSION SET STATEMENT_TIMEOUT_IN_SECONDS = 1800") | |
| cs.execute(query) | |
| rows = cs.fetchall() | |
| cols = [c[0] for c in cs.description] | |
| df = pd.DataFrame(rows, columns=cols) | |
| cs.close() | |
| conn.close() | |
| return df | |
| def run_drop_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| private_key_str, | |
| user, | |
| account_identifier, | |
| warehouse, | |
| database, | |
| schema, | |
| role, | |
| client, | |
| integration_filter=None, | |
| ad_format_filter=None, | |
| ): | |
| """ | |
| Universal drop analysis for any Integration + Ad_Format. | |
| """ | |
| # 1) Build SQL statements with filters | |
| main_sql = get_main_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| flex_sql = get_flex_bucket_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| bidder_sql = get_bidder_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| device_sql = get_device_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| ad_unit_sql = get_ad_unit_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| refresh_sql = get_refresh_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| integration_filter, | |
| ad_format_filter, | |
| ) | |
| # 2) Run top-level query once | |
| if not st.session_state["query_run"]: | |
| try: | |
| t0 = time.time() | |
| with st.spinner("Running top-level impressions query..."): | |
| df = cached_run_query( | |
| main_sql, | |
| private_key_str, | |
| user, | |
| account_identifier, | |
| warehouse, | |
| database, | |
| schema, | |
| role, | |
| ) | |
| elapsed = time.time() - t0 | |
| mins, secs = divmod(elapsed, 60) | |
| st.info(f"Query ran in {int(mins)}m {secs:.2f}s") | |
| # Normalize timestamps | |
| df.columns = [c.upper() for c in df.columns] | |
| df = df.sort_values(["EST_HOUR", "EST_MINUTE"]) | |
| df["timestamp"] = pd.to_datetime( | |
| df["EST_DATE"].astype(str) | |
| + " " | |
| + df["EST_HOUR"].astype(str).str.zfill(2) | |
| + ":" | |
| + df["EST_MINUTE"].astype(str).str.zfill(2) | |
| ) | |
| df["5min"] = df["timestamp"].dt.floor("5T") | |
| base_date = ( | |
| df[df["TIMEFRAME"] == "TODAY"]["5min"].iloc[0].normalize() | |
| if not df[df["TIMEFRAME"] == "TODAY"].empty | |
| else pd.Timestamp("today").normalize() | |
| ) | |
| start_hour = int(st.session_state.get("start_hour", 23)) | |
| def norm(ts): | |
| return ts + pd.Timedelta(hours=24) if ts.hour < start_hour else ts | |
| df["normalized_time"] = ( | |
| base_date + (df["5min"] - df["5min"].dt.normalize()) | |
| ).apply(norm) | |
| # Aggregate | |
| agg_df = df.groupby(["TIMEFRAME", "normalized_time"], as_index=False)[ | |
| "CNT" | |
| ].sum() | |
| # Save to state | |
| st.session_state.update( | |
| query_df=df, agg_df=agg_df, query_run=True, top_level_drop_time=None | |
| ) | |
| except Exception as e: | |
| st.error(f"Main query error: {e}") | |
| return | |
| else: | |
| df = st.session_state["query_df"] | |
| agg_df = st.session_state["agg_df"] | |
| # 3) Display top-level | |
| st.header("Top-Level Impressions Data") | |
| drop_time = None | |
| for ts in sorted(agg_df["normalized_time"].unique()): | |
| today_cnt = agg_df[ | |
| (agg_df["normalized_time"] == ts) & (agg_df["TIMEFRAME"] == "TODAY") | |
| ]["CNT"] | |
| other_cnt = agg_df[ | |
| (agg_df["normalized_time"] == ts) & (agg_df["TIMEFRAME"] != "TODAY") | |
| ]["CNT"] | |
| if ( | |
| not today_cnt.empty | |
| and not other_cnt.empty | |
| and today_cnt.values[0] <= 0.9 * other_cnt.mean() | |
| ): | |
| drop_time = ts | |
| break | |
| if drop_time: | |
| msg = f"Top-Level: Delivery drop detected at {drop_time.strftime('%I:%M %p')}." | |
| st.warning(msg) | |
| else: | |
| msg = "Top-Level: No significant delivery drop detected." | |
| st.info(msg) | |
| # Append message once | |
| findings_messages = st.session_state.setdefault("findings_messages", []) | |
| if msg not in findings_messages: | |
| findings_messages.append(msg) | |
| st.session_state["top_level_drop_time"] = drop_time | |
| with st.expander("Raw Data"): | |
| st.dataframe(df) | |
| with st.expander("Aggregated Data"): | |
| st.dataframe(agg_df) | |
| fig = px.line( | |
| agg_df, | |
| x="normalized_time", | |
| y="CNT", | |
| color="TIMEFRAME", | |
| labels={"normalized_time": "Time of Day", "CNT": "Impressions"}, | |
| ) | |
| fig.update_xaxes(tickformat="%I:%M %p") | |
| st.plotly_chart(fig, use_container_width=True) | |
| # 4) Share-of-Voice | |
| st.markdown("<hr>", unsafe_allow_html=True) | |
| st.header("Share of Voice Analysis") | |
| sov_sql = get_main_int_sov_query( | |
| table, | |
| start_datetime, | |
| end_datetime, | |
| message_filter, | |
| campaign_id, | |
| ad_format_filter=ad_format_filter, | |
| ) | |
| try: | |
| with st.spinner("Running SOV query..."): | |
| sov_df = cached_run_query( | |
| sov_sql, | |
| private_key_str, | |
| user, | |
| account_identifier, | |
| warehouse, | |
| database, | |
| schema, | |
| role, | |
| ) | |
| # Normalize same as above | |
| sov_df["timestamp"] = pd.to_datetime( | |
| sov_df["EST_DATE"].astype(str) | |
| + " " | |
| + sov_df["EST_HOUR"].astype(str).str.zfill(2) | |
| + ":" | |
| + sov_df["EST_MINUTE"].astype(str).str.zfill(2) | |
| ) | |
| sov_df["5min"] = sov_df["timestamp"].dt.floor("5T") | |
| base = pd.Timestamp("today").normalize() | |
| sov_df["normalized_time"] = ( | |
| base + (sov_df["5min"] - sov_df["5min"].dt.normalize()) | |
| ).apply(lambda ts: ts + pd.Timedelta(hours=24) if ts.hour < start_hour else ts) | |
| # Group, exclude, percent, order | |
| sov_grp = sov_df.groupby(["normalized_time", "INTEGRATION"], as_index=False)[ | |
| "CNT" | |
| ].sum() | |
| sov_grp = sov_grp[~sov_grp["INTEGRATION"].str.contains("Ignore|Affiliate|PG")] | |
| sov_grp["share"] = sov_grp["CNT"] / sov_grp.groupby("normalized_time")[ | |
| "CNT" | |
| ].transform("sum") | |
| order = ( | |
| sov_grp.groupby("INTEGRATION")["share"] | |
| .sum() | |
| .sort_values(ascending=False) | |
| .index.tolist() | |
| ) | |
| fig2 = px.line( | |
| sov_grp, | |
| x="normalized_time", | |
| y="share", | |
| color="INTEGRATION", | |
| category_orders={"INTEGRATION": order}, | |
| labels={"share": "Share of Total Impressions"}, | |
| ) | |
| fig2.update_xaxes(tickformat="%I:%M %p") | |
| fig2.update_yaxes(tickformat=".2%") | |
| st.plotly_chart(fig2, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"SOV error: {e}") | |
| # 5) Key Findings via OpenAI <-- CUT starts here | |
| st.markdown("<hr>", unsafe_allow_html=True) | |
| st.header("Key Findings and Next Steps") | |
| key_findings_container = st.container() | |
| with key_findings_container: | |
| if st.session_state.get("key_findings_output"): | |
| st.markdown( | |
| st.session_state.get("key_findings_output"), | |
| unsafe_allow_html=True, | |
| ) | |
| else: | |
| st.info( | |
| "Key findings will appear here once additional queries have finished." | |
| ) | |
| def generate_key_findings_callback(): | |
| findings = "\n".join(st.session_state.get("findings_messages", [])) | |
| flex_jira_info = st.session_state.get("flex_jira_info", "") | |
| jira_section = ( | |
| f"\nJira Ticket Information from Flex Bucket section:\n{flex_jira_info}\n" | |
| if flex_jira_info | |
| else "" | |
| ) | |
| prompt = ( | |
| "You are a helpful analyst investigating a drop in ad delivery. " | |
| "A delivery drop detection dashboard has compiled a list of findings " | |
| "showing potential drops across different dimensions. Below are the detailed findings " | |
| "from the dashboard, along with any flagged Jira ticket information. " | |
| "The NEXT_STEPS_INSTRUCTIONS file contains recommended next steps for each section " | |
| "depending on the drop(s) flagged in the dashboard:\n\n" | |
| f"Findings:\n{findings}\n" | |
| f"{jira_section}\n" | |
| "Next Steps Instructions:\n" | |
| f"{NEXT_STEPS_INSTRUCTIONS}\n\n" | |
| "Using the Findings, Jira section information, and Next Steps Instructions as helpful context, " | |
| "create a concise summary that identifies the likely cause/causes of any detected delivery drops " | |
| "and recommends actionable next steps. The summary should be a few sentences long followed by bullet points " | |
| "with the main findings and recommended next steps. Please output the summary in Markdown format with each bullet " | |
| "point on a new line, and indent sub-bullets properly. Ensure that each bullet point is on its own line. " | |
| "There is no need to explicitly mention the Instructions file in the summary; just use it to inform your analysis." | |
| ) | |
| st.session_state["key_findings"] = prompt | |
| try: | |
| response = client.responses.create( | |
| model="o3-mini", | |
| instructions="You are a helpful analyst who provides insights and recommends next steps.", | |
| input=prompt, | |
| ) | |
| st.session_state["key_findings_output"] = response.output_text.strip() | |
| except Exception as e: | |
| st.session_state["key_findings_output"] = f"Error calling OpenAI API: {e}" | |
| # Once additional queries complete (below), automatically generate key findings: | |
| generate_key_findings_callback() | |
| # 6) Breakdown dimensions | |
| st.markdown("<hr>", unsafe_allow_html=True) | |
| st.header("Specific Dimensions Data") | |
| st.info("Running breakdown queries...") | |
| queries = { | |
| "flex_bucket": flex_sql, | |
| "bidder": bidder_sql, | |
| "device": device_sql, | |
| "ad_unit": ad_unit_sql, | |
| "refresh": refresh_sql, | |
| } | |
| with st.spinner("Running additional queries..."): | |
| with concurrent.futures.ThreadPoolExecutor() as ex: | |
| futures = { | |
| k: ex.submit( | |
| cached_run_query, | |
| q, | |
| private_key_str, | |
| user, | |
| account_identifier, | |
| warehouse, | |
| database, | |
| schema, | |
| role, | |
| ) | |
| for k, q in queries.items() | |
| } | |
| start_ts = {k: time.time() for k in queries} | |
| conts = {k: st.container() for k in queries} | |
| while futures: | |
| done, _ = concurrent.futures.wait( | |
| futures.values(), | |
| timeout=0.5, | |
| return_when=concurrent.futures.FIRST_COMPLETED, | |
| ) | |
| for fut in done: | |
| key = next(k for k, v in futures.items() if v is fut) | |
| df_res = fut.result() | |
| update_section_generic_drop( | |
| key, df_res, start_ts, conts[key], drop_time | |
| ) | |
| del futures[key] | |
| # Update the key findings container with the new output. | |
| with key_findings_container: | |
| st.markdown( | |
| st.session_state.get("key_findings_output", ""), | |
| unsafe_allow_html=True, | |
| ) | |