import os import re import streamlit as st import pandas as pd import snowflake.connector from openai import OpenAI from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from dateutil.relativedelta import relativedelta from typing import Optional STATIC_PRIMARY_VERTICALS = [ "Arts & Creativity", "Auto", "Baby", "Beauty", "Business", "Careers", "Clean Eating", "Crafts", "Deals", "Education", "Entertainment", "Family and Parenting", "Fitness", "Food", "Gaming", "Gardening", "Green Living", "Health and Wellness", "History & Culture", "Hobbies & Interests", "Home Decor and Design", "Law, Gov't & Politics", "Lifestyle", "Mens Style and Grooming", "Natural Parenting", "News", "Other", "Personal Finance", "Pets", "Pregnancy", "Professional Finance", "Real Estate", "Religion & Spirituality", "Science", "Shopping", "Sports", "Tech", "Toddler", "Travel", "Vegetarian", "Wedding", "Womens Style", ] def extract_primary_verticals(text: str) -> list[str]: text = text.lower() candidates = set() m = re.search(r"themes like ([^—]+)", text) if m: for part in re.split(r",|and", m.group(1)): w = part.strip() if w and w not in {"more"}: candidates.add(w) m2 = re.search(r"topic \(([^)]+)\)", text) if m2: for part in m2.group(1).split(","): w = part.strip().strip(" etc.") if w: candidates.add(w) return [w.title() for w in sorted(candidates)] # —————————————— # 1) STREAMLIT PAGE CONFIG # —————————————— st.set_page_config(page_title="Content Analysis Workflow", layout="wide") st.title("Content Analysis Workflow Automation") # —————————————— # 2) LOAD SYSTEM PROMPT # —————————————— INSTRUCTIONS_PATH = os.path.join(os.path.dirname(__file__), "INSTRUCTIONS.md") try: with open(INSTRUCTIONS_PATH, "r", encoding="utf-8") as f: SYSTEM_PROMPT = f.read() extracted_verticals = extract_primary_verticals(SYSTEM_PROMPT) except FileNotFoundError: SYSTEM_PROMPT = "" extracted_verticals = [] st.warning(f"Could not find INSTRUCTIONS.md at {INSTRUCTIONS_PATH}") PRIMARY_VERTICALS = sorted(set(STATIC_PRIMARY_VERTICALS) | set(extracted_verticals)) # —————————————— # 3) DATE RANGE FILTERS # —————————————— col1, col2 = st.columns(2) with col1: start_date = st.date_input("Start date", value=pd.to_datetime("2025-02-01")) with col2: end_date = st.date_input("End date", value=pd.to_datetime("2025-03-01")) col3, col4 = st.columns(2) with col3: prior_start = st.date_input( "Prior year start date", value=start_date - relativedelta(years=1) ) with col4: prior_end = st.date_input( "Prior year end date", value=end_date - relativedelta(years=1) ) if start_date > end_date or prior_start > prior_end: st.error("Start date must be on or before end date for both ranges.") st.stop() col5, col6 = st.columns(2) with col5: include_verticals = st.multiselect( "Filter to primary vertical", PRIMARY_VERTICALS, default=[] ) with col6: exclude_verticals = st.multiselect( "Exclude primary vertical", PRIMARY_VERTICALS, default=[] ) # —————————————— # 4) CHECK ENVIRONMENT VARIABLES # —————————————— REQUIRED_VARS = [ "snowflake_user", "snowflake_account_identifier", "snowflake_warehouse", "snowflake_database", "snowflake_role", "snowflake_private_key", "OPENAI_API_KEY", ] missing = [v for v in REQUIRED_VARS if not os.getenv(v)] if missing: st.error("Missing required secrets: " + ", ".join(missing)) st.stop() # —————————————— # 5) INSTANTIATE OPENAI CLIENT # —————————————— client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # —————————————— # 6) PARSE PRIVATE KEY → DER BYTES # —————————————— pem_bytes = os.getenv("snowflake_private_key").encode("utf-8") try: key_obj = serialization.load_pem_private_key( pem_bytes, password=None, backend=default_backend() ) private_key_der = key_obj.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) except Exception as e: st.error(f"Failed to load Snowflake private key: {e}") st.stop() # —————————————— # 7) BUILD SNOWFLAKE CONFIG # —————————————— SNOWFLAKE_CONFIG = { "user": os.getenv("snowflake_user"), "account": os.getenv("snowflake_account_identifier"), "warehouse": os.getenv("snowflake_warehouse"), "database": os.getenv("snowflake_database"), "role": os.getenv("snowflake_role"), "private_key": private_key_der, } # —————————————— # 8) HELPERS # —————————————— def extract_sql_block(text: str) -> str: """Extract SQL from the first ```sql …``` fence.""" m = re.search(r"```(?:sql)?\s*([\s\S]*?)```", text, re.IGNORECASE) return m.group(1).strip() if m else text.strip() def extract_keywords(sql: str) -> list[str]: found = re.findall(r"(? list[str]: """Split article titles into unique lowercase words.""" seen = set() words = [] for title in df.get("article_title", []): for w in re.split(r"\W+", str(title)): w = w.lower().strip() if not w or w.isdigit(): continue if w not in seen: seen.add(w) words.append(w) return words def apply_vertical_filter( sql: str, include: Optional[list[str]], exclude: Optional[list[str]], ) -> str: clauses = [] if include: inc_clauses = [] for v in include: # sanitize any single-quotes by doubling them sanitized = v.lower().replace("'", "''") inc_clauses.append( f"LOWER(primary_vertical) LIKE '%{sanitized}%'" ) clauses.append("(" + " OR ".join(inc_clauses) + ")") if exclude: exc_clauses = [] for v in exclude: sanitized = v.lower().replace("'", "''") exc_clauses.append( f"LOWER(primary_vertical) NOT LIKE '%{sanitized}%'" ) clauses.append("(" + " AND ".join(exc_clauses) + ")") if not clauses: return sql full_clause = "AND " + " AND ".join(clauses) # strip any old single-vertical filters sql = re.sub( r"\s+AND\s+LOWER\(primary_vertical\)[^\n]*", "", sql, flags=re.IGNORECASE ) sql = re.sub( r"\s+AND\s+r\.primary_vertical\s*=\s*'[^']*'", "", sql, flags=re.IGNORECASE ) # inject before GROUP BY return re.sub( r"(WHERE[\s\S]*?)(GROUP BY)", lambda m: f"{m.group(1)} {full_clause}\n{m.group(2)}", sql, count=1, flags=re.IGNORECASE, ) def highlight_sov(val: float) -> str: """Color SOV change green for positive, red for negative.""" if pd.isna(val): return "" color = "green" if val > 0 else "red" return f"color: {color};" def get_sql_template_from_openai(user_text: str) -> str: prompt = f""" You are a SQL maestro. 1) From the user’s description: \"\"\"{user_text}\"\"\" identify the top **25** keywords. 2) Generate one complete SQL query that: • Selects domain, article_title, url, pageviews, primary_vertical • Filters date BETWEEN '{{START_DATE}}' AND '{{END_DATE}}' • Filters only active sites • Only includes pageviews > 9 and pmp_enabled = 'true' • Excludes unwanted URLs (e.g. '%atlanta%', '%forum%', etc.) • Uses **at least 20** lines of: `OR parse_url(...):"path" LIKE '%%'` all wrapped in a single `AND ( … )` block • GROUPs and ORDERs as needed Return *only* the SQL, with the placeholders literally in the BETWEEN clause, inside a ```sql …``` fence—no extra text. """ resp = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], ) return extract_sql_block(resp.choices[0].message.content) def run_query(sql: str) -> pd.DataFrame: """Execute SQL on Snowflake and return a lowercase-column DataFrame.""" conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() cols = [c[0].lower() for c in cur.description] conn.close() return pd.DataFrame(rows, columns=cols) # —————————————— # 9) USER INPUT & EXECUTION # —————————————— user_prompt = st.text_area( "Describe the content or keywords for your analysis:", height=150, ) if st.button("Generate Table"): if not user_prompt.strip(): st.warning("Enter some analysis keywords or description.") else: # Generate SQL once and swap the date range for prior-year query template_sql = get_sql_template_from_openai(user_prompt) sql_current = template_sql.replace( "{START_DATE}", start_date.isoformat() ).replace("{END_DATE}", end_date.isoformat()) sql_prior = template_sql.replace( "{START_DATE}", prior_start.isoformat() ).replace("{END_DATE}", prior_end.isoformat()) include_sel = include_verticals or None exclude_sel = exclude_verticals or None sql_current = apply_vertical_filter(sql_current, include_sel, exclude_sel) sql_prior = apply_vertical_filter(sql_prior, include_sel, exclude_sel) # Run queries df_current = run_query(sql_current) df_prior = run_query(sql_prior) # Extract terms url_kws = extract_keywords(sql_current) if len(url_kws) < 20: st.warning( "Fewer than 20 keywords detected; refine your prompt for broader coverage." ) title_kws = extract_title_words(df_current) + extract_title_words(df_prior) all_terms = [] seen = set() for term in url_kws + title_kws: term = term.strip() if len(term) <= 3 or term in seen: continue seen.add(term) all_terms.append(term) # Totals for pageview display total_cy = df_current["pageviews"].sum() total_py = df_prior["pageviews"].sum() # Build metrics without a totals row metrics = [] for term in all_terms: cy = df_current[ df_current["article_title"].str.contains(term, case=False, na=False) | df_current["url"].str.contains(term, case=False, na=False) ]["pageviews"].sum() py = df_prior[ df_prior["article_title"].str.contains(term, case=False, na=False) | df_prior["url"].str.contains(term, case=False, na=False) ]["pageviews"].sum() yoy = (cy - py) / py * 100 if py else float("nan") metrics.append( { "term": term, "CY pageviews": cy, "PY pageviews": py, "YoY %": yoy, } ) sum_cy_terms = sum(m["CY pageviews"] for m in metrics) sum_py_terms = sum(m["PY pageviews"] for m in metrics) for m in metrics: m["SOV CY"] = ( m["CY pageviews"] / sum_cy_terms if sum_cy_terms else float("nan") ) m["SOV PY"] = ( m["PY pageviews"] / sum_py_terms if sum_py_terms else float("nan") ) m["SOV % Change"] = ( (m["SOV CY"] / m["SOV PY"] - 1) if (not pd.isna(m["SOV CY"]) and not pd.isna(m["SOV PY"])) else float("nan") ) metrics_df = pd.DataFrame(metrics).sort_values("CY pageviews", ascending=False) # Display SQL in a hidden expander above metrics with st.expander("Show SQL Queries"): st.subheader("Current Year SQL") st.code(sql_current, language="sql") st.subheader("Prior Year SQL") st.code(sql_prior, language="sql") # Format percentages fmt = { "CY pageviews": "{:,}", # add thousand separators "PY pageviews": "{:,}", # add thousand separators "YoY %": "{:.1f}%", "SOV CY": "{:.1%}", "SOV PY": "{:.1%}", "SOV % Change": "{:.1%}", } # Display with conditional formatting st.subheader("Term Performance Metrics") styled = metrics_df.style.format(fmt, na_rep="-").applymap( highlight_sov, subset=["SOV % Change"] ) st.dataframe(styled, height=400) # Show raw result tables with totals with st.expander(f"Current Year Results: {start_date} to {end_date}"): st.dataframe(df_current.style.format({"pageviews": "{:,}"})) st.write(f"Total pageviews: {total_cy:,}") with st.expander(f"Prior Year Results: {prior_start} to {prior_end}"): st.dataframe(df_prior.style.format({"pageviews": "{:,}"})) st.write(f"Total pageviews: {total_py:,}")