github-actions[bot]
sync: automatic content update from github
1d4a839
import os
import re
import streamlit as st
import pandas as pd
import snowflake.connector
from openai import OpenAI
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from dateutil.relativedelta import relativedelta
from typing import Optional
STATIC_PRIMARY_VERTICALS = [
"Arts & Creativity",
"Auto",
"Baby",
"Beauty",
"Business",
"Careers",
"Clean Eating",
"Crafts",
"Deals",
"Education",
"Entertainment",
"Family and Parenting",
"Fitness",
"Food",
"Gaming",
"Gardening",
"Green Living",
"Health and Wellness",
"History & Culture",
"Hobbies & Interests",
"Home Decor and Design",
"Law, Gov't & Politics",
"Lifestyle",
"Mens Style and Grooming",
"Natural Parenting",
"News",
"Other",
"Personal Finance",
"Pets",
"Pregnancy",
"Professional Finance",
"Real Estate",
"Religion & Spirituality",
"Science",
"Shopping",
"Sports",
"Tech",
"Toddler",
"Travel",
"Vegetarian",
"Wedding",
"Womens Style",
]
def extract_primary_verticals(text: str) -> list[str]:
text = text.lower()
candidates = set()
m = re.search(r"themes like ([^β€”]+)", text)
if m:
for part in re.split(r",|and", m.group(1)):
w = part.strip()
if w and w not in {"more"}:
candidates.add(w)
m2 = re.search(r"topic \(([^)]+)\)", text)
if m2:
for part in m2.group(1).split(","):
w = part.strip().strip(" etc.")
if w:
candidates.add(w)
return [w.title() for w in sorted(candidates)]
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 1) STREAMLIT PAGE CONFIG
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
st.set_page_config(page_title="Content Analysis Workflow", layout="wide")
st.title("Content Analysis Workflow Automation")
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 2) LOAD SYSTEM PROMPT
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
INSTRUCTIONS_PATH = os.path.join(os.path.dirname(__file__), "INSTRUCTIONS.md")
try:
with open(INSTRUCTIONS_PATH, "r", encoding="utf-8") as f:
SYSTEM_PROMPT = f.read()
extracted_verticals = extract_primary_verticals(SYSTEM_PROMPT)
except FileNotFoundError:
SYSTEM_PROMPT = ""
extracted_verticals = []
st.warning(f"Could not find INSTRUCTIONS.md at {INSTRUCTIONS_PATH}")
PRIMARY_VERTICALS = sorted(set(STATIC_PRIMARY_VERTICALS) | set(extracted_verticals))
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 3) DATE RANGE FILTERS
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
col1, col2 = st.columns(2)
with col1:
start_date = st.date_input("Start date", value=pd.to_datetime("2025-02-01"))
with col2:
end_date = st.date_input("End date", value=pd.to_datetime("2025-03-01"))
col3, col4 = st.columns(2)
with col3:
prior_start = st.date_input(
"Prior year start date", value=start_date - relativedelta(years=1)
)
with col4:
prior_end = st.date_input(
"Prior year end date", value=end_date - relativedelta(years=1)
)
if start_date > end_date or prior_start > prior_end:
st.error("Start date must be on or before end date for both ranges.")
st.stop()
col5, col6 = st.columns(2)
with col5:
include_verticals = st.multiselect(
"Filter to primary vertical", PRIMARY_VERTICALS, default=[]
)
with col6:
exclude_verticals = st.multiselect(
"Exclude primary vertical", PRIMARY_VERTICALS, default=[]
)
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 4) CHECK ENVIRONMENT VARIABLES
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
REQUIRED_VARS = [
"snowflake_user",
"snowflake_account_identifier",
"snowflake_warehouse",
"snowflake_database",
"snowflake_role",
"snowflake_private_key",
"OPENAI_API_KEY",
]
missing = [v for v in REQUIRED_VARS if not os.getenv(v)]
if missing:
st.error("Missing required secrets: " + ", ".join(missing))
st.stop()
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 5) INSTANTIATE OPENAI CLIENT
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 6) PARSE PRIVATE KEY β†’ DER BYTES
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
pem_bytes = os.getenv("snowflake_private_key").encode("utf-8")
try:
key_obj = serialization.load_pem_private_key(
pem_bytes, password=None, backend=default_backend()
)
private_key_der = key_obj.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
except Exception as e:
st.error(f"Failed to load Snowflake private key: {e}")
st.stop()
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 7) BUILD SNOWFLAKE CONFIG
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
SNOWFLAKE_CONFIG = {
"user": os.getenv("snowflake_user"),
"account": os.getenv("snowflake_account_identifier"),
"warehouse": os.getenv("snowflake_warehouse"),
"database": os.getenv("snowflake_database"),
"role": os.getenv("snowflake_role"),
"private_key": private_key_der,
}
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 8) HELPERS
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
def extract_sql_block(text: str) -> str:
"""Extract SQL from the first ```sql …``` fence."""
m = re.search(r"```(?:sql)?\s*([\s\S]*?)```", text, re.IGNORECASE)
return m.group(1).strip() if m else text.strip()
def extract_keywords(sql: str) -> list[str]:
found = re.findall(r"(?<!NOT\s)LIKE\s+'%([^%]+)%'", sql, flags=re.IGNORECASE)
seen, kws = set(), []
for kw in found:
if kw not in seen:
seen.add(kw)
kws.append(kw)
return kws
def extract_title_words(df: pd.DataFrame) -> list[str]:
"""Split article titles into unique lowercase words."""
seen = set()
words = []
for title in df.get("article_title", []):
for w in re.split(r"\W+", str(title)):
w = w.lower().strip()
if not w or w.isdigit():
continue
if w not in seen:
seen.add(w)
words.append(w)
return words
def apply_vertical_filter(
sql: str,
include: Optional[list[str]],
exclude: Optional[list[str]],
) -> str:
clauses = []
if include:
inc_clauses = []
for v in include:
# sanitize any single-quotes by doubling them
sanitized = v.lower().replace("'", "''")
inc_clauses.append(
f"LOWER(primary_vertical) LIKE '%{sanitized}%'"
)
clauses.append("(" + " OR ".join(inc_clauses) + ")")
if exclude:
exc_clauses = []
for v in exclude:
sanitized = v.lower().replace("'", "''")
exc_clauses.append(
f"LOWER(primary_vertical) NOT LIKE '%{sanitized}%'"
)
clauses.append("(" + " AND ".join(exc_clauses) + ")")
if not clauses:
return sql
full_clause = "AND " + " AND ".join(clauses)
# strip any old single-vertical filters
sql = re.sub(
r"\s+AND\s+LOWER\(primary_vertical\)[^\n]*", "", sql, flags=re.IGNORECASE
)
sql = re.sub(
r"\s+AND\s+r\.primary_vertical\s*=\s*'[^']*'", "", sql, flags=re.IGNORECASE
)
# inject before GROUP BY
return re.sub(
r"(WHERE[\s\S]*?)(GROUP BY)",
lambda m: f"{m.group(1)} {full_clause}\n{m.group(2)}",
sql,
count=1,
flags=re.IGNORECASE,
)
def highlight_sov(val: float) -> str:
"""Color SOV change green for positive, red for negative."""
if pd.isna(val):
return ""
color = "green" if val > 0 else "red"
return f"color: {color};"
def get_sql_template_from_openai(user_text: str) -> str:
prompt = f"""
You are a SQL maestro.
1) From the user’s description:
\"\"\"{user_text}\"\"\"
identify the top **25** keywords.
2) Generate one complete SQL query that:
β€’ Selects domain, article_title, url, pageviews, primary_vertical
β€’ Filters date BETWEEN '{{START_DATE}}' AND '{{END_DATE}}'
β€’ Filters only active sites
β€’ Only includes pageviews > 9 and pmp_enabled = 'true'
β€’ Excludes unwanted URLs (e.g. '%atlanta%', '%forum%', etc.)
β€’ Uses **at least 20** lines of:
`OR parse_url(...):"path" LIKE '%<keyword>%'`
all wrapped in a single `AND ( … )` block
β€’ GROUPs and ORDERs as needed
Return *only* the SQL, with the placeholders literally in the BETWEEN clause, inside a ```sql …``` fenceβ€”no extra text.
"""
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
)
return extract_sql_block(resp.choices[0].message.content)
def run_query(sql: str) -> pd.DataFrame:
"""Execute SQL on Snowflake and return a lowercase-column DataFrame."""
conn = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
cols = [c[0].lower() for c in cur.description]
conn.close()
return pd.DataFrame(rows, columns=cols)
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
# 9) USER INPUT & EXECUTION
# β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
user_prompt = st.text_area(
"Describe the content or keywords for your analysis:",
height=150,
)
if st.button("Generate Table"):
if not user_prompt.strip():
st.warning("Enter some analysis keywords or description.")
else:
# Generate SQL once and swap the date range for prior-year query
template_sql = get_sql_template_from_openai(user_prompt)
sql_current = template_sql.replace(
"{START_DATE}", start_date.isoformat()
).replace("{END_DATE}", end_date.isoformat())
sql_prior = template_sql.replace(
"{START_DATE}", prior_start.isoformat()
).replace("{END_DATE}", prior_end.isoformat())
include_sel = include_verticals or None
exclude_sel = exclude_verticals or None
sql_current = apply_vertical_filter(sql_current, include_sel, exclude_sel)
sql_prior = apply_vertical_filter(sql_prior, include_sel, exclude_sel)
# Run queries
df_current = run_query(sql_current)
df_prior = run_query(sql_prior)
# Extract terms
url_kws = extract_keywords(sql_current)
if len(url_kws) < 20:
st.warning(
"Fewer than 20 keywords detected; refine your prompt for broader coverage."
)
title_kws = extract_title_words(df_current) + extract_title_words(df_prior)
all_terms = []
seen = set()
for term in url_kws + title_kws:
term = term.strip()
if len(term) <= 3 or term in seen:
continue
seen.add(term)
all_terms.append(term)
# Totals for pageview display
total_cy = df_current["pageviews"].sum()
total_py = df_prior["pageviews"].sum()
# Build metrics without a totals row
metrics = []
for term in all_terms:
cy = df_current[
df_current["article_title"].str.contains(term, case=False, na=False)
| df_current["url"].str.contains(term, case=False, na=False)
]["pageviews"].sum()
py = df_prior[
df_prior["article_title"].str.contains(term, case=False, na=False)
| df_prior["url"].str.contains(term, case=False, na=False)
]["pageviews"].sum()
yoy = (cy - py) / py * 100 if py else float("nan")
metrics.append(
{
"term": term,
"CY pageviews": cy,
"PY pageviews": py,
"YoY %": yoy,
}
)
sum_cy_terms = sum(m["CY pageviews"] for m in metrics)
sum_py_terms = sum(m["PY pageviews"] for m in metrics)
for m in metrics:
m["SOV CY"] = (
m["CY pageviews"] / sum_cy_terms if sum_cy_terms else float("nan")
)
m["SOV PY"] = (
m["PY pageviews"] / sum_py_terms if sum_py_terms else float("nan")
)
m["SOV % Change"] = (
(m["SOV CY"] / m["SOV PY"] - 1)
if (not pd.isna(m["SOV CY"]) and not pd.isna(m["SOV PY"]))
else float("nan")
)
metrics_df = pd.DataFrame(metrics).sort_values("CY pageviews", ascending=False)
# Display SQL in a hidden expander above metrics
with st.expander("Show SQL Queries"):
st.subheader("Current Year SQL")
st.code(sql_current, language="sql")
st.subheader("Prior Year SQL")
st.code(sql_prior, language="sql")
# Format percentages
fmt = {
"CY pageviews": "{:,}", # add thousand separators
"PY pageviews": "{:,}", # add thousand separators
"YoY %": "{:.1f}%",
"SOV CY": "{:.1%}",
"SOV PY": "{:.1%}",
"SOV % Change": "{:.1%}",
}
# Display with conditional formatting
st.subheader("Term Performance Metrics")
styled = metrics_df.style.format(fmt, na_rep="-").applymap(
highlight_sov, subset=["SOV % Change"]
)
st.dataframe(styled, height=400)
# Show raw result tables with totals
with st.expander(f"Current Year Results: {start_date} to {end_date}"):
st.dataframe(df_current.style.format({"pageviews": "{:,}"}))
st.write(f"Total pageviews: {total_cy:,}")
with st.expander(f"Prior Year Results: {prior_start} to {prior_end}"):
st.dataframe(df_prior.style.format({"pageviews": "{:,}"}))
st.write(f"Total pageviews: {total_py:,}")