"""Task 9: Detect near-verbatim extractions from Paul's Online Math Notes. Fetches the actual Paul's Notes pages that were crawled (from crawl_progress.json), extracts reference sentences, then checks every manual wiki unit for similarity. Usage: python3 scripts/audit_copyright.py --threshold 0.75 > flagged_copyright.csv python3 scripts/audit_copyright.py --threshold 0.75 --reviewed-ok reviewed_ok.csv python3 scripts/audit_copyright.py --threshold 0.75 --no-fetch # use cached sentences """ import argparse import csv import difflib import json import re import sqlite3 import sys from pathlib import Path try: import httpx from bs4 import BeautifulSoup _HTTP_AVAILABLE = True except ImportError: _HTTP_AVAILABLE = False DB_PATH = "math_wiki.db" PROGRESS_FILE = Path("scripts/crawl_progress.json") SENTENCE_CACHE = Path("scripts/.pauls_sentences_cache.json") PAULS_HOST = "tutorial.math.lamar.edu" # Topics whose manual units might have come from Paul's Notes. PAULS_TOPICS = frozenset({"algebra", "calculus", "trigonometry", "differential_equations"}) _HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; MathWikiCrawler/1.0)"} def _load_pauls_urls() -> list[str]: if not PROGRESS_FILE.exists(): return [] seen = json.loads(PROGRESS_FILE.read_text()) return [u for u in seen if PAULS_HOST in u] def _fetch_sentences(urls: list[str]) -> list[str]: """Fetch all Paul's Notes pages and extract content sentences.""" if not _HTTP_AVAILABLE: print("httpx/bs4 not available — cannot fetch reference text.", file=sys.stderr) return [] sentences: list[str] = [] print(f"Fetching {len(urls)} Paul's Notes pages for reference text...", file=sys.stderr) for url in urls: try: r = httpx.get(url, headers=_HEADERS, timeout=15, follow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") for tag in ["script", "style", "nav", "header", "footer"]: for el in soup.find_all(tag): el.decompose() text = soup.get_text(" ") page_sents = [ s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if 40 <= len(s.strip()) <= 300 ] sentences.extend(page_sents) except Exception as exc: print(f" warn: {url}: {exc}", file=sys.stderr) # Deduplicate while preserving order seen: set[str] = set() unique = [] for s in sentences: if s not in seen: seen.add(s) unique.append(s) print(f"Collected {len(unique)} unique reference sentences.", file=sys.stderr) return unique def _load_or_fetch_sentences(no_fetch: bool) -> list[str]: if SENTENCE_CACHE.exists(): cached = json.loads(SENTENCE_CACHE.read_text()) if cached: print(f"Using {len(cached)} cached reference sentences.", file=sys.stderr) return cached if no_fetch: print("--no-fetch specified but no cache found. Run without --no-fetch first.", file=sys.stderr) return [] urls = _load_pauls_urls() if not urls: print("No Paul's Notes URLs in crawl_progress.json.", file=sys.stderr) return [] sentences = _fetch_sentences(urls) SENTENCE_CACHE.write_text(json.dumps(sentences, ensure_ascii=False)) return sentences def _word_ngrams(text: str, n: int = 5) -> set[tuple]: words = re.sub(r"[^a-z0-9 ]", " ", text.lower()).split() return {tuple(words[i:i+n]) for i in range(len(words) - n + 1)} def _build_reference_index(references: list[str], n: int = 5) -> tuple[set[tuple], dict[tuple, str]]: """Build a set of all reference n-grams and a map ngram→representative sentence.""" ngram_set: set[tuple] = set() ngram_to_sent: dict[tuple, str] = {} for sent in references: for ng in _word_ngrams(sent, n): ngram_set.add(ng) ngram_to_sent.setdefault(ng, sent) return ngram_set, ngram_to_sent def _ngram_score(unit_content: str, ref_ngrams: set[tuple], ngram_to_sent: dict[tuple, str], n: int = 5) -> tuple[float, str]: """Fraction of unit's n-grams that appear in the reference corpus. O(n) per unit.""" unit_ngrams = _word_ngrams(unit_content, n) if not unit_ngrams: return 0.0, "" hits = unit_ngrams & ref_ngrams if not hits: return 0.0, "" score = len(hits) / len(unit_ngrams) best_ref = ngram_to_sent.get(next(iter(hits)), "") return score, best_ref def _load_reviewed_ok(path: str | None) -> set[str]: if not path or not Path(path).exists(): return set() with open(path) as f: reader = csv.DictReader(f) return {row["id"] for row in reader if row.get("status") == "reviewed_ok"} def main(threshold: float, reviewed_ok_path: str | None, no_fetch: bool) -> int: references = _load_or_fetch_sentences(no_fetch) if not references: print("No reference sentences — cannot audit. Exiting.", file=sys.stderr) return 1 conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # Audit all manual units in topics that Paul's Notes covers. topic_placeholders = ",".join("?" * len(PAULS_TOPICS)) rows = conn.execute( f"SELECT id, topic, subtopic, content FROM wiki_units " f"WHERE deleted=0 AND source='manual' AND topic IN ({topic_placeholders})", list(PAULS_TOPICS), ).fetchall() print(f"Building n-gram index from {len(references)} reference sentences...", file=sys.stderr) ref_ngrams, ngram_to_sent = _build_reference_index(references) print(f"Index has {len(ref_ngrams)} unique 5-grams.", file=sys.stderr) print(f"Checking {len(rows)} manual units in topics: {sorted(PAULS_TOPICS)}", file=sys.stderr) reviewed_ok = _load_reviewed_ok(reviewed_ok_path) flagged: list[dict] = [] for row in rows: if row["id"] in reviewed_ok: continue score, matched_ref = _ngram_score(row["content"], ref_ngrams, ngram_to_sent) if score >= threshold: flagged.append({ "id": row["id"], "topic": row["topic"], "subtopic": row["subtopic"], "score": f"{score:.3f}", "matched_ref": matched_ref[:120], "content": row["content"], }) writer = csv.DictWriter( sys.stdout, fieldnames=["id", "topic", "subtopic", "score", "matched_ref", "content"], ) writer.writeheader() writer.writerows(flagged) print( f"Flagged: {len(flagged)} / {len(rows)} units at threshold {threshold}", file=sys.stderr, ) conn.close() return 0 if len(flagged) == 0 else 1 if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--threshold", type=float, default=0.75) parser.add_argument("--reviewed-ok", default=None, help="CSV of reviewed_ok IDs to exclude") parser.add_argument("--no-fetch", action="store_true", help="Use cached sentences only") args = parser.parse_args() sys.exit(main(args.threshold, args.reviewed_ok, args.no_fetch))