import os import ast import asyncio import math import sys # Ensure logos is in path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from logos.swarm_engine import AsyncSwarmEngine # CONFIGURATION # Adjusted to existing project structure TARGET_SECTORS = ["./logos/network", "./logos/agents", "./logos"] class RecursiveShaderMapper: def __init__(self): self.swarm = AsyncSwarmEngine() print("[MAPPER] Recursive Lens Calibrated. Target: Swarm Architecture.") def get_geometric_complexity(self, source_code): """ RNJ-1 PULSE: Calculates the 'Hardness' (Cyclomatic Complexity + Nesting). Returns a value 0.0 - 1.0 """ try: tree = ast.parse(source_code) complexity = 0 depth = 0 for node in ast.walk(tree): # Branching adds complexity if isinstance(node, (ast.If, ast.For, ast.While, ast.Try)): complexity += 1 # Nested functions/classes add Depth if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)): depth += 1 # Normalize (Heuristic) score = (complexity * 0.5 + depth * 1.5) / 50 return min(score, 1.0) except: return 0.0 def get_semantic_density(self, source_code): """ GEMMA PULSE: Calculates the 'Texture' (Comment density, variable length). Returns a value 0.0 - 1.0 """ total_len = len(source_code) if total_len == 0: return 0.0 # Count Docstrings and Comments (Approximation) comment_mass = source_code.count('"""') * 50 + source_code.count('#') * 20 density = comment_mass / total_len return min(density * 5, 1.0) # Boost signal async def interfere_and_map(self): """ The Holographic Sweep. """ print("[SWEEP] Firing Dual-Pulse Beams...") found_files = 0 for sector in TARGET_SECTORS: sector_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', sector)) if not os.path.exists(sector_path): continue for root, _, files in os.walk(sector_path): for file in files: if not file.endswith(".py"): continue filepath = os.path.join(root, file) rel_path = os.path.relpath(filepath, os.path.dirname(os.path.dirname(__file__))) try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() except: continue # 1. PARALLEL PULSE geo_wave = self.get_geometric_complexity(code) # RNJ-1 sem_wave = self.get_semantic_density(code) # GEMMA # 2. CALCULATE INTERFERENCE resonance = (geo_wave + sem_wave) / 2 dissonance = abs(geo_wave - sem_wave) status = "STABLE" color = "GOLD" if dissonance > 0.4: status = "UNSTABLE (High Friction)" color = "RED" elif resonance < 0.1: status = "HOLLOW (Placeholder)" color = "BLUE" # 3. PULSE TO UI packet = { "type": "TENSOR_UPDATE", "node": file, "origin": "MAPPER", "sector": sector, "depth": round(resonance, 3), "friction": round(dissonance, 3), "visual_color": color, "status": status, "agent_route": "RECURSIVE_MAPPER", # Compatibility with LogosGraph "tensor": f"RES: {resonance:.2f} | DIS: {dissonance:.2f}" } print(f"[{color}] {rel_path} | Depth: {resonance:.2f} | Friction: {dissonance:.2f} -> {status}") await self.swarm.broadcast(packet) found_files += 1 await asyncio.sleep(0.02) print(f"\n[COMPLETE] {found_files} files mapped to the manifold.") if __name__ == "__main__": mapper = RecursiveShaderMapper() asyncio.run(mapper.interfere_and_map())