Spaces:
Running
Running
| import gradio as gr | |
| import random | |
| from typing import List, Set, Tuple, Optional | |
| from dataclasses import dataclass | |
| import time | |
| # ========== КОНФИГУРАЦИЯ ========== | |
| class Config: | |
| num_neurons: int = 1000 | |
| depth: int = 3 | |
| input_size: int = 256 | |
| output_size: int = 256 | |
| spike_threshold: int = 1000 | |
| input_signal_strength: int = 1000 | |
| signal_scale_shift: int = 8 | |
| explore_noise_scale: int = 64 | |
| sparsity: float = 0.05 | |
| neighbor_radius: int = 10 | |
| hebbian_step_threshold: int = 10 | |
| sigma_decay_shift: int = 8 | |
| dopamine_decay: int = 9 | |
| homeostasis_interval_steps: int = 100 | |
| prune_interval_steps: int = 200 | |
| min_neurons: int = 100 | |
| target_activity: int = 100 | |
| # ========== НЕЙРОНЫ ========== | |
| class Neuron: | |
| def __init__(self, idx: int, threshold: int = 1000, noise_scale: int = 0): | |
| self.idx = idx | |
| self.potential = 0 | |
| self.threshold = threshold | |
| self.fired = False | |
| self.refractory = 0 | |
| self.neuron_sigma = 0 | |
| self.noise_scale = noise_scale | |
| self.noise_accum = (idx * 1103515245 + 12345) & 0x7FFFFFFF | |
| self.outgoing_synapses = [] | |
| self.outgoing_neurons = [] | |
| self.total_fires = 0 | |
| self.neuron_type = "basic" | |
| def tick(self, step_count: int) -> Tuple[bool, int]: | |
| if self.refractory > 0: | |
| self.refractory -= 1 | |
| return False, 0 | |
| if self.potential >= self.threshold: | |
| self.fired = True | |
| output = self.potential | |
| self.refractory = 2 | |
| self.neuron_sigma += 1 | |
| self.total_fires += 1 | |
| return True, output | |
| self.fired = False | |
| return False, 0 | |
| def integrate(self, signal: int): | |
| if self.refractory == 0: | |
| self.potential += signal | |
| def fire_ternary(self) -> int: | |
| if self.fired: | |
| if self.potential >= self.threshold + (self.threshold >> 2): | |
| return 1 | |
| elif self.potential <= self.threshold - (self.threshold >> 2): | |
| return -1 | |
| return 0 | |
| def propagate(self, shift: int) -> List[Tuple[int, int]]: | |
| if not self.fired: | |
| return [] | |
| ternary = self.fire_ternary() | |
| if ternary == 0: | |
| self.potential = 0 | |
| return [] | |
| updates = [] | |
| for tgt, syn in zip(self.outgoing_neurons, self.outgoing_synapses): | |
| signal = ternary * syn.weight | |
| if signal != 0: | |
| updates.append((tgt, signal << shift)) | |
| self.potential = 0 | |
| return updates | |
| def get_logit_with_noise(self) -> int: | |
| self.noise_accum = (self.noise_accum * 1103515245 + 12345) & 0x7FFFFFFF | |
| if self.noise_scale == 0: | |
| return self.potential | |
| noise = (self.noise_accum % (self.noise_scale * 2 + 1)) - self.noise_scale | |
| return self.potential + noise | |
| def is_protected(self) -> bool: | |
| return False | |
| class ExcitatoryNeuron(Neuron): | |
| def __init__(self, idx: int, threshold: int = 1000): | |
| super().__init__(idx, threshold) | |
| self.neuron_type = "excitatory" | |
| def fire_ternary(self) -> int: | |
| return 1 if self.fired else 0 | |
| class InhibitoryNeuron(Neuron): | |
| def __init__(self, idx: int, threshold: int = 1000): | |
| super().__init__(idx, threshold) | |
| self.neuron_type = "inhibitory" | |
| def fire_ternary(self) -> int: | |
| return -1 if self.fired else 0 | |
| class OscillatorNeuron(Neuron): | |
| def __init__(self, idx: int, period_steps: int, threshold: int = 1000): | |
| super().__init__(idx, threshold) | |
| self.neuron_type = "oscillator" | |
| self.period_steps = period_steps | |
| self.step_counter = 0 | |
| def tick(self, step_count: int) -> Tuple[bool, int]: | |
| self.step_counter += 1 | |
| if self.step_counter >= self.period_steps: | |
| self.step_counter = 0 | |
| self.fired = True | |
| return True, self.threshold | |
| return False, 0 | |
| def is_protected(self) -> bool: | |
| return True | |
| class CategoryDetectorNeuron(Neuron): | |
| def __init__(self, idx: int, category_bytes: List[int], threshold: int = 1000): | |
| super().__init__(idx, threshold) | |
| self.neuron_type = "category_detector" | |
| self.category_bytes = set(category_bytes) | |
| def integrate(self, signal: int, input_byte: Optional[int] = None): | |
| if input_byte is not None and input_byte in self.category_bytes: | |
| super().integrate(signal) | |
| def is_protected(self) -> bool: | |
| return True | |
| # ========== СИНАПС ========== | |
| class Synapse: | |
| def __init__(self, source: int, target: int, weight: int = 0): | |
| self.source = source | |
| self.target = target | |
| self.weight = weight | |
| self.G = 0 | |
| self.sigma = 0 | |
| self.bcm_theta = 512 | |
| def hebbian_update(self, pre_fired: bool, post_fired: bool, | |
| post_potential: int, dopamine_gain: int = 512): | |
| if pre_fired and post_fired: | |
| self.sigma += dopamine_gain | |
| if abs(self.sigma) >= 10: | |
| delta = 1 if self.sigma > 0 else -1 | |
| self.G = max(-1, min(1, self.G + delta)) | |
| self.sigma = 0 | |
| else: | |
| self.sigma = (self.sigma * 255) >> 8 | |
| def homeostasis(self, target_activity: int = 100): | |
| current = abs(self.weight) * 100 | |
| if current > target_activity: | |
| scale = (target_activity << 8) // current | |
| self.weight = (self.weight * scale) >> 8 | |
| # ========== ГРАФ ========== | |
| class Graph: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self._n = config.num_neurons | |
| self.synapses: List[Synapse] = [] | |
| self._incoming_count = [0] * self._n | |
| self._build() | |
| def _build(self): | |
| for i in range(self._n): | |
| for j in range(max(0, i - self.config.neighbor_radius), | |
| min(self._n, i + self.config.neighbor_radius + 1)): | |
| if i != j and random.random() < self.config.sparsity: | |
| self.synapses.append(Synapse(i, j)) | |
| self._update_incoming_counts() | |
| def _update_incoming_counts(self): | |
| self._incoming_count = [0] * self._n | |
| for syn in self.synapses: | |
| self._incoming_count[syn.target] += 1 | |
| def get_incoming_count(self, idx: int) -> int: | |
| return self._incoming_count[idx] if idx < len(self._incoming_count) else 0 | |
| def get_outgoing_count(self, idx: int) -> int: | |
| return sum(1 for s in self.synapses if s.source == idx) | |
| def get_all_synapses(self) -> List[Synapse]: | |
| return self.synapses | |
| def remove_neuron(self, idx: int): | |
| self.synapses = [s for s in self.synapses if s.source != idx and s.target != idx] | |
| self._update_incoming_counts() | |
| def n(self) -> int: | |
| return self._n | |
| def num_edges(self) -> int: | |
| return len(self.synapses) | |
| # ========== ПЛАСТИЧНОСТЬ ========== | |
| class Plasticity: | |
| def __init__(self, graph: Graph, config: Config): | |
| self.graph = graph | |
| self.config = config | |
| def hebbian_update(self, active_neurons: Set[int], potentials: List[int], | |
| dopamine_gain: int = 512): | |
| for src in active_neurons: | |
| for syn in self.graph.synapses: | |
| if syn.source == src: | |
| post_active = syn.target in active_neurons | |
| post_pot = potentials[syn.target] if post_active else 0 | |
| syn.hebbian_update(True, post_active, post_pot, dopamine_gain) | |
| def homeostasis_all(self): | |
| for syn in self.graph.synapses: | |
| syn.homeostasis(self.config.target_activity) | |
| # ========== ЯДРО AURA ========== | |
| class AuraCore: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.graph = Graph(config) | |
| self.plasticity = Plasticity(self.graph, config) | |
| self.neurons: List[Neuron] = [] | |
| self._init_neurons() | |
| self._init_category_detectors() | |
| self._init_oscillators() | |
| self._build_caches() | |
| self.input_neurons = list(range(min(config.input_size, len(self.neurons)))) | |
| self.output_neurons = list(range(min(config.output_size, len(self.neurons)))) | |
| self.step_count = 0 | |
| self.active_neurons: Set[int] = set() | |
| self.dopamine_trace = 0 | |
| self.dopamine_gain = 512 | |
| self.homeostasis_osc = self.neurons[-2] | |
| self.prune_osc = self.neurons[-1] | |
| def _init_neurons(self): | |
| n = self.config.num_neurons | |
| n_exc = int(n * 0.70) | |
| n_inh = int(n * 0.15) | |
| idx = 0 | |
| for _ in range(n_exc): | |
| self.neurons.append(ExcitatoryNeuron(idx, self.config.spike_threshold)) | |
| idx += 1 | |
| for _ in range(n_inh): | |
| self.neurons.append(InhibitoryNeuron(idx, self.config.spike_threshold)) | |
| idx += 1 | |
| while idx < n: | |
| self.neurons.append(Neuron(idx, self.config.spike_threshold)) | |
| idx += 1 | |
| def _init_category_detectors(self): | |
| self.neurons.append(CategoryDetectorNeuron( | |
| len(self.neurons), list(range(48, 58)), self.config.spike_threshold | |
| )) | |
| self.neurons.append(CategoryDetectorNeuron( | |
| len(self.neurons), list(range(65, 91)), self.config.spike_threshold | |
| )) | |
| self.neurons.append(CategoryDetectorNeuron( | |
| len(self.neurons), list(range(97, 123)), self.config.spike_threshold | |
| )) | |
| self.neurons.append(CategoryDetectorNeuron( | |
| len(self.neurons), [9, 10, 32], self.config.spike_threshold | |
| )) | |
| def _init_oscillators(self): | |
| self.neurons.append(OscillatorNeuron( | |
| len(self.neurons), self.config.homeostasis_interval_steps, self.config.spike_threshold | |
| )) | |
| self.neurons.append(OscillatorNeuron( | |
| len(self.neurons), self.config.prune_interval_steps, self.config.spike_threshold | |
| )) | |
| def _build_caches(self): | |
| for n in self.neurons: | |
| n.outgoing_synapses = [] | |
| n.outgoing_neurons = [] | |
| for syn in self.graph.synapses: | |
| if syn.source < len(self.neurons): | |
| self.neurons[syn.source].outgoing_synapses.append(syn) | |
| self.neurons[syn.source].outgoing_neurons.append(syn.target) | |
| def forward(self, input_byte: int, reward: int = 0, explore: bool = True) -> int: | |
| self.dopamine_trace = (self.dopamine_trace * self.config.dopamine_decay + reward * 10) // 10 | |
| self.dopamine_trace = max(0, min(1024, self.dopamine_trace)) | |
| self.dopamine_gain = 512 + self.dopamine_trace | |
| if self.homeostasis_osc.tick(self.step_count)[0]: | |
| self.plasticity.homeostasis_all() | |
| if self.prune_osc.tick(self.step_count)[0]: | |
| self._prune_isolated() | |
| input_idx = self.input_neurons[input_byte % len(self.input_neurons)] | |
| input_neuron = self.neurons[input_idx] | |
| if isinstance(input_neuron, CategoryDetectorNeuron): | |
| input_neuron.integrate(self.config.input_signal_strength, input_byte=input_byte) | |
| else: | |
| input_neuron.integrate(self.config.input_signal_strength) | |
| self.active_neurons.clear() | |
| for _ in range(self.config.depth): | |
| fired_this_step = [] | |
| for i, n in enumerate(self.neurons): | |
| if n.tick(self.step_count)[0]: | |
| fired_this_step.append(n) | |
| self.active_neurons.add(i) | |
| for src in fired_this_step: | |
| updates = src.propagate(self.config.signal_scale_shift) | |
| for tgt, signal in updates: | |
| if tgt < len(self.neurons): | |
| self.neurons[tgt].integrate(signal) | |
| potentials = [n.potential for n in self.neurons] | |
| self.plasticity.hebbian_update(self.active_neurons, potentials, self.dopamine_gain) | |
| logits = [0] * self.config.output_size | |
| for byte in range(self.config.output_size): | |
| idx = self.output_neurons[byte] | |
| if idx < len(self.neurons): | |
| logits[byte] = self.neurons[idx].get_logit_with_noise() if explore else self.neurons[idx].potential | |
| next_byte = max(range(self.config.output_size), key=lambda i: logits[i]) | |
| self.step_count += 1 | |
| return next_byte | |
| def _prune_isolated(self): | |
| if self.graph.n <= self.config.min_neurons: | |
| return | |
| dead = [] | |
| for i, n in enumerate(self.neurons): | |
| if i < self.config.input_size or n.is_protected(): | |
| continue | |
| if self.graph.get_incoming_count(i) == 0 and self.graph.get_outgoing_count(i) == 0: | |
| dead.append(i) | |
| if dead and self.graph.n - len(dead) >= self.config.min_neurons: | |
| for idx in sorted(dead, reverse=True): | |
| self.graph.remove_neuron(idx) | |
| del self.neurons[idx] | |
| self._build_caches() | |
| # ========== ИНИЦИАЛИЗАЦИЯ МОДЕЛИ ========== | |
| print("🚀 Инициализация AURA...") | |
| config = Config() | |
| config.num_neurons = 200 | |
| config.depth = 3 | |
| core = AuraCore(config) | |
| print(f"✅ Модель готова: {len(core.neurons)} нейронов, {core.graph.num_edges} синапсов") | |
| # ========== ФУНКЦИИ ДЛЯ GRADIO ========== | |
| def generate_text(prompt: str, steps: int = 100, temperature: float = 1.0, reward: int = 0) -> str: | |
| """Генерация текста из промпта.""" | |
| if not prompt: | |
| prompt = "A" | |
| # Устанавливаем шум в зависимости от температуры | |
| core.config.explore_noise_scale = int(64 * temperature) | |
| start_time = time.time() | |
| # Подаём промпт | |
| for ch in prompt: | |
| core.forward(ord(ch), reward=reward, explore=True) | |
| # Генерируем | |
| result = list(prompt) | |
| current = ord(prompt[-1]) | |
| for _ in range(steps): | |
| current = core.forward(current, reward=reward, explore=True) | |
| if 32 <= current <= 126 or current in (9, 10, 13): | |
| result.append(chr(current)) | |
| elif current == 0: | |
| result.append(' ') | |
| else: | |
| result.append(chr(32 + (current % 95))) | |
| elapsed = time.time() - start_time | |
| stats = f""" | |
| ⏱️ Время: {elapsed:.2f} сек | |
| 📊 Шагов: {steps} | |
| 🔥 Активных нейронов: {len(core.active_neurons)} | |
| 💪 Дофамин: {core.dopamine_trace} | |
| """ | |
| return ''.join(result), stats | |
| def reset_model(): | |
| """Сброс модели.""" | |
| global core | |
| core = AuraCore(config) | |
| return "✅ Модель сброшена" | |
| # ========== ИНТЕРФЕЙС GRADIO ========== | |
| with gr.Blocks(title="AURA — Спайковая нейросеть") as demo: | |
| gr.Markdown(""" | |
| # 🧠 AURA — Спайковая нейросеть с Hebbian-обучением | |
| Минимальная версия ядра AURA. Модель обучается на лету через локальную пластичность. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| prompt_input = gr.Textbox( | |
| label="📝 Промпт", | |
| placeholder="Введите текст для генерации...", | |
| value="Hello", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| steps_slider = gr.Slider( | |
| label="📏 Длина генерации", | |
| minimum=10, | |
| maximum=500, | |
| value=100, | |
| step=10 | |
| ) | |
| temp_slider = gr.Slider( | |
| label="🌡️ Temperature (креативность)", | |
| minimum=0.1, | |
| maximum=2.0, | |
| value=1.0, | |
| step=0.1 | |
| ) | |
| reward_slider = gr.Slider( | |
| label="🏆 Reward (награда за шаг)", | |
| minimum=0, | |
| maximum=100, | |
| value=0, | |
| step=10 | |
| ) | |
| generate_btn = gr.Button("🚀 Сгенерировать", variant="primary", size="lg") | |
| reset_btn = gr.Button("🔄 Сбросить модель", variant="secondary") | |
| with gr.Column(scale=3): | |
| output_text = gr.Textbox( | |
| label="✨ Сгенерированный текст", | |
| lines=10, | |
| max_lines=20 | |
| ) | |
| stats_text = gr.Textbox( | |
| label="📊 Статистика", | |
| lines=5 | |
| ) | |
| generate_btn.click( | |
| fn=generate_text, | |
| inputs=[prompt_input, steps_slider, temp_slider, reward_slider], | |
| outputs=[output_text, stats_text] | |
| ) | |
| reset_btn.click( | |
| fn=reset_model, | |
| inputs=[], | |
| outputs=[output_text] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### О модели | |
| - **Архитектура**: Спайковая нейросеть на разреженном графе | |
| - **Обучение**: Hebbian-пластичность (локальные правила) | |
| - **Нейронов**: 800 (70% excitatory, 15% inhibitory) | |
| - **Категориальные детекторы**: цифры, буквы, пробелы | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| theme=gr.themes.Soft() | |
| ) |