Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Deterministic parser for Karpathy-pattern LLM wikis. | |
| Detects the three-layer pattern (raw sources + wiki markdown + schema), | |
| extracts structure from markdown files, resolves wikilinks, and derives | |
| categories from index.md section headings. | |
| Usage: | |
| python parse-knowledge-base.py <wiki-directory> | |
| Output: | |
| Writes scan-manifest.json to <wiki-directory>/.understand-anything/intermediate/ | |
| """ | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| # --------------------------------------------------------------------------- | |
| # Regex patterns | |
| # --------------------------------------------------------------------------- | |
| WIKILINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|([^\]]+))?\]\]") | |
| FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) | |
| CODE_BLOCK_RE = re.compile(r"```(\w*)") | |
| HEADING_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) | |
| INDEX_SECTION_RE = re.compile(r"^##\s+(.+)$", re.MULTILINE) | |
| # Files that are part of wiki infrastructure, not content articles | |
| INFRA_FILES = {"index.md", "log.md", "claude.md", "agents.md", "soul.md"} | |
| # --------------------------------------------------------------------------- | |
| # Detection: is this a Karpathy-pattern wiki? | |
| # --------------------------------------------------------------------------- | |
| def detect_format(root: Path) -> dict: | |
| """Detect if directory follows the Karpathy LLM wiki three-layer pattern.""" | |
| signals = { | |
| "has_index": (root / "index.md").is_file() or (root / "wiki" / "index.md").is_file(), | |
| "has_log": (root / "log.md").is_file() or (root / "wiki" / "log.md").is_file(), | |
| "has_raw": (root / "raw").is_dir(), | |
| "has_schema": any( | |
| (root / f).is_file() or (root / "wiki" / f).is_file() | |
| for f in ["CLAUDE.md", "AGENTS.md"] | |
| ), | |
| } | |
| # Find the wiki root — could be the directory itself or a wiki/ subdirectory | |
| if (root / "wiki").is_dir(): | |
| wiki_root = root / "wiki" | |
| else: | |
| wiki_root = root | |
| # Count markdown files in the wiki root | |
| md_files = list(wiki_root.rglob("*.md")) | |
| signals["md_count"] = len(md_files) | |
| signals["wiki_root"] = str(wiki_root) | |
| # Primary signal: has index.md + meaningful number of markdown files | |
| if signals["has_index"] and signals["md_count"] >= 3: | |
| signals["detected"] = True | |
| signals["format"] = "karpathy" | |
| else: | |
| signals["detected"] = False | |
| signals["format"] = "unknown" | |
| return signals | |
| # --------------------------------------------------------------------------- | |
| # Markdown extraction helpers | |
| # --------------------------------------------------------------------------- | |
| def extract_frontmatter(text: str) -> dict: | |
| """Extract YAML frontmatter as a simple key-value dict.""" | |
| m = FRONTMATTER_RE.match(text) | |
| if not m: | |
| return {} | |
| fm = {} | |
| for line in m.group(1).split("\n"): | |
| if ":" in line: | |
| key, _, val = line.partition(":") | |
| fm[key.strip()] = val.strip().strip('"').strip("'") | |
| return fm | |
| def extract_wikilinks(text: str) -> list[dict]: | |
| """Extract all [[target]] and [[target|display]] wikilinks.""" | |
| links = [] | |
| for m in WIKILINK_RE.finditer(text): | |
| links.append({ | |
| "target": m.group(1).strip(), | |
| "display": m.group(2).strip() if m.group(2) else None, | |
| }) | |
| return links | |
| def extract_headings(text: str) -> list[dict]: | |
| """Extract all markdown headings with level and text.""" | |
| return [ | |
| {"level": len(m.group(1)), "text": m.group(2).strip()} | |
| for m in HEADING_RE.finditer(text) | |
| ] | |
| def extract_code_blocks(text: str) -> list[str]: | |
| """Extract languages from fenced code blocks.""" | |
| return [m.group(1) for m in CODE_BLOCK_RE.finditer(text) if m.group(1)] | |
| def extract_first_paragraph(text: str) -> str: | |
| """Extract the first non-empty paragraph after frontmatter and H1.""" | |
| # Strip frontmatter | |
| stripped = FRONTMATTER_RE.sub("", text).strip() | |
| if not stripped: | |
| return "" | |
| lines = stripped.split("\n") | |
| def _collect_paragraph(start_lines: list[str]) -> str: | |
| """Collect the first paragraph from the given lines.""" | |
| para: list[str] = [] | |
| for s_raw in start_lines: | |
| s = s_raw.strip() | |
| if not s and not para: | |
| continue # Skip leading blank lines | |
| if not s and para: | |
| break # End of paragraph | |
| if s.startswith(">"): | |
| continue # Skip blockquotes | |
| if re.match(r"^[-*_]{3,}\s*$", s): | |
| continue # Skip horizontal rules | |
| if s.startswith("#"): | |
| if para: | |
| break # End paragraph at next heading | |
| continue # Skip headings before paragraph | |
| para.append(s) | |
| return " ".join(para) | |
| # Try: find first paragraph after H1 | |
| for i, line in enumerate(lines): | |
| if line.strip().startswith("# "): | |
| result = _collect_paragraph(lines[i + 1:]) | |
| if result: | |
| if len(result) > 200: | |
| return result[:197] + "..." | |
| return result | |
| # Fallback: no H1 found, take first paragraph from start | |
| result = _collect_paragraph(lines) | |
| if len(result) > 200: | |
| result = result[:197] + "..." | |
| return result or "" | |
| def extract_h1(text: str) -> str: | |
| """Extract the first H1 heading.""" | |
| for m in HEADING_RE.finditer(text): | |
| if len(m.group(1)) == 1: | |
| # Strip trailing wiki-style decorations like " — subtitle" | |
| return m.group(2).strip() | |
| return "" | |
| # --------------------------------------------------------------------------- | |
| # Index.md parsing — categories come from section headings | |
| # --------------------------------------------------------------------------- | |
| def parse_index(index_path: Path) -> list[dict]: | |
| """Parse index.md to extract categories from ## headings and their wikilinks.""" | |
| if not index_path.is_file(): | |
| return [] | |
| text = index_path.read_text(encoding="utf-8", errors="replace") | |
| categories = [] | |
| current_category = None | |
| for line in text.split("\n"): | |
| # Detect ## section heading | |
| sec_match = re.match(r"^##\s+(.+)$", line) | |
| if sec_match: | |
| current_category = { | |
| "name": sec_match.group(1).strip(), | |
| "articles": [], | |
| } | |
| categories.append(current_category) | |
| continue | |
| # Collect wikilinks under current section | |
| if current_category: | |
| for wl in WIKILINK_RE.finditer(line): | |
| current_category["articles"].append(wl.group(1).strip()) | |
| return categories | |
| # --------------------------------------------------------------------------- | |
| # Log.md parsing — extract operation timeline | |
| # --------------------------------------------------------------------------- | |
| def parse_log(log_path: Path) -> list[dict]: | |
| """Parse log.md to extract chronological entries.""" | |
| if not log_path.is_file(): | |
| return [] | |
| text = log_path.read_text(encoding="utf-8", errors="replace") | |
| entries = [] | |
| log_entry_re = re.compile( | |
| r"^##\s+\[(\d{4}-\d{2}-\d{2})\]\s+(\w+)\s*\|\s*(.+)$", re.MULTILINE | |
| ) | |
| for m in log_entry_re.finditer(text): | |
| entries.append({ | |
| "date": m.group(1), | |
| "operation": m.group(2), | |
| "title": m.group(3).strip(), | |
| }) | |
| return entries | |
| # --------------------------------------------------------------------------- | |
| # Main pipeline | |
| # --------------------------------------------------------------------------- | |
| def build_name_to_stem_map(wiki_root: Path) -> dict[str, str]: | |
| """Build a case-insensitive map from filename stem to relative stem path. | |
| Full relative paths always map uniquely. Bare basenames map only when | |
| unambiguous — duplicate basenames are removed so they don't silently | |
| resolve to the wrong page. | |
| """ | |
| name_map: dict[str, str] = {} | |
| # Track which bare basenames appear more than once | |
| basename_counts: dict[str, int] = {} | |
| for md_file in wiki_root.rglob("*.md"): | |
| rel = md_file.relative_to(wiki_root) | |
| stem = str(rel.with_suffix("")) # e.g., "decisions/decision-foo" | |
| basename = md_file.stem # e.g., "decision-foo" | |
| # Full relative path always maps uniquely | |
| name_map[stem.lower()] = stem | |
| # Track basename for ambiguity detection | |
| key = basename.lower() | |
| basename_counts[key] = basename_counts.get(key, 0) + 1 | |
| name_map[key] = stem | |
| # Remove ambiguous basename entries (appear more than once) | |
| for key, count in basename_counts.items(): | |
| if count > 1 and key in name_map: | |
| del name_map[key] | |
| return name_map | |
| def resolve_wikilink(target: str, name_map: dict[str, str], node_ids: set[str] | None = None) -> str | None: | |
| """Resolve a wikilink target to an article node ID. | |
| If node_ids is provided, only resolve to IDs that exist in the set. | |
| """ | |
| key = target.lower().strip() | |
| # Skip targets that are clearly not page names (shell flags, etc.) | |
| if key.startswith("-"): | |
| return None | |
| stem = name_map.get(key) | |
| if stem: | |
| candidate = f"article:{stem}" | |
| # If we have a node set, verify the target exists | |
| if node_ids is not None and candidate not in node_ids: | |
| return None | |
| return candidate | |
| # Try without directory prefix | |
| for stored_key, stored_stem in name_map.items(): | |
| if stored_key.endswith("/" + key) or stored_key == key: | |
| candidate = f"article:{stored_stem}" | |
| if node_ids is not None and candidate not in node_ids: | |
| return None | |
| return candidate | |
| return None | |
| def parse_wiki(root: Path) -> dict: | |
| """Parse a Karpathy-pattern wiki and produce the scan manifest.""" | |
| detection = detect_format(root) | |
| if not detection["detected"]: | |
| print(json.dumps({"error": "Not a Karpathy-pattern wiki", "detection": detection}), | |
| file=sys.stderr) | |
| sys.exit(1) | |
| wiki_root = Path(detection["wiki_root"]) | |
| raw_root = root / "raw" | |
| # Build name resolution map | |
| name_map = build_name_to_stem_map(wiki_root) | |
| # Find index.md and log.md | |
| index_path = wiki_root / "index.md" | |
| if not index_path.is_file(): | |
| index_path = root / "index.md" | |
| log_path = wiki_root / "log.md" | |
| if not log_path.is_file(): | |
| log_path = root / "log.md" | |
| # Parse index for categories | |
| categories = parse_index(index_path) | |
| log_entries = parse_log(log_path) | |
| # Build category lookup: wikilink target → category name | |
| category_lookup: dict[str, str] = {} | |
| for cat in categories: | |
| for article_target in cat["articles"]: | |
| category_lookup[article_target.lower()] = cat["name"] | |
| # --- Pre-compute article IDs (for edge resolution validation) --- | |
| # Only skip infra files at the wiki root level, not in subdirectories | |
| # (e.g., wiki/index.md is infra, but wiki/concepts/index.md is content) | |
| article_ids: set[str] = set() | |
| for md_file in sorted(wiki_root.rglob("*.md")): | |
| rel = md_file.relative_to(wiki_root) | |
| stem = str(rel.with_suffix("")) | |
| # Only filter infra files at root level (no parent directory) | |
| if rel.parent == Path(".") and rel.name.lower() in INFRA_FILES: | |
| continue | |
| article_ids.add(f"article:{stem}") | |
| # --- Build article nodes --- | |
| nodes = [] | |
| edges = [] | |
| warnings = [] | |
| stats = {"articles": 0, "sources": 0, "topics": 0, "wikilinks": 0, "unresolved": 0} | |
| for md_file in sorted(wiki_root.rglob("*.md")): | |
| rel = md_file.relative_to(wiki_root) | |
| stem = str(rel.with_suffix("")) | |
| basename = md_file.stem | |
| # Skip infrastructure files only at wiki root level | |
| if rel.parent == Path(".") and rel.name.lower() in INFRA_FILES: | |
| continue | |
| text = md_file.read_text(encoding="utf-8", errors="replace") | |
| h1 = extract_h1(text) | |
| frontmatter = extract_frontmatter(text) | |
| wikilinks = extract_wikilinks(text) | |
| headings = extract_headings(text) | |
| code_langs = extract_code_blocks(text) | |
| summary = extract_first_paragraph(text) | |
| line_count = text.count("\n") + 1 | |
| word_count = len(text.split()) | |
| # Derive category from index.md lookup | |
| category = category_lookup.get(basename.lower(), "") | |
| if not category: | |
| # Try stem match | |
| category = category_lookup.get(stem.lower(), "") | |
| # Derive tags (deduplicated) | |
| tag_set: set[str] = set() | |
| if category: | |
| tag_set.add(category.lower()) | |
| if rel.parent != Path("."): | |
| tag_set.add(str(rel.parent)) | |
| fm_tags = frontmatter.get("tags", "") | |
| if fm_tags: | |
| tag_set.update(t.strip() for t in fm_tags.split(",") if t.strip()) | |
| tags = sorted(tag_set) | |
| # Complexity from wikilink density | |
| wl_count = len(wikilinks) | |
| if wl_count > 15: | |
| complexity = "complex" | |
| elif wl_count > 5: | |
| complexity = "moderate" | |
| else: | |
| complexity = "simple" | |
| node_id = f"article:{stem}" | |
| nodes.append({ | |
| "id": node_id, | |
| "type": "article", | |
| "name": h1 or basename, | |
| "filePath": str(rel), | |
| "summary": summary or f"Wiki article: {h1 or basename}", | |
| "tags": tags, | |
| "complexity": complexity, | |
| "knowledgeMeta": { | |
| "wikilinks": [wl["target"] for wl in wikilinks], | |
| "category": category or None, | |
| "content": text[:3000], # First 3000 chars for LLM analysis | |
| }, | |
| }) | |
| stats["articles"] += 1 | |
| stats["wikilinks"] += wl_count | |
| # Build edges from wikilinks (resolve against known article IDs) | |
| for wl in wikilinks: | |
| target_id = resolve_wikilink(wl["target"], name_map, article_ids) | |
| if target_id and target_id != node_id: | |
| edges.append({ | |
| "source": node_id, | |
| "target": target_id, | |
| "type": "related", | |
| "direction": "forward", | |
| "weight": 0.7, | |
| }) | |
| elif not target_id: | |
| warnings.append(f"Unresolved wikilink: [[{wl['target']}]] in {rel}") | |
| stats["unresolved"] += 1 | |
| # --- Build topic nodes from index.md categories --- | |
| for cat in categories: | |
| topic_id = f"topic:{cat['name'].lower().replace(' ', '-')}" | |
| nodes.append({ | |
| "id": topic_id, | |
| "type": "topic", | |
| "name": cat["name"], | |
| "summary": f"Category from index: {cat['name']} ({len(cat['articles'])} articles)", | |
| "tags": ["category"], | |
| "complexity": "simple", | |
| }) | |
| stats["topics"] += 1 | |
| # categorized_under edges (only resolve to known article nodes) | |
| for article_target in cat["articles"]: | |
| article_id = resolve_wikilink(article_target, name_map, article_ids) | |
| if article_id: | |
| edges.append({ | |
| "source": article_id, | |
| "target": topic_id, | |
| "type": "categorized_under", | |
| "direction": "forward", | |
| "weight": 0.6, | |
| }) | |
| # --- Build source nodes from raw/ --- | |
| if raw_root.is_dir(): | |
| for raw_file in sorted(raw_root.rglob("*")): | |
| if raw_file.is_file() and not raw_file.name.startswith("."): | |
| rel_raw = raw_file.relative_to(root) | |
| ext = raw_file.suffix.lower() | |
| size_kb = raw_file.stat().st_size / 1024 | |
| source_id = f"source:{raw_file.relative_to(raw_root).with_suffix('')}" | |
| nodes.append({ | |
| "id": source_id, | |
| "type": "source", | |
| "name": raw_file.name, | |
| "filePath": str(rel_raw), | |
| "summary": f"Raw source ({ext or 'unknown'}, {size_kb:.0f} KB)", | |
| "tags": ["raw", ext.lstrip(".") or "unknown"], | |
| "complexity": "simple", | |
| }) | |
| stats["sources"] += 1 | |
| # --- Compute backlinks --- | |
| backlink_map: dict[str, list[str]] = {} | |
| for edge in edges: | |
| if edge["type"] == "related": | |
| target = edge["target"] | |
| source = edge["source"] | |
| backlink_map.setdefault(target, []).append(source) | |
| for node in nodes: | |
| if node["type"] == "article" and "knowledgeMeta" in node: | |
| bl = backlink_map.get(node["id"], []) | |
| node["knowledgeMeta"]["backlinks"] = bl | |
| # --- Deduplicate edges --- | |
| seen_edges: set[tuple[str, str, str]] = set() | |
| deduped_edges = [] | |
| for edge in edges: | |
| key = (edge["source"], edge["target"], edge["type"]) | |
| if key not in seen_edges: | |
| seen_edges.add(key) | |
| deduped_edges.append(edge) | |
| return { | |
| "format": "karpathy", | |
| "stats": stats, | |
| "categories": [{"name": c["name"], "count": len(c["articles"])} for c in categories], | |
| "logEntries": len(log_entries), | |
| "nodes": nodes, | |
| "edges": deduped_edges, | |
| "warnings": warnings[:50], # Cap warnings | |
| } | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: parse-knowledge-base.py <wiki-directory>", file=sys.stderr) | |
| sys.exit(1) | |
| root = Path(sys.argv[1]).resolve() | |
| if not root.is_dir(): | |
| print(f"Error: {root} is not a directory", file=sys.stderr) | |
| sys.exit(1) | |
| manifest = parse_wiki(root) | |
| # Write output | |
| out_dir = root / ".understand-anything" / "intermediate" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / "scan-manifest.json" | |
| out_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8") | |
| # Report to stderr | |
| s = manifest["stats"] | |
| print(f"[parse] Karpathy wiki: {s['articles']} articles, {s['sources']} sources, " | |
| f"{s['topics']} topics, {s['wikilinks']} wikilinks " | |
| f"({s['unresolved']} unresolved)", file=sys.stderr) | |
| print(f"[parse] Output: {out_path}", file=sys.stderr) | |
| if __name__ == "__main__": | |
| main() | |