ai-agent-app / scripts /audit_copyright.py
MinhTai's picture
deploy: 5e8a959
f0a6334
"""Task 9: Detect near-verbatim extractions from Paul's Online Math Notes.
Fetches the actual Paul's Notes pages that were crawled (from crawl_progress.json),
extracts reference sentences, then checks every manual wiki unit for similarity.
Usage:
python3 scripts/audit_copyright.py --threshold 0.75 > flagged_copyright.csv
python3 scripts/audit_copyright.py --threshold 0.75 --reviewed-ok reviewed_ok.csv
python3 scripts/audit_copyright.py --threshold 0.75 --no-fetch # use cached sentences
"""
import argparse
import csv
import difflib
import json
import re
import sqlite3
import sys
from pathlib import Path
try:
import httpx
from bs4 import BeautifulSoup
_HTTP_AVAILABLE = True
except ImportError:
_HTTP_AVAILABLE = False
DB_PATH = "math_wiki.db"
PROGRESS_FILE = Path("scripts/crawl_progress.json")
SENTENCE_CACHE = Path("scripts/.pauls_sentences_cache.json")
PAULS_HOST = "tutorial.math.lamar.edu"
# Topics whose manual units might have come from Paul's Notes.
PAULS_TOPICS = frozenset({"algebra", "calculus", "trigonometry", "differential_equations"})
_HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; MathWikiCrawler/1.0)"}
def _load_pauls_urls() -> list[str]:
if not PROGRESS_FILE.exists():
return []
seen = json.loads(PROGRESS_FILE.read_text())
return [u for u in seen if PAULS_HOST in u]
def _fetch_sentences(urls: list[str]) -> list[str]:
"""Fetch all Paul's Notes pages and extract content sentences."""
if not _HTTP_AVAILABLE:
print("httpx/bs4 not available — cannot fetch reference text.", file=sys.stderr)
return []
sentences: list[str] = []
print(f"Fetching {len(urls)} Paul's Notes pages for reference text...", file=sys.stderr)
for url in urls:
try:
r = httpx.get(url, headers=_HEADERS, timeout=15, follow_redirects=True)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
for tag in ["script", "style", "nav", "header", "footer"]:
for el in soup.find_all(tag):
el.decompose()
text = soup.get_text(" ")
page_sents = [
s.strip()
for s in re.split(r"(?<=[.!?])\s+", text)
if 40 <= len(s.strip()) <= 300
]
sentences.extend(page_sents)
except Exception as exc:
print(f" warn: {url}: {exc}", file=sys.stderr)
# Deduplicate while preserving order
seen: set[str] = set()
unique = []
for s in sentences:
if s not in seen:
seen.add(s)
unique.append(s)
print(f"Collected {len(unique)} unique reference sentences.", file=sys.stderr)
return unique
def _load_or_fetch_sentences(no_fetch: bool) -> list[str]:
if SENTENCE_CACHE.exists():
cached = json.loads(SENTENCE_CACHE.read_text())
if cached:
print(f"Using {len(cached)} cached reference sentences.", file=sys.stderr)
return cached
if no_fetch:
print("--no-fetch specified but no cache found. Run without --no-fetch first.", file=sys.stderr)
return []
urls = _load_pauls_urls()
if not urls:
print("No Paul's Notes URLs in crawl_progress.json.", file=sys.stderr)
return []
sentences = _fetch_sentences(urls)
SENTENCE_CACHE.write_text(json.dumps(sentences, ensure_ascii=False))
return sentences
def _word_ngrams(text: str, n: int = 5) -> set[tuple]:
words = re.sub(r"[^a-z0-9 ]", " ", text.lower()).split()
return {tuple(words[i:i+n]) for i in range(len(words) - n + 1)}
def _build_reference_index(references: list[str], n: int = 5) -> tuple[set[tuple], dict[tuple, str]]:
"""Build a set of all reference n-grams and a map ngram→representative sentence."""
ngram_set: set[tuple] = set()
ngram_to_sent: dict[tuple, str] = {}
for sent in references:
for ng in _word_ngrams(sent, n):
ngram_set.add(ng)
ngram_to_sent.setdefault(ng, sent)
return ngram_set, ngram_to_sent
def _ngram_score(unit_content: str, ref_ngrams: set[tuple], ngram_to_sent: dict[tuple, str], n: int = 5) -> tuple[float, str]:
"""Fraction of unit's n-grams that appear in the reference corpus. O(n) per unit."""
unit_ngrams = _word_ngrams(unit_content, n)
if not unit_ngrams:
return 0.0, ""
hits = unit_ngrams & ref_ngrams
if not hits:
return 0.0, ""
score = len(hits) / len(unit_ngrams)
best_ref = ngram_to_sent.get(next(iter(hits)), "")
return score, best_ref
def _load_reviewed_ok(path: str | None) -> set[str]:
if not path or not Path(path).exists():
return set()
with open(path) as f:
reader = csv.DictReader(f)
return {row["id"] for row in reader if row.get("status") == "reviewed_ok"}
def main(threshold: float, reviewed_ok_path: str | None, no_fetch: bool) -> int:
references = _load_or_fetch_sentences(no_fetch)
if not references:
print("No reference sentences — cannot audit. Exiting.", file=sys.stderr)
return 1
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Audit all manual units in topics that Paul's Notes covers.
topic_placeholders = ",".join("?" * len(PAULS_TOPICS))
rows = conn.execute(
f"SELECT id, topic, subtopic, content FROM wiki_units "
f"WHERE deleted=0 AND source='manual' AND topic IN ({topic_placeholders})",
list(PAULS_TOPICS),
).fetchall()
print(f"Building n-gram index from {len(references)} reference sentences...", file=sys.stderr)
ref_ngrams, ngram_to_sent = _build_reference_index(references)
print(f"Index has {len(ref_ngrams)} unique 5-grams.", file=sys.stderr)
print(f"Checking {len(rows)} manual units in topics: {sorted(PAULS_TOPICS)}", file=sys.stderr)
reviewed_ok = _load_reviewed_ok(reviewed_ok_path)
flagged: list[dict] = []
for row in rows:
if row["id"] in reviewed_ok:
continue
score, matched_ref = _ngram_score(row["content"], ref_ngrams, ngram_to_sent)
if score >= threshold:
flagged.append({
"id": row["id"],
"topic": row["topic"],
"subtopic": row["subtopic"],
"score": f"{score:.3f}",
"matched_ref": matched_ref[:120],
"content": row["content"],
})
writer = csv.DictWriter(
sys.stdout,
fieldnames=["id", "topic", "subtopic", "score", "matched_ref", "content"],
)
writer.writeheader()
writer.writerows(flagged)
print(
f"Flagged: {len(flagged)} / {len(rows)} units at threshold {threshold}",
file=sys.stderr,
)
conn.close()
return 0 if len(flagged) == 0 else 1
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--threshold", type=float, default=0.75)
parser.add_argument("--reviewed-ok", default=None, help="CSV of reviewed_ok IDs to exclude")
parser.add_argument("--no-fetch", action="store_true", help="Use cached sentences only")
args = parser.parse_args()
sys.exit(main(args.threshold, args.reviewed_ok, args.no_fetch))