Spaces:
Runtime error
Runtime error
| import os | |
| import ast | |
| import asyncio | |
| import math | |
| import sys | |
| # Ensure logos is in path | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| from logos.swarm_engine import AsyncSwarmEngine | |
| # CONFIGURATION | |
| # Adjusted to existing project structure | |
| TARGET_SECTORS = ["./logos/network", "./logos/agents", "./logos"] | |
| class RecursiveShaderMapper: | |
| def __init__(self): | |
| self.swarm = AsyncSwarmEngine() | |
| print("[MAPPER] Recursive Lens Calibrated. Target: Swarm Architecture.") | |
| def get_geometric_complexity(self, source_code): | |
| """ | |
| RNJ-1 PULSE: Calculates the 'Hardness' (Cyclomatic Complexity + Nesting). | |
| Returns a value 0.0 - 1.0 | |
| """ | |
| try: | |
| tree = ast.parse(source_code) | |
| complexity = 0 | |
| depth = 0 | |
| for node in ast.walk(tree): | |
| # Branching adds complexity | |
| if isinstance(node, (ast.If, ast.For, ast.While, ast.Try)): | |
| complexity += 1 | |
| # Nested functions/classes add Depth | |
| if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)): | |
| depth += 1 | |
| # Normalize (Heuristic) | |
| score = (complexity * 0.5 + depth * 1.5) / 50 | |
| return min(score, 1.0) | |
| except: | |
| return 0.0 | |
| def get_semantic_density(self, source_code): | |
| """ | |
| GEMMA PULSE: Calculates the 'Texture' (Comment density, variable length). | |
| Returns a value 0.0 - 1.0 | |
| """ | |
| total_len = len(source_code) | |
| if total_len == 0: return 0.0 | |
| # Count Docstrings and Comments (Approximation) | |
| comment_mass = source_code.count('"""') * 50 + source_code.count('#') * 20 | |
| density = comment_mass / total_len | |
| return min(density * 5, 1.0) # Boost signal | |
| async def interfere_and_map(self): | |
| """ | |
| The Holographic Sweep. | |
| """ | |
| print("[SWEEP] Firing Dual-Pulse Beams...") | |
| found_files = 0 | |
| for sector in TARGET_SECTORS: | |
| sector_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', sector)) | |
| if not os.path.exists(sector_path): | |
| continue | |
| for root, _, files in os.walk(sector_path): | |
| for file in files: | |
| if not file.endswith(".py"): continue | |
| filepath = os.path.join(root, file) | |
| rel_path = os.path.relpath(filepath, os.path.dirname(os.path.dirname(__file__))) | |
| try: | |
| with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
| code = f.read() | |
| except: | |
| continue | |
| # 1. PARALLEL PULSE | |
| geo_wave = self.get_geometric_complexity(code) # RNJ-1 | |
| sem_wave = self.get_semantic_density(code) # GEMMA | |
| # 2. CALCULATE INTERFERENCE | |
| resonance = (geo_wave + sem_wave) / 2 | |
| dissonance = abs(geo_wave - sem_wave) | |
| status = "STABLE" | |
| color = "GOLD" | |
| if dissonance > 0.4: | |
| status = "UNSTABLE (High Friction)" | |
| color = "RED" | |
| elif resonance < 0.1: | |
| status = "HOLLOW (Placeholder)" | |
| color = "BLUE" | |
| # 3. PULSE TO UI | |
| packet = { | |
| "type": "TENSOR_UPDATE", | |
| "node": file, | |
| "origin": "MAPPER", | |
| "sector": sector, | |
| "depth": round(resonance, 3), | |
| "friction": round(dissonance, 3), | |
| "visual_color": color, | |
| "status": status, | |
| "agent_route": "RECURSIVE_MAPPER", | |
| # Compatibility with LogosGraph | |
| "tensor": f"RES: {resonance:.2f} | DIS: {dissonance:.2f}" | |
| } | |
| print(f"[{color}] {rel_path} | Depth: {resonance:.2f} | Friction: {dissonance:.2f} -> {status}") | |
| await self.swarm.broadcast(packet) | |
| found_files += 1 | |
| await asyncio.sleep(0.02) | |
| print(f"\n[COMPLETE] {found_files} files mapped to the manifold.") | |
| if __name__ == "__main__": | |
| mapper = RecursiveShaderMapper() | |
| asyncio.run(mapper.interfere_and_map()) | |