import asyncio import json import logging import httpx from openai import AsyncOpenAI from crawl.topic_map import AOPS_QUERIES, AOPS_CATEGORIES, PAULS_INDEX_URLS, GENERIC_HTML_SOURCES from crawl.cleaner import html_to_chunks from crawl.http_utils import close_client from crawl.progress import load_seen, mark_seen, flush_seen, reset from crawl.sources.aops import fetch_aops from crawl.sources.pauls import fetch_pauls from crawl.sources.generic_html import fetch_generic_html logger = logging.getLogger(__name__) async def fetch_gap_topics(api_base: str, limit: int) -> list[str]: try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(f"{api_base}/math-gaps") resp.raise_for_status() data = resp.json() labels = [ item["topic"] for item in data if item.get("topic") in AOPS_QUERIES ][:limit] if labels: return labels except Exception as exc: logger.warning("Could not fetch /math-gaps: %s — using all topics", exc) return list(AOPS_QUERIES.keys()) async def crawl_and_ingest( client: AsyncOpenAI, topics: list[str], sources: list[str] = ("aops", "pauls", "generic"), dry_run: bool = False, reset_progress: bool = False, pool=None, ) -> dict[str, int]: from app.math_wiki.agents.concept_ingest import concept_ingest if reset_progress: reset() seen = load_seen() stats = { "topics": len(topics), "pages_fetched": 0, "chunks_sent": 0, "wiki_units_added": 0, # LLM output count; may overcount on re-crawl due to DB upserts "skipped_seen": 0, "errors": 0, } try: for topic in topics: # Collect (url, html, source_tag) tuples from all enabled sources all_pages: list[tuple[str, str, str]] = [] if "aops" in sources: pages, skipped = await fetch_aops( AOPS_QUERIES.get(topic, []), seen, category=AOPS_CATEGORIES.get(topic), ) all_pages += [(url, html, "aops") for url, html in pages] stats["skipped_seen"] += skipped if "pauls" in sources: pages, skipped = await fetch_pauls(PAULS_INDEX_URLS.get(topic), seen) all_pages += [(url, html, "pauls") for url, html in pages] stats["skipped_seen"] += skipped if "generic" in sources: for source_cfg in GENERIC_HTML_SOURCES.get(topic, []): pages, skipped = await fetch_generic_html( index_url=source_cfg["index_url"], seen=seen, link_pattern=source_cfg["link_pattern"], max_pages=source_cfg.get("max_pages", 150), crawl_delay=source_cfg.get("crawl_delay_seconds", 0.0), ) tag = source_cfg["source_tag"] all_pages += [(url, html, tag) for url, html in pages] stats["skipped_seen"] += skipped topic_units = 0 for page_idx, (url, html, page_source) in enumerate(all_pages, 1): chunks = html_to_chunks(html, source=page_source) stats["pages_fetched"] += 1 page_units = 0 print(f" [{topic}/{page_source}] page {page_idx}/{len(all_pages)}: {url[:80]}", flush=True) print(f" chunks: {len(chunks)}", flush=True) for chunk in chunks: stats["chunks_sent"] += 1 if not dry_run: try: out = await concept_ingest(client, chunk, pool=pool, source=page_source, source_url=url, fallback_topic=topic) added = len(out.wiki_units) stats["wiki_units_added"] += added topic_units += added page_units += added except (ValueError, json.JSONDecodeError) as exc: logger.warning("concept_ingest error: %s", exc) stats["errors"] += 1 if not dry_run: mark_seen(url) seen.add(url) print(f" → {page_units} units added (total: {stats['wiki_units_added']})", flush=True) if not dry_run: flush_seen() source_counts = {} for _, _, tag in all_pages: source_counts[tag] = source_counts.get(tag, 0) + 1 counts_str = " | ".join(f"{tag}: {n}" for tag, n in source_counts.items()) print(f"[{topic}] pages — {counts_str} | units: {topic_units}") finally: await close_client() return stats