Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import math | |
| import uuid | |
| import re | |
| class EntropyKillSwitch: | |
| """ | |
| PROTOCOL 22: THE PREFIX INTEGRATOR | |
| Monitors the 'Temperature' of the reasoning chain. | |
| """ | |
| def __init__(self, threshold=0.75, window_size=5): | |
| self.threshold = threshold | |
| self.window_size = window_size | |
| self.entropy_trace = [] | |
| self.status = "STABLE" | |
| def calculate_entropy(self, logprobs): | |
| # Converts log probabilities into Shannon Entropy | |
| if not logprobs: return 0.0 | |
| # Handle list vs dict format | |
| if isinstance(logprobs, list): | |
| probs = [math.exp(item.get('logprob', -100)) for item in logprobs] | |
| else: | |
| probs = [math.exp(lp) for lp in logprobs.values()] | |
| total_p = sum(probs) | |
| if total_p == 0: return 1.0 | |
| probs = [p/total_p for p in probs] | |
| return -sum(p * math.log(p) for p in probs if p > 0) | |
| def monitor(self, token, logprobs): | |
| # The "Kill" Logic | |
| current_entropy = self.calculate_entropy(logprobs) | |
| self.entropy_trace.append(current_entropy) | |
| if len(self.entropy_trace) > self.window_size: | |
| self.entropy_trace.pop(0) | |
| avg_entropy = np.mean(self.entropy_trace) if self.entropy_trace else 0 | |
| # If uncertainty spikes, we kill the stream | |
| if avg_entropy > self.threshold: | |
| self.status = "HALLUCINATION_DETECTED" | |
| return True | |
| self.status = "STABLE" | |
| return False | |
| def monitor_bulk(self, logprobs_content): | |
| """ | |
| PROTOCOL 22: Parallel Wave Integration. | |
| Analyzes bulk telemetry from reasoning waves. | |
| """ | |
| if not logprobs_content: | |
| return | |
| temp_trace = [] | |
| for entry in logprobs_content: | |
| # Entry usually has 'top_logprobs' which we can use for entropy | |
| top_lp = entry.get('top_logprobs', []) | |
| if not top_lp: | |
| # Fallback to single logprob (minimal entropy but better than nothing) | |
| lp = entry.get('logprob', -100) | |
| entropy = - (math.exp(lp) * lp) if lp > -20 else 1.0 # Rough approximation | |
| else: | |
| # Calculate from full distribution | |
| dist = {e.get('token'): e.get('logprob') for e in top_lp} | |
| entropy = self.calculate_entropy(dist) | |
| temp_trace.append(entropy) | |
| if temp_trace: | |
| # We take the mean of the bulk message as a single data point or fill the trace | |
| batch_avg = np.mean(temp_trace) | |
| self.entropy_trace.append(batch_avg) | |
| if batch_avg > self.threshold: | |
| self.status = "HALLUCINATION_DETECTED" | |
| else: | |
| self.status = "STABLE" | |
| if len(self.entropy_trace) > self.window_size: | |
| self.entropy_trace.pop(0) | |
| class DolphinOversight: | |
| def __init__(self, swarm_state=None): | |
| self.name = "Dolphin-x1-8b" | |
| self.kill_switch = EntropyKillSwitch(threshold=0.8) # Tunable sensitivity | |
| self.state = swarm_state or {} | |
| def strip_context(self, raw_input): | |
| """ | |
| Sanitizes input to remove conversational fluff, isolating the core directive. | |
| """ | |
| # 1. Remove common conversational prefixes | |
| clean_text = re.sub(r'^(please|can you|would you|swarm|logos|logo)\s+', '', raw_input, flags=re.IGNORECASE) | |
| return clean_text.strip() | |
| def ingest(self, user_packet): | |
| """ | |
| The main entry point for Protocol 18. | |
| """ | |
| packet_id = str(uuid.uuid4())[:8] | |
| core_intent = self.strip_context(user_packet['content']) | |
| previous_node_id = self.state.get('last_node', 1) | |
| print(f"[{self.name}] Packet {packet_id} Ingested.") | |
| logos_packet = { | |
| "id": packet_id, | |
| "type": user_packet.get('type', 'text_command'), | |
| "origin_node": previous_node_id, | |
| "intent": core_intent, | |
| "content": user_packet['content'], | |
| "meta_tags": [], | |
| "target_coords": None | |
| } | |
| return self.route_packet(logos_packet) | |
| def route_packet(self, packet): | |
| # Dolphin Deciphers Intent | |
| print(f"[{self.name}] Decoding Intent: '{packet['intent']}'") | |
| if "image" in packet['type'] or "scan" in packet['intent'].lower() or ".py" in packet['intent']: | |
| return "HANDOFF_TO_GEMMA", packet | |
| else: | |
| return "HANDOFF_TO_RNJ1", packet | |
| def mhs_smoothing(self, tensor_coords, mass): | |
| """ | |
| Protocol 22: Manifold Harmonic Smoothing (mhs). | |
| """ | |
| node_id = tensor_coords.get('destination_node', 1) | |
| resonance = tensor_coords.get('resonance', 'STOCHASTIC_NOISE') | |
| delta_heat = tensor_coords.get('delta_heat', 0) | |
| # If noise is detected, nudge towards nearest prime anchor | |
| if resonance == "STOCHASTIC_NOISE": | |
| anchors = [1, 3, 7, 9] | |
| if (node_id % 10) not in anchors: | |
| for i in range(1, 6): | |
| if ((node_id + i) % 10) in anchors: | |
| node_id += i | |
| break | |
| elif ((node_id - i) % 10) in anchors: | |
| node_id -= i | |
| break | |
| # Apply 'Mass Dampening' if complexity is too high | |
| dampened_heat = delta_heat / (1 + mass/100) | |
| return { | |
| "node_id": node_id, | |
| "fidelity": 1.0 - (abs(dampened_heat) * 0.01), | |
| "status": "MHS_STABILIZED" if not self.entropy_kill_switch(dampened_heat) else "KILL_SWITCH_ACTIVE_TURBULENCE" | |
| } | |
| def entropy_kill_switch(self, delta_heat): | |
| """ | |
| Legacy heat-based kill switch for MHS logic. | |
| """ | |
| threshold = 500 | |
| if abs(delta_heat) > threshold: | |
| print(f"[{self.name}] ⚠️ !ENTROPY_CRITICAL! Delta Heat: {delta_heat}") | |
| return True | |
| return False | |
| async def verify_output_stream(self, generator_stream): | |
| """ | |
| Wraps the LLM output stream. | |
| Filtering text through the Prefix Integrator Entropy Kill Switch. | |
| """ | |
| verified_text = "" | |
| async for token_data in generator_stream: | |
| token = token_data.get('text', '') | |
| logprobs = token_data.get('logprobs', None) | |
| # CHECK ENTROPY | |
| should_kill = self.kill_switch.monitor(token, logprobs) | |
| if should_kill: | |
| print(f"[{self.name}] 🚫 KILL SWITCH TRIGGERED! Entropy: {self.kill_switch.entropy_trace[-1]:.2f}") | |
| yield "[SYSTEM INTERRUPT: HALLUCINATION DETECTED. RE-ROUTING...]" | |
| self.trigger_correction() | |
| break | |
| verified_text += token | |
| yield token | |
| def trigger_correction(self): | |
| # Update Swarm State to 'CAUTION' mode | |
| self.state['active_mode'] = "HIGH_PRECISION" | |
| print(f"[{self.name}] Swarm Mode shifted to HIGH_PRECISION due to entropy spike.") | |