File size: 4,975 Bytes
ae227b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import json
import logging
import httpx
from openai import AsyncOpenAI

from crawl.topic_map import AOPS_QUERIES, AOPS_CATEGORIES, PAULS_INDEX_URLS, GENERIC_HTML_SOURCES
from crawl.cleaner import html_to_chunks
from crawl.http_utils import close_client
from crawl.progress import load_seen, mark_seen, flush_seen, reset
from crawl.sources.aops import fetch_aops
from crawl.sources.pauls import fetch_pauls
from crawl.sources.generic_html import fetch_generic_html

logger = logging.getLogger(__name__)


async def fetch_gap_topics(api_base: str, limit: int) -> list[str]:
    try:
        async with httpx.AsyncClient(timeout=10) as client:
            resp = await client.get(f"{api_base}/math-gaps")
            resp.raise_for_status()
            data = resp.json()
            labels = [
                item["topic"]
                for item in data
                if item.get("topic") in AOPS_QUERIES
            ][:limit]
            if labels:
                return labels
    except Exception as exc:
        logger.warning("Could not fetch /math-gaps: %s — using all topics", exc)
    return list(AOPS_QUERIES.keys())


async def crawl_and_ingest(
    client: AsyncOpenAI,
    topics: list[str],
    sources: list[str] = ("aops", "pauls", "generic"),
    dry_run: bool = False,
    reset_progress: bool = False,
    pool=None,
) -> dict[str, int]:
    from app.math_wiki.agents.concept_ingest import concept_ingest

    if reset_progress:
        reset()

    seen = load_seen()
    stats = {
        "topics": len(topics),
        "pages_fetched": 0,
        "chunks_sent": 0,
        "wiki_units_added": 0,  # LLM output count; may overcount on re-crawl due to DB upserts
        "skipped_seen": 0,
        "errors": 0,
    }

    try:
        for topic in topics:
            # Collect (url, html, source_tag) tuples from all enabled sources
            all_pages: list[tuple[str, str, str]] = []

            if "aops" in sources:
                pages, skipped = await fetch_aops(
                    AOPS_QUERIES.get(topic, []),
                    seen,
                    category=AOPS_CATEGORIES.get(topic),
                )
                all_pages += [(url, html, "aops") for url, html in pages]
                stats["skipped_seen"] += skipped

            if "pauls" in sources:
                pages, skipped = await fetch_pauls(PAULS_INDEX_URLS.get(topic), seen)
                all_pages += [(url, html, "pauls") for url, html in pages]
                stats["skipped_seen"] += skipped

            if "generic" in sources:
                for source_cfg in GENERIC_HTML_SOURCES.get(topic, []):
                    pages, skipped = await fetch_generic_html(
                        index_url=source_cfg["index_url"],
                        seen=seen,
                        link_pattern=source_cfg["link_pattern"],
                        max_pages=source_cfg.get("max_pages", 150),
                        crawl_delay=source_cfg.get("crawl_delay_seconds", 0.0),
                    )
                    tag = source_cfg["source_tag"]
                    all_pages += [(url, html, tag) for url, html in pages]
                    stats["skipped_seen"] += skipped

            topic_units = 0

            for page_idx, (url, html, page_source) in enumerate(all_pages, 1):
                chunks = html_to_chunks(html, source=page_source)
                stats["pages_fetched"] += 1
                page_units = 0
                print(f"  [{topic}/{page_source}] page {page_idx}/{len(all_pages)}: {url[:80]}", flush=True)
                print(f"    chunks: {len(chunks)}", flush=True)
                for chunk in chunks:
                    stats["chunks_sent"] += 1
                    if not dry_run:
                        try:
                            out = await concept_ingest(client, chunk, pool=pool, source=page_source, source_url=url, fallback_topic=topic)
                            added = len(out.wiki_units)
                            stats["wiki_units_added"] += added
                            topic_units += added
                            page_units += added
                        except (ValueError, json.JSONDecodeError) as exc:
                            logger.warning("concept_ingest error: %s", exc)
                            stats["errors"] += 1
                if not dry_run:
                    mark_seen(url)
                    seen.add(url)
                    print(f"    → {page_units} units added (total: {stats['wiki_units_added']})", flush=True)

            if not dry_run:
                flush_seen()

            source_counts = {}
            for _, _, tag in all_pages:
                source_counts[tag] = source_counts.get(tag, 0) + 1
            counts_str = " | ".join(f"{tag}: {n}" for tag, n in source_counts.items())
            print(f"[{topic}] pages — {counts_str} | units: {topic_units}")

    finally:
        await close_client()

    return stats