pre-punctuation-processor / sources /download_classics.py
LisaMegaWatts's picture
Upload sources/download_classics.py with huggingface_hub
b9e58da verified
"""
download_classics.py
====================
Curated classical text downloader for MicroGPT Trivium/Quadrivium training corpus.
Reads source_manifest.json and fetches each enabled source:
- Project Gutenberg : downloads plain .txt files directly
- MIT Internet Classics Archive: downloads HTML, strips markup with BeautifulSoup
Downloaded files land in pipeline/inbox/ so the main pipeline picks them up.
A download_log.json is written alongside this script recording every run.
Usage
-----
python download_classics.py # download all enabled sources
python download_classics.py --list # list all available sources
python download_classics.py --dry-run # show what would download without doing it
python download_classics.py --art logic # download only logic texts
Requirements
------------
pip install requests beautifulsoup4
"""
import argparse
import json
import logging
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
MANIFEST_PATH = SCRIPT_DIR / "source_manifest.json"
INBOX_DIR = SCRIPT_DIR.parent / "inbox"
LOG_PATH = SCRIPT_DIR / "download_log.json"
# ---------------------------------------------------------------------------
# HTTP configuration
# ---------------------------------------------------------------------------
HEADERS = {
"User-Agent": "MicroGPT-Classics-Downloader/1.0",
"Accept": "text/html,text/plain,*/*",
}
REQUEST_TIMEOUT = 30 # seconds per HTTP request
INTER_REQUEST_DELAY = 1.0 # seconds between downloads (rate-limit courtesy)
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("download_classics")
# ---------------------------------------------------------------------------
# Lazy imports with helpful error messages
# ---------------------------------------------------------------------------
def _require_requests():
try:
import requests
return requests
except ImportError:
log.error("'requests' is not installed. Run: pip install requests")
sys.exit(1)
def _require_bs4():
try:
from bs4 import BeautifulSoup
return BeautifulSoup
except ImportError:
log.error("'beautifulsoup4' is not installed. Run: pip install beautifulsoup4")
sys.exit(1)
# ---------------------------------------------------------------------------
# Manifest loading
# ---------------------------------------------------------------------------
def load_manifest() -> dict:
if not MANIFEST_PATH.exists():
log.error("Manifest not found: %s", MANIFEST_PATH)
sys.exit(1)
with MANIFEST_PATH.open(encoding="utf-8") as fh:
manifest = json.load(fh)
return manifest
def filter_sources(sources: list[dict], *, art: str | None = None, enabled_only: bool = True) -> list[dict]:
result = sources
if enabled_only:
result = [s for s in result if s.get("enabled", True)]
if art:
result = [s for s in result if s.get("art", "").lower() == art.lower()]
return result
# ---------------------------------------------------------------------------
# Text cleaning helpers
# ---------------------------------------------------------------------------
def _strip_gutenberg_boilerplate(text: str) -> str:
"""Remove Project Gutenberg header and footer legalese from raw .txt files."""
# The header ends at a line matching "*** START OF..." or similar
start_markers = [
r"\*\*\* START OF (THE|THIS) PROJECT GUTENBERG",
r"\*\*\*START OF (THE|THIS) PROJECT GUTENBERG",
]
end_markers = [
r"\*\*\* END OF (THE|THIS) PROJECT GUTENBERG",
r"\*\*\*END OF (THE|THIS) PROJECT GUTENBERG",
]
lines = text.splitlines()
start_idx = 0
end_idx = len(lines)
for i, line in enumerate(lines):
for pat in start_markers:
if re.search(pat, line, re.IGNORECASE):
start_idx = i + 1
break
for i, line in enumerate(lines):
for pat in end_markers:
if re.search(pat, line, re.IGNORECASE):
end_idx = i
break
body = lines[start_idx:end_idx]
return "\n".join(body).strip()
def _extract_mit_classics_text(html: str, source_id: str) -> str:
"""
Strip MIT Classics HTML down to the main prose body.
MIT Classics pages wrap the actual text inside a fairly simple layout:
navigation links at top, a <pre> or body paragraphs in the middle, and
a small footer. We grab everything inside <body>, remove <script>,
<style>, <a> navigation blocks, and return clean plain text.
"""
BeautifulSoup = _require_bs4()
soup = BeautifulSoup(html, "html.parser")
# Kill noise elements
for tag in soup(["script", "style", "head", "nav"]):
tag.decompose()
# MIT Classics often wraps text in a <pre> block
pre = soup.find("pre")
if pre:
raw = pre.get_text(separator="\n")
else:
# Fall back to extracting from the full body
body = soup.find("body") or soup
# Remove navbars: anchors that are just navigation images or short links
for a in body.find_all("a"):
# Keep inline text links; drop bare image links
if a.find("img"):
a.decompose()
raw = body.get_text(separator="\n")
# Collapse excessive blank lines (more than two in a row → one blank)
cleaned = re.sub(r"\n{3,}", "\n\n", raw)
return cleaned.strip()
# ---------------------------------------------------------------------------
# Downloaders
# ---------------------------------------------------------------------------
def _download_gutenberg(source: dict, requests) -> str:
"""Fetch a plain-text Gutenberg file and strip boilerplate."""
url = source["url"]
log.info(" GET %s", url)
resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
# Gutenberg serves UTF-8 but sometimes declares latin-1; detect from BOM
try:
text = resp.content.decode("utf-8-sig")
except UnicodeDecodeError:
text = resp.content.decode("latin-1")
cleaned = _strip_gutenberg_boilerplate(text)
if len(cleaned) < 1000:
raise ValueError(
f"Gutenberg body suspiciously short ({len(cleaned)} chars) for {source['id']}; "
"boilerplate stripping may have failed"
)
return cleaned
def _download_mit_classics(source: dict, requests) -> str:
"""Fetch an MIT Classics text file (.mb.txt) or HTML page."""
url = source["url"]
log.info(" GET %s", url)
resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
# .mb.txt files are plain text - no HTML parsing needed
if url.endswith(".mb.txt") or url.endswith(".txt"):
text = resp.content.decode("utf-8", errors="replace").strip()
else:
html = resp.content.decode("utf-8", errors="replace")
text = _extract_mit_classics_text(html, source["id"])
if len(text) < 500:
raise ValueError(
f"MIT Classics body suspiciously short ({len(text)} chars) for {source['id']}"
)
return text
def _download_ia(source: dict, requests) -> str:
"""Fetch plain text from Internet Archive."""
from sources.ia_search import get_ia_text
identifier = source.get("identifier") or source.get("id")
if not identifier:
raise ValueError(f"No 'identifier' field for IA source: {source}")
log.info(" Fetching IA text for: %s", identifier)
text = get_ia_text(identifier)
if len(text) < 1000:
raise ValueError(
f"IA body suspiciously short ({len(text)} chars) for {identifier}"
)
return text
DOWNLOADER_MAP = {
"gutenberg": _download_gutenberg,
"mit_classics": _download_mit_classics,
"internet_archive": _download_ia,
}
# ---------------------------------------------------------------------------
# Core download logic
# ---------------------------------------------------------------------------
def download_source(source: dict, requests, dry_run: bool = False) -> dict:
"""
Download a single source and save it to inbox/.
Returns a result dict suitable for inclusion in download_log.json.
"""
source_id = source["id"]
filename = source["filename"]
dest = INBOX_DIR / filename
result = {
"id": source_id,
"filename": filename,
"url": source["url"],
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": None,
"bytes": 0,
"error": None,
}
if dry_run:
log.info("[DRY-RUN] Would download: %s → %s", source_id, dest)
result["status"] = "dry_run"
return result
source_type = source.get("source_type", "gutenberg")
downloader = DOWNLOADER_MAP.get(source_type)
if downloader is None:
result["status"] = "skipped"
result["error"] = f"Unknown source_type '{source_type}'"
log.warning(" Skipping %s: %s", source_id, result["error"])
return result
try:
text = downloader(source, requests)
INBOX_DIR.mkdir(parents=True, exist_ok=True)
dest.write_text(text, encoding="utf-8")
byte_count = dest.stat().st_size
result["status"] = "ok"
result["bytes"] = byte_count
log.info(" Saved %s (%s bytes)", dest.name, f"{byte_count:,}")
except Exception as exc:
result["status"] = "error"
result["error"] = str(exc)
log.error(" FAILED %s: %s", source_id, exc)
return result
# ---------------------------------------------------------------------------
# Log persistence
# ---------------------------------------------------------------------------
def load_download_log() -> list:
if LOG_PATH.exists():
try:
with LOG_PATH.open(encoding="utf-8") as fh:
return json.load(fh)
except (json.JSONDecodeError, OSError):
return []
return []
def save_download_log(entries: list) -> None:
with LOG_PATH.open("w", encoding="utf-8") as fh:
json.dump(entries, fh, indent=2, ensure_ascii=False)
# ---------------------------------------------------------------------------
# CLI presentation helpers
# ---------------------------------------------------------------------------
CATEGORY_COLOURS = {
"trivium": "\033[96m", # cyan
"quadrivium": "\033[93m", # yellow
"bridging": "\033[95m", # magenta
}
RESET = "\033[0m"
BOLD = "\033[1m"
def _coloured(text: str, colour: str) -> str:
"""Apply ANSI colour if stdout is a TTY."""
if sys.stdout.isatty():
return f"{colour}{text}{RESET}"
return text
def print_source_list(sources: list[dict]) -> None:
"""Pretty-print the full catalogue of sources."""
arts: dict[str, list[dict]] = {}
for s in sources:
arts.setdefault(s.get("art", "unknown"), []).append(s)
total_words = sum(s.get("estimated_words", 0) for s in sources)
enabled_count = sum(1 for s in sources if s.get("enabled", True))
print(f"\n{BOLD}MicroGPT Classical Corpus - {len(sources)} sources "
f"({enabled_count} enabled, ~{total_words:,} words){RESET}\n")
for art, art_sources in sorted(arts.items()):
cat = art_sources[0].get("category", "")
colour = CATEGORY_COLOURS.get(cat, "")
label = _coloured(f"[{art.upper()}]", BOLD + colour)
print(f" {label}")
for s in art_sources:
enabled_marker = " " if s.get("enabled", True) else " [DISABLED] "
est = s.get("estimated_words", 0)
print(
f" {enabled_marker}{s['author']}: {s['title']}"
f" ({est:,} words) -> {s['filename']}"
)
print()
def print_summary(results: list[dict]) -> None:
"""Print a download-run summary table."""
ok = [r for r in results if r["status"] == "ok"]
failed = [r for r in results if r["status"] == "error"]
skipped = [r for r in results if r["status"] in ("skipped", "dry_run")]
total_bytes = sum(r["bytes"] for r in ok)
print(f"\n{'-' * 60}")
print(f" Downloaded : {len(ok)}")
print(f" Failed : {len(failed)}")
print(f" Skipped : {len(skipped)}")
print(f" Total size : {total_bytes / 1024:.1f} KB ({total_bytes:,} bytes)")
if failed:
print(f"\n {BOLD}Failures:{RESET}")
for r in failed:
print(f" - {r['id']}: {r['error']}")
print(f"{'-' * 60}\n")
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Download classical Trivium/Quadrivium texts for MicroGPT training.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--list",
action="store_true",
help="List all available sources and exit",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be downloaded without actually downloading",
)
parser.add_argument(
"--art",
metavar="ART",
help="Download only sources for the given art (e.g. logic, rhetoric, geometry)",
)
parser.add_argument(
"--include-disabled",
action="store_true",
help="Include sources marked enabled=false in the manifest",
)
args = parser.parse_args()
# Load manifest
manifest = load_manifest()
all_sources = manifest.get("sources", [])
# --list just prints and exits
if args.list:
print_source_list(all_sources)
return
# Determine which sources to process
enabled_only = not args.include_disabled
sources = filter_sources(all_sources, art=args.art, enabled_only=enabled_only)
if not sources:
filter_desc = f" with art='{args.art}'" if args.art else ""
log.warning("No enabled sources found%s. Use --list to see all.", filter_desc)
return
if args.dry_run:
log.info("DRY RUN - nothing will be written")
# Print what we intend to do
art_desc = f" (art={args.art})" if args.art else ""
log.info(
"Processing %d source(s)%s - inbox: %s",
len(sources), art_desc, INBOX_DIR
)
requests = _require_requests()
if not args.dry_run:
_require_bs4() # validate early so we don't fail mid-run
results: list[dict] = []
for i, source in enumerate(sources, start=1):
log.info(
"[%d/%d] %s - %s (%s)",
i, len(sources),
source["id"],
source["title"],
source["source_type"],
)
result = download_source(source, requests, dry_run=args.dry_run)
results.append(result)
# Rate-limit: wait between requests (skip after last one)
if i < len(sources) and not args.dry_run:
time.sleep(INTER_REQUEST_DELAY)
# Persist log
if not args.dry_run:
log_entries = load_download_log()
log_entries.extend(results)
save_download_log(log_entries)
log.info("Download log updated: %s", LOG_PATH)
print_summary(results)
if __name__ == "__main__":
main()