#!/usr/bin/env python3 """ graphify_rebuild.py — One-shot NudR knowledge graph regeneration. Usage: python graphify_rebuild.py # Full rebuild python graphify_rebuild.py --watch # Watch mode (rebuilds on file change) python graphify_rebuild.py --quick # Skip semantic, AST-only rebuild Outputs (all in graphify-out/): GRAPH_REPORT.md — Full community/audit report graph.html — Interactive force-directed graph (open in browser) graph.json — Raw graph data for tooling manifest.json — File hashes for incremental re-runs cost.json — Token usage tracking """ import sys, io, os, json, ast, hashlib, time, argparse from pathlib import Path from datetime import datetime, timezone # Fix Windows console encoding if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # ─── Configuration ─────────────────────────────────────────────────────────── ROOT = Path(__file__).parent OUT_DIR = ROOT / 'graphify-out' CACHE_DIR = OUT_DIR / 'cache' MANIFEST = OUT_DIR / 'manifest.json' REPORT_PATH = OUT_DIR / 'GRAPH_REPORT.md' HTML_PATH = OUT_DIR / 'graph.html' JSON_PATH = OUT_DIR / 'graph.json' COST_PATH = OUT_DIR / 'cost.json' # Directories and patterns to skip SKIP_DIRS = { '.git', '.venv', 'venv', 'node_modules', '__pycache__', '.mypy_cache', '.pytest_cache', '.graphify', 'graphify-out', '.terraform', '.idea', 'env', 'dist', 'build', 'egg-info', '.tox', '.ruff_cache', } SKIP_EXTENSIONS = {'.pyc', '.pyo', '.whl', '.egg', '.so', '.dll', '.exe'} # File types for AST extraction AST_EXTENSIONS = {'.py'} # File types for corpus (semantic awareness) CORPUS_EXTENSIONS = { '.py', '.md', '.txt', '.html', '.css', '.js', '.ts', '.json', '.yaml', '.yml', '.toml', '.cfg', '.ini', '.proto', '.tf', '.tfvars', } # ─── Step 1: Detect files ──────────────────────────────────────────────────── def detect_files(): """Walk the project and return list of relevant files with metadata.""" files = [] total_words = 0 for dirpath, dirnames, filenames in os.walk(ROOT): # Prune skipped directories dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS] for fname in filenames: fpath = Path(dirpath) / fname ext = fpath.suffix.lower() if ext in SKIP_EXTENSIONS: continue rel = fpath.relative_to(ROOT) if any(part.startswith('.') for part in rel.parts[:-1]): continue try: mtime = fpath.stat().st_mtime size = fpath.stat().st_size except OSError: continue if ext in CORPUS_EXTENSIONS and size < 5_000_000: try: content = fpath.read_text(encoding='utf-8', errors='ignore') word_count = len(content.split()) total_words += word_count except Exception: word_count = 0 else: word_count = 0 files.append({ 'path': str(rel), 'ext': ext, 'mtime': mtime, 'size': size, 'words': word_count, }) return files, total_words def get_changed_files(files): """Compare against manifest to find changed files.""" if MANIFEST.exists(): old_manifest = json.loads(MANIFEST.read_text(encoding='utf-8')) else: old_manifest = {} changed = [] for f in files: old_mtime = old_manifest.get(f['path']) if old_mtime is None or f['mtime'] != old_mtime: changed.append(f) return changed # ─── Step 2: AST Extraction ────────────────────────────────────────────────── def hash_file(path): """SHA-256 hash for cache keying.""" h = hashlib.sha256() try: h.update(Path(path).read_bytes()) except Exception: h.update(path.encode()) return h.hexdigest() def extract_ast_file(filepath): """Extract AST nodes and edges from a single Python file.""" nodes = [] edges = [] rel = str(filepath.relative_to(ROOT)) file_id = rel.replace('\\', '_').replace('/', '_').replace('.', '_') try: source = filepath.read_text(encoding='utf-8', errors='ignore') tree = ast.parse(source, filename=str(filepath)) except SyntaxError: return nodes, edges # File-level node nodes.append({ 'id': file_id, 'label': filepath.name, 'file_type': 'code', 'source_file': rel, }) # Extract module-level docstring docstring = ast.get_docstring(tree) if docstring and len(docstring) > 20: doc_id = f"{file_id}_docstring" nodes.append({ 'id': doc_id, 'label': docstring[:80], 'file_type': 'rationale', 'source_file': rel, }) edges.append({ 'source': file_id, 'target': doc_id, 'relation': 'has_rationale', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 0.5, }) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): func_id = f"{file_id}_{node.name}" label = f"{node.name}()" nodes.append({ 'id': func_id, 'label': label, 'file_type': 'code', 'source_file': rel, 'source_location': f"line {node.lineno}", }) edges.append({ 'source': file_id, 'target': func_id, 'relation': 'defines', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 1.0, }) # Function docstring fdoc = ast.get_docstring(node) if fdoc and len(fdoc) > 20: fdoc_id = f"{func_id}_doc" nodes.append({ 'id': fdoc_id, 'label': fdoc[:80], 'file_type': 'rationale', 'source_file': rel, 'source_location': f"line {node.lineno}", }) edges.append({ 'source': func_id, 'target': fdoc_id, 'relation': 'has_rationale', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 0.5, }) # Calls inside functions for child in ast.walk(node): if isinstance(child, ast.Call): callee = _get_call_name(child) if callee: edges.append({ 'source': func_id, 'target': callee, 'relation': 'calls', 'confidence': 'INFERRED', 'confidence_score': 0.7, 'source_file': rel, 'weight': 0.8, }) elif isinstance(node, ast.ClassDef): class_id = f"{file_id}_{node.name}" nodes.append({ 'id': class_id, 'label': node.name, 'file_type': 'code', 'source_file': rel, 'source_location': f"line {node.lineno}", }) edges.append({ 'source': file_id, 'target': class_id, 'relation': 'defines', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 1.0, }) # Class docstring cdoc = ast.get_docstring(node) if cdoc and len(cdoc) > 20: cdoc_id = f"{class_id}_doc" nodes.append({ 'id': cdoc_id, 'label': cdoc[:80], 'file_type': 'rationale', 'source_file': rel, 'source_location': f"line {node.lineno}", }) edges.append({ 'source': class_id, 'target': cdoc_id, 'relation': 'has_rationale', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 0.5, }) # Base classes for base in node.bases: base_name = _get_name(base) if base_name: edges.append({ 'source': class_id, 'target': base_name, 'relation': 'inherits', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 1.0, }) elif isinstance(node, ast.Import): for alias in node.names: edges.append({ 'source': file_id, 'target': alias.name, 'relation': 'imports', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 0.6, }) elif isinstance(node, ast.ImportFrom) and node.module: edges.append({ 'source': file_id, 'target': node.module, 'relation': 'imports', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': rel, 'weight': 0.6, }) return nodes, edges def _get_call_name(node): """Extract callable name from ast.Call node.""" if isinstance(node.func, ast.Name): return node.func.id elif isinstance(node.func, ast.Attribute): return node.func.attr return None def _get_name(node): """Extract name from various AST node types.""" if isinstance(node, ast.Name): return node.id elif isinstance(node, ast.Attribute): return node.attr return None def _resolve_edges(all_nodes, all_edges): """Post-process edges to resolve bare names to actual node IDs. The per-file AST extraction produces edges with bare targets: - calls: target='get_cached_image' (bare function name) - imports: target='app.core.session' (dotted module path) This function resolves them to actual node IDs so they survive the graph build phase (which drops unresolvable targets). """ node_ids = {n['id'] for n in all_nodes} # Build function name → [node_id, ...] index func_index: dict[str, list[str]] = {} for n in all_nodes: if n.get('file_type') == 'code' and '(' in n.get('label', ''): # label looks like "get_cached_image()" bare_name = n['label'].rstrip('()') func_index.setdefault(bare_name, []).append(n['id']) # Build module path → file node ID map # e.g. 'app.core.session' → 'app_core_session_py' module_index: dict[str, str] = {} for n in all_nodes: src = n.get('source_file', '') if src.endswith('.py'): # Convert 'app/core/session.py' or 'app\core\session.py' # → dotted module: 'app.core.session' mod_path = src.replace('\\', '/').replace('/', '.').removesuffix('.py') # Strip leading __init__ for package imports mod_path_init = mod_path.removesuffix('.__init__') nid = n['id'] # Only map file-level nodes (no functions/classes) if nid == src.replace('\\', '_').replace('/', '_').replace('.', '_'): module_index[mod_path] = nid if mod_path != mod_path_init: module_index[mod_path_init] = nid resolved_edges = [] calls_resolved = 0 imports_resolved = 0 dropped = 0 for edge in all_edges: rel = edge.get('relation', '') if rel == 'calls': target = edge['target'] # Try exact match first if target in node_ids: resolved_edges.append(edge) calls_resolved += 1 continue # Resolve via function index matches = func_index.get(target, []) if matches: for match_id in matches: # Don't create self-edges within the same file if match_id.rsplit('_', 1)[0] != edge['source'].rsplit('_', 1)[0] or len(matches) == 1: resolved_edges.append({ **edge, 'target': match_id, 'confidence': 'INFERRED' if len(matches) > 1 else 'EXTRACTED', 'confidence_score': 0.9 if len(matches) == 1 else 0.6, }) calls_resolved += 1 else: dropped += 1 elif rel == 'imports': target = edge['target'] # Try exact match as node ID first if target in node_ids: resolved_edges.append(edge) imports_resolved += 1 continue # Resolve dotted module path to file node ID resolved_id = module_index.get(target) if resolved_id: resolved_edges.append({**edge, 'target': resolved_id}) imports_resolved += 1 continue # Try progressively shorter prefixes # e.g. 'app.core.session.revoke_all' → 'app.core.session' → 'app.core' → 'app' parts = target.split('.') found = False for i in range(len(parts) - 1, 0, -1): prefix = '.'.join(parts[:i]) resolved_id = module_index.get(prefix) if resolved_id: resolved_edges.append({**edge, 'target': resolved_id}) imports_resolved += 1 found = True break if not found: # External/stdlib import — drop it dropped += 1 else: # defines, has_rationale, etc — keep as-is resolved_edges.append(edge) print(f" Resolved: {calls_resolved} calls, {imports_resolved} imports, {dropped} dropped (external/stdlib)") return resolved_edges def run_ast_extraction(files, use_cache=True): """Run AST extraction on all Python files, with caching.""" CACHE_DIR.mkdir(parents=True, exist_ok=True) all_nodes = [] all_edges = [] cached, extracted = 0, 0 # Collect valid cache hashes for cleanup valid_hashes = set() py_files = [f for f in files if f['ext'] in AST_EXTENSIONS] for f in py_files: fpath = ROOT / f['path'] fhash = hash_file(fpath) valid_hashes.add(fhash) cache_file = CACHE_DIR / f"{fhash}.json" if use_cache and cache_file.exists(): data = json.loads(cache_file.read_text(encoding='utf-8')) all_nodes.extend(data.get('nodes', [])) all_edges.extend(data.get('edges', [])) cached += 1 else: nodes, edges = extract_ast_file(fpath) all_nodes.extend(nodes) all_edges.extend(edges) # Write cache cache_file.write_text(json.dumps({ 'nodes': nodes, 'edges': edges, }, indent=2), encoding='utf-8') extracted += 1 # Clean stale cache entries stale = 0 for cache_file in CACHE_DIR.glob('*.json'): h = cache_file.stem if h not in valid_hashes: cache_file.unlink() stale += 1 print(f" AST: {len(py_files)} Python files ({cached} cached, {extracted} extracted)") if stale: print(f" Cache cleanup: {stale} stale entries removed") print(f" AST: {len(all_nodes)} nodes, {len(all_edges)} edges (raw)") # Resolve bare targets to actual node IDs all_edges = _resolve_edges(all_nodes, all_edges) print(f" AST: {len(all_nodes)} nodes, {len(all_edges)} edges (resolved)") return all_nodes, all_edges # ─── Step 3: Semantic Extraction ───────────────────────────────────────────── def build_semantic_nodes(): """ Build semantic nodes from documentation files. These capture high-level architecture concepts that AST can't see. """ nodes = [] edges = [] hyperedges = [] # Architecture components from README arch_nodes = [ ("nudr_api", "NudR Stateless API", "README.md"), ("fastapi_backend", "FastAPI Stateless Backend", "README.md"), ("supabase_db", "Supabase PostgreSQL Database", "README.md"), ("redis_cache", "Redis Session & Cache Store", "README.md"), ("cloudflare_proxy", "Cloudflare Edge Proxy", "README.md"), ("stripe_payments", "Stripe Payment Integration", "README.md"), ("firebase_fcm", "Firebase FCM Push Notifications", "README.md"), ("e2ee_encryption", "E2EE X25519 Key Exchange", "README.md"), ("protobuf_framing", "Protobuf Binary WebSocket Framing", "README.md"), ("hmac_verification", "HMAC-SHA256 Request Verification", "README.md"), ("origin_secret", "X-Origin-Secret Middleware", "README.md"), ("pow_challenge", "Proof-of-Work Challenge", "README.md"), ("rate_limiting", "Per-IP Rate Limiting", "README.md"), ("aws_secrets", "AWS Secrets Manager Integration", "README.md"), ("terraform_infra", "Terraform AWS Infrastructure", "README.md"), ("vpc_network", "VPC Network Topology", "README.md"), ("alb_autoscaling", "ALB + Auto Scaling Group", "README.md"), ("lambda_rotator", "Lambda Origin Secret Rotator", "README.md"), ("unified_ws", "Unified WebSocket Endpoint /ws", "README.md"), ("feed_ws", "Feed WebSocket Channel", "README.md"), ("chat_ws", "Chat WebSocket Channel", "README.md"), ("keysync_ws", "Keysync WebSocket Channel", "README.md"), ("discovery_ws", "Discovery WebSocket Channel", "README.md"), ("attack_detection", "Attack Detection & IP Risk Management", "README.md"), ] for nid, label, src in arch_nodes: nodes.append({ 'id': f"sem_{nid}", 'label': label, 'file_type': 'document', 'source_file': src, }) # Architecture edges arch_edges = [ ("nudr_api", "fastapi_backend", "implements"), ("fastapi_backend", "supabase_db", "references"), ("fastapi_backend", "redis_cache", "references"), ("cloudflare_proxy", "origin_secret", "references"), ("origin_secret", "lambda_rotator", "references"), ("stripe_payments", "fastapi_backend", "references"), ("firebase_fcm", "fastapi_backend", "references"), ("e2ee_encryption", "keysync_ws", "references"), ("protobuf_framing", "unified_ws", "references"), ("terraform_infra", "vpc_network", "references"), ("terraform_infra", "alb_autoscaling", "references"), ("terraform_infra", "aws_secrets", "references"), ("attack_detection", "rate_limiting", "references"), ("unified_ws", "feed_ws", "conceptually_related_to"), ("unified_ws", "chat_ws", "conceptually_related_to"), ("unified_ws", "keysync_ws", "conceptually_related_to"), ("unified_ws", "discovery_ws", "conceptually_related_to"), ] for src, tgt, rel in arch_edges: edges.append({ 'source': f"sem_{src}", 'target': f"sem_{tgt}", 'relation': rel, 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'README.md', 'weight': 1.0, }) # Feed system nodes (from feed_system_documentation.md) feed_nodes = [ ("feed_system", "Feed System Technical Documentation", "PLAN/feed_system_documentation.md"), ("feed_scoring", "Multi-Factor Scoring Algorithm", "PLAN/feed_system_documentation.md"), ("feed_pool", "Feed Pool Computation Pipeline", "PLAN/feed_system_documentation.md"), ("feed_filters", "Feed Hard Filters (12 Rules)", "PLAN/feed_system_documentation.md"), ("feed_heatmap", "Preference Heatmap (Learned AI)", "PLAN/feed_system_documentation.md"), ("feed_reciprocal", "Reciprocal Boost & Injection", "PLAN/feed_system_documentation.md"), ("feed_gradient", "3-Tier Gradient Distribution", "PLAN/feed_system_documentation.md"), ("feed_redis", "Feed Redis Key Schema", "PLAN/feed_system_documentation.md"), ] for nid, label, src in feed_nodes: nodes.append({ 'id': f"sem_{nid}", 'label': label, 'file_type': 'document', 'source_file': src, }) feed_edges = [ ("feed_system", "nudr_api", "references"), ("feed_pool", "redis_cache", "references"), ("feed_pool", "supabase_db", "references"), ("feed_scoring", "feed_pool", "references"), ("feed_filters", "feed_pool", "references"), ("feed_heatmap", "feed_scoring", "references"), ("feed_reciprocal", "feed_scoring", "references"), ("feed_gradient", "feed_scoring", "references"), ("feed_redis", "redis_cache", "references"), ("feed_system", "feed_ws", "references"), ] for src, tgt, rel in feed_edges: edges.append({ 'source': f"sem_{src}", 'target': f"sem_{tgt}", 'relation': rel, 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'PLAN/feed_system_documentation.md', 'weight': 1.0, }) # Logic analysis nodes logic_nodes = [ ("logic_analysis", "Logic-Level Async Issue Audit", "PLAN/LOGIC_ANALYSIS.md"), ("id_ws_reuse", "DISASTROUS: id(ws) Memory Reuse Bug", "PLAN/LOGIC_ANALYSIS.md"), ("token_refresh_crash", "DISASTROUS: Token Refresh Crash Window", "PLAN/LOGIC_ANALYSIS.md"), ("pubsub_crash", "DISASTROUS: PubSub Listener Permanent Crash", "PLAN/LOGIC_ANALYSIS.md"), ("redis_pool_exhaustion", "DISASTROUS: Redis Connection Pool Exhaustion", "PLAN/LOGIC_ANALYSIS.md"), ("preference_race", "Race Condition: Preference Merge", "PLAN/LOGIC_ANALYSIS.md"), ] for nid, label, src in logic_nodes: nodes.append({ 'id': f"sem_{nid}", 'label': label, 'file_type': 'document', 'source_file': src, }) logic_edges = [ ("id_ws_reuse", "unified_ws", "references"), ("token_refresh_crash", "unified_ws", "references"), ("pubsub_crash", "redis_cache", "references"), ("redis_pool_exhaustion", "redis_cache", "references"), ("preference_race", "supabase_db", "references"), ("logic_analysis", "nudr_api", "references"), ] for src, tgt, rel in logic_edges: edges.append({ 'source': f"sem_{src}", 'target': f"sem_{tgt}", 'relation': rel, 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'PLAN/LOGIC_ANALYSIS.md', 'weight': 1.0, }) # Hyperedges hyperedges = [ { 'id': 'websocket_channels', 'label': 'WebSocket Channel System', 'nodes': ['sem_unified_ws', 'sem_feed_ws', 'sem_chat_ws', 'sem_keysync_ws', 'sem_discovery_ws'], 'relation': 'participate_in', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'README.md', }, { 'id': 'security_stack', 'label': 'Security Defense Stack', 'nodes': ['sem_hmac_verification', 'sem_origin_secret', 'sem_pow_challenge', 'sem_rate_limiting', 'sem_attack_detection'], 'relation': 'participate_in', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'README.md', }, { 'id': 'feed_pipeline', 'label': 'Feed Recommendation Pipeline', 'nodes': ['sem_feed_pool', 'sem_feed_filters', 'sem_feed_scoring', 'sem_feed_heatmap', 'sem_feed_reciprocal', 'sem_feed_gradient'], 'relation': 'form', 'confidence': 'EXTRACTED', 'confidence_score': 1.0, 'source_file': 'PLAN/feed_system_documentation.md', }, ] print(f" Semantic: {len(nodes)} nodes, {len(edges)} edges, {len(hyperedges)} hyperedges") return nodes, edges, hyperedges # ─── Step 4: Merge & Build Graph ───────────────────────────────────────────── def merge_and_build(ast_nodes, ast_edges, sem_nodes, sem_edges, hyperedges): """Merge AST + semantic, build NetworkX graph, cluster, analyze.""" from graphify.build import build_from_json from graphify.cluster import cluster, score_all from graphify.analyze import god_nodes, surprising_connections, suggest_questions # Merge: AST first, deduplicate semantic by id seen = {n['id'] for n in ast_nodes} merged_nodes = list(ast_nodes) for n in sem_nodes: if n['id'] not in seen: merged_nodes.append(n) seen.add(n['id']) merged_edges = ast_edges + sem_edges extraction = { 'nodes': merged_nodes, 'edges': merged_edges, 'hyperedges': hyperedges, } G = build_from_json(extraction) communities = cluster(G) cohesion = score_all(G, communities) gods = god_nodes(G) surprises = surprising_connections(G, communities) # Auto-label communities labels = {} for cid, members in communities.items(): names = " ".join(members[:10]).lower() if 'feed' in names and 'service' in names: labels[cid] = "Feed System" elif 'feed' in names and ('score' in names or 'pool' in names): labels[cid] = "Feed Scoring & Pool" elif 'chat' in names and ('ws' in names or 'websocket' in names): labels[cid] = "Chat WebSocket" elif 'keysync' in names or 'key_exchange' in names: labels[cid] = "Key Exchange & Sync" elif 'discovery' in names and ('match' in names or 'like' in names): labels[cid] = "Discovery & Matching" elif 'auth' in names or 'signup' in names or 'signin' in names: labels[cid] = "Authentication" elif 'payment' in names or 'stripe' in names: labels[cid] = "Payments & Billing" elif 'setting' in names or 'profile' in names or 'preference' in names: labels[cid] = "Settings & Profiles" elif 'consent' in names: labels[cid] = "Consent System" elif 'report' in names or 'violation' in names: labels[cid] = "Reporting & Moderation" elif 'notification' in names or 'fcm' in names: labels[cid] = "Push Notifications" elif 'redis' in names or 'cache' in names: labels[cid] = "Redis & Caching" elif 'supabase' in names or 'migration' in names: labels[cid] = "Database Layer" elif 'terraform' in names or 'aws' in names or 'vpc' in names: labels[cid] = "Infrastructure (Terraform)" elif 'security' in names or 'rate_limit' in names or 'attack' in names: labels[cid] = "Security & Rate Limiting" elif 'codec' in names or 'hmac' in names or 'protobuf' in names: labels[cid] = "WebSocket Codec" elif 'unified' in names and 'ws' in names: labels[cid] = "Unified WebSocket" elif 'token' in names: labels[cid] = "Token Management" elif 'image' in names: labels[cid] = "Image Processing" elif 'event' in names or 'pending' in names: labels[cid] = "Event Queue" elif 'linkup' in names: labels[cid] = "Linkup System" elif 'test' in names: labels[cid] = "Tests" elif 'nuke' in names or 'script' in names: labels[cid] = "Utility Scripts" elif 'email' in names or 'otp' in names: labels[cid] = "Email & OTP" elif 'flutter' in names: labels[cid] = "Flutter Directives" elif 'readme' in names: labels[cid] = "API Documentation" else: labels[cid] = f"Module Group {cid}" questions = suggest_questions(G, communities, labels) print(f" Graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(communities)} communities") return G, communities, cohesion, labels, gods, surprises, questions, extraction # ─── Step 5: Generate Outputs ──────────────────────────────────────────────── def generate_outputs(G, communities, cohesion, labels, gods, surprises, questions, detection, extraction): """Generate report, HTML, JSON, and manifest.""" from graphify.report import generate from graphify.export import to_json, to_html OUT_DIR.mkdir(parents=True, exist_ok=True) tokens = {'input': 0, 'output': 0} # Report report = generate( G, communities, cohesion, labels, gods, surprises, detection, tokens, str(ROOT), suggested_questions=questions, ) REPORT_PATH.write_text(report, encoding='utf-8') print(f" -> {REPORT_PATH.relative_to(ROOT)}") # JSON to_json(G, communities, str(JSON_PATH)) print(f" -> {JSON_PATH.relative_to(ROOT)}") # HTML if G.number_of_nodes() <= 5000: to_html(G, communities, str(HTML_PATH), community_labels=labels) print(f" -> {HTML_PATH.relative_to(ROOT)}") else: print(f" !! Graph too large for HTML ({G.number_of_nodes()} nodes)") # Manifest manifest = {} for f in detection.get('files', []): manifest[f['path']] = f.get('mtime', 0) MANIFEST.write_text(json.dumps(manifest, indent=2), encoding='utf-8') # Cost tracker if COST_PATH.exists(): cost = json.loads(COST_PATH.read_text(encoding='utf-8')) else: cost = {'runs': [], 'total_input_tokens': 0, 'total_output_tokens': 0} cost['runs'].append({ 'date': datetime.now(timezone.utc).isoformat(), 'nodes': G.number_of_nodes(), 'edges': G.number_of_edges(), 'communities': len(communities), }) COST_PATH.write_text(json.dumps(cost, indent=2), encoding='utf-8') # ─── Main Pipeline ─────────────────────────────────────────────────────────── def run_pipeline(skip_semantic=False): """Execute the full graphify pipeline.""" start = time.time() print("=" * 60) print(f"graphify rebuild — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) # Step 1: Detect print("\n[1/5] Detecting files...") files, total_words = detect_files() changed = get_changed_files(files) print(f" Found {len(files)} files ({total_words:,} words)") print(f" Changed since last build: {len(changed)}") detection = { 'files': files, 'total_files': len(files), 'total_words': total_words, 'changed_files': len(changed), } # Step 2: AST extraction print("\n[2/5] AST extraction...") ast_nodes, ast_edges = run_ast_extraction(files) # Step 3: Semantic extraction if skip_semantic: print("\n[3/5] Semantic extraction... SKIPPED (--quick)") sem_nodes, sem_edges, hyperedges = [], [], [] else: print("\n[3/5] Semantic extraction...") sem_nodes, sem_edges, hyperedges = build_semantic_nodes() # Step 4: Merge & build print("\n[4/5] Building graph...") G, communities, cohesion, labels, gods, surprises, questions, extraction = \ merge_and_build(ast_nodes, ast_edges, sem_nodes, sem_edges, hyperedges) # Step 5: Generate outputs print("\n[5/5] Generating outputs...") generate_outputs(G, communities, cohesion, labels, gods, surprises, questions, detection, extraction) elapsed = time.time() - start print(f"\n{'=' * 60}") print(f"Done in {elapsed:.1f}s") print(f" {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(communities)} communities") print(f" Open graphify-out/graph.html in your browser") print(f"{'=' * 60}") def watch_mode(): """Watch for file changes and rebuild automatically.""" print("Watching for changes... (Ctrl+C to stop)") last_mtimes = {} while True: try: changed = False for dirpath, dirnames, filenames in os.walk(ROOT): dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS] for fname in filenames: fpath = Path(dirpath) / fname if fpath.suffix.lower() not in CORPUS_EXTENSIONS: continue try: mtime = fpath.stat().st_mtime except OSError: continue key = str(fpath) if key in last_mtimes and last_mtimes[key] != mtime: rel = fpath.relative_to(ROOT) print(f"\n Changed: {rel}") changed = True last_mtimes[key] = mtime if changed: run_pipeline() time.sleep(3) except KeyboardInterrupt: print("\nStopped watching.") break if __name__ == '__main__': parser = argparse.ArgumentParser(description='NudR Knowledge Graph Rebuild') parser.add_argument('--watch', action='store_true', help='Watch mode: rebuild on file change') parser.add_argument('--quick', action='store_true', help='Quick mode: AST-only, skip semantic') args = parser.parse_args() if args.watch: run_pipeline(skip_semantic=args.quick) watch_mode() else: run_pipeline(skip_semantic=args.quick)