hebbian_bloom / hebbian_bloom_docs.py
1990two's picture
Update hebbian_bloom_docs.py
be9c6f5 verified
###########################################################################################################################################
#||- - - |8.19.2025| - - - || HEBBIAN BLOOM || - - - | 1990two | - - -||#
###########################################################################################################################################
"""
Mathematical Foundation & Conceptual Documentation
-------------------------------------------------
CORE PRINCIPLE:
Combines Hebbian learning ("neurons that fire together, wire together") with
Bloom filter probabilistic membership testing to create self-organizing
associative memory systems that adapt based on usage patterns.
MATHEMATICAL FOUNDATION:
=======================
1. HEBBIAN LEARNING RULE:
Δw_ij = η * a_i * a_j
Where:
- w_ij: connection strength between neurons i and j
- η: learning rate (plasticity parameter)
- a_i, a_j: activation levels of neurons i and j
In our context:
- Strengthens hash function weights for co-occurring patterns
- Adapts activation thresholds based on usage frequency
- Creates associative links between related items
2. BLOOM FILTER MATHEMATICS:
Optimal bit array size: m = -n * ln(p) / (ln(2))²
Optimal hash functions: k = (m/n) * ln(2)
Where:
- n: expected number of items
- p: desired false positive rate
- m: bit array size
- k: number of hash functions
False positive probability: P_fp ≈ (1 - e^(-kn/m))^k
3. CONFIDENCE ESTIMATION:
C_total = (C_bit + C_hash + C_access) / 3
Where:
- C_bit: confidence from bit array activation strength
- C_hash: confidence from hash activation patterns
- C_access: confidence from historical access frequency
4. TEMPORAL DECAY:
w_t+1 = γ * w_t
Where γ ∈ [0.9, 0.999] is the decay rate, implementing forgetting.
CONCEPTUAL REASONING:
====================
WHY HEBBIAN + BLOOM FILTERS?
- Traditional Bloom filters use static hash functions
- Real-world data has temporal and associative patterns
- Hebbian learning enables dynamic adaptation to these patterns
- Results in more efficient memory utilization and better retrieval
KEY INNOVATIONS:
1. **Learnable Hash Functions**: Neural networks that adapt their mappings
2. **Associative Strengthening**: Related items develop similar hash patterns
3. **Confidence Estimation**: Multi-factor confidence scoring
4. **Temporal Adaptation**: Gradual forgetting prevents overfitting
5. **Ensemble Filtering**: Multiple filters with voting for robustness
APPLICATIONS:
- Caching systems that learn access patterns
- Recommendation engines with temporal adaptation
- Memory systems for neural architectures
- Similarity search with learned associations
COMPLEXITY ANALYSIS:
- Space: O(m + n*d) where m=bit array size, n=items, d=vector dimension
- Time: O(k*d) per operation where k=hash functions
- Learning: O(d²) for co-activation matrix updates
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import math
import hashlib
from collections import defaultdict, deque
from typing import List, Dict, Tuple, Optional, Union
SAFE_MIN = -1e6
SAFE_MAX = 1e6
EPS = 1e-8
#||- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 𓅸 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -||#
def make_safe(tensor, min_val=SAFE_MIN, max_val=SAFE_MAX):
tensor = torch.where(torch.isnan(tensor), torch.tensor(0.0, device=tensor.device, dtype=tensor.dtype), tensor)
tensor = torch.where(torch.isinf(tensor), torch.tensor(max_val, device=tensor.device, dtype=tensor.dtype), tensor)
return torch.clamp(tensor, min_val, max_val)
def safe_cosine_similarity(a, b, dim=-1, eps=EPS):
if a.dtype != torch.float32:
a = a.float()
if b.dtype != torch.float32:
b = b.float()
a_norm = torch.norm(a, dim=dim, keepdim=True).clamp(min=eps)
b_norm = torch.norm(b, dim=dim, keepdim=True).clamp(min=eps)
return torch.sum(a * b, dim=dim, keepdim=True) / (a_norm * b_norm)
def item_to_vector(item, vector_dim=64):
"""Convert arbitrary item to fixed-size vector representation.
Uses different encoding strategies:
- Strings: MD5 hash-based encoding
- Numbers: Sinusoidal positional encoding
- Tensors: Flattening with padding/truncation
- Other: Deterministic hash-based random vector
"""
if isinstance(item, str):
# String to vector via hashing
hash_obj = hashlib.md5(item.encode())
hash_bytes = hash_obj.digest()
# Convert bytes to float vector
vector = torch.tensor([b / 255.0 for b in hash_bytes], dtype=torch.float32)
# Pad or truncate to desired dimension
if len(vector) < vector_dim:
padding = torch.zeros(vector_dim - len(vector), dtype=torch.float32)
vector = torch.cat([vector, padding])
else:
vector = vector[:vector_dim]
elif isinstance(item, (int, float)):
# Numeric to vector via sinusoidal encoding
vector = torch.zeros(vector_dim, dtype=torch.float32)
for i in range(vector_dim // 2):
freq = 10000 ** (-2 * i / vector_dim)
vector[2*i] = math.sin(item * freq)
vector[2*i + 1] = math.cos(item * freq)
elif torch.is_tensor(item):
# Tensor to vector via projection
vector = item.flatten().float()
if len(vector) < vector_dim:
padding = torch.zeros(vector_dim - len(vector), dtype=torch.float32, device=vector.device)
vector = torch.cat([vector, padding])
else:
vector = vector[:vector_dim]
else:
# Default: random stable vector based on hash (no global RNG side-effects)
hash_val = hash(str(item)) % (2**31)
gen = torch.Generator(device='cpu')
gen.manual_seed(hash_val)
vector = torch.randn(vector_dim, generator=gen, dtype=torch.float32)
return make_safe(vector)
###########################################################################################################################################
###############################################- - - LEARNABLE HASH FUNCTION - - -#####################################################
class LearnableHashFunction(nn.Module):
"""Neural hash function with Hebbian plasticity.
Implements learnable hash functions that adapt through Hebbian learning,
strengthening patterns that co-occur and developing associative mappings.
Mathematical Details:
- Base hash: h = tanh(W2 * tanh(W1 * x + b1) + b2)
- Hebbian modulation: h_mod = h * tanh(w_hebbian)
- Threshold adaptation: h_thresh = h_mod - θ
- Binary conversion: p = sigmoid(5 * h_thresh)
"""
def __init__(self, input_dim, hash_output_bits=32, learning_rate=0.01):
super().__init__()
self.input_dim = input_dim
self.hash_output_bits = hash_output_bits
self.learning_rate = learning_rate
# Neural hash function
self.hash_network = nn.Sequential(
nn.Linear(input_dim, input_dim * 2),
nn.LayerNorm(input_dim * 2),
nn.Tanh(),
nn.Linear(input_dim * 2, hash_output_bits),
nn.Tanh() # Output in [-1, 1]
)
# Hebbian plasticity parameters
self.hebbian_weights = nn.Parameter(torch.ones(hash_output_bits) * 0.1)
self.plasticity_rate = nn.Parameter(torch.tensor(learning_rate))
# Activity history for Hebbian learning
self.register_buffer('activity_history', torch.zeros(100, hash_output_bits))
self.register_buffer('history_pointer', torch.tensor(0, dtype=torch.long))
# Co-activation tracking
self.coactivation_matrix = nn.Parameter(torch.eye(hash_output_bits) * 0.1)
# Adaptive threshold
self.activation_threshold = nn.Parameter(torch.zeros(hash_output_bits))
def compute_hash_activation(self, item_vector):
"""Compute hash activation pattern for an item."""
# Ensure correct shape/dtype/device
if item_vector.dim() == 1:
item_vector = item_vector.unsqueeze(0)
item_vector = item_vector.to(next(self.hash_network.parameters()).device, dtype=torch.float32)
# Base neural hash
base_hash = self.hash_network(item_vector).squeeze(0)
# Apply Hebbian modulation
hebbian_modulation = torch.tanh(self.hebbian_weights)
modulated_hash = base_hash * hebbian_modulation
# Apply adaptive threshold
thresholded = modulated_hash - self.activation_threshold
# Convert to binary pattern (probabilistic)
hash_probs = torch.sigmoid(thresholded * 10.0) # Sharp sigmoid
return hash_probs, modulated_hash
def get_hash_bits(self, item_vector, deterministic=False):
"""Get binary hash bits for an item."""
hash_probs, _ = self.compute_hash_activation(item_vector)
if deterministic:
hash_bits = (hash_probs > 0.5).float()
else:
hash_bits = torch.bernoulli(hash_probs)
return hash_bits
def hebbian_update(self, item_vector, co_occurring_items=None):
"""Apply Hebbian learning rule: Δw = η * pre * post.
Strengthens connections between co-activated hash bits and updates
the co-activation matrix for associative learning.
"""
hash_probs, modulated_hash = self.compute_hash_activation(item_vector)
# Store activity in history
with torch.no_grad():
ptr = int(self.history_pointer.item())
self.activity_history[ptr % self.activity_history.size(0)].copy_(hash_probs.detach())
self.history_pointer.add_(1)
self.history_pointer.remainder_(self.activity_history.size(0))
# Hebbian weight update: strengthen active bits
plasticity_rate = torch.clamp(self.plasticity_rate, 0.001, 0.1)
# Basic Hebbian rule: Δw = η * pre * post
activity_strength = torch.abs(modulated_hash)
hebbian_delta = plasticity_rate * activity_strength * hash_probs
# Update Hebbian weights
with torch.no_grad():
self.hebbian_weights.data.add_(hebbian_delta * 0.05)
self.hebbian_weights.data.clamp_(-2.0, 2.0)
# Co-activation matrix update if multiple items provided
if co_occurring_items is not None:
self.update_coactivation_matrix(hash_probs, co_occurring_items)
return hash_probs
def update_coactivation_matrix(self, current_activation, co_occurring_items):
"""Update co-activation matrix based on items that occur together."""
with torch.no_grad():
for co_item in co_occurring_items:
co_item_vector = item_to_vector(co_item, self.input_dim).to(current_activation.device)
co_activation, _ = self.compute_hash_activation(co_item_vector)
# Outer product for co-activation strengthening
coactivation_update = torch.outer(current_activation, co_activation)
# Update co-activation matrix
learning_rate = 0.01
self.coactivation_matrix.data.add_(learning_rate * coactivation_update)
self.coactivation_matrix.data.clamp_(-1.0, 1.0)
def get_similar_patterns(self, item_vector, top_k=5):
"""Find historically similar activation patterns."""
current_probs, _ = self.compute_hash_activation(item_vector)
# Compare with history
similarities = []
for i in range(self.activity_history.shape[0]):
hist_pattern = self.activity_history[i]
if torch.sum(hist_pattern) > 0: # Non-zero pattern
similarity = safe_cosine_similarity(
current_probs.unsqueeze(0),
hist_pattern.unsqueeze(0)
).squeeze()
similarities.append((i, float(similarity.item())))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def apply_forgetting(self, forget_rate=0.99):
"""Apply gradual forgetting to prevent overfitting."""
with torch.no_grad():
self.hebbian_weights.data.mul_(forget_rate)
self.coactivation_matrix.data.mul_(forget_rate)
###########################################################################################################################################
################################################- - - HEBBIAN BLOOM FILTER - - -#######################################################
class HebbianBloomFilter(nn.Module):
"""Probabilistic set membership filter with Hebbian learning.
Combines traditional Bloom filter efficiency with adaptive hash functions
that learn from usage patterns and develop associative mappings.
Key Features:
- Learnable hash functions with neural plasticity
- Confidence-based membership testing
- Associative learning between related items
- Temporal decay for forgetting old patterns
"""
def __init__(self, capacity=10000, error_rate=0.01, vector_dim=64, num_hash_functions=8):
super().__init__()
self.capacity = capacity
self.error_rate = error_rate
self.vector_dim = vector_dim
self.num_hash_functions = num_hash_functions
# Calculate optimal bit array size
self.bit_array_size = self._calculate_bit_array_size(capacity, error_rate)
# Learnable hash functions
self.hash_functions = nn.ModuleList([
LearnableHashFunction(vector_dim, hash_output_bits=32)
for _ in range(num_hash_functions)
])
# Bit array with confidence scores (not just binary)
self.register_buffer('bit_array', torch.zeros(self.bit_array_size))
self.register_buffer('confidence_array', torch.zeros(self.bit_array_size))
# Item storage for association learning
self.stored_items = {}
self.item_vectors = {}
# Usage statistics
self.register_buffer('access_counts', torch.zeros(self.bit_array_size))
self.register_buffer('total_items_added', torch.tensor(0, dtype=torch.long))
# Associative learning parameters
self.association_strength = nn.Parameter(torch.tensor(0.1))
self.confidence_threshold = nn.Parameter(torch.tensor(0.5))
# Temporal decay for forgetting
self.decay_rate = nn.Parameter(torch.tensor(0.999))
def _calculate_bit_array_size(self, capacity, error_rate):
"""Calculate optimal bit array size for given capacity and error rate."""
return int(-capacity * math.log(error_rate) / (math.log(2) ** 2))
def _get_bit_indices(self, item_vector):
"""Get bit indices from all hash functions for an item."""
indices = []
confidences = []
for hash_func in self.hash_functions:
hash_bits = hash_func.get_hash_bits(item_vector, deterministic=True)
# Convert hash bits to index in bit array using binary encoding -> integer -> modulo
weights = (1 << torch.arange(len(hash_bits), device=hash_bits.device, dtype=torch.int64))
bit_index = int((hash_bits.to(dtype=torch.int64) * weights).sum().item())
bit_index = bit_index % self.bit_array_size
# Compute confidence based on hash activation strength
hash_probs, _ = hash_func.compute_hash_activation(item_vector)
confidence = torch.mean(torch.abs(hash_probs - 0.5)) * 2 # Distance from uncertain (0.5)
indices.append(bit_index)
confidences.append(confidence.item())
return indices, confidences
def add(self, item, associated_items=None):
"""Add item to the Hebbian Bloom filter with optional associations.
Args:
item: Item to add to the filter
associated_items: Optional list of items to associate with this item
Returns:
List of bit indices that were set for this item
"""
# Convert item to vector
item_vector = item_to_vector(item, self.vector_dim)
# Store item information
item_key = str(item)
self.stored_items[item_key] = item
self.item_vectors[item_key] = item_vector
# Get bit indices and confidences
indices, confidences = self._get_bit_indices(item_vector)
# Update bit array and confidence array
with torch.no_grad():
for idx, conf in zip(indices, confidences):
self.bit_array[idx] = 1.0
self.confidence_array[idx] = max(float(self.confidence_array[idx].item()), conf)
self.access_counts[idx] += 1
# Apply Hebbian learning to hash functions
for hash_func in self.hash_functions:
hash_func.hebbian_update(item_vector, associated_items)
# Update item count
with torch.no_grad():
self.total_items_added.add_(1)
# Learn associations if provided
if associated_items:
self._learn_associations(item, associated_items)
return indices
def _learn_associations(self, primary_item, associated_items):
"""Learn associations between items using Hebbian principles."""
primary_vector = item_to_vector(primary_item, self.vector_dim)
for assoc_item in associated_items:
assoc_vector = item_to_vector(assoc_item, self.vector_dim)
# Compute similarity
similarity = safe_cosine_similarity(
primary_vector.unsqueeze(0),
assoc_vector.unsqueeze(0)
).squeeze()
# Strengthen hash functions based on similarity
association_strength = torch.clamp(self.association_strength, 0.01, 1.0)
_ = association_strength # keep variable used to respect format
for hash_func in self.hash_functions:
# If items are similar, encourage similar hash patterns
if float(similarity.item()) > 0.5:
hash_func.hebbian_update(primary_vector, [assoc_item])
def query(self, item, return_confidence=False):
"""Query membership with optional confidence estimation.
Args:
item: Item to query
return_confidence: Whether to return confidence score
Returns:
Boolean membership result, optionally with confidence score
"""
item_vector = item_to_vector(item, self.vector_dim)
indices, confidences = self._get_bit_indices(item_vector)
# Check if all bits are set
bit_checks = [self.bit_array[idx].item() > 0 for idx in indices]
is_member = all(bit_checks)
if return_confidence:
# Compute confidence based on multiple factors
bit_confidences = [self.confidence_array[idx].item() for idx in indices]
hash_confidences = confidences
# Combined confidence
bit_conf = np.mean(bit_confidences) if bit_confidences else 0.0
hash_conf = np.mean(hash_confidences) if hash_confidences else 0.0
# Access frequency confidence
access_conf = np.mean([self.access_counts[idx].item() for idx in indices])
access_conf = min(access_conf / 10.0, 1.0) # Normalize
overall_confidence = (bit_conf + hash_conf + access_conf) / 3.0
return is_member, overall_confidence
return is_member
def find_similar_items(self, query_item, top_k=5):
"""Find items similar to query using learned associations (vector + coactivation)."""
query_vector = item_to_vector(query_item, self.vector_dim)
# Precompute query activations and coactivation weights for each hash function
coact_weights = []
for hash_func in self.hash_functions:
q_act, _ = hash_func.compute_hash_activation(query_vector)
# act_q^T M act_i = dot(M^T act_q, act_i)
q_weight = torch.matmul(hash_func.coactivation_matrix.t(), q_act)
coact_weights.append((q_act, q_weight))
similarities = []
for item_key, item_vector in self.item_vectors.items():
# Base cosine similarity in item space
base_sim = safe_cosine_similarity(
query_vector.unsqueeze(0),
item_vector.unsqueeze(0)
).squeeze().item()
# Coactivation similarity averaged over hash functions
co_sim_sum = 0.0
for (hash_func, (q_act, q_weight)) in zip(self.hash_functions, coact_weights):
i_act, _ = hash_func.compute_hash_activation(item_vector)
co_sim_sum += torch.dot(q_weight, i_act).item() / max(1, len(i_act))
co_sim = co_sim_sum / max(1, len(self.hash_functions))
# Blend scores (alpha vector, beta coactivation)
alpha, beta = 0.6, 0.4
score = alpha * base_sim + beta * co_sim
similarities.append((self.stored_items[item_key], score))
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def get_hash_statistics(self):
"""Get statistics about hash function learning."""
stats = {
'total_items': int(self.total_items_added.item()),
'bit_array_utilization': (self.bit_array > 0).float().mean().item(),
'average_confidence': self.confidence_array.mean().item(),
'hash_function_stats': []
}
for i, hash_func in enumerate(self.hash_functions):
hash_stats = {
'function_id': i,
'hebbian_weights_mean': hash_func.hebbian_weights.mean().item(),
'plasticity_rate': hash_func.plasticity_rate.item(),
'activation_threshold_mean': hash_func.activation_threshold.mean().item()
}
stats['hash_function_stats'].append(hash_stats)
return stats
def apply_temporal_decay(self):
"""Apply temporal decay to implement forgetting."""
decay_rate = torch.clamp(self.decay_rate, 0.9, 0.999)
with torch.no_grad():
self.confidence_array.mul_(decay_rate)
self.access_counts.mul_(decay_rate)
# Remove bits with very low confidence
low_confidence_mask = self.confidence_array < 0.1
self.bit_array[low_confidence_mask] = 0.0
self.confidence_array[low_confidence_mask] = 0.0
# Apply forgetting to hash functions
for hash_func in self.hash_functions:
hash_func.apply_forgetting(float(decay_rate.item()))
def optimize_structure(self):
"""Optimize the filter structure based on usage patterns."""
with torch.no_grad():
# Adjust thresholds based on access patterns (coarse global heuristic)
high_access_ratio = (self.access_counts > self.access_counts.mean()).float().mean().item()
adjustment = -0.01 * high_access_ratio
for hash_func in self.hash_functions:
hash_func.activation_threshold.data.add_(adjustment)
hash_func.activation_threshold.data.clamp_(-1.0, 1.0)
###########################################################################################################################################
############################################- - - ASSOCIATIVE HEBBIAN BLOOM SYSTEM - - -###############################################
class AssociativeHebbianBloomSystem(nn.Module):
"""Ensemble of Hebbian Bloom filters with meta-learning.
Combines multiple Hebbian Bloom filters with learned routing to create
a robust, scalable associative memory system with ensemble decision making.
Features:
- Multiple specialized filters with learned routing
- Ensemble voting for robust membership decisions
- Global association learning across filters
- Automatic system maintenance and optimization
"""
def __init__(self, capacity=10000, vector_dim=64, num_filters=3):
super().__init__()
self.capacity = capacity
self.vector_dim = vector_dim
self.num_filters = num_filters
# Multiple Hebbian Bloom filters for ensemble behavior
self.filters = nn.ModuleList([
HebbianBloomFilter(
capacity=capacity // num_filters,
error_rate=0.01,
vector_dim=vector_dim,
num_hash_functions=6
) for _ in range(num_filters)
])
# Meta-learning for filter selection
self.filter_selector = nn.Sequential(
nn.Linear(vector_dim, vector_dim // 2),
nn.ReLU(),
nn.Linear(vector_dim // 2, num_filters),
nn.Softmax(dim=-1)
)
# Global association learning
self.global_association_net = nn.Sequential(
nn.Linear(vector_dim * 2, vector_dim),
nn.Tanh(),
nn.Linear(vector_dim, 1),
nn.Sigmoid()
)
# System statistics
self.register_buffer('global_access_count', torch.tensor(0, dtype=torch.long))
def add_item(self, item, category=None, associated_items=None):
"""Add item to the most appropriate filter(s)."""
item_vector = item_to_vector(item, self.vector_dim)
# Determine which filter(s) to use
filter_weights = self.filter_selector(item_vector.unsqueeze(0)).squeeze(0)
# Light load-balancing penalty to avoid starving filters
with torch.no_grad():
loads = torch.tensor([f.total_items_added.item() / max(1, f.capacity) for f in self.filters], dtype=filter_weights.dtype, device=filter_weights.device)
filter_weights = filter_weights - 0.1 * loads
# Add to filters based on weights (top-k selection)
top_k_filters = min(2, self.num_filters) # Use top 2 filters
_, top_indices = torch.topk(filter_weights, top_k_filters)
added_to_filters = []
for filter_idx in top_indices:
filter_obj = self.filters[filter_idx.item()]
indices = filter_obj.add(item, associated_items)
added_to_filters.append((filter_idx.item(), indices))
# Update global statistics
with torch.no_grad():
self.global_access_count.add_(1)
return added_to_filters
def query_item(self, item, return_detailed=False):
"""Query item across all filters with ensemble confidence."""
item_vector = item_to_vector(item, self.vector_dim)
results = []
confidences = []
for i, filter_obj in enumerate(self.filters):
is_member, confidence = filter_obj.query(item, return_confidence=True)
results.append(is_member)
confidences.append(confidence)
# Ensemble decision
positive_votes = sum(results)
avg_confidence = np.mean(confidences)
# Final decision based on majority vote and confidence
ensemble_decision = positive_votes > len(self.filters) // 2
if return_detailed:
return {
'is_member': ensemble_decision,
'confidence': avg_confidence,
'individual_results': list(zip(results, confidences)),
'positive_votes': positive_votes,
'total_filters': len(self.filters)
}
return ensemble_decision
def find_associations(self, query_item, top_k=10):
"""Find associated items across all filters."""
all_similarities = []
for filter_obj in self.filters:
similarities = filter_obj.find_similar_items(query_item, top_k)
all_similarities.extend(similarities)
# Remove duplicates and re-rank
unique_items = {}
for item, similarity in all_similarities:
item_key = str(item)
if item_key in unique_items:
unique_items[item_key] = max(unique_items[item_key], similarity)
else:
unique_items[item_key] = similarity
# Sort by similarity
ranked_items = sorted(unique_items.items(), key=lambda x: x[1], reverse=True)
return ranked_items[:top_k]
def system_maintenance(self):
# Apply temporal decay to all filters
for filter_obj in self.filters:
filter_obj.apply_temporal_decay()
filter_obj.optimize_structure()
# System-level optimization every 1000 accesses
if self.global_access_count % 1000 == 0:
self._global_optimization()
def _global_optimization(self):
print("Performing global Hebbian Bloom system optimization...")
# Rebalance filter usage if needed
filter_utilizations = []
for filter_obj in self.filters:
stats = filter_obj.get_hash_statistics()
utilization = stats['bit_array_utilization']
filter_utilizations.append(utilization)
# Could implement filter rebalancing here if needed
def get_system_statistics(self):
stats = {
'global_access_count': int(self.global_access_count.item()),
'num_filters': self.num_filters,
'filter_statistics': []
}
for i, filter_obj in enumerate(self.filters):
filter_stats = filter_obj.get_hash_statistics()
filter_stats['filter_id'] = i
stats['filter_statistics'].append(filter_stats)
return stats
###########################################################################################################################################
####################################################- - - DEMO AND TESTING - - -#######################################################
def test_hebbian_bloom():
print("Testing Hebbian Bloom Filter - Self-Organizing Probabilistic Memory")
print("=" * 85)
# Create Hebbian Bloom Filter system
system = AssociativeHebbianBloomSystem(
capacity=1000,
vector_dim=32,
num_filters=3
)
print(f"Created Hebbian Bloom System:")
print(f" - Capacity: 1000 items")
print(f" - Vector dimension: 32")
print(f" - Number of filters: 3")
print(f" - Hash functions per filter: 6")
# Test with related items to demonstrate Hebbian learning
print("\nAdding related items to demonstrate associative learning...")
# Add some related items
fruits = ["apple", "banana", "orange", "grape", "strawberry"]
colors = ["red", "yellow", "orange", "purple", "red"]
for fruit, color in zip(fruits, colors):
system.add_item(fruit, associated_items=[color, "fruit"])
system.add_item(color, associated_items=[fruit, "color"])
# Add some numbers
numbers = [1, 2, 3, 4, 5]
for num in numbers:
system.add_item(num, associated_items=["number", "digit"])
print(f"Added {len(fruits)} fruits with colors and {len(numbers)} numbers")
# Test membership queries
print("\nTesting membership queries...")
test_items = ["apple", "banana", "pineapple", 1, 3, 7, "red", "blue"]
for item in test_items:
result = system.query_item(item, return_detailed=True)
print(f" '{item}': {result['is_member']} (confidence: {result['confidence']:.3f}, votes: {result['positive_votes']}/{result['total_filters']})")
# Test associative retrieval
print("\nTesting associative retrieval...")
query_items = ["apple", "red", 2]
for query in query_items:
associations = system.find_associations(query, top_k=5)
print(f"\nItems associated with '{query}':")
for i, (item, similarity) in enumerate(associations[:3]):
print(f" {i+1}. {item} (similarity: {similarity:.3f})")
# Test Hebbian adaptation
print("\nTesting Hebbian adaptation with repeated associations...")
# Repeatedly associate "apple" with "healthy"
for _ in range(5):
system.add_item("apple", associated_items=["healthy", "nutrition"])
# Check if "healthy" becomes more associated with "apple"
updated_associations = system.find_associations("apple", top_k=5)
print("Updated associations for 'apple' after repeated 'healthy' associations:")
for item, similarity in updated_associations[:3]:
print(f" {item}: {similarity:.3f}")
# System statistics
stats = system.get_system_statistics()
print(f"\nSystem Statistics:")
print(f" - Total accesses: {stats['global_access_count']}")
for filter_stats in stats['filter_statistics']:
print(f" Filter {filter_stats['filter_id']}:")
print(f" - Items added: {filter_stats['total_items']}")
print(f" - Bit utilization: {filter_stats['bit_array_utilization']:.3f}")
print(f" - Average confidence: {filter_stats['average_confidence']:.3f}")
# Test temporal decay
print("\nApplying temporal decay...")
system.system_maintenance()
print("\nHebbian Bloom Filter test completed!")
print("✓ Self-organizing hash functions with Hebbian learning")
print("✓ Associative memory formation")
print("✓ Adaptive confidence estimation")
print("✓ Temporal decay and forgetting mechanisms")
print("✓ Ensemble filtering for robust membership testing")
return True
def hebbian_learning_demo():
"""Demonstrate Hebbian learning in action."""
print("\n" + "="*60)
print("HEBBIAN LEARNING DEMONSTRATION")
print("="*60)
# Create simple single filter for clear demonstration
hb_filter = HebbianBloomFilter(capacity=100, vector_dim=16, num_hash_functions=4)
# Add items with strong associations
print("Phase 1: Adding animal-habitat associations")
animals_habitats = [
("lion", ["savanna", "africa", "predator"]),
("tiger", ["jungle", "asia", "predator"]),
("penguin", ["antarctica", "ice", "bird"]),
("shark", ["ocean", "water", "predator"]),
("eagle", ["mountain", "sky", "bird"])
]
for animal, habitats in animals_habitats:
hb_filter.add(animal, associated_items=habitats)
for habitat in habitats:
hb_filter.add(habitat, associated_items=[animal])
# Test initial associations
print("\nInitial associations:")
similar_to_lion = hb_filter.find_similar_items("lion", top_k=3)
for item, similarity in similar_to_lion:
print(f" lion -> {item}: {similarity:.3f}")
# Strengthen specific associations through repetition
print("\nPhase 2: Strengthening lion-savanna association through repetition")
for _ in range(10):
hb_filter.add("lion", associated_items=["savanna"])
hb_filter.add("savanna", associated_items=["lion"])
# Test strengthened associations
print("\nStrengthened associations:")
similar_to_lion = hb_filter.find_similar_items("lion", top_k=3)
for item, similarity in similar_to_lion:
print(f" lion -> {item}: {similarity:.3f}")
# Show hash function adaptation
stats = hb_filter.get_hash_statistics()
print(f"\nHash function adaptation statistics:")
for hash_stat in stats['hash_function_stats'][:2]: # Show first 2
print(f" Hash function {hash_stat['function_id']}:")
print(f" - Hebbian weights mean: {hash_stat['hebbian_weights_mean']:.4f}")
print(f" - Plasticity rate: {hash_stat['plasticity_rate']:.4f}")
print("\n Hebbian learning successfully demonstrated")
print(" Repeated associations strengthen neural pathways in hash functions")
if __name__ == "__main__":
test_hebbian_bloom()
hebbian_learning_demo()
###########################################################################################################################################
###########################################################################################################################################