| """ |
| download_classics.py |
| ==================== |
| Curated classical text downloader for MicroGPT Trivium/Quadrivium training corpus. |
| |
| Reads source_manifest.json and fetches each enabled source: |
| - Project Gutenberg : downloads plain .txt files directly |
| - MIT Internet Classics Archive: downloads HTML, strips markup with BeautifulSoup |
| |
| Downloaded files land in pipeline/inbox/ so the main pipeline picks them up. |
| A download_log.json is written alongside this script recording every run. |
| |
| Usage |
| ----- |
| python download_classics.py # download all enabled sources |
| python download_classics.py --list # list all available sources |
| python download_classics.py --dry-run # show what would download without doing it |
| python download_classics.py --art logic # download only logic texts |
| |
| Requirements |
| ------------ |
| pip install requests beautifulsoup4 |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import re |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| |
| |
| |
|
|
| SCRIPT_DIR = Path(__file__).resolve().parent |
| MANIFEST_PATH = SCRIPT_DIR / "source_manifest.json" |
| INBOX_DIR = SCRIPT_DIR.parent / "inbox" |
| LOG_PATH = SCRIPT_DIR / "download_log.json" |
|
|
| |
| |
| |
|
|
| HEADERS = { |
| "User-Agent": "MicroGPT-Classics-Downloader/1.0", |
| "Accept": "text/html,text/plain,*/*", |
| } |
| REQUEST_TIMEOUT = 30 |
| INTER_REQUEST_DELAY = 1.0 |
|
|
| |
| |
| |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s %(levelname)-8s %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| log = logging.getLogger("download_classics") |
|
|
| |
| |
| |
|
|
| def _require_requests(): |
| try: |
| import requests |
| return requests |
| except ImportError: |
| log.error("'requests' is not installed. Run: pip install requests") |
| sys.exit(1) |
|
|
|
|
| def _require_bs4(): |
| try: |
| from bs4 import BeautifulSoup |
| return BeautifulSoup |
| except ImportError: |
| log.error("'beautifulsoup4' is not installed. Run: pip install beautifulsoup4") |
| sys.exit(1) |
|
|
|
|
| |
| |
| |
|
|
| def load_manifest() -> dict: |
| if not MANIFEST_PATH.exists(): |
| log.error("Manifest not found: %s", MANIFEST_PATH) |
| sys.exit(1) |
| with MANIFEST_PATH.open(encoding="utf-8") as fh: |
| manifest = json.load(fh) |
| return manifest |
|
|
|
|
| def filter_sources(sources: list[dict], *, art: str | None = None, enabled_only: bool = True) -> list[dict]: |
| result = sources |
| if enabled_only: |
| result = [s for s in result if s.get("enabled", True)] |
| if art: |
| result = [s for s in result if s.get("art", "").lower() == art.lower()] |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def _strip_gutenberg_boilerplate(text: str) -> str: |
| """Remove Project Gutenberg header and footer legalese from raw .txt files.""" |
| |
| start_markers = [ |
| r"\*\*\* START OF (THE|THIS) PROJECT GUTENBERG", |
| r"\*\*\*START OF (THE|THIS) PROJECT GUTENBERG", |
| ] |
| end_markers = [ |
| r"\*\*\* END OF (THE|THIS) PROJECT GUTENBERG", |
| r"\*\*\*END OF (THE|THIS) PROJECT GUTENBERG", |
| ] |
|
|
| lines = text.splitlines() |
| start_idx = 0 |
| end_idx = len(lines) |
|
|
| for i, line in enumerate(lines): |
| for pat in start_markers: |
| if re.search(pat, line, re.IGNORECASE): |
| start_idx = i + 1 |
| break |
|
|
| for i, line in enumerate(lines): |
| for pat in end_markers: |
| if re.search(pat, line, re.IGNORECASE): |
| end_idx = i |
| break |
|
|
| body = lines[start_idx:end_idx] |
| return "\n".join(body).strip() |
|
|
|
|
| def _extract_mit_classics_text(html: str, source_id: str) -> str: |
| """ |
| Strip MIT Classics HTML down to the main prose body. |
| |
| MIT Classics pages wrap the actual text inside a fairly simple layout: |
| navigation links at top, a <pre> or body paragraphs in the middle, and |
| a small footer. We grab everything inside <body>, remove <script>, |
| <style>, <a> navigation blocks, and return clean plain text. |
| """ |
| BeautifulSoup = _require_bs4() |
|
|
| soup = BeautifulSoup(html, "html.parser") |
|
|
| |
| for tag in soup(["script", "style", "head", "nav"]): |
| tag.decompose() |
|
|
| |
| pre = soup.find("pre") |
| if pre: |
| raw = pre.get_text(separator="\n") |
| else: |
| |
| body = soup.find("body") or soup |
| |
| for a in body.find_all("a"): |
| |
| if a.find("img"): |
| a.decompose() |
| raw = body.get_text(separator="\n") |
|
|
| |
| cleaned = re.sub(r"\n{3,}", "\n\n", raw) |
| return cleaned.strip() |
|
|
|
|
| |
| |
| |
|
|
| def _download_gutenberg(source: dict, requests) -> str: |
| """Fetch a plain-text Gutenberg file and strip boilerplate.""" |
| url = source["url"] |
| log.info(" GET %s", url) |
| resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) |
| resp.raise_for_status() |
|
|
| |
| try: |
| text = resp.content.decode("utf-8-sig") |
| except UnicodeDecodeError: |
| text = resp.content.decode("latin-1") |
|
|
| cleaned = _strip_gutenberg_boilerplate(text) |
| if len(cleaned) < 1000: |
| raise ValueError( |
| f"Gutenberg body suspiciously short ({len(cleaned)} chars) for {source['id']}; " |
| "boilerplate stripping may have failed" |
| ) |
| return cleaned |
|
|
|
|
| def _download_mit_classics(source: dict, requests) -> str: |
| """Fetch an MIT Classics text file (.mb.txt) or HTML page.""" |
| url = source["url"] |
| log.info(" GET %s", url) |
| resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) |
| resp.raise_for_status() |
|
|
| |
| if url.endswith(".mb.txt") or url.endswith(".txt"): |
| text = resp.content.decode("utf-8", errors="replace").strip() |
| else: |
| html = resp.content.decode("utf-8", errors="replace") |
| text = _extract_mit_classics_text(html, source["id"]) |
|
|
| if len(text) < 500: |
| raise ValueError( |
| f"MIT Classics body suspiciously short ({len(text)} chars) for {source['id']}" |
| ) |
| return text |
|
|
|
|
| def _download_ia(source: dict, requests) -> str: |
| """Fetch plain text from Internet Archive.""" |
| from sources.ia_search import get_ia_text |
|
|
| identifier = source.get("identifier") or source.get("id") |
| if not identifier: |
| raise ValueError(f"No 'identifier' field for IA source: {source}") |
|
|
| log.info(" Fetching IA text for: %s", identifier) |
| text = get_ia_text(identifier) |
|
|
| if len(text) < 1000: |
| raise ValueError( |
| f"IA body suspiciously short ({len(text)} chars) for {identifier}" |
| ) |
| return text |
|
|
|
|
| DOWNLOADER_MAP = { |
| "gutenberg": _download_gutenberg, |
| "mit_classics": _download_mit_classics, |
| "internet_archive": _download_ia, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def download_source(source: dict, requests, dry_run: bool = False) -> dict: |
| """ |
| Download a single source and save it to inbox/. |
| |
| Returns a result dict suitable for inclusion in download_log.json. |
| """ |
| source_id = source["id"] |
| filename = source["filename"] |
| dest = INBOX_DIR / filename |
|
|
| result = { |
| "id": source_id, |
| "filename": filename, |
| "url": source["url"], |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "status": None, |
| "bytes": 0, |
| "error": None, |
| } |
|
|
| if dry_run: |
| log.info("[DRY-RUN] Would download: %s → %s", source_id, dest) |
| result["status"] = "dry_run" |
| return result |
|
|
| source_type = source.get("source_type", "gutenberg") |
| downloader = DOWNLOADER_MAP.get(source_type) |
| if downloader is None: |
| result["status"] = "skipped" |
| result["error"] = f"Unknown source_type '{source_type}'" |
| log.warning(" Skipping %s: %s", source_id, result["error"]) |
| return result |
|
|
| try: |
| text = downloader(source, requests) |
| INBOX_DIR.mkdir(parents=True, exist_ok=True) |
| dest.write_text(text, encoding="utf-8") |
| byte_count = dest.stat().st_size |
| result["status"] = "ok" |
| result["bytes"] = byte_count |
| log.info(" Saved %s (%s bytes)", dest.name, f"{byte_count:,}") |
| except Exception as exc: |
| result["status"] = "error" |
| result["error"] = str(exc) |
| log.error(" FAILED %s: %s", source_id, exc) |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| def load_download_log() -> list: |
| if LOG_PATH.exists(): |
| try: |
| with LOG_PATH.open(encoding="utf-8") as fh: |
| return json.load(fh) |
| except (json.JSONDecodeError, OSError): |
| return [] |
| return [] |
|
|
|
|
| def save_download_log(entries: list) -> None: |
| with LOG_PATH.open("w", encoding="utf-8") as fh: |
| json.dump(entries, fh, indent=2, ensure_ascii=False) |
|
|
|
|
| |
| |
| |
|
|
| CATEGORY_COLOURS = { |
| "trivium": "\033[96m", |
| "quadrivium": "\033[93m", |
| "bridging": "\033[95m", |
| } |
| RESET = "\033[0m" |
| BOLD = "\033[1m" |
|
|
|
|
| def _coloured(text: str, colour: str) -> str: |
| """Apply ANSI colour if stdout is a TTY.""" |
| if sys.stdout.isatty(): |
| return f"{colour}{text}{RESET}" |
| return text |
|
|
|
|
| def print_source_list(sources: list[dict]) -> None: |
| """Pretty-print the full catalogue of sources.""" |
| arts: dict[str, list[dict]] = {} |
| for s in sources: |
| arts.setdefault(s.get("art", "unknown"), []).append(s) |
|
|
| total_words = sum(s.get("estimated_words", 0) for s in sources) |
| enabled_count = sum(1 for s in sources if s.get("enabled", True)) |
|
|
| print(f"\n{BOLD}MicroGPT Classical Corpus - {len(sources)} sources " |
| f"({enabled_count} enabled, ~{total_words:,} words){RESET}\n") |
|
|
| for art, art_sources in sorted(arts.items()): |
| cat = art_sources[0].get("category", "") |
| colour = CATEGORY_COLOURS.get(cat, "") |
| label = _coloured(f"[{art.upper()}]", BOLD + colour) |
| print(f" {label}") |
| for s in art_sources: |
| enabled_marker = " " if s.get("enabled", True) else " [DISABLED] " |
| est = s.get("estimated_words", 0) |
| print( |
| f" {enabled_marker}{s['author']}: {s['title']}" |
| f" ({est:,} words) -> {s['filename']}" |
| ) |
| print() |
|
|
|
|
| def print_summary(results: list[dict]) -> None: |
| """Print a download-run summary table.""" |
| ok = [r for r in results if r["status"] == "ok"] |
| failed = [r for r in results if r["status"] == "error"] |
| skipped = [r for r in results if r["status"] in ("skipped", "dry_run")] |
|
|
| total_bytes = sum(r["bytes"] for r in ok) |
|
|
| print(f"\n{'-' * 60}") |
| print(f" Downloaded : {len(ok)}") |
| print(f" Failed : {len(failed)}") |
| print(f" Skipped : {len(skipped)}") |
| print(f" Total size : {total_bytes / 1024:.1f} KB ({total_bytes:,} bytes)") |
|
|
| if failed: |
| print(f"\n {BOLD}Failures:{RESET}") |
| for r in failed: |
| print(f" - {r['id']}: {r['error']}") |
| print(f"{'-' * 60}\n") |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Download classical Trivium/Quadrivium texts for MicroGPT training.", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=__doc__, |
| ) |
| parser.add_argument( |
| "--list", |
| action="store_true", |
| help="List all available sources and exit", |
| ) |
| parser.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Show what would be downloaded without actually downloading", |
| ) |
| parser.add_argument( |
| "--art", |
| metavar="ART", |
| help="Download only sources for the given art (e.g. logic, rhetoric, geometry)", |
| ) |
| parser.add_argument( |
| "--include-disabled", |
| action="store_true", |
| help="Include sources marked enabled=false in the manifest", |
| ) |
| args = parser.parse_args() |
|
|
| |
| manifest = load_manifest() |
| all_sources = manifest.get("sources", []) |
|
|
| |
| if args.list: |
| print_source_list(all_sources) |
| return |
|
|
| |
| enabled_only = not args.include_disabled |
| sources = filter_sources(all_sources, art=args.art, enabled_only=enabled_only) |
|
|
| if not sources: |
| filter_desc = f" with art='{args.art}'" if args.art else "" |
| log.warning("No enabled sources found%s. Use --list to see all.", filter_desc) |
| return |
|
|
| if args.dry_run: |
| log.info("DRY RUN - nothing will be written") |
|
|
| |
| art_desc = f" (art={args.art})" if args.art else "" |
| log.info( |
| "Processing %d source(s)%s - inbox: %s", |
| len(sources), art_desc, INBOX_DIR |
| ) |
|
|
| requests = _require_requests() |
| if not args.dry_run: |
| _require_bs4() |
|
|
| results: list[dict] = [] |
| for i, source in enumerate(sources, start=1): |
| log.info( |
| "[%d/%d] %s - %s (%s)", |
| i, len(sources), |
| source["id"], |
| source["title"], |
| source["source_type"], |
| ) |
|
|
| result = download_source(source, requests, dry_run=args.dry_run) |
| results.append(result) |
|
|
| |
| if i < len(sources) and not args.dry_run: |
| time.sleep(INTER_REQUEST_DELAY) |
|
|
| |
| if not args.dry_run: |
| log_entries = load_download_log() |
| log_entries.extend(results) |
| save_download_log(log_entries) |
| log.info("Download log updated: %s", LOG_PATH) |
|
|
| print_summary(results) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|