import click import logging from scraper import db from scraper.config import TOPICS, STYLES, SOURCE_CATEGORIES from scraper.scoring import rescore_source from scraper.utils import truncate logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("joke-corpus") @click.group() def cli(): """Joke corpus — scrape, score, and curate comedy.""" db.init_db() # --- Scrape commands --- @cli.command() @click.option("--source", "platform", default=None, help="Scrape a single platform") @click.option("--backfill", is_flag=True, help="Collect historical posts") def scrape(platform, backfill): """Scrape all configured sources for new content.""" sources = db.list_sources(platform) if not sources: click.echo("No sources configured. Use 'add' to add some.") return for source in sources: plat = source["platform"] handle = source["handle_or_url"] click.echo(f"Scraping {plat}: {handle}...") try: scraper = _get_scraper(plat) if scraper is None: click.echo(f" No scraper implemented for '{plat}' yet. Skipping.") continue found, new = scraper.scrape_source(dict(source), backfill=backfill) db.update_source_last_scraped(source["id"]) db.log_scrape(source["id"], found, new, "success") rescore_source(source["id"]) click.echo(f" Found {found}, {new} new entries.") except Exception as e: db.log_scrape(source["id"], 0, 0, f"error: {e}") logger.error(f" Error scraping {handle}: {e}") @cli.command("import-datasets") def import_datasets(): """One-time import of pre-built comedy datasets.""" from scraper.static import import_datasets as imp imp.run() # --- Browse commands --- @cli.command() @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None) @click.option("--style", default=None, help="Comma-separated style names") @click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None) @click.option("--platform", default=None) @click.option("--days", default=7, help="Look back N days (0 = all time)") @click.option("--limit", default=20) def top(topic, style, tier, platform, days, limit): """Show top-scored entries.""" style_val = style.split(",")[0] if style else None days_val = days if days > 0 else None entries = db.top_entries( topic=topic, style=style_val, tier=tier, platform=platform, days=days_val, limit=limit, ) _display_entries(entries) @cli.command() @click.argument("query") @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None) @click.option("--style", default=None) @click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None) @click.option("--platform", default=None) @click.option("--limit", default=20) def search(query, topic, style, tier, platform, limit): """Full-text search the corpus.""" entries = db.search_entries( query=query, topic=topic, style=style, tier=tier, platform=platform, limit=limit, ) _display_entries(entries) @cli.command() @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None) @click.option("--style", default=None) @click.option("--tier", type=click.Choice(["S", "A", "B", "C"]), default=None) def random(topic, style, tier): """Show a random entry (inspiration mode).""" entry = db.random_entry(topic=topic, style=style, tier=tier) if entry: _display_entry_detail(entry) else: click.echo("No entries match those filters.") # --- Classify command --- @cli.command() @click.argument("entry_id", type=int) @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None) @click.option("--style", default=None, help="Comma-separated style names") def classify(entry_id, topic, style): """Set topic and/or styles for an entry.""" style_names = None if style: style_names = [s.strip() for s in style.split(",")] invalid = [s for s in style_names if s not in STYLES] if invalid: click.echo(f"Unknown styles: {', '.join(invalid)}") click.echo(f"Valid styles: {', '.join(STYLES)}") return db.classify_entry(entry_id, topic=topic, style_names=style_names) click.echo(f"Entry {entry_id} classified.") if topic: click.echo(f" Topic: {topic}") if style_names: click.echo(f" Styles: {', '.join(style_names)}") # --- Curate commands --- @cli.command() @click.argument("entry_id", type=int) @click.argument("rating", type=click.IntRange(1, 5)) def rate(entry_id, rating): """Rate an entry 1-5.""" db.rate_entry(entry_id, rating) click.echo(f"Entry {entry_id} rated {rating}/5.") @cli.command() @click.argument("entry_id", type=int) def fav(entry_id): """Toggle favourite on an entry.""" result = db.favourite_entry(entry_id) if result is not None: state = "favourited" if result else "unfavourited" click.echo(f"Entry {entry_id} {state}.") else: click.echo(f"Entry {entry_id} not found.") @cli.command() @click.argument("entry_id", type=int) @click.argument("text") def notes(entry_id, text): """Add notes to an entry.""" db.add_notes(entry_id, text) click.echo(f"Notes added to entry {entry_id}.") @cli.command() @click.argument("entry_id", type=int) def show(entry_id): """Show full detail for an entry.""" entry, styles = db.get_entry(entry_id) if entry: _display_entry_full(entry, styles) else: click.echo(f"Entry {entry_id} not found.") # --- Source management --- @cli.command("add") @click.argument("category", type=click.Choice(SOURCE_CATEGORIES)) @click.argument("platform") @click.argument("handle_or_url") @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None, help="Default topic for entries from this source") @click.option("--name", default=None, help="Display name") @click.argument("notes_text", required=False, default=None) def add_source(category, platform, handle_or_url, topic, name, notes_text): """Add a source to track.""" try: source_id = db.add_source( category=category, platform=platform, handle_or_url=handle_or_url, display_name=name, default_topic=topic, notes=notes_text, ) click.echo(f"Source added (id={source_id}): {platform} / {handle_or_url}") if topic: click.echo(f" Default topic: {topic}") except Exception as e: click.echo(f"Error: {e}") @cli.command("list") @click.option("--platform", default=None) def list_sources(platform): """List all tracked sources.""" sources = db.list_sources(platform) if not sources: click.echo("No sources configured.") return for s in sources: topic_str = f" [{s['default_topic']}]" if s["default_topic"] else "" last = s["last_scraped"] or "never" click.echo( f" {s['id']:>3} {s['platform']:<12} {s['handle_or_url']:<30}" f"{topic_str:<15} last: {last}" ) if s["notes"]: click.echo(f" {s['notes']}") @cli.command("remove") @click.argument("source_id", type=int) @click.confirmation_option(prompt="Delete this source and all its entries?") def remove_source(source_id): """Remove a source and all its entries.""" source = db.get_source(source_id) if source: db.remove_source(source_id) click.echo(f"Removed: {source['platform']} / {source['handle_or_url']}") else: click.echo(f"Source {source_id} not found.") # --- Stats --- @cli.command() def stats(): """Show corpus statistics.""" s = db.get_stats() click.echo(f"\nCorpus: {s['total_entries']} entries from {s['total_sources']} sources") click.echo(f"Favourites: {s['favourites']}") if s["by_tier"]: click.echo("\nBy tier:") for row in s["by_tier"]: click.echo(f" {row['quality_tier']}: {row['n']}") if s["by_platform"]: click.echo("\nBy platform:") for row in s["by_platform"]: click.echo(f" {row['platform']:<15} {row['n']}") if s["by_topic"]: click.echo("\nBy topic:") for row in s["by_topic"]: click.echo(f" {row['topic']:<15} {row['n']}") # --- Export --- @cli.command() @click.option("--topic", type=click.Choice(TOPICS, case_sensitive=False), default=None) @click.option("--style", default=None) @click.option("--favourites-only", is_flag=True) @click.option("--output", "-o", default=None, help="Output file (default: stdout)") def export(topic, style, favourites_only, output): """Export entries to markdown.""" with db.get_db() as conn: conditions = ["e.quality_tier IN ('S', 'A')"] params = [] if favourites_only: conditions = ["e.favourite = 1"] if topic: conditions.append("e.topic = ?") params.append(topic) if style: conditions.append( """e.id IN (SELECT es.entry_id FROM entry_styles es JOIN styles s ON s.id = es.style_id WHERE s.name = ?)""" ) params.append(style) where = " AND ".join(conditions) entries = conn.execute( f"""SELECT e.*, s.display_name as source_name FROM entries e LEFT JOIN sources s ON e.source_id = s.id WHERE {where} ORDER BY e.quality_tier, e.normalised_score DESC""", params, ).fetchall() lines = [f"# Joke Corpus Export\n"] lines.append(f"Entries: {len(entries)}\n") if topic: lines.append(f"Topic: {topic}\n") if style: lines.append(f"Style: {style}\n") lines.append("---\n") for e in entries: lines.append(f"\n## [{e['quality_tier']}] {e['source_name'] or e['platform']} — {e['author'] or 'unknown'}\n") if e["topic"]: lines.append(f"**Topic:** {e['topic']}") if e["url"]: lines.append(f"**URL:** {e['url']}") if e["ally_rating"]: lines.append(f"**Rating:** {'★' * e['ally_rating']}") lines.append(f"\n{e['text']}\n") if e["ally_notes"]: lines.append(f"\n> {e['ally_notes']}\n") lines.append("---\n") text = "\n".join(lines) if output: with open(output, "w") as f: f.write(text) click.echo(f"Exported {len(entries)} entries to {output}") else: click.echo(text) # --- Display helpers --- def _display_entries(entries): if not entries: click.echo("No entries found.") return for e in entries: fav_str = " ★" if e["favourite"] else "" rating_str = f" [{e['ally_rating']}/5]" if e["ally_rating"] else "" topic_str = f" [{e['topic']}]" if e["topic"] else "" source_str = e["source_name"] or e["platform"] click.echo( f"\n {e['id']:>5} [{e['quality_tier']}]{fav_str}{rating_str}" f" {source_str}{topic_str}" ) click.echo(f" {truncate(e['text'], 200)}") click.echo(f"\n ({len(entries)} entries)") def _display_entry_detail(entry): fav_str = " ★" if entry["favourite"] else "" click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}") if entry["topic"]: click.echo(f" Topic: {entry['topic']}") click.echo(f"\n {entry['text']}\n") if entry["url"]: click.echo(f" {entry['url']}") def _display_entry_full(entry, styles): fav_str = " ★" if entry["favourite"] else "" click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}") if entry["topic"]: click.echo(f" Topic: {entry['topic']}") if styles: click.echo(f" Styles: {', '.join(styles)}") if entry["ally_rating"]: click.echo(f" Rating: {entry['ally_rating']}/5") click.echo(f"\n {entry['text']}\n") if entry["url"]: click.echo(f" URL: {entry['url']}") if entry["ally_notes"]: click.echo(f" Notes: {entry['ally_notes']}") # Engagement has_engagement = any( entry.get(k) for k in ["likes", "shares", "saves", "quotes", "replies"] ) if has_engagement: click.echo( f" Engagement: {entry.get('likes', 0)} likes, " f"{entry.get('shares', 0)} shares, " f"{entry.get('saves', 0)} saves, " f"{entry.get('quotes', 0)} quotes, " f"{entry.get('replies', 0)} replies" ) click.echo( f" Score: raw={entry['raw_score']:.0f} " f"norm={entry['normalised_score']:.1f}" ) # --- Scraper registry --- def _get_scraper(platform): """Return the scraper module for a platform, or None.""" try: if platform == "bluesky": from scraper.platforms import bluesky return bluesky elif platform == "x_twitter": from scraper.platforms import x_twitter return x_twitter elif platform == "reddit": from scraper.platforms import reddit return reddit elif platform == "youtube": from scraper.platforms import youtube return youtube elif platform == "rss": logger.info("RSS scraping disabled (no engagement data)") return None elif platform == "guardian": from scraper.platforms import guardian return guardian elif platform == "instagram": from scraper.platforms import instagram return instagram except ImportError: pass return None