legislation-tracker / data_updating_scripts /generate_newsletter.py
ramanna's picture
Deploy: newsletter display polish
3cc39aa
#!/usr/bin/env python3
"""
generate_newsletter.py
----------------------
Reads the latest weekly changes CSV and bill data, then uses GPT-4o to
generate a Markdown newsletter draft summarizing the week's legislative changes.
Output: data/newsletter_drafts/newsletter_YYYY-MM-DD.md
"""
from __future__ import annotations
import csv
import json
import logging
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import dotenv
dotenv.load_dotenv()
# Pipeline status tracking (no-op when running standalone)
_PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT")
_pipeline = None
_last_status_write = 0.0
if _PIPELINE_SCRIPT:
try:
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from data_updating_scripts.pipeline_status import PipelineStatus
_pipeline = PipelineStatus()
except Exception:
try:
from pipeline_status import PipelineStatus
_pipeline = PipelineStatus()
except Exception:
pass
def _update_pipeline_progress(current, total, unit="items", message=""):
global _last_status_write
if not _pipeline:
return
now = time.time()
if now - _last_status_write < 3.0:
return
_last_status_write = now
try:
_pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message)
except Exception:
pass
def _log_pipeline_error(error, bill_id="", bill_key=""):
if not _pipeline:
return
try:
_pipeline.log_error(_PIPELINE_SCRIPT, error, bill_id, bill_key)
except Exception:
pass
# Create logs directory
os.makedirs("data_updating_scripts/logs", exist_ok=True)
# LangChain imports
try:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
except ImportError:
ChatOpenAI = None # type: ignore
ChatPromptTemplate = None # type: ignore
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("data_updating_scripts/logs/generate_newsletter.log"),
],
)
logger = logging.getLogger(__name__)
# Paths
DATA_DIR = Path("data")
CHANGES_DIR = DATA_DIR / "weekly_changes"
DRAFTS_DIR = DATA_DIR / "newsletter_drafts"
BILLS_FILE = DATA_DIR / "known_bills_visualize.json"
PROMPT_FILE = Path("data_updating_scripts/PROMPTS/newsletter_prompt.md")
DRAFTS_DIR.mkdir(parents=True, exist_ok=True)
# ── LLM setup ────────────────────────────────────────────────────────
def _ensure_llm() -> ChatOpenAI:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY environment variable not set")
model_name = os.getenv("MODEL_NAME", "gpt-5")
return ChatOpenAI(model=model_name, temperature=0.3, request_timeout=600)
def _load_prompt() -> str:
with open(PROMPT_FILE, "r", encoding="utf-8") as f:
return f.read()
# ── Data loading ─────────────────────────────────────────────────────
def get_most_recent_csv() -> Optional[Path]:
"""Find the most recent weekly changes CSV by filename date."""
if not CHANGES_DIR.exists():
return None
csvs = list(CHANGES_DIR.glob("weekly_changes_*.csv"))
if not csvs:
return None
# Sort by filename (contains date) instead of mtime (unreliable on HF containers)
csvs.sort(key=lambda p: p.name)
return csvs[-1]
def load_changes_csv(csv_path: Path) -> List[Dict[str, str]]:
"""Parse the weekly changes CSV into a list of dicts."""
changes = []
with open(csv_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
changes.append(dict(row))
return changes
def load_bills_lookup() -> Dict[str, Dict]:
"""Load known_bills_visualize.json into a lookup dict keyed by state_billnumber."""
try:
with open(BILLS_FILE, "r", encoding="utf-8") as f:
bills = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
logger.warning(f"Could not load {BILLS_FILE}")
return {}
lookup = {}
for bill in bills:
key = f"{bill.get('state', 'Unknown')}_{bill.get('bill_number', 'Unknown')}"
lookup[key] = bill
return lookup
# ── Build structured payload for the LLM ─────────────────────────────
def build_newsletter_data(changes: List[Dict[str, str]], bills_lookup: Dict[str, Dict]) -> Dict[str, Any]:
"""
Organize raw CSV changes into structured sections for the LLM prompt.
Enriches each change with full bill details from known_bills_visualize.json.
"""
# Load news cache for enrichment
_news_cache = {}
_news_file = Path("data/bill_news.json")
if _news_file.exists():
try:
with open(_news_file, "r", encoding="utf-8") as f:
_news_cache = json.load(f)
except Exception:
pass
new_bills = []
status_changes = []
signed_into_law = []
vetoed = []
for change in changes:
bill_key = f"{change.get('state', 'Unknown')}_{change.get('bill_number', 'Unknown')}"
full_bill = bills_lookup.get(bill_key, {})
enriched = {
"bill_number": change.get("bill_number", ""),
"state": change.get("state", ""),
"title": change.get("title", "") or full_bill.get("title", ""),
"description": (full_bill.get("description") or "")[:300],
"sponsors": full_bill.get("sponsors", []),
"bill_url": full_bill.get("bill_url", ""),
"session_year": change.get("session_year", ""),
"change_type": change.get("change_type", ""),
"old_status": change.get("old_status", ""),
"new_status": change.get("new_status", ""),
"status_change_detail": change.get("status_change_detail", ""),
"recent_news": _news_cache.get(bill_key, {}).get("articles", [])[:3],
}
ctype = change.get("change_type", "")
if ctype == "new_bill":
new_bills.append(enriched)
elif ctype == "signed_into_law":
signed_into_law.append(enriched)
elif ctype == "vetoed":
vetoed.append(enriched)
else:
status_changes.append(enriched)
# Compute summary stats
all_states = [c.get("state", "") for c in changes]
state_counts = {}
for s in all_states:
state_counts[s] = state_counts.get(s, 0) + 1
most_active = sorted(state_counts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"date": datetime.now().strftime("%B %d, %Y"),
"summary": {
"total_changes": len(changes),
"new_bills": len(new_bills),
"status_changes": len(status_changes),
"signed_into_law": len(signed_into_law),
"vetoed": len(vetoed),
"most_active_states": most_active,
},
"new_bills": new_bills,
"status_changes": status_changes,
"signed_into_law": signed_into_law,
"vetoed": vetoed,
}
# ── LLM generation ───────────────────────────────────────────────────
def generate_newsletter(newsletter_data: Dict[str, Any]) -> str:
"""Send structured data to GPT-4o and get back a Markdown newsletter."""
llm = _ensure_llm()
system_prompt = _load_prompt()
user_message = (
"Here is this week's legislative change data in JSON format. "
"Generate the newsletter draft based on this data.\n\n"
f"```json\n{json.dumps(newsletter_data, indent=2, ensure_ascii=False)}\n```"
)
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{user_input}"),
])
chain = prompt | llm
def _do_invoke():
return chain.invoke({"user_input": user_message})
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_do_invoke)
try:
result = future.result(timeout=7200)
except FuturesTimeout:
raise TimeoutError("Newsletter generation exceeded 2h timeout")
content = result.content
# Strip markdown code fences if the model wraps output in them
if content.startswith("```markdown"):
content = content[len("```markdown"):].strip()
if content.startswith("```"):
content = content[3:].strip()
if content.endswith("```"):
content = content[:-3].strip()
return content
# ── Main ──────────────────────────────────────────────────────────────
def main():
logger.info("Starting newsletter generation...")
_update_pipeline_progress(0, 1, "newsletter", "Starting...")
# Find most recent changes CSV
csv_path = get_most_recent_csv()
if csv_path is None:
logger.warning("No weekly changes CSV found β€” skipping newsletter generation")
print("No weekly changes CSV found. Run detect_changes.py first.")
_update_pipeline_progress(1, 1, "newsletter", "Skipped β€” no changes CSV")
return
logger.info(f"Using changes CSV: {csv_path}")
# Load data
changes = load_changes_csv(csv_path)
logger.info(f"Loaded {len(changes)} changes from CSV")
bills_lookup = load_bills_lookup()
logger.info(f"Loaded {len(bills_lookup)} bills for enrichment")
# Build structured data
newsletter_data = build_newsletter_data(changes, bills_lookup)
# Generate with LLM
if len(changes) == 0:
logger.info("Zero changes detected β€” generating quiet week newsletter")
_update_pipeline_progress(0, 1, "newsletter", "Generating draft with GPT-4o...")
try:
markdown = generate_newsletter(newsletter_data)
except Exception as e:
logger.error(f"Newsletter generation failed: {e}")
_log_pipeline_error(str(e))
print(f"Newsletter generation failed: {e}")
return
# Save the draft
date_str = datetime.now().strftime("%Y-%m-%d")
output_path = DRAFTS_DIR / f"newsletter_{date_str}.md"
with open(output_path, "w", encoding="utf-8") as f:
f.write(markdown)
logger.info(f"Newsletter saved to {output_path}")
print(f"Newsletter draft saved: {output_path}")
_update_pipeline_progress(1, 1, "newsletter", "Newsletter generated")
# Email the newsletter
_send_email_notification(output_path, markdown)
# Upload newsletter draft to HF dataset for persistence
_upload_newsletter_to_hf(output_path)
def _markdown_to_html(markdown: str) -> str:
"""Convert newsletter markdown to styled HTML email matching editorial design."""
import re
# ── Parse markdown into sections ──
lines = markdown.split("\n")
sections = [] # list of (level, title, content_lines)
current_title = ""
current_level = 0
current_lines = []
for line in lines:
stripped = line.strip()
if stripped.startswith("### "):
if current_title or current_lines:
sections.append((current_level, current_title, current_lines))
current_level = 3
current_title = re.sub(r'^\d+\.\s*', '', stripped[4:]).strip()
current_lines = []
elif stripped.startswith("## "):
if current_title or current_lines:
sections.append((current_level, current_title, current_lines))
current_level = 2
current_title = stripped[3:].strip()
current_lines = []
else:
current_lines.append(line)
if current_title or current_lines:
sections.append((current_level, current_title, current_lines))
# ── Helper: format inline markdown ──
def _fmt(text):
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)',
r'<a href="\2" style="color:#0F172A;text-decoration:underline;border-bottom:2px solid #CFB991;">\1</a>', text)
text = re.sub(r'\*\*([^*]+)\*\*', r'<strong>\1</strong>', text)
text = text.replace("β†’", '<span style="color:#64748B;font-weight:bold;">&rarr;</span>')
return text
# ── Helper: render a state tag ──
def _state_tag(state):
return (f'<span style="display:inline-block;background:#E8E4DD;padding:2px 8px;'
f'border-radius:4px;font-size:11px;font-weight:600;letter-spacing:0.5px;'
f'border:1px solid #D1CBC0;margin-right:4px;">{state}</span>')
# ── Helper: render a bill card ──
def _bill_card(content_html, featured=False):
border_color = "#CFB991" if featured else "#0F172A"
return (f'<table width="100%" cellpadding="0" cellspacing="0" style="margin-bottom:10px;">'
f'<tr><td style="background:#FFFFFF;border:1px solid #E2E8F0;border-left:3px solid {border_color};'
f'border-radius:6px;padding:14px 16px;font-size:14px;line-height:1.5;color:#1E293B;">'
f'{content_html}</td></tr></table>')
# ── Build body sections ──
body_parts = []
_intro_added = False
_total_changes_for_cta = ""
for level, title, content in sections:
title_lower = title.lower()
# Skip preemption watch
if "preemption" in title_lower:
continue
# Insert editorial intro once, just before the first curated bill section
is_curated = any(k in title_lower for k in ("moved", "governor", "desk", "watch", "new bills"))
if is_curated and not _intro_added:
body_parts.append(
'<table width="100%" cellpadding="0" cellspacing="0" style="margin:20px 0 8px;">'
'<tr><td style="background:#FAF7EE;border-left:3px solid #CFB991;border-radius:6px;'
'padding:12px 16px;font-size:13px;line-height:1.5;color:#475569;">'
'<strong style="color:#0F172A;">Editor\'s picks below.</strong> '
'The next sections highlight the most notable bills curated by our team. '
'For the complete record of every change this week β€” grouped by state β€” visit the '
'<a href="https://huggingface.co/spaces/VAILL/legislation-tracker" '
'style="color:#0F172A;font-weight:600;text-decoration:underline;border-bottom:2px solid #CFB991;">'
'full dashboard</a>.'
'</td></tr></table>'
)
_intro_added = True
# ── Section header ──
if title:
body_parts.append(
f'<table width="100%" cellpadding="0" cellspacing="0" style="margin-top:28px;margin-bottom:14px;">'
f'<tr><td style="border-left:3px solid #0F172A;padding:6px 14px;">'
f'<p style="color:#0F172A;font-size:13px;font-weight:700;margin:0;text-transform:uppercase;letter-spacing:2px;">'
f'{title}</p></td></tr></table>'
)
# ── "This Week in Numbers" β†’ stat cards ──
if "numbers" in title_lower or "week in numbers" in title_lower:
stats = {}
active_states = []
full_text = "\n".join(content)
for cline in content:
s = cline.strip()
s_lower = s.lower()
m = re.search(r'\*\*.*?\*\*\s*[:\-]?\s*(\d+)', s)
if m:
val = m.group(1)
if "total" in s_lower and "change" in s_lower:
stats["total"] = val
elif "new bill" in s_lower:
stats["new"] = val
elif "changed status" in s_lower or "status change" in s_lower:
stats["status"] = val
elif "signed" in s_lower:
stats["signed"] = val
elif "vetoed" in s_lower:
stats["vetoed"] = val
# Parse active states from full text
am = re.search(r'[Mm]ost active.*?:(.*?)(?:\n\n|\n###|\Z)', full_text, re.DOTALL)
if am:
active_states = re.findall(r'([A-Z][A-Za-z\s]+?)\s*\((\d+)\)', am.group(1))
if stats:
stat_items = [
(stats.get("total", "0"), "TOTAL CHANGES"),
(stats.get("new", "0"), "NEW BILLS"),
(stats.get("status", "0"), "STATUS CHANGES"),
(stats.get("signed", "0"), "SIGNED INTO LAW"),
(stats.get("vetoed", "0"), "VETOED"),
]
cells = ""
for val, label in stat_items:
cells += (f'<td align="center" style="background:#FFFFFF;border:1px solid #E2E8F0;'
f'border-radius:8px;padding:16px 8px;width:20%;">'
f'<p style="font-size:28px;font-weight:800;color:#0F172A;margin:0;line-height:1;">{val}</p>'
f'<p style="font-size:9px;color:#94A3B8;text-transform:uppercase;letter-spacing:1.5px;'
f'margin:6px 0 0;font-weight:600;">{label}</p></td>')
body_parts.append(
f'<table width="100%" cellpadding="4" cellspacing="0"><tr>{cells}</tr></table>'
)
if active_states:
tags = " ".join(_state_tag(f"{s} ({c})") for s, c in active_states)
body_parts.append(
f'<table width="100%" cellpadding="0" cellspacing="0" style="margin-top:10px;">'
f'<tr><td align="center">'
f'<p style="font-size:10px;color:#64748B;text-transform:uppercase;letter-spacing:1px;'
f'margin:0 0 6px;font-weight:600;">MOST ACTIVE STATES</p>'
f'{tags}</td></tr></table>'
)
continue
# ── Bills sections (Moved, Governor's Desk, New Bills) ──
featured = "governor" in title_lower
has_state_groups = any(l.strip().startswith("#### ") for l in content)
if has_state_groups:
current_state = ""
for cline in content:
s = cline.strip()
if s.startswith("#### "):
current_state = s[5:].strip()
elif s.startswith("- ") or s.startswith("* "):
item = _fmt(s[2:])
card_html = ""
if current_state:
card_html += _state_tag(current_state) + " "
card_html += item
body_parts.append(_bill_card(card_html, featured=featured))
else:
# Plain list items
for cline in content:
s = cline.strip()
if s.startswith("- ") or s.startswith("* "):
item = _fmt(s[2:])
body_parts.append(_bill_card(item, featured=featured))
elif s and not s.startswith("#"):
text = _fmt(s)
if "no " in s.lower() and ("activity" in s.lower() or "bills" in s.lower() or "preemption" in s.lower()):
body_parts.append(
f'<p style="color:#94A3B8;font-size:13px;font-style:italic;margin:8px 0;">{text}</p>'
)
elif s.strip() == "---":
pass
else:
body_parts.append(f'<p style="margin:8px 0;line-height:1.6;color:#1E293B;font-size:14px;">{text}</p>')
# ── CTA: see complete changes on the website ──
body_parts.append(
'<table width="100%" cellpadding="0" cellspacing="0" style="margin:32px 0 8px;">'
'<tr><td align="center" style="background:#0F172A;border-radius:8px;padding:24px 20px;">'
'<p style="color:#FFFFFF;font-size:14px;font-weight:600;margin:0 0 12px;line-height:1.5;">'
'Want every bill that moved this week?'
'</p>'
'<p style="color:#CFB991;font-size:12px;margin:0 0 16px;line-height:1.5;">'
'The website has the complete week\'s activity grouped by state, with filters and search.'
'</p>'
'<a href="https://huggingface.co/spaces/VAILL/legislation-tracker" '
'style="display:inline-block;background:#CFB991;color:#0F172A;font-size:13px;font-weight:700;'
'padding:10px 24px;border-radius:6px;text-decoration:none;text-transform:uppercase;letter-spacing:1px;">'
'View All Changes &rarr;</a>'
'</td></tr></table>'
)
body = "\n".join(body_parts)
date_str = datetime.now().strftime("%B %d, %Y")
return f"""<!DOCTYPE html>
<html>
<head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1.0"></head>
<body style="margin:0;padding:0;background-color:#F8F7F4;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,Helvetica,Arial,sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0" style="background-color:#F8F7F4;padding:20px 0;">
<tr><td align="center">
<table width="640" cellpadding="0" cellspacing="0" style="background-color:#FFFFFF;border-radius:8px;overflow:hidden;box-shadow:0 2px 8px rgba(0,0,0,0.06);">
<!-- Header -->
<tr><td style="background:#FFFFFF;border-bottom:2px solid #0F172A;padding:32px 40px 24px;text-align:center;">
<h1 style="color:#0F172A;margin:0;font-size:28px;letter-spacing:6px;font-weight:800;text-transform:uppercase;">VAILL AI Governance Weekly</h1>
<p style="color:#64748B;margin:6px 0 0;font-size:12px;font-weight:500;letter-spacing:1px;">Vanderbilt AI Law Lab &bull; Legislation Tracker</p>
<p style="color:#0F172A;margin:8px 0 0;font-size:11px;letter-spacing:3px;text-transform:uppercase;font-weight:600;">{date_str}</p>
<table width="60" cellpadding="0" cellspacing="0" style="margin:12px auto 0;"><tr><td style="height:3px;background:#CFB991;"></td></tr></table>
</td></tr>
<!-- Body -->
<tr><td style="padding:8px 40px 32px;">
{body}
</td></tr>
<!-- Footer -->
<tr><td style="background-color:#0F172A;padding:24px 40px;text-align:center;">
<p style="color:#94A3B8;font-size:12px;margin:0;">
Vanderbilt AI Law Lab &bull; AI Governance Legislation Tracker<br>
<a href="https://huggingface.co/spaces/VAILL/legislation-tracker" style="color:#CFB991;text-decoration:underline;">View Full Dashboard</a>
</p>
</td></tr>
</table>
</td></tr>
</table>
</body>
</html>"""
def _send_email_notification(draft_path: Path, markdown: str):
"""Email the newsletter draft to all subscribers + NOTIFY_EMAIL.
Tries SendGrid HTTP API first (works on HF Spaces where SMTP is blocked),
falls back to Gmail SMTP if SendGrid is not configured.
"""
import requests as _req
gmail_from = os.environ.get("GMAIL_FROM", "").strip()
app_password = os.environ.get("GMAIL_APP_PASSWORD", "").replace(" ", "")
notify_to = os.environ.get("NOTIFY_EMAIL", "").strip()
sendgrid_key = os.environ.get("SENDGRID_API_KEY", "").strip()
logger.info(
f"Email credentials β€” GMAIL_FROM={'set' if gmail_from else 'MISSING'}, "
f"APP_PASSWORD={'set' if app_password else 'MISSING'}, "
f"NOTIFY_EMAIL={'set' if notify_to else 'MISSING'}, "
f"SENDGRID_API_KEY={'set' if sendgrid_key else 'MISSING'}"
)
# Load subscriber list
subscribers_path = Path("data/subscribers.json")
subscribers = []
if subscribers_path.exists():
try:
with open(subscribers_path, "r") as f:
subscribers = json.load(f)
except Exception:
pass
# Combine subscribers with NOTIFY_EMAIL (deduped)
all_recipients = list({e.strip().lower() for e in subscribers + ([notify_to] if notify_to else []) if e.strip()})
logger.info(f"Newsletter recipients: {all_recipients}")
if not all_recipients:
logger.warning("No recipients configured β€” skipping email")
return
from_email = gmail_from or "newsletter@vaill.org"
subject = f"VAILL AI Legislation Weekly β€” {datetime.now().strftime('%B %d, %Y')}"
html_body = _markdown_to_html(markdown)
# Try SendGrid HTTP API first (works on HF Spaces)
if sendgrid_key:
logger.info("Sending via SendGrid HTTP API...")
sent, failed = 0, 0
for recipient in all_recipients:
payload = {
"personalizations": [{"to": [{"email": recipient}]}],
"from": {"email": from_email, "name": "VAILL AI Law Lab"},
"subject": subject,
"content": [
{"type": "text/plain", "value": markdown},
{"type": "text/html", "value": html_body},
],
}
try:
resp = _req.post(
"https://api.sendgrid.com/v3/mail/send",
headers={
"Authorization": f"Bearer {sendgrid_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=30,
)
if resp.status_code in (200, 202):
logger.info(f"Newsletter emailed to {recipient} via SendGrid")
sent += 1
else:
logger.warning(f"SendGrid error for {recipient}: {resp.status_code} {resp.text}")
failed += 1
except Exception as e:
logger.warning(f"SendGrid request failed for {recipient}: {e}")
failed += 1
print(f"Newsletter sent (SendGrid): {sent} succeeded, {failed} failed ({len(all_recipients)} total)")
return
# Fall back to Gmail SMTP
if not gmail_from or not app_password:
logger.warning("No email service configured β€” set SENDGRID_API_KEY or GMAIL_FROM+GMAIL_APP_PASSWORD")
return
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
logger.info("Sending via Gmail SMTP...")
sent, failed = 0, 0
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(gmail_from, app_password)
for recipient in all_recipients:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = f"VAILL AI Law Lab <{gmail_from}>"
msg["To"] = recipient
msg.attach(MIMEText(markdown, "plain", "utf-8"))
msg.attach(MIMEText(html_body, "html", "utf-8"))
try:
server.sendmail(gmail_from, recipient, msg.as_string())
logger.info(f"Newsletter emailed to {recipient}")
sent += 1
except Exception as e:
logger.warning(f"Failed to send to {recipient}: {e}")
failed += 1
except Exception as e:
logger.error(f"SMTP connection/login failed: {e}")
print(f"Newsletter email failed: {e}")
return
print(f"Newsletter sent (SMTP): {sent} succeeded, {failed} failed ({len(all_recipients)} total)")
def _upload_newsletter_to_hf(draft_path: Path):
"""Upload the newsletter draft to HF dataset."""
try:
from huggingface_hub import HfApi
token = os.environ.get("HUGGINGFACE_HUB_TOKEN")
repo_id = os.environ.get("HF_REPO_ID")
if not token or not repo_id:
logger.info("HF credentials not configured β€” skipping newsletter upload")
return
api = HfApi(token=token)
dest = f"newsletter_drafts/{draft_path.name}"
logger.info(f"Uploading newsletter to HF: {dest}")
api.upload_file(
path_or_fileobj=str(draft_path),
path_in_repo=dest,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Update newsletter: {draft_path.name}",
)
print(f"Newsletter synced to HuggingFace: {dest}")
except Exception as e:
logger.warning(f"Failed to upload newsletter to HF: {e}")
print(f"HF newsletter sync skipped: {e}")
if __name__ == "__main__":
main()