rewrite / graph_codebase.py
morpheuslord's picture
Add files using upload-large-folder tool
3df5819 verified
#!/usr/bin/env python3
"""
graphify_rebuild.py β€” One-shot NudR knowledge graph regeneration.
Usage:
python graphify_rebuild.py # Full rebuild
python graphify_rebuild.py --watch # Watch mode (rebuilds on file change)
python graphify_rebuild.py --quick # Skip semantic, AST-only rebuild
Outputs (all in graphify-out/):
GRAPH_REPORT.md β€” Full community/audit report
graph.html β€” Interactive force-directed graph (open in browser)
graph.json β€” Raw graph data for tooling
manifest.json β€” File hashes for incremental re-runs
cost.json β€” Token usage tracking
"""
import sys, io, os, json, ast, hashlib, time, argparse
from pathlib import Path
from datetime import datetime, timezone
# Fix Windows console encoding
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# ─── Configuration ───────────────────────────────────────────────────────────
ROOT = Path(__file__).parent
OUT_DIR = ROOT / 'graphify-out'
CACHE_DIR = OUT_DIR / 'cache'
MANIFEST = OUT_DIR / 'manifest.json'
REPORT_PATH = OUT_DIR / 'GRAPH_REPORT.md'
HTML_PATH = OUT_DIR / 'graph.html'
JSON_PATH = OUT_DIR / 'graph.json'
COST_PATH = OUT_DIR / 'cost.json'
# Directories and patterns to skip
SKIP_DIRS = {
'.git', '.venv', 'venv', 'node_modules', '__pycache__', '.mypy_cache',
'.pytest_cache', '.graphify', 'graphify-out', '.terraform', '.idea',
'env', 'dist', 'build', 'egg-info', '.tox', '.ruff_cache',
}
SKIP_EXTENSIONS = {'.pyc', '.pyo', '.whl', '.egg', '.so', '.dll', '.exe'}
# File types for AST extraction
AST_EXTENSIONS = {'.py'}
# File types for corpus (semantic awareness)
CORPUS_EXTENSIONS = {
'.py', '.md', '.txt', '.html', '.css', '.js', '.ts', '.json',
'.yaml', '.yml', '.toml', '.cfg', '.ini', '.proto', '.tf', '.tfvars',
}
# ─── Step 1: Detect files ────────────────────────────────────────────────────
def detect_files():
"""Walk the project and return list of relevant files with metadata."""
files = []
total_words = 0
for dirpath, dirnames, filenames in os.walk(ROOT):
# Prune skipped directories
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
for fname in filenames:
fpath = Path(dirpath) / fname
ext = fpath.suffix.lower()
if ext in SKIP_EXTENSIONS:
continue
rel = fpath.relative_to(ROOT)
if any(part.startswith('.') for part in rel.parts[:-1]):
continue
try:
mtime = fpath.stat().st_mtime
size = fpath.stat().st_size
except OSError:
continue
if ext in CORPUS_EXTENSIONS and size < 5_000_000:
try:
content = fpath.read_text(encoding='utf-8', errors='ignore')
word_count = len(content.split())
total_words += word_count
except Exception:
word_count = 0
else:
word_count = 0
files.append({
'path': str(rel),
'ext': ext,
'mtime': mtime,
'size': size,
'words': word_count,
})
return files, total_words
def get_changed_files(files):
"""Compare against manifest to find changed files."""
if MANIFEST.exists():
old_manifest = json.loads(MANIFEST.read_text(encoding='utf-8'))
else:
old_manifest = {}
changed = []
for f in files:
old_mtime = old_manifest.get(f['path'])
if old_mtime is None or f['mtime'] != old_mtime:
changed.append(f)
return changed
# ─── Step 2: AST Extraction ──────────────────────────────────────────────────
def hash_file(path):
"""SHA-256 hash for cache keying."""
h = hashlib.sha256()
try:
h.update(Path(path).read_bytes())
except Exception:
h.update(path.encode())
return h.hexdigest()
def extract_ast_file(filepath):
"""Extract AST nodes and edges from a single Python file."""
nodes = []
edges = []
rel = str(filepath.relative_to(ROOT))
file_id = rel.replace('\\', '_').replace('/', '_').replace('.', '_')
try:
source = filepath.read_text(encoding='utf-8', errors='ignore')
tree = ast.parse(source, filename=str(filepath))
except SyntaxError:
return nodes, edges
# File-level node
nodes.append({
'id': file_id,
'label': filepath.name,
'file_type': 'code',
'source_file': rel,
})
# Extract module-level docstring
docstring = ast.get_docstring(tree)
if docstring and len(docstring) > 20:
doc_id = f"{file_id}_docstring"
nodes.append({
'id': doc_id,
'label': docstring[:80],
'file_type': 'rationale',
'source_file': rel,
})
edges.append({
'source': file_id, 'target': doc_id,
'relation': 'has_rationale',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 0.5,
})
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
func_id = f"{file_id}_{node.name}"
label = f"{node.name}()"
nodes.append({
'id': func_id,
'label': label,
'file_type': 'code',
'source_file': rel,
'source_location': f"line {node.lineno}",
})
edges.append({
'source': file_id, 'target': func_id,
'relation': 'defines',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 1.0,
})
# Function docstring
fdoc = ast.get_docstring(node)
if fdoc and len(fdoc) > 20:
fdoc_id = f"{func_id}_doc"
nodes.append({
'id': fdoc_id,
'label': fdoc[:80],
'file_type': 'rationale',
'source_file': rel,
'source_location': f"line {node.lineno}",
})
edges.append({
'source': func_id, 'target': fdoc_id,
'relation': 'has_rationale',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 0.5,
})
# Calls inside functions
for child in ast.walk(node):
if isinstance(child, ast.Call):
callee = _get_call_name(child)
if callee:
edges.append({
'source': func_id,
'target': callee,
'relation': 'calls',
'confidence': 'INFERRED', 'confidence_score': 0.7,
'source_file': rel, 'weight': 0.8,
})
elif isinstance(node, ast.ClassDef):
class_id = f"{file_id}_{node.name}"
nodes.append({
'id': class_id,
'label': node.name,
'file_type': 'code',
'source_file': rel,
'source_location': f"line {node.lineno}",
})
edges.append({
'source': file_id, 'target': class_id,
'relation': 'defines',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 1.0,
})
# Class docstring
cdoc = ast.get_docstring(node)
if cdoc and len(cdoc) > 20:
cdoc_id = f"{class_id}_doc"
nodes.append({
'id': cdoc_id,
'label': cdoc[:80],
'file_type': 'rationale',
'source_file': rel,
'source_location': f"line {node.lineno}",
})
edges.append({
'source': class_id, 'target': cdoc_id,
'relation': 'has_rationale',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 0.5,
})
# Base classes
for base in node.bases:
base_name = _get_name(base)
if base_name:
edges.append({
'source': class_id, 'target': base_name,
'relation': 'inherits',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 1.0,
})
elif isinstance(node, ast.Import):
for alias in node.names:
edges.append({
'source': file_id, 'target': alias.name,
'relation': 'imports',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 0.6,
})
elif isinstance(node, ast.ImportFrom) and node.module:
edges.append({
'source': file_id, 'target': node.module,
'relation': 'imports',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': rel, 'weight': 0.6,
})
return nodes, edges
def _get_call_name(node):
"""Extract callable name from ast.Call node."""
if isinstance(node.func, ast.Name):
return node.func.id
elif isinstance(node.func, ast.Attribute):
return node.func.attr
return None
def _get_name(node):
"""Extract name from various AST node types."""
if isinstance(node, ast.Name):
return node.id
elif isinstance(node, ast.Attribute):
return node.attr
return None
def _resolve_edges(all_nodes, all_edges):
"""Post-process edges to resolve bare names to actual node IDs.
The per-file AST extraction produces edges with bare targets:
- calls: target='get_cached_image' (bare function name)
- imports: target='app.core.session' (dotted module path)
This function resolves them to actual node IDs so they survive
the graph build phase (which drops unresolvable targets).
"""
node_ids = {n['id'] for n in all_nodes}
# Build function name β†’ [node_id, ...] index
func_index: dict[str, list[str]] = {}
for n in all_nodes:
if n.get('file_type') == 'code' and '(' in n.get('label', ''):
# label looks like "get_cached_image()"
bare_name = n['label'].rstrip('()')
func_index.setdefault(bare_name, []).append(n['id'])
# Build module path β†’ file node ID map
# e.g. 'app.core.session' β†’ 'app_core_session_py'
module_index: dict[str, str] = {}
for n in all_nodes:
src = n.get('source_file', '')
if src.endswith('.py'):
# Convert 'app/core/session.py' or 'app\core\session.py'
# β†’ dotted module: 'app.core.session'
mod_path = src.replace('\\', '/').replace('/', '.').removesuffix('.py')
# Strip leading __init__ for package imports
mod_path_init = mod_path.removesuffix('.__init__')
nid = n['id']
# Only map file-level nodes (no functions/classes)
if nid == src.replace('\\', '_').replace('/', '_').replace('.', '_'):
module_index[mod_path] = nid
if mod_path != mod_path_init:
module_index[mod_path_init] = nid
resolved_edges = []
calls_resolved = 0
imports_resolved = 0
dropped = 0
for edge in all_edges:
rel = edge.get('relation', '')
if rel == 'calls':
target = edge['target']
# Try exact match first
if target in node_ids:
resolved_edges.append(edge)
calls_resolved += 1
continue
# Resolve via function index
matches = func_index.get(target, [])
if matches:
for match_id in matches:
# Don't create self-edges within the same file
if match_id.rsplit('_', 1)[0] != edge['source'].rsplit('_', 1)[0] or len(matches) == 1:
resolved_edges.append({
**edge,
'target': match_id,
'confidence': 'INFERRED' if len(matches) > 1 else 'EXTRACTED',
'confidence_score': 0.9 if len(matches) == 1 else 0.6,
})
calls_resolved += 1
else:
dropped += 1
elif rel == 'imports':
target = edge['target']
# Try exact match as node ID first
if target in node_ids:
resolved_edges.append(edge)
imports_resolved += 1
continue
# Resolve dotted module path to file node ID
resolved_id = module_index.get(target)
if resolved_id:
resolved_edges.append({**edge, 'target': resolved_id})
imports_resolved += 1
continue
# Try progressively shorter prefixes
# e.g. 'app.core.session.revoke_all' β†’ 'app.core.session' β†’ 'app.core' β†’ 'app'
parts = target.split('.')
found = False
for i in range(len(parts) - 1, 0, -1):
prefix = '.'.join(parts[:i])
resolved_id = module_index.get(prefix)
if resolved_id:
resolved_edges.append({**edge, 'target': resolved_id})
imports_resolved += 1
found = True
break
if not found:
# External/stdlib import β€” drop it
dropped += 1
else:
# defines, has_rationale, etc β€” keep as-is
resolved_edges.append(edge)
print(f" Resolved: {calls_resolved} calls, {imports_resolved} imports, {dropped} dropped (external/stdlib)")
return resolved_edges
def run_ast_extraction(files, use_cache=True):
"""Run AST extraction on all Python files, with caching."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
all_nodes = []
all_edges = []
cached, extracted = 0, 0
# Collect valid cache hashes for cleanup
valid_hashes = set()
py_files = [f for f in files if f['ext'] in AST_EXTENSIONS]
for f in py_files:
fpath = ROOT / f['path']
fhash = hash_file(fpath)
valid_hashes.add(fhash)
cache_file = CACHE_DIR / f"{fhash}.json"
if use_cache and cache_file.exists():
data = json.loads(cache_file.read_text(encoding='utf-8'))
all_nodes.extend(data.get('nodes', []))
all_edges.extend(data.get('edges', []))
cached += 1
else:
nodes, edges = extract_ast_file(fpath)
all_nodes.extend(nodes)
all_edges.extend(edges)
# Write cache
cache_file.write_text(json.dumps({
'nodes': nodes, 'edges': edges,
}, indent=2), encoding='utf-8')
extracted += 1
# Clean stale cache entries
stale = 0
for cache_file in CACHE_DIR.glob('*.json'):
h = cache_file.stem
if h not in valid_hashes:
cache_file.unlink()
stale += 1
print(f" AST: {len(py_files)} Python files ({cached} cached, {extracted} extracted)")
if stale:
print(f" Cache cleanup: {stale} stale entries removed")
print(f" AST: {len(all_nodes)} nodes, {len(all_edges)} edges (raw)")
# Resolve bare targets to actual node IDs
all_edges = _resolve_edges(all_nodes, all_edges)
print(f" AST: {len(all_nodes)} nodes, {len(all_edges)} edges (resolved)")
return all_nodes, all_edges
# ─── Step 3: Semantic Extraction ─────────────────────────────────────────────
def build_semantic_nodes():
"""
Build semantic nodes from documentation files.
These capture high-level architecture concepts that AST can't see.
"""
nodes = []
edges = []
hyperedges = []
# Architecture components from README
arch_nodes = [
("nudr_api", "NudR Stateless API", "README.md"),
("fastapi_backend", "FastAPI Stateless Backend", "README.md"),
("supabase_db", "Supabase PostgreSQL Database", "README.md"),
("redis_cache", "Redis Session & Cache Store", "README.md"),
("cloudflare_proxy", "Cloudflare Edge Proxy", "README.md"),
("stripe_payments", "Stripe Payment Integration", "README.md"),
("firebase_fcm", "Firebase FCM Push Notifications", "README.md"),
("e2ee_encryption", "E2EE X25519 Key Exchange", "README.md"),
("protobuf_framing", "Protobuf Binary WebSocket Framing", "README.md"),
("hmac_verification", "HMAC-SHA256 Request Verification", "README.md"),
("origin_secret", "X-Origin-Secret Middleware", "README.md"),
("pow_challenge", "Proof-of-Work Challenge", "README.md"),
("rate_limiting", "Per-IP Rate Limiting", "README.md"),
("aws_secrets", "AWS Secrets Manager Integration", "README.md"),
("terraform_infra", "Terraform AWS Infrastructure", "README.md"),
("vpc_network", "VPC Network Topology", "README.md"),
("alb_autoscaling", "ALB + Auto Scaling Group", "README.md"),
("lambda_rotator", "Lambda Origin Secret Rotator", "README.md"),
("unified_ws", "Unified WebSocket Endpoint /ws", "README.md"),
("feed_ws", "Feed WebSocket Channel", "README.md"),
("chat_ws", "Chat WebSocket Channel", "README.md"),
("keysync_ws", "Keysync WebSocket Channel", "README.md"),
("discovery_ws", "Discovery WebSocket Channel", "README.md"),
("attack_detection", "Attack Detection & IP Risk Management", "README.md"),
]
for nid, label, src in arch_nodes:
nodes.append({
'id': f"sem_{nid}", 'label': label,
'file_type': 'document', 'source_file': src,
})
# Architecture edges
arch_edges = [
("nudr_api", "fastapi_backend", "implements"),
("fastapi_backend", "supabase_db", "references"),
("fastapi_backend", "redis_cache", "references"),
("cloudflare_proxy", "origin_secret", "references"),
("origin_secret", "lambda_rotator", "references"),
("stripe_payments", "fastapi_backend", "references"),
("firebase_fcm", "fastapi_backend", "references"),
("e2ee_encryption", "keysync_ws", "references"),
("protobuf_framing", "unified_ws", "references"),
("terraform_infra", "vpc_network", "references"),
("terraform_infra", "alb_autoscaling", "references"),
("terraform_infra", "aws_secrets", "references"),
("attack_detection", "rate_limiting", "references"),
("unified_ws", "feed_ws", "conceptually_related_to"),
("unified_ws", "chat_ws", "conceptually_related_to"),
("unified_ws", "keysync_ws", "conceptually_related_to"),
("unified_ws", "discovery_ws", "conceptually_related_to"),
]
for src, tgt, rel in arch_edges:
edges.append({
'source': f"sem_{src}", 'target': f"sem_{tgt}",
'relation': rel,
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'README.md', 'weight': 1.0,
})
# Feed system nodes (from feed_system_documentation.md)
feed_nodes = [
("feed_system", "Feed System Technical Documentation", "PLAN/feed_system_documentation.md"),
("feed_scoring", "Multi-Factor Scoring Algorithm", "PLAN/feed_system_documentation.md"),
("feed_pool", "Feed Pool Computation Pipeline", "PLAN/feed_system_documentation.md"),
("feed_filters", "Feed Hard Filters (12 Rules)", "PLAN/feed_system_documentation.md"),
("feed_heatmap", "Preference Heatmap (Learned AI)", "PLAN/feed_system_documentation.md"),
("feed_reciprocal", "Reciprocal Boost & Injection", "PLAN/feed_system_documentation.md"),
("feed_gradient", "3-Tier Gradient Distribution", "PLAN/feed_system_documentation.md"),
("feed_redis", "Feed Redis Key Schema", "PLAN/feed_system_documentation.md"),
]
for nid, label, src in feed_nodes:
nodes.append({
'id': f"sem_{nid}", 'label': label,
'file_type': 'document', 'source_file': src,
})
feed_edges = [
("feed_system", "nudr_api", "references"),
("feed_pool", "redis_cache", "references"),
("feed_pool", "supabase_db", "references"),
("feed_scoring", "feed_pool", "references"),
("feed_filters", "feed_pool", "references"),
("feed_heatmap", "feed_scoring", "references"),
("feed_reciprocal", "feed_scoring", "references"),
("feed_gradient", "feed_scoring", "references"),
("feed_redis", "redis_cache", "references"),
("feed_system", "feed_ws", "references"),
]
for src, tgt, rel in feed_edges:
edges.append({
'source': f"sem_{src}", 'target': f"sem_{tgt}",
'relation': rel,
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'PLAN/feed_system_documentation.md', 'weight': 1.0,
})
# Logic analysis nodes
logic_nodes = [
("logic_analysis", "Logic-Level Async Issue Audit", "PLAN/LOGIC_ANALYSIS.md"),
("id_ws_reuse", "DISASTROUS: id(ws) Memory Reuse Bug", "PLAN/LOGIC_ANALYSIS.md"),
("token_refresh_crash", "DISASTROUS: Token Refresh Crash Window", "PLAN/LOGIC_ANALYSIS.md"),
("pubsub_crash", "DISASTROUS: PubSub Listener Permanent Crash", "PLAN/LOGIC_ANALYSIS.md"),
("redis_pool_exhaustion", "DISASTROUS: Redis Connection Pool Exhaustion", "PLAN/LOGIC_ANALYSIS.md"),
("preference_race", "Race Condition: Preference Merge", "PLAN/LOGIC_ANALYSIS.md"),
]
for nid, label, src in logic_nodes:
nodes.append({
'id': f"sem_{nid}", 'label': label,
'file_type': 'document', 'source_file': src,
})
logic_edges = [
("id_ws_reuse", "unified_ws", "references"),
("token_refresh_crash", "unified_ws", "references"),
("pubsub_crash", "redis_cache", "references"),
("redis_pool_exhaustion", "redis_cache", "references"),
("preference_race", "supabase_db", "references"),
("logic_analysis", "nudr_api", "references"),
]
for src, tgt, rel in logic_edges:
edges.append({
'source': f"sem_{src}", 'target': f"sem_{tgt}",
'relation': rel,
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'PLAN/LOGIC_ANALYSIS.md', 'weight': 1.0,
})
# Hyperedges
hyperedges = [
{
'id': 'websocket_channels',
'label': 'WebSocket Channel System',
'nodes': ['sem_unified_ws', 'sem_feed_ws', 'sem_chat_ws', 'sem_keysync_ws', 'sem_discovery_ws'],
'relation': 'participate_in',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'README.md',
},
{
'id': 'security_stack',
'label': 'Security Defense Stack',
'nodes': ['sem_hmac_verification', 'sem_origin_secret', 'sem_pow_challenge', 'sem_rate_limiting', 'sem_attack_detection'],
'relation': 'participate_in',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'README.md',
},
{
'id': 'feed_pipeline',
'label': 'Feed Recommendation Pipeline',
'nodes': ['sem_feed_pool', 'sem_feed_filters', 'sem_feed_scoring', 'sem_feed_heatmap', 'sem_feed_reciprocal', 'sem_feed_gradient'],
'relation': 'form',
'confidence': 'EXTRACTED', 'confidence_score': 1.0,
'source_file': 'PLAN/feed_system_documentation.md',
},
]
print(f" Semantic: {len(nodes)} nodes, {len(edges)} edges, {len(hyperedges)} hyperedges")
return nodes, edges, hyperedges
# ─── Step 4: Merge & Build Graph ─────────────────────────────────────────────
def merge_and_build(ast_nodes, ast_edges, sem_nodes, sem_edges, hyperedges):
"""Merge AST + semantic, build NetworkX graph, cluster, analyze."""
from graphify.build import build_from_json
from graphify.cluster import cluster, score_all
from graphify.analyze import god_nodes, surprising_connections, suggest_questions
# Merge: AST first, deduplicate semantic by id
seen = {n['id'] for n in ast_nodes}
merged_nodes = list(ast_nodes)
for n in sem_nodes:
if n['id'] not in seen:
merged_nodes.append(n)
seen.add(n['id'])
merged_edges = ast_edges + sem_edges
extraction = {
'nodes': merged_nodes,
'edges': merged_edges,
'hyperedges': hyperedges,
}
G = build_from_json(extraction)
communities = cluster(G)
cohesion = score_all(G, communities)
gods = god_nodes(G)
surprises = surprising_connections(G, communities)
# Auto-label communities
labels = {}
for cid, members in communities.items():
names = " ".join(members[:10]).lower()
if 'feed' in names and 'service' in names:
labels[cid] = "Feed System"
elif 'feed' in names and ('score' in names or 'pool' in names):
labels[cid] = "Feed Scoring & Pool"
elif 'chat' in names and ('ws' in names or 'websocket' in names):
labels[cid] = "Chat WebSocket"
elif 'keysync' in names or 'key_exchange' in names:
labels[cid] = "Key Exchange & Sync"
elif 'discovery' in names and ('match' in names or 'like' in names):
labels[cid] = "Discovery & Matching"
elif 'auth' in names or 'signup' in names or 'signin' in names:
labels[cid] = "Authentication"
elif 'payment' in names or 'stripe' in names:
labels[cid] = "Payments & Billing"
elif 'setting' in names or 'profile' in names or 'preference' in names:
labels[cid] = "Settings & Profiles"
elif 'consent' in names:
labels[cid] = "Consent System"
elif 'report' in names or 'violation' in names:
labels[cid] = "Reporting & Moderation"
elif 'notification' in names or 'fcm' in names:
labels[cid] = "Push Notifications"
elif 'redis' in names or 'cache' in names:
labels[cid] = "Redis & Caching"
elif 'supabase' in names or 'migration' in names:
labels[cid] = "Database Layer"
elif 'terraform' in names or 'aws' in names or 'vpc' in names:
labels[cid] = "Infrastructure (Terraform)"
elif 'security' in names or 'rate_limit' in names or 'attack' in names:
labels[cid] = "Security & Rate Limiting"
elif 'codec' in names or 'hmac' in names or 'protobuf' in names:
labels[cid] = "WebSocket Codec"
elif 'unified' in names and 'ws' in names:
labels[cid] = "Unified WebSocket"
elif 'token' in names:
labels[cid] = "Token Management"
elif 'image' in names:
labels[cid] = "Image Processing"
elif 'event' in names or 'pending' in names:
labels[cid] = "Event Queue"
elif 'linkup' in names:
labels[cid] = "Linkup System"
elif 'test' in names:
labels[cid] = "Tests"
elif 'nuke' in names or 'script' in names:
labels[cid] = "Utility Scripts"
elif 'email' in names or 'otp' in names:
labels[cid] = "Email & OTP"
elif 'flutter' in names:
labels[cid] = "Flutter Directives"
elif 'readme' in names:
labels[cid] = "API Documentation"
else:
labels[cid] = f"Module Group {cid}"
questions = suggest_questions(G, communities, labels)
print(f" Graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(communities)} communities")
return G, communities, cohesion, labels, gods, surprises, questions, extraction
# ─── Step 5: Generate Outputs ────────────────────────────────────────────────
def generate_outputs(G, communities, cohesion, labels, gods, surprises, questions, detection, extraction):
"""Generate report, HTML, JSON, and manifest."""
from graphify.report import generate
from graphify.export import to_json, to_html
OUT_DIR.mkdir(parents=True, exist_ok=True)
tokens = {'input': 0, 'output': 0}
# Report
report = generate(
G, communities, cohesion, labels, gods, surprises,
detection, tokens, str(ROOT), suggested_questions=questions,
)
REPORT_PATH.write_text(report, encoding='utf-8')
print(f" -> {REPORT_PATH.relative_to(ROOT)}")
# JSON
to_json(G, communities, str(JSON_PATH))
print(f" -> {JSON_PATH.relative_to(ROOT)}")
# HTML
if G.number_of_nodes() <= 5000:
to_html(G, communities, str(HTML_PATH), community_labels=labels)
print(f" -> {HTML_PATH.relative_to(ROOT)}")
else:
print(f" !! Graph too large for HTML ({G.number_of_nodes()} nodes)")
# Manifest
manifest = {}
for f in detection.get('files', []):
manifest[f['path']] = f.get('mtime', 0)
MANIFEST.write_text(json.dumps(manifest, indent=2), encoding='utf-8')
# Cost tracker
if COST_PATH.exists():
cost = json.loads(COST_PATH.read_text(encoding='utf-8'))
else:
cost = {'runs': [], 'total_input_tokens': 0, 'total_output_tokens': 0}
cost['runs'].append({
'date': datetime.now(timezone.utc).isoformat(),
'nodes': G.number_of_nodes(),
'edges': G.number_of_edges(),
'communities': len(communities),
})
COST_PATH.write_text(json.dumps(cost, indent=2), encoding='utf-8')
# ─── Main Pipeline ───────────────────────────────────────────────────────────
def run_pipeline(skip_semantic=False):
"""Execute the full graphify pipeline."""
start = time.time()
print("=" * 60)
print(f"graphify rebuild β€” {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# Step 1: Detect
print("\n[1/5] Detecting files...")
files, total_words = detect_files()
changed = get_changed_files(files)
print(f" Found {len(files)} files ({total_words:,} words)")
print(f" Changed since last build: {len(changed)}")
detection = {
'files': files,
'total_files': len(files),
'total_words': total_words,
'changed_files': len(changed),
}
# Step 2: AST extraction
print("\n[2/5] AST extraction...")
ast_nodes, ast_edges = run_ast_extraction(files)
# Step 3: Semantic extraction
if skip_semantic:
print("\n[3/5] Semantic extraction... SKIPPED (--quick)")
sem_nodes, sem_edges, hyperedges = [], [], []
else:
print("\n[3/5] Semantic extraction...")
sem_nodes, sem_edges, hyperedges = build_semantic_nodes()
# Step 4: Merge & build
print("\n[4/5] Building graph...")
G, communities, cohesion, labels, gods, surprises, questions, extraction = \
merge_and_build(ast_nodes, ast_edges, sem_nodes, sem_edges, hyperedges)
# Step 5: Generate outputs
print("\n[5/5] Generating outputs...")
generate_outputs(G, communities, cohesion, labels, gods, surprises, questions, detection, extraction)
elapsed = time.time() - start
print(f"\n{'=' * 60}")
print(f"Done in {elapsed:.1f}s")
print(f" {G.number_of_nodes()} nodes, {G.number_of_edges()} edges, {len(communities)} communities")
print(f" Open graphify-out/graph.html in your browser")
print(f"{'=' * 60}")
def watch_mode():
"""Watch for file changes and rebuild automatically."""
print("Watching for changes... (Ctrl+C to stop)")
last_mtimes = {}
while True:
try:
changed = False
for dirpath, dirnames, filenames in os.walk(ROOT):
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
for fname in filenames:
fpath = Path(dirpath) / fname
if fpath.suffix.lower() not in CORPUS_EXTENSIONS:
continue
try:
mtime = fpath.stat().st_mtime
except OSError:
continue
key = str(fpath)
if key in last_mtimes and last_mtimes[key] != mtime:
rel = fpath.relative_to(ROOT)
print(f"\n Changed: {rel}")
changed = True
last_mtimes[key] = mtime
if changed:
run_pipeline()
time.sleep(3)
except KeyboardInterrupt:
print("\nStopped watching.")
break
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='NudR Knowledge Graph Rebuild')
parser.add_argument('--watch', action='store_true', help='Watch mode: rebuild on file change')
parser.add_argument('--quick', action='store_true', help='Quick mode: AST-only, skip semantic')
args = parser.parse_args()
if args.watch:
run_pipeline(skip_semantic=args.quick)
watch_mode()
else:
run_pipeline(skip_semantic=args.quick)