Spaces:
Configuration error
Configuration error
| import os | |
| import re | |
| import streamlit as st | |
| import pandas as pd | |
| import snowflake.connector | |
| from openai import OpenAI | |
| from cryptography.hazmat.primitives import serialization | |
| from cryptography.hazmat.backends import default_backend | |
| from dateutil.relativedelta import relativedelta | |
| from typing import Optional | |
| STATIC_PRIMARY_VERTICALS = [ | |
| "Arts & Creativity", | |
| "Auto", | |
| "Baby", | |
| "Beauty", | |
| "Business", | |
| "Careers", | |
| "Clean Eating", | |
| "Crafts", | |
| "Deals", | |
| "Education", | |
| "Entertainment", | |
| "Family and Parenting", | |
| "Fitness", | |
| "Food", | |
| "Gaming", | |
| "Gardening", | |
| "Green Living", | |
| "Health and Wellness", | |
| "History & Culture", | |
| "Hobbies & Interests", | |
| "Home Decor and Design", | |
| "Law, Gov't & Politics", | |
| "Lifestyle", | |
| "Mens Style and Grooming", | |
| "Natural Parenting", | |
| "News", | |
| "Other", | |
| "Personal Finance", | |
| "Pets", | |
| "Pregnancy", | |
| "Professional Finance", | |
| "Real Estate", | |
| "Religion & Spirituality", | |
| "Science", | |
| "Shopping", | |
| "Sports", | |
| "Tech", | |
| "Toddler", | |
| "Travel", | |
| "Vegetarian", | |
| "Wedding", | |
| "Womens Style", | |
| ] | |
| def extract_primary_verticals(text: str) -> list[str]: | |
| text = text.lower() | |
| candidates = set() | |
| m = re.search(r"themes like ([^β]+)", text) | |
| if m: | |
| for part in re.split(r",|and", m.group(1)): | |
| w = part.strip() | |
| if w and w not in {"more"}: | |
| candidates.add(w) | |
| m2 = re.search(r"topic \(([^)]+)\)", text) | |
| if m2: | |
| for part in m2.group(1).split(","): | |
| w = part.strip().strip(" etc.") | |
| if w: | |
| candidates.add(w) | |
| return [w.title() for w in sorted(candidates)] | |
| # ββββββββββββββ | |
| # 1) STREAMLIT PAGE CONFIG | |
| # ββββββββββββββ | |
| st.set_page_config(page_title="Content Analysis Workflow", layout="wide") | |
| st.title("Content Analysis Workflow Automation") | |
| # ββββββββββββββ | |
| # 2) LOAD SYSTEM PROMPT | |
| # ββββββββββββββ | |
| INSTRUCTIONS_PATH = os.path.join(os.path.dirname(__file__), "INSTRUCTIONS.md") | |
| try: | |
| with open(INSTRUCTIONS_PATH, "r", encoding="utf-8") as f: | |
| SYSTEM_PROMPT = f.read() | |
| extracted_verticals = extract_primary_verticals(SYSTEM_PROMPT) | |
| except FileNotFoundError: | |
| SYSTEM_PROMPT = "" | |
| extracted_verticals = [] | |
| st.warning(f"Could not find INSTRUCTIONS.md at {INSTRUCTIONS_PATH}") | |
| PRIMARY_VERTICALS = sorted(set(STATIC_PRIMARY_VERTICALS) | set(extracted_verticals)) | |
| # ββββββββββββββ | |
| # 3) DATE RANGE FILTERS | |
| # ββββββββββββββ | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| start_date = st.date_input("Start date", value=pd.to_datetime("2025-02-01")) | |
| with col2: | |
| end_date = st.date_input("End date", value=pd.to_datetime("2025-03-01")) | |
| col3, col4 = st.columns(2) | |
| with col3: | |
| prior_start = st.date_input( | |
| "Prior year start date", value=start_date - relativedelta(years=1) | |
| ) | |
| with col4: | |
| prior_end = st.date_input( | |
| "Prior year end date", value=end_date - relativedelta(years=1) | |
| ) | |
| if start_date > end_date or prior_start > prior_end: | |
| st.error("Start date must be on or before end date for both ranges.") | |
| st.stop() | |
| col5, col6 = st.columns(2) | |
| with col5: | |
| include_verticals = st.multiselect( | |
| "Filter to primary vertical", PRIMARY_VERTICALS, default=[] | |
| ) | |
| with col6: | |
| exclude_verticals = st.multiselect( | |
| "Exclude primary vertical", PRIMARY_VERTICALS, default=[] | |
| ) | |
| # ββββββββββββββ | |
| # 4) CHECK ENVIRONMENT VARIABLES | |
| # ββββββββββββββ | |
| REQUIRED_VARS = [ | |
| "snowflake_user", | |
| "snowflake_account_identifier", | |
| "snowflake_warehouse", | |
| "snowflake_database", | |
| "snowflake_role", | |
| "snowflake_private_key", | |
| "OPENAI_API_KEY", | |
| ] | |
| missing = [v for v in REQUIRED_VARS if not os.getenv(v)] | |
| if missing: | |
| st.error("Missing required secrets: " + ", ".join(missing)) | |
| st.stop() | |
| # ββββββββββββββ | |
| # 5) INSTANTIATE OPENAI CLIENT | |
| # ββββββββββββββ | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # ββββββββββββββ | |
| # 6) PARSE PRIVATE KEY β DER BYTES | |
| # ββββββββββββββ | |
| pem_bytes = os.getenv("snowflake_private_key").encode("utf-8") | |
| try: | |
| key_obj = serialization.load_pem_private_key( | |
| pem_bytes, password=None, backend=default_backend() | |
| ) | |
| private_key_der = key_obj.private_bytes( | |
| encoding=serialization.Encoding.DER, | |
| format=serialization.PrivateFormat.PKCS8, | |
| encryption_algorithm=serialization.NoEncryption(), | |
| ) | |
| except Exception as e: | |
| st.error(f"Failed to load Snowflake private key: {e}") | |
| st.stop() | |
| # ββββββββββββββ | |
| # 7) BUILD SNOWFLAKE CONFIG | |
| # ββββββββββββββ | |
| SNOWFLAKE_CONFIG = { | |
| "user": os.getenv("snowflake_user"), | |
| "account": os.getenv("snowflake_account_identifier"), | |
| "warehouse": os.getenv("snowflake_warehouse"), | |
| "database": os.getenv("snowflake_database"), | |
| "role": os.getenv("snowflake_role"), | |
| "private_key": private_key_der, | |
| } | |
| # ββββββββββββββ | |
| # 8) HELPERS | |
| # ββββββββββββββ | |
| def extract_sql_block(text: str) -> str: | |
| """Extract SQL from the first ```sql β¦``` fence.""" | |
| m = re.search(r"```(?:sql)?\s*([\s\S]*?)```", text, re.IGNORECASE) | |
| return m.group(1).strip() if m else text.strip() | |
| def extract_keywords(sql: str) -> list[str]: | |
| found = re.findall(r"(?<!NOT\s)LIKE\s+'%([^%]+)%'", sql, flags=re.IGNORECASE) | |
| seen, kws = set(), [] | |
| for kw in found: | |
| if kw not in seen: | |
| seen.add(kw) | |
| kws.append(kw) | |
| return kws | |
| def extract_title_words(df: pd.DataFrame) -> list[str]: | |
| """Split article titles into unique lowercase words.""" | |
| seen = set() | |
| words = [] | |
| for title in df.get("article_title", []): | |
| for w in re.split(r"\W+", str(title)): | |
| w = w.lower().strip() | |
| if not w or w.isdigit(): | |
| continue | |
| if w not in seen: | |
| seen.add(w) | |
| words.append(w) | |
| return words | |
| def apply_vertical_filter( | |
| sql: str, | |
| include: Optional[list[str]], | |
| exclude: Optional[list[str]], | |
| ) -> str: | |
| clauses = [] | |
| if include: | |
| inc_clauses = [] | |
| for v in include: | |
| # sanitize any single-quotes by doubling them | |
| sanitized = v.lower().replace("'", "''") | |
| inc_clauses.append( | |
| f"LOWER(primary_vertical) LIKE '%{sanitized}%'" | |
| ) | |
| clauses.append("(" + " OR ".join(inc_clauses) + ")") | |
| if exclude: | |
| exc_clauses = [] | |
| for v in exclude: | |
| sanitized = v.lower().replace("'", "''") | |
| exc_clauses.append( | |
| f"LOWER(primary_vertical) NOT LIKE '%{sanitized}%'" | |
| ) | |
| clauses.append("(" + " AND ".join(exc_clauses) + ")") | |
| if not clauses: | |
| return sql | |
| full_clause = "AND " + " AND ".join(clauses) | |
| # strip any old single-vertical filters | |
| sql = re.sub( | |
| r"\s+AND\s+LOWER\(primary_vertical\)[^\n]*", "", sql, flags=re.IGNORECASE | |
| ) | |
| sql = re.sub( | |
| r"\s+AND\s+r\.primary_vertical\s*=\s*'[^']*'", "", sql, flags=re.IGNORECASE | |
| ) | |
| # inject before GROUP BY | |
| return re.sub( | |
| r"(WHERE[\s\S]*?)(GROUP BY)", | |
| lambda m: f"{m.group(1)} {full_clause}\n{m.group(2)}", | |
| sql, | |
| count=1, | |
| flags=re.IGNORECASE, | |
| ) | |
| def highlight_sov(val: float) -> str: | |
| """Color SOV change green for positive, red for negative.""" | |
| if pd.isna(val): | |
| return "" | |
| color = "green" if val > 0 else "red" | |
| return f"color: {color};" | |
| def get_sql_template_from_openai(user_text: str) -> str: | |
| prompt = f""" | |
| You are a SQL maestro. | |
| 1) From the userβs description: | |
| \"\"\"{user_text}\"\"\" | |
| identify the top **25** keywords. | |
| 2) Generate one complete SQL query that: | |
| β’ Selects domain, article_title, url, pageviews, primary_vertical | |
| β’ Filters date BETWEEN '{{START_DATE}}' AND '{{END_DATE}}' | |
| β’ Filters only active sites | |
| β’ Only includes pageviews > 9 and pmp_enabled = 'true' | |
| β’ Excludes unwanted URLs (e.g. '%atlanta%', '%forum%', etc.) | |
| β’ Uses **at least 20** lines of: | |
| `OR parse_url(...):"path" LIKE '%<keyword>%'` | |
| all wrapped in a single `AND ( β¦ )` block | |
| β’ GROUPs and ORDERs as needed | |
| Return *only* the SQL, with the placeholders literally in the BETWEEN clause, inside a ```sql β¦``` fenceβno extra text. | |
| """ | |
| resp = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| return extract_sql_block(resp.choices[0].message.content) | |
| def run_query(sql: str) -> pd.DataFrame: | |
| """Execute SQL on Snowflake and return a lowercase-column DataFrame.""" | |
| conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG) | |
| cur = conn.cursor() | |
| cur.execute(sql) | |
| rows = cur.fetchall() | |
| cols = [c[0].lower() for c in cur.description] | |
| conn.close() | |
| return pd.DataFrame(rows, columns=cols) | |
| # ββββββββββββββ | |
| # 9) USER INPUT & EXECUTION | |
| # ββββββββββββββ | |
| user_prompt = st.text_area( | |
| "Describe the content or keywords for your analysis:", | |
| height=150, | |
| ) | |
| if st.button("Generate Table"): | |
| if not user_prompt.strip(): | |
| st.warning("Enter some analysis keywords or description.") | |
| else: | |
| # Generate SQL once and swap the date range for prior-year query | |
| template_sql = get_sql_template_from_openai(user_prompt) | |
| sql_current = template_sql.replace( | |
| "{START_DATE}", start_date.isoformat() | |
| ).replace("{END_DATE}", end_date.isoformat()) | |
| sql_prior = template_sql.replace( | |
| "{START_DATE}", prior_start.isoformat() | |
| ).replace("{END_DATE}", prior_end.isoformat()) | |
| include_sel = include_verticals or None | |
| exclude_sel = exclude_verticals or None | |
| sql_current = apply_vertical_filter(sql_current, include_sel, exclude_sel) | |
| sql_prior = apply_vertical_filter(sql_prior, include_sel, exclude_sel) | |
| # Run queries | |
| df_current = run_query(sql_current) | |
| df_prior = run_query(sql_prior) | |
| # Extract terms | |
| url_kws = extract_keywords(sql_current) | |
| if len(url_kws) < 20: | |
| st.warning( | |
| "Fewer than 20 keywords detected; refine your prompt for broader coverage." | |
| ) | |
| title_kws = extract_title_words(df_current) + extract_title_words(df_prior) | |
| all_terms = [] | |
| seen = set() | |
| for term in url_kws + title_kws: | |
| term = term.strip() | |
| if len(term) <= 3 or term in seen: | |
| continue | |
| seen.add(term) | |
| all_terms.append(term) | |
| # Totals for pageview display | |
| total_cy = df_current["pageviews"].sum() | |
| total_py = df_prior["pageviews"].sum() | |
| # Build metrics without a totals row | |
| metrics = [] | |
| for term in all_terms: | |
| cy = df_current[ | |
| df_current["article_title"].str.contains(term, case=False, na=False) | |
| | df_current["url"].str.contains(term, case=False, na=False) | |
| ]["pageviews"].sum() | |
| py = df_prior[ | |
| df_prior["article_title"].str.contains(term, case=False, na=False) | |
| | df_prior["url"].str.contains(term, case=False, na=False) | |
| ]["pageviews"].sum() | |
| yoy = (cy - py) / py * 100 if py else float("nan") | |
| metrics.append( | |
| { | |
| "term": term, | |
| "CY pageviews": cy, | |
| "PY pageviews": py, | |
| "YoY %": yoy, | |
| } | |
| ) | |
| sum_cy_terms = sum(m["CY pageviews"] for m in metrics) | |
| sum_py_terms = sum(m["PY pageviews"] for m in metrics) | |
| for m in metrics: | |
| m["SOV CY"] = ( | |
| m["CY pageviews"] / sum_cy_terms if sum_cy_terms else float("nan") | |
| ) | |
| m["SOV PY"] = ( | |
| m["PY pageviews"] / sum_py_terms if sum_py_terms else float("nan") | |
| ) | |
| m["SOV % Change"] = ( | |
| (m["SOV CY"] / m["SOV PY"] - 1) | |
| if (not pd.isna(m["SOV CY"]) and not pd.isna(m["SOV PY"])) | |
| else float("nan") | |
| ) | |
| metrics_df = pd.DataFrame(metrics).sort_values("CY pageviews", ascending=False) | |
| # Display SQL in a hidden expander above metrics | |
| with st.expander("Show SQL Queries"): | |
| st.subheader("Current Year SQL") | |
| st.code(sql_current, language="sql") | |
| st.subheader("Prior Year SQL") | |
| st.code(sql_prior, language="sql") | |
| # Format percentages | |
| fmt = { | |
| "CY pageviews": "{:,}", # add thousand separators | |
| "PY pageviews": "{:,}", # add thousand separators | |
| "YoY %": "{:.1f}%", | |
| "SOV CY": "{:.1%}", | |
| "SOV PY": "{:.1%}", | |
| "SOV % Change": "{:.1%}", | |
| } | |
| # Display with conditional formatting | |
| st.subheader("Term Performance Metrics") | |
| styled = metrics_df.style.format(fmt, na_rep="-").applymap( | |
| highlight_sov, subset=["SOV % Change"] | |
| ) | |
| st.dataframe(styled, height=400) | |
| # Show raw result tables with totals | |
| with st.expander(f"Current Year Results: {start_date} to {end_date}"): | |
| st.dataframe(df_current.style.format({"pageviews": "{:,}"})) | |
| st.write(f"Total pageviews: {total_cy:,}") | |
| with st.expander(f"Prior Year Results: {prior_start} to {prior_end}"): | |
| st.dataframe(df_prior.style.format({"pageviews": "{:,}"})) | |
| st.write(f"Total pageviews: {total_py:,}") | |