aur / app.py
Andrewstivan's picture
Update app.py
6aeec30 verified
import gradio as gr
import random
from typing import List, Set, Tuple, Optional
from dataclasses import dataclass
import time
# ========== КОНФИГУРАЦИЯ ==========
@dataclass
class Config:
num_neurons: int = 1000
depth: int = 3
input_size: int = 256
output_size: int = 256
spike_threshold: int = 1000
input_signal_strength: int = 1000
signal_scale_shift: int = 8
explore_noise_scale: int = 64
sparsity: float = 0.05
neighbor_radius: int = 10
hebbian_step_threshold: int = 10
sigma_decay_shift: int = 8
dopamine_decay: int = 9
homeostasis_interval_steps: int = 100
prune_interval_steps: int = 200
min_neurons: int = 100
target_activity: int = 100
# ========== НЕЙРОНЫ ==========
class Neuron:
def __init__(self, idx: int, threshold: int = 1000, noise_scale: int = 0):
self.idx = idx
self.potential = 0
self.threshold = threshold
self.fired = False
self.refractory = 0
self.neuron_sigma = 0
self.noise_scale = noise_scale
self.noise_accum = (idx * 1103515245 + 12345) & 0x7FFFFFFF
self.outgoing_synapses = []
self.outgoing_neurons = []
self.total_fires = 0
self.neuron_type = "basic"
def tick(self, step_count: int) -> Tuple[bool, int]:
if self.refractory > 0:
self.refractory -= 1
return False, 0
if self.potential >= self.threshold:
self.fired = True
output = self.potential
self.refractory = 2
self.neuron_sigma += 1
self.total_fires += 1
return True, output
self.fired = False
return False, 0
def integrate(self, signal: int):
if self.refractory == 0:
self.potential += signal
def fire_ternary(self) -> int:
if self.fired:
if self.potential >= self.threshold + (self.threshold >> 2):
return 1
elif self.potential <= self.threshold - (self.threshold >> 2):
return -1
return 0
def propagate(self, shift: int) -> List[Tuple[int, int]]:
if not self.fired:
return []
ternary = self.fire_ternary()
if ternary == 0:
self.potential = 0
return []
updates = []
for tgt, syn in zip(self.outgoing_neurons, self.outgoing_synapses):
signal = ternary * syn.weight
if signal != 0:
updates.append((tgt, signal << shift))
self.potential = 0
return updates
def get_logit_with_noise(self) -> int:
self.noise_accum = (self.noise_accum * 1103515245 + 12345) & 0x7FFFFFFF
if self.noise_scale == 0:
return self.potential
noise = (self.noise_accum % (self.noise_scale * 2 + 1)) - self.noise_scale
return self.potential + noise
def is_protected(self) -> bool:
return False
class ExcitatoryNeuron(Neuron):
def __init__(self, idx: int, threshold: int = 1000):
super().__init__(idx, threshold)
self.neuron_type = "excitatory"
def fire_ternary(self) -> int:
return 1 if self.fired else 0
class InhibitoryNeuron(Neuron):
def __init__(self, idx: int, threshold: int = 1000):
super().__init__(idx, threshold)
self.neuron_type = "inhibitory"
def fire_ternary(self) -> int:
return -1 if self.fired else 0
class OscillatorNeuron(Neuron):
def __init__(self, idx: int, period_steps: int, threshold: int = 1000):
super().__init__(idx, threshold)
self.neuron_type = "oscillator"
self.period_steps = period_steps
self.step_counter = 0
def tick(self, step_count: int) -> Tuple[bool, int]:
self.step_counter += 1
if self.step_counter >= self.period_steps:
self.step_counter = 0
self.fired = True
return True, self.threshold
return False, 0
def is_protected(self) -> bool:
return True
class CategoryDetectorNeuron(Neuron):
def __init__(self, idx: int, category_bytes: List[int], threshold: int = 1000):
super().__init__(idx, threshold)
self.neuron_type = "category_detector"
self.category_bytes = set(category_bytes)
def integrate(self, signal: int, input_byte: Optional[int] = None):
if input_byte is not None and input_byte in self.category_bytes:
super().integrate(signal)
def is_protected(self) -> bool:
return True
# ========== СИНАПС ==========
class Synapse:
def __init__(self, source: int, target: int, weight: int = 0):
self.source = source
self.target = target
self.weight = weight
self.G = 0
self.sigma = 0
self.bcm_theta = 512
def hebbian_update(self, pre_fired: bool, post_fired: bool,
post_potential: int, dopamine_gain: int = 512):
if pre_fired and post_fired:
self.sigma += dopamine_gain
if abs(self.sigma) >= 10:
delta = 1 if self.sigma > 0 else -1
self.G = max(-1, min(1, self.G + delta))
self.sigma = 0
else:
self.sigma = (self.sigma * 255) >> 8
def homeostasis(self, target_activity: int = 100):
current = abs(self.weight) * 100
if current > target_activity:
scale = (target_activity << 8) // current
self.weight = (self.weight * scale) >> 8
# ========== ГРАФ ==========
class Graph:
def __init__(self, config: Config):
self.config = config
self._n = config.num_neurons
self.synapses: List[Synapse] = []
self._incoming_count = [0] * self._n
self._build()
def _build(self):
for i in range(self._n):
for j in range(max(0, i - self.config.neighbor_radius),
min(self._n, i + self.config.neighbor_radius + 1)):
if i != j and random.random() < self.config.sparsity:
self.synapses.append(Synapse(i, j))
self._update_incoming_counts()
def _update_incoming_counts(self):
self._incoming_count = [0] * self._n
for syn in self.synapses:
self._incoming_count[syn.target] += 1
def get_incoming_count(self, idx: int) -> int:
return self._incoming_count[idx] if idx < len(self._incoming_count) else 0
def get_outgoing_count(self, idx: int) -> int:
return sum(1 for s in self.synapses if s.source == idx)
def get_all_synapses(self) -> List[Synapse]:
return self.synapses
def remove_neuron(self, idx: int):
self.synapses = [s for s in self.synapses if s.source != idx and s.target != idx]
self._update_incoming_counts()
@property
def n(self) -> int:
return self._n
@property
def num_edges(self) -> int:
return len(self.synapses)
# ========== ПЛАСТИЧНОСТЬ ==========
class Plasticity:
def __init__(self, graph: Graph, config: Config):
self.graph = graph
self.config = config
def hebbian_update(self, active_neurons: Set[int], potentials: List[int],
dopamine_gain: int = 512):
for src in active_neurons:
for syn in self.graph.synapses:
if syn.source == src:
post_active = syn.target in active_neurons
post_pot = potentials[syn.target] if post_active else 0
syn.hebbian_update(True, post_active, post_pot, dopamine_gain)
def homeostasis_all(self):
for syn in self.graph.synapses:
syn.homeostasis(self.config.target_activity)
# ========== ЯДРО AURA ==========
class AuraCore:
def __init__(self, config: Config):
self.config = config
self.graph = Graph(config)
self.plasticity = Plasticity(self.graph, config)
self.neurons: List[Neuron] = []
self._init_neurons()
self._init_category_detectors()
self._init_oscillators()
self._build_caches()
self.input_neurons = list(range(min(config.input_size, len(self.neurons))))
self.output_neurons = list(range(min(config.output_size, len(self.neurons))))
self.step_count = 0
self.active_neurons: Set[int] = set()
self.dopamine_trace = 0
self.dopamine_gain = 512
self.homeostasis_osc = self.neurons[-2]
self.prune_osc = self.neurons[-1]
def _init_neurons(self):
n = self.config.num_neurons
n_exc = int(n * 0.70)
n_inh = int(n * 0.15)
idx = 0
for _ in range(n_exc):
self.neurons.append(ExcitatoryNeuron(idx, self.config.spike_threshold))
idx += 1
for _ in range(n_inh):
self.neurons.append(InhibitoryNeuron(idx, self.config.spike_threshold))
idx += 1
while idx < n:
self.neurons.append(Neuron(idx, self.config.spike_threshold))
idx += 1
def _init_category_detectors(self):
self.neurons.append(CategoryDetectorNeuron(
len(self.neurons), list(range(48, 58)), self.config.spike_threshold
))
self.neurons.append(CategoryDetectorNeuron(
len(self.neurons), list(range(65, 91)), self.config.spike_threshold
))
self.neurons.append(CategoryDetectorNeuron(
len(self.neurons), list(range(97, 123)), self.config.spike_threshold
))
self.neurons.append(CategoryDetectorNeuron(
len(self.neurons), [9, 10, 32], self.config.spike_threshold
))
def _init_oscillators(self):
self.neurons.append(OscillatorNeuron(
len(self.neurons), self.config.homeostasis_interval_steps, self.config.spike_threshold
))
self.neurons.append(OscillatorNeuron(
len(self.neurons), self.config.prune_interval_steps, self.config.spike_threshold
))
def _build_caches(self):
for n in self.neurons:
n.outgoing_synapses = []
n.outgoing_neurons = []
for syn in self.graph.synapses:
if syn.source < len(self.neurons):
self.neurons[syn.source].outgoing_synapses.append(syn)
self.neurons[syn.source].outgoing_neurons.append(syn.target)
def forward(self, input_byte: int, reward: int = 0, explore: bool = True) -> int:
self.dopamine_trace = (self.dopamine_trace * self.config.dopamine_decay + reward * 10) // 10
self.dopamine_trace = max(0, min(1024, self.dopamine_trace))
self.dopamine_gain = 512 + self.dopamine_trace
if self.homeostasis_osc.tick(self.step_count)[0]:
self.plasticity.homeostasis_all()
if self.prune_osc.tick(self.step_count)[0]:
self._prune_isolated()
input_idx = self.input_neurons[input_byte % len(self.input_neurons)]
input_neuron = self.neurons[input_idx]
if isinstance(input_neuron, CategoryDetectorNeuron):
input_neuron.integrate(self.config.input_signal_strength, input_byte=input_byte)
else:
input_neuron.integrate(self.config.input_signal_strength)
self.active_neurons.clear()
for _ in range(self.config.depth):
fired_this_step = []
for i, n in enumerate(self.neurons):
if n.tick(self.step_count)[0]:
fired_this_step.append(n)
self.active_neurons.add(i)
for src in fired_this_step:
updates = src.propagate(self.config.signal_scale_shift)
for tgt, signal in updates:
if tgt < len(self.neurons):
self.neurons[tgt].integrate(signal)
potentials = [n.potential for n in self.neurons]
self.plasticity.hebbian_update(self.active_neurons, potentials, self.dopamine_gain)
logits = [0] * self.config.output_size
for byte in range(self.config.output_size):
idx = self.output_neurons[byte]
if idx < len(self.neurons):
logits[byte] = self.neurons[idx].get_logit_with_noise() if explore else self.neurons[idx].potential
next_byte = max(range(self.config.output_size), key=lambda i: logits[i])
self.step_count += 1
return next_byte
def _prune_isolated(self):
if self.graph.n <= self.config.min_neurons:
return
dead = []
for i, n in enumerate(self.neurons):
if i < self.config.input_size or n.is_protected():
continue
if self.graph.get_incoming_count(i) == 0 and self.graph.get_outgoing_count(i) == 0:
dead.append(i)
if dead and self.graph.n - len(dead) >= self.config.min_neurons:
for idx in sorted(dead, reverse=True):
self.graph.remove_neuron(idx)
del self.neurons[idx]
self._build_caches()
# ========== ИНИЦИАЛИЗАЦИЯ МОДЕЛИ ==========
print("🚀 Инициализация AURA...")
config = Config()
config.num_neurons = 200
config.depth = 3
core = AuraCore(config)
print(f"✅ Модель готова: {len(core.neurons)} нейронов, {core.graph.num_edges} синапсов")
# ========== ФУНКЦИИ ДЛЯ GRADIO ==========
def generate_text(prompt: str, steps: int = 100, temperature: float = 1.0, reward: int = 0) -> str:
"""Генерация текста из промпта."""
if not prompt:
prompt = "A"
# Устанавливаем шум в зависимости от температуры
core.config.explore_noise_scale = int(64 * temperature)
start_time = time.time()
# Подаём промпт
for ch in prompt:
core.forward(ord(ch), reward=reward, explore=True)
# Генерируем
result = list(prompt)
current = ord(prompt[-1])
for _ in range(steps):
current = core.forward(current, reward=reward, explore=True)
if 32 <= current <= 126 or current in (9, 10, 13):
result.append(chr(current))
elif current == 0:
result.append(' ')
else:
result.append(chr(32 + (current % 95)))
elapsed = time.time() - start_time
stats = f"""
⏱️ Время: {elapsed:.2f} сек
📊 Шагов: {steps}
🔥 Активных нейронов: {len(core.active_neurons)}
💪 Дофамин: {core.dopamine_trace}
"""
return ''.join(result), stats
def reset_model():
"""Сброс модели."""
global core
core = AuraCore(config)
return "✅ Модель сброшена"
# ========== ИНТЕРФЕЙС GRADIO ==========
with gr.Blocks(title="AURA — Спайковая нейросеть") as demo:
gr.Markdown("""
# 🧠 AURA — Спайковая нейросеть с Hebbian-обучением
Минимальная версия ядра AURA. Модель обучается на лету через локальную пластичность.
""")
with gr.Row():
with gr.Column(scale=2):
prompt_input = gr.Textbox(
label="📝 Промпт",
placeholder="Введите текст для генерации...",
value="Hello",
lines=2
)
with gr.Row():
steps_slider = gr.Slider(
label="📏 Длина генерации",
minimum=10,
maximum=500,
value=100,
step=10
)
temp_slider = gr.Slider(
label="🌡️ Temperature (креативность)",
minimum=0.1,
maximum=2.0,
value=1.0,
step=0.1
)
reward_slider = gr.Slider(
label="🏆 Reward (награда за шаг)",
minimum=0,
maximum=100,
value=0,
step=10
)
generate_btn = gr.Button("🚀 Сгенерировать", variant="primary", size="lg")
reset_btn = gr.Button("🔄 Сбросить модель", variant="secondary")
with gr.Column(scale=3):
output_text = gr.Textbox(
label="✨ Сгенерированный текст",
lines=10,
max_lines=20
)
stats_text = gr.Textbox(
label="📊 Статистика",
lines=5
)
generate_btn.click(
fn=generate_text,
inputs=[prompt_input, steps_slider, temp_slider, reward_slider],
outputs=[output_text, stats_text]
)
reset_btn.click(
fn=reset_model,
inputs=[],
outputs=[output_text]
)
gr.Markdown("""
---
### О модели
- **Архитектура**: Спайковая нейросеть на разреженном графе
- **Обучение**: Hebbian-пластичность (локальные правила)
- **Нейронов**: 800 (70% excitatory, 15% inhibitory)
- **Категориальные детекторы**: цифры, буквы, пробелы
""")
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
theme=gr.themes.Soft()
)