LOGOS-SPCW-Matroska / logos /recursive_mapper.py
GitHub Copilot
Upgrade to Protocol 26: Gödel-Zeta Datastore & Recursive Manifold
66b508d
import os
import ast
import asyncio
import math
import sys
# Ensure logos is in path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from logos.swarm_engine import AsyncSwarmEngine
# CONFIGURATION
# Adjusted to existing project structure
TARGET_SECTORS = ["./logos/network", "./logos/agents", "./logos"]
class RecursiveShaderMapper:
def __init__(self):
self.swarm = AsyncSwarmEngine()
print("[MAPPER] Recursive Lens Calibrated. Target: Swarm Architecture.")
def get_geometric_complexity(self, source_code):
"""
RNJ-1 PULSE: Calculates the 'Hardness' (Cyclomatic Complexity + Nesting).
Returns a value 0.0 - 1.0
"""
try:
tree = ast.parse(source_code)
complexity = 0
depth = 0
for node in ast.walk(tree):
# Branching adds complexity
if isinstance(node, (ast.If, ast.For, ast.While, ast.Try)):
complexity += 1
# Nested functions/classes add Depth
if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)):
depth += 1
# Normalize (Heuristic)
score = (complexity * 0.5 + depth * 1.5) / 50
return min(score, 1.0)
except:
return 0.0
def get_semantic_density(self, source_code):
"""
GEMMA PULSE: Calculates the 'Texture' (Comment density, variable length).
Returns a value 0.0 - 1.0
"""
total_len = len(source_code)
if total_len == 0: return 0.0
# Count Docstrings and Comments (Approximation)
comment_mass = source_code.count('"""') * 50 + source_code.count('#') * 20
density = comment_mass / total_len
return min(density * 5, 1.0) # Boost signal
async def interfere_and_map(self):
"""
The Holographic Sweep.
"""
print("[SWEEP] Firing Dual-Pulse Beams...")
found_files = 0
for sector in TARGET_SECTORS:
sector_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', sector))
if not os.path.exists(sector_path):
continue
for root, _, files in os.walk(sector_path):
for file in files:
if not file.endswith(".py"): continue
filepath = os.path.join(root, file)
rel_path = os.path.relpath(filepath, os.path.dirname(os.path.dirname(__file__)))
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
except:
continue
# 1. PARALLEL PULSE
geo_wave = self.get_geometric_complexity(code) # RNJ-1
sem_wave = self.get_semantic_density(code) # GEMMA
# 2. CALCULATE INTERFERENCE
resonance = (geo_wave + sem_wave) / 2
dissonance = abs(geo_wave - sem_wave)
status = "STABLE"
color = "GOLD"
if dissonance > 0.4:
status = "UNSTABLE (High Friction)"
color = "RED"
elif resonance < 0.1:
status = "HOLLOW (Placeholder)"
color = "BLUE"
# 3. PULSE TO UI
packet = {
"type": "TENSOR_UPDATE",
"node": file,
"origin": "MAPPER",
"sector": sector,
"depth": round(resonance, 3),
"friction": round(dissonance, 3),
"visual_color": color,
"status": status,
"agent_route": "RECURSIVE_MAPPER",
# Compatibility with LogosGraph
"tensor": f"RES: {resonance:.2f} | DIS: {dissonance:.2f}"
}
print(f"[{color}] {rel_path} | Depth: {resonance:.2f} | Friction: {dissonance:.2f} -> {status}")
await self.swarm.broadcast(packet)
found_files += 1
await asyncio.sleep(0.02)
print(f"\n[COMPLETE] {found_files} files mapped to the manifold.")
if __name__ == "__main__":
mapper = RecursiveShaderMapper()
asyncio.run(mapper.interfere_and_map())