allyboyboy's picture
Initial deployment of The Daily Snorter
555aaab verified
import click
import logging
from scraper import db
from scraper.config import TOPICS, STYLES, SOURCE_CATEGORIES
from scraper.scoring import rescore_source
from scraper.utils import truncate
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("joke-corpus")
@click.group()
def cli():
"""Joke corpus — scrape, score, and curate comedy."""
db.init_db()
# --- Scrape commands ---
@cli.command()
@click.option("--source", "platform", default=None, help="Scrape a single platform")
@click.option("--backfill", is_flag=True, help="Collect historical posts")
def scrape(platform, backfill):
"""Scrape all configured sources for new content."""
sources = db.list_sources(platform)
if not sources:
click.echo("No sources configured. Use 'add' to add some.")
return
for source in sources:
plat = source["platform"]
handle = source["handle_or_url"]
click.echo(f"Scraping {plat}: {handle}...")
try:
scraper = _get_scraper(plat)
if scraper is None:
click.echo(f" No scraper implemented for '{plat}' yet. Skipping.")
continue
found, new = scraper.scrape_source(dict(source), backfill=backfill)
db.update_source_last_scraped(source["id"])
db.log_scrape(source["id"], found, new, "success")
rescore_source(source["id"])
click.echo(f" Found {found}, {new} new entries.")
except Exception as e:
db.log_scrape(source["id"], 0, 0, f"error: {e}")
logger.error(f" Error scraping {handle}: {e}")
@cli.command("import-datasets")
def import_datasets():
"""One-time import of pre-built comedy datasets."""
from scraper.static import import_datasets as imp
imp.run()
# --- Browse commands ---
@cli.command()
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None)
@click.option("--style", default=None, help="Comma-separated style names")
@click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None)
@click.option("--platform", default=None)
@click.option("--days", default=7, help="Look back N days (0 = all time)")
@click.option("--limit", default=20)
def top(topic, style, tier, platform, days, limit):
"""Show top-scored entries."""
style_val = style.split(",")[0] if style else None
days_val = days if days > 0 else None
entries = db.top_entries(
topic=topic, style=style_val, tier=tier,
platform=platform, days=days_val, limit=limit,
)
_display_entries(entries)
@cli.command()
@click.argument("query")
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None)
@click.option("--style", default=None)
@click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None)
@click.option("--platform", default=None)
@click.option("--limit", default=20)
def search(query, topic, style, tier, platform, limit):
"""Full-text search the corpus."""
entries = db.search_entries(
query=query, topic=topic, style=style,
tier=tier, platform=platform, limit=limit,
)
_display_entries(entries)
@cli.command()
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None)
@click.option("--style", default=None)
@click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None)
def random(topic, style, tier):
"""Show a random entry (inspiration mode)."""
entry = db.random_entry(topic=topic, style=style, tier=tier)
if entry:
_display_entry_detail(entry)
else:
click.echo("No entries match those filters.")
# --- Classify command ---
@cli.command()
@click.argument("entry_id", type=int)
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None)
@click.option("--style", default=None, help="Comma-separated style names")
def classify(entry_id, topic, style):
"""Set topic and/or styles for an entry."""
style_names = None
if style:
style_names = [s.strip() for s in style.split(",")]
invalid = [s for s in style_names if s not in STYLES]
if invalid:
click.echo(f"Unknown styles: {', '.join(invalid)}")
click.echo(f"Valid styles: {', '.join(STYLES)}")
return
db.classify_entry(entry_id, topic=topic, style_names=style_names)
click.echo(f"Entry {entry_id} classified.")
if topic:
click.echo(f" Topic: {topic}")
if style_names:
click.echo(f" Styles: {', '.join(style_names)}")
# --- Curate commands ---
@cli.command()
@click.argument("entry_id", type=int)
@click.argument("rating", type=click.IntRange(1, 5))
def rate(entry_id, rating):
"""Rate an entry 1-5."""
db.rate_entry(entry_id, rating)
click.echo(f"Entry {entry_id} rated {rating}/5.")
@cli.command()
@click.argument("entry_id", type=int)
def fav(entry_id):
"""Toggle favourite on an entry."""
result = db.favourite_entry(entry_id)
if result is not None:
state = "favourited" if result else "unfavourited"
click.echo(f"Entry {entry_id} {state}.")
else:
click.echo(f"Entry {entry_id} not found.")
@cli.command()
@click.argument("entry_id", type=int)
@click.argument("text")
def notes(entry_id, text):
"""Add notes to an entry."""
db.add_notes(entry_id, text)
click.echo(f"Notes added to entry {entry_id}.")
@cli.command()
@click.argument("entry_id", type=int)
def show(entry_id):
"""Show full detail for an entry."""
entry, styles = db.get_entry(entry_id)
if entry:
_display_entry_full(entry, styles)
else:
click.echo(f"Entry {entry_id} not found.")
# --- Source management ---
@cli.command("add")
@click.argument("category", type=click.Choice(SOURCE_CATEGORIES))
@click.argument("platform")
@click.argument("handle_or_url")
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None,
help="Default topic for entries from this source")
@click.option("--name", default=None, help="Display name")
@click.argument("notes_text", required=False, default=None)
def add_source(category, platform, handle_or_url, topic, name, notes_text):
"""Add a source to track."""
try:
source_id = db.add_source(
category=category,
platform=platform,
handle_or_url=handle_or_url,
display_name=name,
default_topic=topic,
notes=notes_text,
)
click.echo(f"Source added (id={source_id}): {platform} / {handle_or_url}")
if topic:
click.echo(f" Default topic: {topic}")
except Exception as e:
click.echo(f"Error: {e}")
@cli.command("list")
@click.option("--platform", default=None)
def list_sources(platform):
"""List all tracked sources."""
sources = db.list_sources(platform)
if not sources:
click.echo("No sources configured.")
return
for s in sources:
topic_str = f" [{s['default_topic']}]" if s["default_topic"] else ""
last = s["last_scraped"] or "never"
click.echo(
f" {s['id']:>3} {s['platform']:<12} {s['handle_or_url']:<30}"
f"{topic_str:<15} last: {last}"
)
if s["notes"]:
click.echo(f" {s['notes']}")
@cli.command("remove")
@click.argument("source_id", type=int)
@click.confirmation_option(prompt="Delete this source and all its entries?")
def remove_source(source_id):
"""Remove a source and all its entries."""
source = db.get_source(source_id)
if source:
db.remove_source(source_id)
click.echo(f"Removed: {source['platform']} / {source['handle_or_url']}")
else:
click.echo(f"Source {source_id} not found.")
# --- Stats ---
@cli.command()
def stats():
"""Show corpus statistics."""
s = db.get_stats()
click.echo(f"\nCorpus: {s['total_entries']} entries from {s['total_sources']} sources")
click.echo(f"Favourites: {s['favourites']}")
if s["by_tier"]:
click.echo("\nBy tier:")
for row in s["by_tier"]:
click.echo(f" {row['quality_tier']}: {row['n']}")
if s["by_platform"]:
click.echo("\nBy platform:")
for row in s["by_platform"]:
click.echo(f" {row['platform']:<15} {row['n']}")
if s["by_topic"]:
click.echo("\nBy topic:")
for row in s["by_topic"]:
click.echo(f" {row['topic']:<15} {row['n']}")
# --- Export ---
@cli.command()
@click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None)
@click.option("--style", default=None)
@click.option("--favourites-only", is_flag=True)
@click.option("--output", "-o", default=None, help="Output file (default: stdout)")
def export(topic, style, favourites_only, output):
"""Export entries to markdown."""
with db.get_db() as conn:
conditions = ["e.quality_tier IN ('S', 'A')"]
params = []
if favourites_only:
conditions = ["e.favourite = 1"]
if topic:
conditions.append("e.topic = ?")
params.append(topic)
if style:
conditions.append(
"""e.id IN (SELECT es.entry_id FROM entry_styles es
JOIN styles s ON s.id = es.style_id
WHERE s.name = ?)"""
)
params.append(style)
where = " AND ".join(conditions)
entries = conn.execute(
f"""SELECT e.*, s.display_name as source_name
FROM entries e
LEFT JOIN sources s ON e.source_id = s.id
WHERE {where}
ORDER BY e.quality_tier, e.normalised_score DESC""",
params,
).fetchall()
lines = [f"# Joke Corpus Export\n"]
lines.append(f"Entries: {len(entries)}\n")
if topic:
lines.append(f"Topic: {topic}\n")
if style:
lines.append(f"Style: {style}\n")
lines.append("---\n")
for e in entries:
lines.append(f"\n## [{e['quality_tier']}] {e['source_name'] or e['platform']}{e['author'] or 'unknown'}\n")
if e["topic"]:
lines.append(f"**Topic:** {e['topic']}")
if e["url"]:
lines.append(f"**URL:** {e['url']}")
if e["ally_rating"]:
lines.append(f"**Rating:** {'★' * e['ally_rating']}")
lines.append(f"\n{e['text']}\n")
if e["ally_notes"]:
lines.append(f"\n> {e['ally_notes']}\n")
lines.append("---\n")
text = "\n".join(lines)
if output:
with open(output, "w") as f:
f.write(text)
click.echo(f"Exported {len(entries)} entries to {output}")
else:
click.echo(text)
# --- Display helpers ---
def _display_entries(entries):
if not entries:
click.echo("No entries found.")
return
for e in entries:
fav_str = " ★" if e["favourite"] else ""
rating_str = f" [{e['ally_rating']}/5]" if e["ally_rating"] else ""
topic_str = f" [{e['topic']}]" if e["topic"] else ""
source_str = e["source_name"] or e["platform"]
click.echo(
f"\n {e['id']:>5} [{e['quality_tier']}]{fav_str}{rating_str}"
f" {source_str}{topic_str}"
)
click.echo(f" {truncate(e['text'], 200)}")
click.echo(f"\n ({len(entries)} entries)")
def _display_entry_detail(entry):
fav_str = " ★" if entry["favourite"] else ""
click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}")
if entry["topic"]:
click.echo(f" Topic: {entry['topic']}")
click.echo(f"\n {entry['text']}\n")
if entry["url"]:
click.echo(f" {entry['url']}")
def _display_entry_full(entry, styles):
fav_str = " ★" if entry["favourite"] else ""
click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}")
if entry["topic"]:
click.echo(f" Topic: {entry['topic']}")
if styles:
click.echo(f" Styles: {', '.join(styles)}")
if entry["ally_rating"]:
click.echo(f" Rating: {entry['ally_rating']}/5")
click.echo(f"\n {entry['text']}\n")
if entry["url"]:
click.echo(f" URL: {entry['url']}")
if entry["ally_notes"]:
click.echo(f" Notes: {entry['ally_notes']}")
# Engagement
has_engagement = any(
entry.get(k) for k in ["likes", "shares", "saves", "quotes", "replies"]
)
if has_engagement:
click.echo(
f" Engagement: {entry.get('likes', 0)} likes, "
f"{entry.get('shares', 0)} shares, "
f"{entry.get('saves', 0)} saves, "
f"{entry.get('quotes', 0)} quotes, "
f"{entry.get('replies', 0)} replies"
)
click.echo(
f" Score: raw={entry['raw_score']:.0f} "
f"norm={entry['normalised_score']:.1f}"
)
# --- Scraper registry ---
def _get_scraper(platform):
"""Return the scraper module for a platform, or None."""
try:
if platform == "bluesky":
from scraper.platforms import bluesky
return bluesky
elif platform == "x_twitter":
from scraper.platforms import x_twitter
return x_twitter
elif platform == "reddit":
from scraper.platforms import reddit
return reddit
elif platform == "youtube":
from scraper.platforms import youtube
return youtube
elif platform == "rss":
logger.info("RSS scraping disabled (no engagement data)")
return None
elif platform == "guardian":
from scraper.platforms import guardian
return guardian
elif platform == "instagram":
from scraper.platforms import instagram
return instagram
except ImportError:
pass
return None