Spaces:
Runtime error
Runtime error
File size: 4,769 Bytes
66b508d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import ast
import asyncio
import math
import sys
# Ensure logos is in path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from logos.swarm_engine import AsyncSwarmEngine
# CONFIGURATION
# Adjusted to existing project structure
TARGET_SECTORS = ["./logos/network", "./logos/agents", "./logos"]
class RecursiveShaderMapper:
def __init__(self):
self.swarm = AsyncSwarmEngine()
print("[MAPPER] Recursive Lens Calibrated. Target: Swarm Architecture.")
def get_geometric_complexity(self, source_code):
"""
RNJ-1 PULSE: Calculates the 'Hardness' (Cyclomatic Complexity + Nesting).
Returns a value 0.0 - 1.0
"""
try:
tree = ast.parse(source_code)
complexity = 0
depth = 0
for node in ast.walk(tree):
# Branching adds complexity
if isinstance(node, (ast.If, ast.For, ast.While, ast.Try)):
complexity += 1
# Nested functions/classes add Depth
if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)):
depth += 1
# Normalize (Heuristic)
score = (complexity * 0.5 + depth * 1.5) / 50
return min(score, 1.0)
except:
return 0.0
def get_semantic_density(self, source_code):
"""
GEMMA PULSE: Calculates the 'Texture' (Comment density, variable length).
Returns a value 0.0 - 1.0
"""
total_len = len(source_code)
if total_len == 0: return 0.0
# Count Docstrings and Comments (Approximation)
comment_mass = source_code.count('"""') * 50 + source_code.count('#') * 20
density = comment_mass / total_len
return min(density * 5, 1.0) # Boost signal
async def interfere_and_map(self):
"""
The Holographic Sweep.
"""
print("[SWEEP] Firing Dual-Pulse Beams...")
found_files = 0
for sector in TARGET_SECTORS:
sector_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', sector))
if not os.path.exists(sector_path):
continue
for root, _, files in os.walk(sector_path):
for file in files:
if not file.endswith(".py"): continue
filepath = os.path.join(root, file)
rel_path = os.path.relpath(filepath, os.path.dirname(os.path.dirname(__file__)))
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
except:
continue
# 1. PARALLEL PULSE
geo_wave = self.get_geometric_complexity(code) # RNJ-1
sem_wave = self.get_semantic_density(code) # GEMMA
# 2. CALCULATE INTERFERENCE
resonance = (geo_wave + sem_wave) / 2
dissonance = abs(geo_wave - sem_wave)
status = "STABLE"
color = "GOLD"
if dissonance > 0.4:
status = "UNSTABLE (High Friction)"
color = "RED"
elif resonance < 0.1:
status = "HOLLOW (Placeholder)"
color = "BLUE"
# 3. PULSE TO UI
packet = {
"type": "TENSOR_UPDATE",
"node": file,
"origin": "MAPPER",
"sector": sector,
"depth": round(resonance, 3),
"friction": round(dissonance, 3),
"visual_color": color,
"status": status,
"agent_route": "RECURSIVE_MAPPER",
# Compatibility with LogosGraph
"tensor": f"RES: {resonance:.2f} | DIS: {dissonance:.2f}"
}
print(f"[{color}] {rel_path} | Depth: {resonance:.2f} | Friction: {dissonance:.2f} -> {status}")
await self.swarm.broadcast(packet)
found_files += 1
await asyncio.sleep(0.02)
print(f"\n[COMPLETE] {found_files} files mapped to the manifold.")
if __name__ == "__main__":
mapper = RecursiveShaderMapper()
asyncio.run(mapper.interfere_and_map())
|