Spaces:
Sleeping
Sleeping
| import click | |
| import logging | |
| from scraper import db | |
| from scraper.config import TOPICS, STYLES, SOURCE_CATEGORIES | |
| from scraper.scoring import rescore_source | |
| from scraper.utils import truncate | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| logger = logging.getLogger("joke-corpus") | |
| def cli(): | |
| """Joke corpus — scrape, score, and curate comedy.""" | |
| db.init_db() | |
| # --- Scrape commands --- | |
| def scrape(platform, backfill): | |
| """Scrape all configured sources for new content.""" | |
| sources = db.list_sources(platform) | |
| if not sources: | |
| click.echo("No sources configured. Use 'add' to add some.") | |
| return | |
| for source in sources: | |
| plat = source["platform"] | |
| handle = source["handle_or_url"] | |
| click.echo(f"Scraping {plat}: {handle}...") | |
| try: | |
| scraper = _get_scraper(plat) | |
| if scraper is None: | |
| click.echo(f" No scraper implemented for '{plat}' yet. Skipping.") | |
| continue | |
| found, new = scraper.scrape_source(dict(source), backfill=backfill) | |
| db.update_source_last_scraped(source["id"]) | |
| db.log_scrape(source["id"], found, new, "success") | |
| rescore_source(source["id"]) | |
| click.echo(f" Found {found}, {new} new entries.") | |
| except Exception as e: | |
| db.log_scrape(source["id"], 0, 0, f"error: {e}") | |
| logger.error(f" Error scraping {handle}: {e}") | |
| def import_datasets(): | |
| """One-time import of pre-built comedy datasets.""" | |
| from scraper.static import import_datasets as imp | |
| imp.run() | |
| # --- Browse commands --- | |
| def top(topic, style, tier, platform, days, limit): | |
| """Show top-scored entries.""" | |
| style_val = style.split(",")[0] if style else None | |
| days_val = days if days > 0 else None | |
| entries = db.top_entries( | |
| topic=topic, style=style_val, tier=tier, | |
| platform=platform, days=days_val, limit=limit, | |
| ) | |
| _display_entries(entries) | |
| def search(query, topic, style, tier, platform, limit): | |
| """Full-text search the corpus.""" | |
| entries = db.search_entries( | |
| query=query, topic=topic, style=style, | |
| tier=tier, platform=platform, limit=limit, | |
| ) | |
| _display_entries(entries) | |
| def random(topic, style, tier): | |
| """Show a random entry (inspiration mode).""" | |
| entry = db.random_entry(topic=topic, style=style, tier=tier) | |
| if entry: | |
| _display_entry_detail(entry) | |
| else: | |
| click.echo("No entries match those filters.") | |
| # --- Classify command --- | |
| def classify(entry_id, topic, style): | |
| """Set topic and/or styles for an entry.""" | |
| style_names = None | |
| if style: | |
| style_names = [s.strip() for s in style.split(",")] | |
| invalid = [s for s in style_names if s not in STYLES] | |
| if invalid: | |
| click.echo(f"Unknown styles: {', '.join(invalid)}") | |
| click.echo(f"Valid styles: {', '.join(STYLES)}") | |
| return | |
| db.classify_entry(entry_id, topic=topic, style_names=style_names) | |
| click.echo(f"Entry {entry_id} classified.") | |
| if topic: | |
| click.echo(f" Topic: {topic}") | |
| if style_names: | |
| click.echo(f" Styles: {', '.join(style_names)}") | |
| # --- Curate commands --- | |
| def rate(entry_id, rating): | |
| """Rate an entry 1-5.""" | |
| db.rate_entry(entry_id, rating) | |
| click.echo(f"Entry {entry_id} rated {rating}/5.") | |
| def fav(entry_id): | |
| """Toggle favourite on an entry.""" | |
| result = db.favourite_entry(entry_id) | |
| if result is not None: | |
| state = "favourited" if result else "unfavourited" | |
| click.echo(f"Entry {entry_id} {state}.") | |
| else: | |
| click.echo(f"Entry {entry_id} not found.") | |
| def notes(entry_id, text): | |
| """Add notes to an entry.""" | |
| db.add_notes(entry_id, text) | |
| click.echo(f"Notes added to entry {entry_id}.") | |
| def show(entry_id): | |
| """Show full detail for an entry.""" | |
| entry, styles = db.get_entry(entry_id) | |
| if entry: | |
| _display_entry_full(entry, styles) | |
| else: | |
| click.echo(f"Entry {entry_id} not found.") | |
| # --- Source management --- | |
| def add_source(category, platform, handle_or_url, topic, name, notes_text): | |
| """Add a source to track.""" | |
| try: | |
| source_id = db.add_source( | |
| category=category, | |
| platform=platform, | |
| handle_or_url=handle_or_url, | |
| display_name=name, | |
| default_topic=topic, | |
| notes=notes_text, | |
| ) | |
| click.echo(f"Source added (id={source_id}): {platform} / {handle_or_url}") | |
| if topic: | |
| click.echo(f" Default topic: {topic}") | |
| except Exception as e: | |
| click.echo(f"Error: {e}") | |
| def list_sources(platform): | |
| """List all tracked sources.""" | |
| sources = db.list_sources(platform) | |
| if not sources: | |
| click.echo("No sources configured.") | |
| return | |
| for s in sources: | |
| topic_str = f" [{s['default_topic']}]" if s["default_topic"] else "" | |
| last = s["last_scraped"] or "never" | |
| click.echo( | |
| f" {s['id']:>3} {s['platform']:<12} {s['handle_or_url']:<30}" | |
| f"{topic_str:<15} last: {last}" | |
| ) | |
| if s["notes"]: | |
| click.echo(f" {s['notes']}") | |
| def remove_source(source_id): | |
| """Remove a source and all its entries.""" | |
| source = db.get_source(source_id) | |
| if source: | |
| db.remove_source(source_id) | |
| click.echo(f"Removed: {source['platform']} / {source['handle_or_url']}") | |
| else: | |
| click.echo(f"Source {source_id} not found.") | |
| # --- Stats --- | |
| def stats(): | |
| """Show corpus statistics.""" | |
| s = db.get_stats() | |
| click.echo(f"\nCorpus: {s['total_entries']} entries from {s['total_sources']} sources") | |
| click.echo(f"Favourites: {s['favourites']}") | |
| if s["by_tier"]: | |
| click.echo("\nBy tier:") | |
| for row in s["by_tier"]: | |
| click.echo(f" {row['quality_tier']}: {row['n']}") | |
| if s["by_platform"]: | |
| click.echo("\nBy platform:") | |
| for row in s["by_platform"]: | |
| click.echo(f" {row['platform']:<15} {row['n']}") | |
| if s["by_topic"]: | |
| click.echo("\nBy topic:") | |
| for row in s["by_topic"]: | |
| click.echo(f" {row['topic']:<15} {row['n']}") | |
| # --- Export --- | |
| def export(topic, style, favourites_only, output): | |
| """Export entries to markdown.""" | |
| with db.get_db() as conn: | |
| conditions = ["e.quality_tier IN ('S', 'A')"] | |
| params = [] | |
| if favourites_only: | |
| conditions = ["e.favourite = 1"] | |
| if topic: | |
| conditions.append("e.topic = ?") | |
| params.append(topic) | |
| if style: | |
| conditions.append( | |
| """e.id IN (SELECT es.entry_id FROM entry_styles es | |
| JOIN styles s ON s.id = es.style_id | |
| WHERE s.name = ?)""" | |
| ) | |
| params.append(style) | |
| where = " AND ".join(conditions) | |
| entries = conn.execute( | |
| f"""SELECT e.*, s.display_name as source_name | |
| FROM entries e | |
| LEFT JOIN sources s ON e.source_id = s.id | |
| WHERE {where} | |
| ORDER BY e.quality_tier, e.normalised_score DESC""", | |
| params, | |
| ).fetchall() | |
| lines = [f"# Joke Corpus Export\n"] | |
| lines.append(f"Entries: {len(entries)}\n") | |
| if topic: | |
| lines.append(f"Topic: {topic}\n") | |
| if style: | |
| lines.append(f"Style: {style}\n") | |
| lines.append("---\n") | |
| for e in entries: | |
| lines.append(f"\n## [{e['quality_tier']}] {e['source_name'] or e['platform']} — {e['author'] or 'unknown'}\n") | |
| if e["topic"]: | |
| lines.append(f"**Topic:** {e['topic']}") | |
| if e["url"]: | |
| lines.append(f"**URL:** {e['url']}") | |
| if e["ally_rating"]: | |
| lines.append(f"**Rating:** {'★' * e['ally_rating']}") | |
| lines.append(f"\n{e['text']}\n") | |
| if e["ally_notes"]: | |
| lines.append(f"\n> {e['ally_notes']}\n") | |
| lines.append("---\n") | |
| text = "\n".join(lines) | |
| if output: | |
| with open(output, "w") as f: | |
| f.write(text) | |
| click.echo(f"Exported {len(entries)} entries to {output}") | |
| else: | |
| click.echo(text) | |
| # --- Display helpers --- | |
| def _display_entries(entries): | |
| if not entries: | |
| click.echo("No entries found.") | |
| return | |
| for e in entries: | |
| fav_str = " ★" if e["favourite"] else "" | |
| rating_str = f" [{e['ally_rating']}/5]" if e["ally_rating"] else "" | |
| topic_str = f" [{e['topic']}]" if e["topic"] else "" | |
| source_str = e["source_name"] or e["platform"] | |
| click.echo( | |
| f"\n {e['id']:>5} [{e['quality_tier']}]{fav_str}{rating_str}" | |
| f" {source_str}{topic_str}" | |
| ) | |
| click.echo(f" {truncate(e['text'], 200)}") | |
| click.echo(f"\n ({len(entries)} entries)") | |
| def _display_entry_detail(entry): | |
| fav_str = " ★" if entry["favourite"] else "" | |
| click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}") | |
| if entry["topic"]: | |
| click.echo(f" Topic: {entry['topic']}") | |
| click.echo(f"\n {entry['text']}\n") | |
| if entry["url"]: | |
| click.echo(f" {entry['url']}") | |
| def _display_entry_full(entry, styles): | |
| fav_str = " ★" if entry["favourite"] else "" | |
| click.echo(f"\n [{entry['quality_tier']}]{fav_str} {entry['platform']} id={entry['id']}") | |
| if entry["topic"]: | |
| click.echo(f" Topic: {entry['topic']}") | |
| if styles: | |
| click.echo(f" Styles: {', '.join(styles)}") | |
| if entry["ally_rating"]: | |
| click.echo(f" Rating: {entry['ally_rating']}/5") | |
| click.echo(f"\n {entry['text']}\n") | |
| if entry["url"]: | |
| click.echo(f" URL: {entry['url']}") | |
| if entry["ally_notes"]: | |
| click.echo(f" Notes: {entry['ally_notes']}") | |
| # Engagement | |
| has_engagement = any( | |
| entry.get(k) for k in ["likes", "shares", "saves", "quotes", "replies"] | |
| ) | |
| if has_engagement: | |
| click.echo( | |
| f" Engagement: {entry.get('likes', 0)} likes, " | |
| f"{entry.get('shares', 0)} shares, " | |
| f"{entry.get('saves', 0)} saves, " | |
| f"{entry.get('quotes', 0)} quotes, " | |
| f"{entry.get('replies', 0)} replies" | |
| ) | |
| click.echo( | |
| f" Score: raw={entry['raw_score']:.0f} " | |
| f"norm={entry['normalised_score']:.1f}" | |
| ) | |
| # --- Scraper registry --- | |
| def _get_scraper(platform): | |
| """Return the scraper module for a platform, or None.""" | |
| try: | |
| if platform == "bluesky": | |
| from scraper.platforms import bluesky | |
| return bluesky | |
| elif platform == "x_twitter": | |
| from scraper.platforms import x_twitter | |
| return x_twitter | |
| elif platform == "reddit": | |
| from scraper.platforms import reddit | |
| return reddit | |
| elif platform == "youtube": | |
| from scraper.platforms import youtube | |
| return youtube | |
| elif platform == "rss": | |
| logger.info("RSS scraping disabled (no engagement data)") | |
| return None | |
| elif platform == "guardian": | |
| from scraper.platforms import guardian | |
| return guardian | |
| elif platform == "instagram": | |
| from scraper.platforms import instagram | |
| return instagram | |
| except ImportError: | |
| pass | |
| return None | |