|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Mathematical Foundation & Conceptual Documentation |
|
|
------------------------------------------------- |
|
|
|
|
|
CORE PRINCIPLE: |
|
|
Combines Hebbian learning ("neurons that fire together, wire together") with |
|
|
Bloom filter probabilistic membership testing to create self-organizing |
|
|
associative memory systems that adapt based on usage patterns. |
|
|
|
|
|
MATHEMATICAL FOUNDATION: |
|
|
======================= |
|
|
|
|
|
1. HEBBIAN LEARNING RULE: |
|
|
Δw_ij = η * a_i * a_j |
|
|
|
|
|
Where: |
|
|
- w_ij: connection strength between neurons i and j |
|
|
- η: learning rate (plasticity parameter) |
|
|
- a_i, a_j: activation levels of neurons i and j |
|
|
|
|
|
In our context: |
|
|
- Strengthens hash function weights for co-occurring patterns |
|
|
- Adapts activation thresholds based on usage frequency |
|
|
- Creates associative links between related items |
|
|
|
|
|
2. BLOOM FILTER MATHEMATICS: |
|
|
|
|
|
Optimal bit array size: m = -n * ln(p) / (ln(2))² |
|
|
Optimal hash functions: k = (m/n) * ln(2) |
|
|
|
|
|
Where: |
|
|
- n: expected number of items |
|
|
- p: desired false positive rate |
|
|
- m: bit array size |
|
|
- k: number of hash functions |
|
|
|
|
|
False positive probability: P_fp ≈ (1 - e^(-kn/m))^k |
|
|
|
|
|
3. CONFIDENCE ESTIMATION: |
|
|
|
|
|
C_total = (C_bit + C_hash + C_access) / 3 |
|
|
|
|
|
Where: |
|
|
- C_bit: confidence from bit array activation strength |
|
|
- C_hash: confidence from hash activation patterns |
|
|
- C_access: confidence from historical access frequency |
|
|
|
|
|
4. TEMPORAL DECAY: |
|
|
|
|
|
w_t+1 = γ * w_t |
|
|
|
|
|
Where γ ∈ [0.9, 0.999] is the decay rate, implementing forgetting. |
|
|
|
|
|
CONCEPTUAL REASONING: |
|
|
==================== |
|
|
|
|
|
WHY HEBBIAN + BLOOM FILTERS? |
|
|
- Traditional Bloom filters use static hash functions |
|
|
- Real-world data has temporal and associative patterns |
|
|
- Hebbian learning enables dynamic adaptation to these patterns |
|
|
- Results in more efficient memory utilization and better retrieval |
|
|
|
|
|
KEY INNOVATIONS: |
|
|
1. **Learnable Hash Functions**: Neural networks that adapt their mappings |
|
|
2. **Associative Strengthening**: Related items develop similar hash patterns |
|
|
3. **Confidence Estimation**: Multi-factor confidence scoring |
|
|
4. **Temporal Adaptation**: Gradual forgetting prevents overfitting |
|
|
5. **Ensemble Filtering**: Multiple filters with voting for robustness |
|
|
|
|
|
APPLICATIONS: |
|
|
- Caching systems that learn access patterns |
|
|
- Recommendation engines with temporal adaptation |
|
|
- Memory systems for neural architectures |
|
|
- Similarity search with learned associations |
|
|
|
|
|
COMPLEXITY ANALYSIS: |
|
|
- Space: O(m + n*d) where m=bit array size, n=items, d=vector dimension |
|
|
- Time: O(k*d) per operation where k=hash functions |
|
|
- Learning: O(d²) for co-activation matrix updates |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
import numpy as np |
|
|
import math |
|
|
import hashlib |
|
|
from collections import defaultdict, deque |
|
|
from typing import List, Dict, Tuple, Optional, Union |
|
|
|
|
|
SAFE_MIN = -1e6 |
|
|
SAFE_MAX = 1e6 |
|
|
EPS = 1e-8 |
|
|
|
|
|
|
|
|
|
|
|
def make_safe(tensor, min_val=SAFE_MIN, max_val=SAFE_MAX): |
|
|
tensor = torch.where(torch.isnan(tensor), torch.tensor(0.0, device=tensor.device, dtype=tensor.dtype), tensor) |
|
|
tensor = torch.where(torch.isinf(tensor), torch.tensor(max_val, device=tensor.device, dtype=tensor.dtype), tensor) |
|
|
return torch.clamp(tensor, min_val, max_val) |
|
|
|
|
|
def safe_cosine_similarity(a, b, dim=-1, eps=EPS): |
|
|
if a.dtype != torch.float32: |
|
|
a = a.float() |
|
|
if b.dtype != torch.float32: |
|
|
b = b.float() |
|
|
a_norm = torch.norm(a, dim=dim, keepdim=True).clamp(min=eps) |
|
|
b_norm = torch.norm(b, dim=dim, keepdim=True).clamp(min=eps) |
|
|
return torch.sum(a * b, dim=dim, keepdim=True) / (a_norm * b_norm) |
|
|
|
|
|
def item_to_vector(item, vector_dim=64): |
|
|
"""Convert arbitrary item to fixed-size vector representation. |
|
|
|
|
|
Uses different encoding strategies: |
|
|
- Strings: MD5 hash-based encoding |
|
|
- Numbers: Sinusoidal positional encoding |
|
|
- Tensors: Flattening with padding/truncation |
|
|
- Other: Deterministic hash-based random vector |
|
|
""" |
|
|
if isinstance(item, str): |
|
|
|
|
|
hash_obj = hashlib.md5(item.encode()) |
|
|
hash_bytes = hash_obj.digest() |
|
|
|
|
|
vector = torch.tensor([b / 255.0 for b in hash_bytes], dtype=torch.float32) |
|
|
|
|
|
if len(vector) < vector_dim: |
|
|
padding = torch.zeros(vector_dim - len(vector), dtype=torch.float32) |
|
|
vector = torch.cat([vector, padding]) |
|
|
else: |
|
|
vector = vector[:vector_dim] |
|
|
elif isinstance(item, (int, float)): |
|
|
|
|
|
vector = torch.zeros(vector_dim, dtype=torch.float32) |
|
|
for i in range(vector_dim // 2): |
|
|
freq = 10000 ** (-2 * i / vector_dim) |
|
|
vector[2*i] = math.sin(item * freq) |
|
|
vector[2*i + 1] = math.cos(item * freq) |
|
|
elif torch.is_tensor(item): |
|
|
|
|
|
vector = item.flatten().float() |
|
|
if len(vector) < vector_dim: |
|
|
padding = torch.zeros(vector_dim - len(vector), dtype=torch.float32, device=vector.device) |
|
|
vector = torch.cat([vector, padding]) |
|
|
else: |
|
|
vector = vector[:vector_dim] |
|
|
else: |
|
|
|
|
|
hash_val = hash(str(item)) % (2**31) |
|
|
gen = torch.Generator(device='cpu') |
|
|
gen.manual_seed(hash_val) |
|
|
vector = torch.randn(vector_dim, generator=gen, dtype=torch.float32) |
|
|
|
|
|
return make_safe(vector) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LearnableHashFunction(nn.Module): |
|
|
"""Neural hash function with Hebbian plasticity. |
|
|
|
|
|
Implements learnable hash functions that adapt through Hebbian learning, |
|
|
strengthening patterns that co-occur and developing associative mappings. |
|
|
|
|
|
Mathematical Details: |
|
|
- Base hash: h = tanh(W2 * tanh(W1 * x + b1) + b2) |
|
|
- Hebbian modulation: h_mod = h * tanh(w_hebbian) |
|
|
- Threshold adaptation: h_thresh = h_mod - θ |
|
|
- Binary conversion: p = sigmoid(5 * h_thresh) |
|
|
""" |
|
|
def __init__(self, input_dim, hash_output_bits=32, learning_rate=0.01): |
|
|
super().__init__() |
|
|
self.input_dim = input_dim |
|
|
self.hash_output_bits = hash_output_bits |
|
|
self.learning_rate = learning_rate |
|
|
|
|
|
|
|
|
self.hash_network = nn.Sequential( |
|
|
nn.Linear(input_dim, input_dim * 2), |
|
|
nn.LayerNorm(input_dim * 2), |
|
|
nn.Tanh(), |
|
|
nn.Linear(input_dim * 2, hash_output_bits), |
|
|
nn.Tanh() |
|
|
) |
|
|
|
|
|
|
|
|
self.hebbian_weights = nn.Parameter(torch.ones(hash_output_bits) * 0.1) |
|
|
self.plasticity_rate = nn.Parameter(torch.tensor(learning_rate)) |
|
|
|
|
|
|
|
|
self.register_buffer('activity_history', torch.zeros(100, hash_output_bits)) |
|
|
self.register_buffer('history_pointer', torch.tensor(0, dtype=torch.long)) |
|
|
|
|
|
|
|
|
self.coactivation_matrix = nn.Parameter(torch.eye(hash_output_bits) * 0.1) |
|
|
|
|
|
|
|
|
self.activation_threshold = nn.Parameter(torch.zeros(hash_output_bits)) |
|
|
|
|
|
def compute_hash_activation(self, item_vector): |
|
|
"""Compute hash activation pattern for an item.""" |
|
|
|
|
|
if item_vector.dim() == 1: |
|
|
item_vector = item_vector.unsqueeze(0) |
|
|
item_vector = item_vector.to(next(self.hash_network.parameters()).device, dtype=torch.float32) |
|
|
|
|
|
|
|
|
base_hash = self.hash_network(item_vector).squeeze(0) |
|
|
|
|
|
|
|
|
hebbian_modulation = torch.tanh(self.hebbian_weights) |
|
|
modulated_hash = base_hash * hebbian_modulation |
|
|
|
|
|
|
|
|
thresholded = modulated_hash - self.activation_threshold |
|
|
|
|
|
|
|
|
hash_probs = torch.sigmoid(thresholded * 10.0) |
|
|
|
|
|
return hash_probs, modulated_hash |
|
|
|
|
|
def get_hash_bits(self, item_vector, deterministic=False): |
|
|
"""Get binary hash bits for an item.""" |
|
|
hash_probs, _ = self.compute_hash_activation(item_vector) |
|
|
|
|
|
if deterministic: |
|
|
hash_bits = (hash_probs > 0.5).float() |
|
|
else: |
|
|
hash_bits = torch.bernoulli(hash_probs) |
|
|
|
|
|
return hash_bits |
|
|
|
|
|
def hebbian_update(self, item_vector, co_occurring_items=None): |
|
|
"""Apply Hebbian learning rule: Δw = η * pre * post. |
|
|
|
|
|
Strengthens connections between co-activated hash bits and updates |
|
|
the co-activation matrix for associative learning. |
|
|
""" |
|
|
hash_probs, modulated_hash = self.compute_hash_activation(item_vector) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
ptr = int(self.history_pointer.item()) |
|
|
self.activity_history[ptr % self.activity_history.size(0)].copy_(hash_probs.detach()) |
|
|
self.history_pointer.add_(1) |
|
|
self.history_pointer.remainder_(self.activity_history.size(0)) |
|
|
|
|
|
|
|
|
plasticity_rate = torch.clamp(self.plasticity_rate, 0.001, 0.1) |
|
|
|
|
|
|
|
|
activity_strength = torch.abs(modulated_hash) |
|
|
hebbian_delta = plasticity_rate * activity_strength * hash_probs |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
self.hebbian_weights.data.add_(hebbian_delta * 0.05) |
|
|
self.hebbian_weights.data.clamp_(-2.0, 2.0) |
|
|
|
|
|
|
|
|
if co_occurring_items is not None: |
|
|
self.update_coactivation_matrix(hash_probs, co_occurring_items) |
|
|
|
|
|
return hash_probs |
|
|
|
|
|
def update_coactivation_matrix(self, current_activation, co_occurring_items): |
|
|
"""Update co-activation matrix based on items that occur together.""" |
|
|
with torch.no_grad(): |
|
|
for co_item in co_occurring_items: |
|
|
co_item_vector = item_to_vector(co_item, self.input_dim).to(current_activation.device) |
|
|
co_activation, _ = self.compute_hash_activation(co_item_vector) |
|
|
|
|
|
|
|
|
coactivation_update = torch.outer(current_activation, co_activation) |
|
|
|
|
|
|
|
|
learning_rate = 0.01 |
|
|
self.coactivation_matrix.data.add_(learning_rate * coactivation_update) |
|
|
self.coactivation_matrix.data.clamp_(-1.0, 1.0) |
|
|
|
|
|
def get_similar_patterns(self, item_vector, top_k=5): |
|
|
"""Find historically similar activation patterns.""" |
|
|
current_probs, _ = self.compute_hash_activation(item_vector) |
|
|
|
|
|
|
|
|
similarities = [] |
|
|
for i in range(self.activity_history.shape[0]): |
|
|
hist_pattern = self.activity_history[i] |
|
|
if torch.sum(hist_pattern) > 0: |
|
|
similarity = safe_cosine_similarity( |
|
|
current_probs.unsqueeze(0), |
|
|
hist_pattern.unsqueeze(0) |
|
|
).squeeze() |
|
|
similarities.append((i, float(similarity.item()))) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
return similarities[:top_k] |
|
|
|
|
|
def apply_forgetting(self, forget_rate=0.99): |
|
|
"""Apply gradual forgetting to prevent overfitting.""" |
|
|
with torch.no_grad(): |
|
|
self.hebbian_weights.data.mul_(forget_rate) |
|
|
self.coactivation_matrix.data.mul_(forget_rate) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HebbianBloomFilter(nn.Module): |
|
|
"""Probabilistic set membership filter with Hebbian learning. |
|
|
|
|
|
Combines traditional Bloom filter efficiency with adaptive hash functions |
|
|
that learn from usage patterns and develop associative mappings. |
|
|
|
|
|
Key Features: |
|
|
- Learnable hash functions with neural plasticity |
|
|
- Confidence-based membership testing |
|
|
- Associative learning between related items |
|
|
- Temporal decay for forgetting old patterns |
|
|
""" |
|
|
def __init__(self, capacity=10000, error_rate=0.01, vector_dim=64, num_hash_functions=8): |
|
|
super().__init__() |
|
|
self.capacity = capacity |
|
|
self.error_rate = error_rate |
|
|
self.vector_dim = vector_dim |
|
|
self.num_hash_functions = num_hash_functions |
|
|
|
|
|
|
|
|
self.bit_array_size = self._calculate_bit_array_size(capacity, error_rate) |
|
|
|
|
|
|
|
|
self.hash_functions = nn.ModuleList([ |
|
|
LearnableHashFunction(vector_dim, hash_output_bits=32) |
|
|
for _ in range(num_hash_functions) |
|
|
]) |
|
|
|
|
|
|
|
|
self.register_buffer('bit_array', torch.zeros(self.bit_array_size)) |
|
|
self.register_buffer('confidence_array', torch.zeros(self.bit_array_size)) |
|
|
|
|
|
|
|
|
self.stored_items = {} |
|
|
self.item_vectors = {} |
|
|
|
|
|
|
|
|
self.register_buffer('access_counts', torch.zeros(self.bit_array_size)) |
|
|
self.register_buffer('total_items_added', torch.tensor(0, dtype=torch.long)) |
|
|
|
|
|
|
|
|
self.association_strength = nn.Parameter(torch.tensor(0.1)) |
|
|
self.confidence_threshold = nn.Parameter(torch.tensor(0.5)) |
|
|
|
|
|
|
|
|
self.decay_rate = nn.Parameter(torch.tensor(0.999)) |
|
|
|
|
|
def _calculate_bit_array_size(self, capacity, error_rate): |
|
|
"""Calculate optimal bit array size for given capacity and error rate.""" |
|
|
return int(-capacity * math.log(error_rate) / (math.log(2) ** 2)) |
|
|
|
|
|
def _get_bit_indices(self, item_vector): |
|
|
"""Get bit indices from all hash functions for an item.""" |
|
|
indices = [] |
|
|
confidences = [] |
|
|
|
|
|
for hash_func in self.hash_functions: |
|
|
hash_bits = hash_func.get_hash_bits(item_vector, deterministic=True) |
|
|
|
|
|
|
|
|
weights = (1 << torch.arange(len(hash_bits), device=hash_bits.device, dtype=torch.int64)) |
|
|
bit_index = int((hash_bits.to(dtype=torch.int64) * weights).sum().item()) |
|
|
bit_index = bit_index % self.bit_array_size |
|
|
|
|
|
|
|
|
hash_probs, _ = hash_func.compute_hash_activation(item_vector) |
|
|
confidence = torch.mean(torch.abs(hash_probs - 0.5)) * 2 |
|
|
|
|
|
indices.append(bit_index) |
|
|
confidences.append(confidence.item()) |
|
|
|
|
|
return indices, confidences |
|
|
|
|
|
def add(self, item, associated_items=None): |
|
|
"""Add item to the Hebbian Bloom filter with optional associations. |
|
|
|
|
|
Args: |
|
|
item: Item to add to the filter |
|
|
associated_items: Optional list of items to associate with this item |
|
|
|
|
|
Returns: |
|
|
List of bit indices that were set for this item |
|
|
""" |
|
|
|
|
|
item_vector = item_to_vector(item, self.vector_dim) |
|
|
|
|
|
|
|
|
item_key = str(item) |
|
|
self.stored_items[item_key] = item |
|
|
self.item_vectors[item_key] = item_vector |
|
|
|
|
|
|
|
|
indices, confidences = self._get_bit_indices(item_vector) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
for idx, conf in zip(indices, confidences): |
|
|
self.bit_array[idx] = 1.0 |
|
|
self.confidence_array[idx] = max(float(self.confidence_array[idx].item()), conf) |
|
|
self.access_counts[idx] += 1 |
|
|
|
|
|
|
|
|
for hash_func in self.hash_functions: |
|
|
hash_func.hebbian_update(item_vector, associated_items) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
self.total_items_added.add_(1) |
|
|
|
|
|
|
|
|
if associated_items: |
|
|
self._learn_associations(item, associated_items) |
|
|
|
|
|
return indices |
|
|
|
|
|
def _learn_associations(self, primary_item, associated_items): |
|
|
"""Learn associations between items using Hebbian principles.""" |
|
|
primary_vector = item_to_vector(primary_item, self.vector_dim) |
|
|
|
|
|
for assoc_item in associated_items: |
|
|
assoc_vector = item_to_vector(assoc_item, self.vector_dim) |
|
|
|
|
|
|
|
|
similarity = safe_cosine_similarity( |
|
|
primary_vector.unsqueeze(0), |
|
|
assoc_vector.unsqueeze(0) |
|
|
).squeeze() |
|
|
|
|
|
|
|
|
association_strength = torch.clamp(self.association_strength, 0.01, 1.0) |
|
|
_ = association_strength |
|
|
|
|
|
for hash_func in self.hash_functions: |
|
|
|
|
|
if float(similarity.item()) > 0.5: |
|
|
hash_func.hebbian_update(primary_vector, [assoc_item]) |
|
|
|
|
|
def query(self, item, return_confidence=False): |
|
|
"""Query membership with optional confidence estimation. |
|
|
|
|
|
Args: |
|
|
item: Item to query |
|
|
return_confidence: Whether to return confidence score |
|
|
|
|
|
Returns: |
|
|
Boolean membership result, optionally with confidence score |
|
|
""" |
|
|
item_vector = item_to_vector(item, self.vector_dim) |
|
|
indices, confidences = self._get_bit_indices(item_vector) |
|
|
|
|
|
|
|
|
bit_checks = [self.bit_array[idx].item() > 0 for idx in indices] |
|
|
is_member = all(bit_checks) |
|
|
|
|
|
if return_confidence: |
|
|
|
|
|
bit_confidences = [self.confidence_array[idx].item() for idx in indices] |
|
|
hash_confidences = confidences |
|
|
|
|
|
|
|
|
bit_conf = np.mean(bit_confidences) if bit_confidences else 0.0 |
|
|
hash_conf = np.mean(hash_confidences) if hash_confidences else 0.0 |
|
|
|
|
|
|
|
|
access_conf = np.mean([self.access_counts[idx].item() for idx in indices]) |
|
|
access_conf = min(access_conf / 10.0, 1.0) |
|
|
|
|
|
overall_confidence = (bit_conf + hash_conf + access_conf) / 3.0 |
|
|
|
|
|
return is_member, overall_confidence |
|
|
|
|
|
return is_member |
|
|
|
|
|
def find_similar_items(self, query_item, top_k=5): |
|
|
"""Find items similar to query using learned associations (vector + coactivation).""" |
|
|
query_vector = item_to_vector(query_item, self.vector_dim) |
|
|
|
|
|
|
|
|
coact_weights = [] |
|
|
for hash_func in self.hash_functions: |
|
|
q_act, _ = hash_func.compute_hash_activation(query_vector) |
|
|
|
|
|
q_weight = torch.matmul(hash_func.coactivation_matrix.t(), q_act) |
|
|
coact_weights.append((q_act, q_weight)) |
|
|
|
|
|
similarities = [] |
|
|
for item_key, item_vector in self.item_vectors.items(): |
|
|
|
|
|
base_sim = safe_cosine_similarity( |
|
|
query_vector.unsqueeze(0), |
|
|
item_vector.unsqueeze(0) |
|
|
).squeeze().item() |
|
|
|
|
|
|
|
|
co_sim_sum = 0.0 |
|
|
for (hash_func, (q_act, q_weight)) in zip(self.hash_functions, coact_weights): |
|
|
i_act, _ = hash_func.compute_hash_activation(item_vector) |
|
|
co_sim_sum += torch.dot(q_weight, i_act).item() / max(1, len(i_act)) |
|
|
co_sim = co_sim_sum / max(1, len(self.hash_functions)) |
|
|
|
|
|
|
|
|
alpha, beta = 0.6, 0.4 |
|
|
score = alpha * base_sim + beta * co_sim |
|
|
similarities.append((self.stored_items[item_key], score)) |
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
return similarities[:top_k] |
|
|
|
|
|
def get_hash_statistics(self): |
|
|
"""Get statistics about hash function learning.""" |
|
|
stats = { |
|
|
'total_items': int(self.total_items_added.item()), |
|
|
'bit_array_utilization': (self.bit_array > 0).float().mean().item(), |
|
|
'average_confidence': self.confidence_array.mean().item(), |
|
|
'hash_function_stats': [] |
|
|
} |
|
|
|
|
|
for i, hash_func in enumerate(self.hash_functions): |
|
|
hash_stats = { |
|
|
'function_id': i, |
|
|
'hebbian_weights_mean': hash_func.hebbian_weights.mean().item(), |
|
|
'plasticity_rate': hash_func.plasticity_rate.item(), |
|
|
'activation_threshold_mean': hash_func.activation_threshold.mean().item() |
|
|
} |
|
|
stats['hash_function_stats'].append(hash_stats) |
|
|
|
|
|
return stats |
|
|
|
|
|
def apply_temporal_decay(self): |
|
|
"""Apply temporal decay to implement forgetting.""" |
|
|
decay_rate = torch.clamp(self.decay_rate, 0.9, 0.999) |
|
|
|
|
|
with torch.no_grad(): |
|
|
self.confidence_array.mul_(decay_rate) |
|
|
self.access_counts.mul_(decay_rate) |
|
|
|
|
|
|
|
|
low_confidence_mask = self.confidence_array < 0.1 |
|
|
self.bit_array[low_confidence_mask] = 0.0 |
|
|
self.confidence_array[low_confidence_mask] = 0.0 |
|
|
|
|
|
|
|
|
for hash_func in self.hash_functions: |
|
|
hash_func.apply_forgetting(float(decay_rate.item())) |
|
|
|
|
|
def optimize_structure(self): |
|
|
"""Optimize the filter structure based on usage patterns.""" |
|
|
with torch.no_grad(): |
|
|
|
|
|
high_access_ratio = (self.access_counts > self.access_counts.mean()).float().mean().item() |
|
|
adjustment = -0.01 * high_access_ratio |
|
|
for hash_func in self.hash_functions: |
|
|
hash_func.activation_threshold.data.add_(adjustment) |
|
|
hash_func.activation_threshold.data.clamp_(-1.0, 1.0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AssociativeHebbianBloomSystem(nn.Module): |
|
|
"""Ensemble of Hebbian Bloom filters with meta-learning. |
|
|
|
|
|
Combines multiple Hebbian Bloom filters with learned routing to create |
|
|
a robust, scalable associative memory system with ensemble decision making. |
|
|
|
|
|
Features: |
|
|
- Multiple specialized filters with learned routing |
|
|
- Ensemble voting for robust membership decisions |
|
|
- Global association learning across filters |
|
|
- Automatic system maintenance and optimization |
|
|
""" |
|
|
def __init__(self, capacity=10000, vector_dim=64, num_filters=3): |
|
|
super().__init__() |
|
|
self.capacity = capacity |
|
|
self.vector_dim = vector_dim |
|
|
self.num_filters = num_filters |
|
|
|
|
|
|
|
|
self.filters = nn.ModuleList([ |
|
|
HebbianBloomFilter( |
|
|
capacity=capacity // num_filters, |
|
|
error_rate=0.01, |
|
|
vector_dim=vector_dim, |
|
|
num_hash_functions=6 |
|
|
) for _ in range(num_filters) |
|
|
]) |
|
|
|
|
|
|
|
|
self.filter_selector = nn.Sequential( |
|
|
nn.Linear(vector_dim, vector_dim // 2), |
|
|
nn.ReLU(), |
|
|
nn.Linear(vector_dim // 2, num_filters), |
|
|
nn.Softmax(dim=-1) |
|
|
) |
|
|
|
|
|
|
|
|
self.global_association_net = nn.Sequential( |
|
|
nn.Linear(vector_dim * 2, vector_dim), |
|
|
nn.Tanh(), |
|
|
nn.Linear(vector_dim, 1), |
|
|
nn.Sigmoid() |
|
|
) |
|
|
|
|
|
|
|
|
self.register_buffer('global_access_count', torch.tensor(0, dtype=torch.long)) |
|
|
|
|
|
def add_item(self, item, category=None, associated_items=None): |
|
|
"""Add item to the most appropriate filter(s).""" |
|
|
item_vector = item_to_vector(item, self.vector_dim) |
|
|
|
|
|
|
|
|
filter_weights = self.filter_selector(item_vector.unsqueeze(0)).squeeze(0) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
loads = torch.tensor([f.total_items_added.item() / max(1, f.capacity) for f in self.filters], dtype=filter_weights.dtype, device=filter_weights.device) |
|
|
filter_weights = filter_weights - 0.1 * loads |
|
|
|
|
|
|
|
|
top_k_filters = min(2, self.num_filters) |
|
|
_, top_indices = torch.topk(filter_weights, top_k_filters) |
|
|
|
|
|
added_to_filters = [] |
|
|
for filter_idx in top_indices: |
|
|
filter_obj = self.filters[filter_idx.item()] |
|
|
indices = filter_obj.add(item, associated_items) |
|
|
added_to_filters.append((filter_idx.item(), indices)) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
self.global_access_count.add_(1) |
|
|
|
|
|
return added_to_filters |
|
|
|
|
|
def query_item(self, item, return_detailed=False): |
|
|
"""Query item across all filters with ensemble confidence.""" |
|
|
item_vector = item_to_vector(item, self.vector_dim) |
|
|
|
|
|
results = [] |
|
|
confidences = [] |
|
|
|
|
|
for i, filter_obj in enumerate(self.filters): |
|
|
is_member, confidence = filter_obj.query(item, return_confidence=True) |
|
|
results.append(is_member) |
|
|
confidences.append(confidence) |
|
|
|
|
|
|
|
|
positive_votes = sum(results) |
|
|
avg_confidence = np.mean(confidences) |
|
|
|
|
|
|
|
|
ensemble_decision = positive_votes > len(self.filters) // 2 |
|
|
|
|
|
if return_detailed: |
|
|
return { |
|
|
'is_member': ensemble_decision, |
|
|
'confidence': avg_confidence, |
|
|
'individual_results': list(zip(results, confidences)), |
|
|
'positive_votes': positive_votes, |
|
|
'total_filters': len(self.filters) |
|
|
} |
|
|
|
|
|
return ensemble_decision |
|
|
|
|
|
def find_associations(self, query_item, top_k=10): |
|
|
"""Find associated items across all filters.""" |
|
|
all_similarities = [] |
|
|
|
|
|
for filter_obj in self.filters: |
|
|
similarities = filter_obj.find_similar_items(query_item, top_k) |
|
|
all_similarities.extend(similarities) |
|
|
|
|
|
|
|
|
unique_items = {} |
|
|
for item, similarity in all_similarities: |
|
|
item_key = str(item) |
|
|
if item_key in unique_items: |
|
|
unique_items[item_key] = max(unique_items[item_key], similarity) |
|
|
else: |
|
|
unique_items[item_key] = similarity |
|
|
|
|
|
|
|
|
ranked_items = sorted(unique_items.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
return ranked_items[:top_k] |
|
|
|
|
|
def system_maintenance(self): |
|
|
|
|
|
for filter_obj in self.filters: |
|
|
filter_obj.apply_temporal_decay() |
|
|
filter_obj.optimize_structure() |
|
|
|
|
|
|
|
|
if self.global_access_count % 1000 == 0: |
|
|
self._global_optimization() |
|
|
|
|
|
def _global_optimization(self): |
|
|
print("Performing global Hebbian Bloom system optimization...") |
|
|
|
|
|
|
|
|
filter_utilizations = [] |
|
|
for filter_obj in self.filters: |
|
|
stats = filter_obj.get_hash_statistics() |
|
|
utilization = stats['bit_array_utilization'] |
|
|
filter_utilizations.append(utilization) |
|
|
|
|
|
|
|
|
|
|
|
def get_system_statistics(self): |
|
|
stats = { |
|
|
'global_access_count': int(self.global_access_count.item()), |
|
|
'num_filters': self.num_filters, |
|
|
'filter_statistics': [] |
|
|
} |
|
|
|
|
|
for i, filter_obj in enumerate(self.filters): |
|
|
filter_stats = filter_obj.get_hash_statistics() |
|
|
filter_stats['filter_id'] = i |
|
|
stats['filter_statistics'].append(filter_stats) |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_hebbian_bloom(): |
|
|
print("Testing Hebbian Bloom Filter - Self-Organizing Probabilistic Memory") |
|
|
print("=" * 85) |
|
|
|
|
|
|
|
|
system = AssociativeHebbianBloomSystem( |
|
|
capacity=1000, |
|
|
vector_dim=32, |
|
|
num_filters=3 |
|
|
) |
|
|
|
|
|
print(f"Created Hebbian Bloom System:") |
|
|
print(f" - Capacity: 1000 items") |
|
|
print(f" - Vector dimension: 32") |
|
|
print(f" - Number of filters: 3") |
|
|
print(f" - Hash functions per filter: 6") |
|
|
|
|
|
|
|
|
print("\nAdding related items to demonstrate associative learning...") |
|
|
|
|
|
|
|
|
fruits = ["apple", "banana", "orange", "grape", "strawberry"] |
|
|
colors = ["red", "yellow", "orange", "purple", "red"] |
|
|
|
|
|
for fruit, color in zip(fruits, colors): |
|
|
system.add_item(fruit, associated_items=[color, "fruit"]) |
|
|
system.add_item(color, associated_items=[fruit, "color"]) |
|
|
|
|
|
|
|
|
numbers = [1, 2, 3, 4, 5] |
|
|
for num in numbers: |
|
|
system.add_item(num, associated_items=["number", "digit"]) |
|
|
|
|
|
print(f"Added {len(fruits)} fruits with colors and {len(numbers)} numbers") |
|
|
|
|
|
|
|
|
print("\nTesting membership queries...") |
|
|
|
|
|
test_items = ["apple", "banana", "pineapple", 1, 3, 7, "red", "blue"] |
|
|
|
|
|
for item in test_items: |
|
|
result = system.query_item(item, return_detailed=True) |
|
|
print(f" '{item}': {result['is_member']} (confidence: {result['confidence']:.3f}, votes: {result['positive_votes']}/{result['total_filters']})") |
|
|
|
|
|
|
|
|
print("\nTesting associative retrieval...") |
|
|
|
|
|
query_items = ["apple", "red", 2] |
|
|
for query in query_items: |
|
|
associations = system.find_associations(query, top_k=5) |
|
|
print(f"\nItems associated with '{query}':") |
|
|
for i, (item, similarity) in enumerate(associations[:3]): |
|
|
print(f" {i+1}. {item} (similarity: {similarity:.3f})") |
|
|
|
|
|
|
|
|
print("\nTesting Hebbian adaptation with repeated associations...") |
|
|
|
|
|
|
|
|
for _ in range(5): |
|
|
system.add_item("apple", associated_items=["healthy", "nutrition"]) |
|
|
|
|
|
|
|
|
updated_associations = system.find_associations("apple", top_k=5) |
|
|
print("Updated associations for 'apple' after repeated 'healthy' associations:") |
|
|
for item, similarity in updated_associations[:3]: |
|
|
print(f" {item}: {similarity:.3f}") |
|
|
|
|
|
|
|
|
stats = system.get_system_statistics() |
|
|
print(f"\nSystem Statistics:") |
|
|
print(f" - Total accesses: {stats['global_access_count']}") |
|
|
|
|
|
for filter_stats in stats['filter_statistics']: |
|
|
print(f" Filter {filter_stats['filter_id']}:") |
|
|
print(f" - Items added: {filter_stats['total_items']}") |
|
|
print(f" - Bit utilization: {filter_stats['bit_array_utilization']:.3f}") |
|
|
print(f" - Average confidence: {filter_stats['average_confidence']:.3f}") |
|
|
|
|
|
|
|
|
print("\nApplying temporal decay...") |
|
|
system.system_maintenance() |
|
|
|
|
|
print("\nHebbian Bloom Filter test completed!") |
|
|
print("✓ Self-organizing hash functions with Hebbian learning") |
|
|
print("✓ Associative memory formation") |
|
|
print("✓ Adaptive confidence estimation") |
|
|
print("✓ Temporal decay and forgetting mechanisms") |
|
|
print("✓ Ensemble filtering for robust membership testing") |
|
|
|
|
|
return True |
|
|
|
|
|
def hebbian_learning_demo(): |
|
|
"""Demonstrate Hebbian learning in action.""" |
|
|
print("\n" + "="*60) |
|
|
print("HEBBIAN LEARNING DEMONSTRATION") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
hb_filter = HebbianBloomFilter(capacity=100, vector_dim=16, num_hash_functions=4) |
|
|
|
|
|
|
|
|
print("Phase 1: Adding animal-habitat associations") |
|
|
|
|
|
animals_habitats = [ |
|
|
("lion", ["savanna", "africa", "predator"]), |
|
|
("tiger", ["jungle", "asia", "predator"]), |
|
|
("penguin", ["antarctica", "ice", "bird"]), |
|
|
("shark", ["ocean", "water", "predator"]), |
|
|
("eagle", ["mountain", "sky", "bird"]) |
|
|
] |
|
|
|
|
|
for animal, habitats in animals_habitats: |
|
|
hb_filter.add(animal, associated_items=habitats) |
|
|
for habitat in habitats: |
|
|
hb_filter.add(habitat, associated_items=[animal]) |
|
|
|
|
|
|
|
|
print("\nInitial associations:") |
|
|
similar_to_lion = hb_filter.find_similar_items("lion", top_k=3) |
|
|
for item, similarity in similar_to_lion: |
|
|
print(f" lion -> {item}: {similarity:.3f}") |
|
|
|
|
|
|
|
|
print("\nPhase 2: Strengthening lion-savanna association through repetition") |
|
|
|
|
|
for _ in range(10): |
|
|
hb_filter.add("lion", associated_items=["savanna"]) |
|
|
hb_filter.add("savanna", associated_items=["lion"]) |
|
|
|
|
|
|
|
|
print("\nStrengthened associations:") |
|
|
similar_to_lion = hb_filter.find_similar_items("lion", top_k=3) |
|
|
for item, similarity in similar_to_lion: |
|
|
print(f" lion -> {item}: {similarity:.3f}") |
|
|
|
|
|
|
|
|
stats = hb_filter.get_hash_statistics() |
|
|
print(f"\nHash function adaptation statistics:") |
|
|
for hash_stat in stats['hash_function_stats'][:2]: |
|
|
print(f" Hash function {hash_stat['function_id']}:") |
|
|
print(f" - Hebbian weights mean: {hash_stat['hebbian_weights_mean']:.4f}") |
|
|
print(f" - Plasticity rate: {hash_stat['plasticity_rate']:.4f}") |
|
|
|
|
|
print("\n Hebbian learning successfully demonstrated") |
|
|
print(" Repeated associations strengthen neural pathways in hash functions") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_hebbian_bloom() |
|
|
hebbian_learning_demo() |
|
|
|
|
|
|
|
|
|
|
|
|