CogNet-1B / data /aicl /examples /29_distributed_cache.aicl
thefinalboss's picture
Add AICL example: 29_distributed_cache.aicl
215eb04 verified
Raw
History Blame Contribute Delete
17.3 kB
# AICL Example: Distributed Cache
# Implements a distributed cache with consistent hashing, replication, eviction policies,
# cache invalidation, read-through/write-through patterns, and backfill strategies.
# Level 1: Architecture
Goal: Provide a low-latency, highly available distributed caching layer that supports consistent hashing for horizontal scaling, multi-region replication, configurable eviction policies, and transparent read-through/write-through integration with backend data stores.
Constraint: Cache consistency must follow eventual consistency model with configurable max staleness bound
Constraint: Consistent hashing ring must distribute keys within 10% of uniform distribution
Constraint: Replication factor must ensure data survives any single node failure
Constraint: Eviction must occur before memory usage exceeds 95% of allocated capacity
Constraint: All cache operations must complete within 5ms P99 at the local region
Risk: Cache stampede causing thundering herd on cache miss
Recovery: Implement request coalescing for in-flight key fetches; only one request per key fetches from backend; other concurrent requests wait on shared future; probabilistic early expiration to stagger re-fetches
Risk: Hotspot key overwhelming single node despite consistent hashing
Recovery: Implement virtual nodes (150+ per physical node) for better distribution; for extreme hotspots, replicate hot keys to multiple nodes using read replicas; client-side local caching for top-K hot keys
Risk: Network partition causing cache inconsistency between regions
Recovery: Version vectors track causality; conflict resolution using last-write-wins with synchronized clocks; on partition heal, anti-entropy protocol reconciles divergent entries using version comparison; tombstones for deletions
Risk: Node failure causing temporary data loss and increased backend load
Recovery: Replicated copies on adjacent ring positions serve reads; background re-replication fills new node from replicas; client retry with replica read on primary failure; circuit breaker prevents backend cascade
Risk: Memory pressure causing unpredictable eviction of recently cached data
Recovery: Segmented LRU (protected/probationary) with admission window; items must prove value before entering protected segment; dynamic pool sizing based on hit rate feedback; OOM protection via hard eviction threshold
Risk: Consistent hashing rebalancing causing massive key migration on topology change
Recovery: Use bounded loads consistent hashing to limit key movement; vnodes ensure only affected partitions move; rebalancing throttled to prevent network saturation; clients update ring topology lazily
Layer: CacheCore
SubLayer: HashRing
SubLayer: StorageEngine
SubLayer: EvictionManager
SubLayer: ReplicationManager
Layer: ClientProtocol
SubLayer: ReadThrough
SubLayer: WriteThrough
SubLayer: CacheAside
SubLayer: Invalidation
Layer: Consistency
SubLayer: VersionTracking
SubLayer: AntiEntropy
SubLayer: ConflictResolution
Validation: Consistent hashing ring must have at least 150 virtual nodes per physical node
Validation: Replication factor must be at least 2 for any write-acknowledged key
Validation: Eviction policy must protect recently admitted items from immediate re-eviction
Validation: Read-through callback must have a timeout of at most 500ms
Validation: Cache invalidation must propagate to all replicas within the consistency window
Validation: Memory usage must not exceed 95% before triggering proactive eviction
# Level 2: Entities
Entity CacheNode
nodeId: string
host: string
port: integer
region: string
virtualNodeCount: integer
memoryUsedBytes: integer
memoryTotalBytes: integer
cpuUtilization: float
status: string
lastHeartbeat: datetime
Entity CacheEntry
key: string
value: bytes
version: integer
regionVersionVector: dict
createdAt: datetime
accessedAt: datetime
ttlSeconds: integer
evictionTier: string
accessCount: integer
sizeBytes: integer
Entity HashRing
ringId: string
nodes: list
virtualNodes: dict
replicationFactor: integer
totalSlots: integer
version: integer
lastRebalanced: datetime
maxKeysPerVnode: integer
Entity EvictionPolicy
policyName: string
protectedRatio: float
probationaryRatio: float
admissionWindow: integer
maxEntrySizeBytes: integer
dynamicAdjustmentEnabled: boolean
hitRateTarget: float
currentHitRate: float
Entity ReplicationConfig
replicationFactor: integer
writeQuorum: integer
readQuorum: integer
consistencyLevel: string
replicationLagThresholdMs: integer
asyncReplicationBatchSize: integer
conflictResolutionStrategy: string
Entity InvalidationMessage
invalidationId: string
key: string
keyPattern: string
originNode: string
reason: string
version: integer
timestamp: datetime
targetRegions: list
propagated: boolean
# Level 3: Behaviors
Behavior GetEntry
Input: key: string, consistencyLevel: string, timeoutMs: integer
Output: value: bytes, version: integer, hit: boolean, sourceNode: string
Action:
Hash key to locate primary node on the ring
If local hit, return value from storage engine
If miss, check replica nodes based on consistency level
If read-through enabled, coalesce with in-flight fetches for same key
Fetch from backend if all cache nodes miss; store result on primary
Apply TTL check and return stale-if-valid if within grace period
Update access metadata for eviction scoring
Behavior PutEntry
Input: key: string, value: bytes, ttlSeconds: integer, writeThrough: boolean, consistencyLevel: string
Output: version: integer, replicatedNodes: list, latencyMs: float
Action:
Hash key to determine primary and replica nodes
Check admission policy before accepting entry
Write to primary node storage engine
Replicate to N-1 replica nodes based on replication factor
Await write quorum acknowledgment before responding
If write-through enabled, synchronously write to backend data store
Invalidate stale copies on non-primary nodes
Return version and replication confirmation
Behavior InvalidateKey
Input: key: string, pattern: string, reason: string, synchronous: boolean
Output: invalidatedNodes: list, invalidationId: string
Action:
Determine all nodes holding the key or keys matching pattern
Delete entry from primary and all replica nodes
Publish invalidation message to cross-region replication channels
If synchronous, await acknowledgment from all target nodes
If asynchronous, fire-and-forget with best-effort delivery
Log invalidation for audit trail
Emit invalidation metric with latency and scope
Behavior RebalanceRing
Input: addedNodes: list, removedNodes: list
Output: migratedKeys: integer, durationMs: float, newVersion: integer
Action:
Compute new ring topology with virtual node assignments
Identify keys that must migrate to new primary nodes
Transfer key ranges to new owners with streaming
Verify data integrity on destination nodes
Update routing table atomically across all clients
Throttle migration rate to avoid impacting production traffic
Increment ring version for client topology refresh
Behavior EvictEntries
Input: targetMemoryBytes: integer, policy: EvictionPolicy
Output: evictedCount: integer, freedBytes: integer
Action:
Scan probationary segment of segmented LRU first
Apply admission filter to protect frequently accessed items
If probationary insufficient, evict from protected segment's tail
Respect TTL-based expiration before policy-based eviction
Update hit rate metrics after each eviction batch
Dynamically adjust protected/probationary ratio based on hit rate
Emit eviction count and memory metrics
Behavior ResolveConflict
Input: key: string, localEntry: CacheEntry, remoteEntry: CacheEntry
Output: resolvedEntry: CacheEntry, resolution: string
Action:
Compare version vectors for causal ordering
If one version dominates, use that entry as resolved
If concurrent versions, apply conflict resolution strategy
For last-write-wins, compare timestamps from synchronized clocks
For merge, combine non-overlapping fields if applicable
Write resolved entry and propagate anti-entropy update
Log conflict for analysis of consistency model violations
# Level 4: Conditions
Condition: MemoryThresholdBreached
When cache node memory usage exceeds 90% of allocated capacity
Then trigger proactive eviction of probationary tier entries; if insufficient, evict from protected tier; throttle new admissions until usage drops below 85%; emit critical memory metric
Condition: ReplicationLagExceeded
When replica falls behind primary by more than replicationLagThresholdMs
Then mark replica as stale for read quorum calculations; redirect reads to healthy replicas; trigger catchup replication stream; if catchup fails within 5 minutes, mark replica as degraded
Condition: ConsistentHashImbalance
When any node's key count deviates more than 15% from the average
Then redistribute virtual nodes to balance load; consider adding virtual nodes to underloaded physical nodes; emit imbalance metric; trigger background rebalancing
Condition: BackendUnreachable
When read-through or write-through backend becomes unreachable
Then serve stale cache entries beyond TTL within grace period; queue write-through writes for async backend delivery; circuit break backend calls after 5 consecutive failures; emit backend unavailability alert
Condition: StampedeDetected
When more than 10 concurrent requests miss the same key within 100ms
Then coalesce all requests into single backend fetch; return shared future result to all waiters; implement probabilistic early expiration (PER) to stagger future re-fetches; emit stampede metric
# Level 5: Events
Event: OnNodeJoined
On new cache node added to the cluster
Action: Register node on hash ring, assign virtual nodes, begin key migration from overloaded neighbors, update client routing tables, start replication streams, emit node-join metric
Event: OnNodeFailed
On cache node declared failed after missing heartbeats
Action: Remove from hash ring, promote replicas to primary for affected keys, trigger re-replication to restore replication factor, update client routing tables, begin data recovery from remaining replicas
Event: OnCacheHitRateDrop
On global cache hit rate drops below configured target for 5 consecutive minutes
Action: Analyze eviction patterns, increase protected segment ratio, adjust admission policy, identify churn patterns, consider increasing cache capacity, emit hit rate degradation alert
Event: OnInvalidationBroadcast
On invalidation message received from another region
Action: Apply invalidation to local storage, acknowledge if synchronous, update version vector, propagate to any downstream replicas, emit cross-region invalidation metric
Event: OnRebalanceComplete
On ring rebalancing migration finishes
Action: Verify all key ranges are correctly placed, update global routing table, validate replication factor for all keys, emit rebalance completion metric with migration stats, enable full traffic on new nodes
# Level 6: Concurrency
Parallel:
Independent cache get/put operations across different key partitions
Background replication streams to replica nodes
Anti-entropy reconciliation between regions
Eviction scanning and memory management
Client routing table updates
# Level 7: Optimization
Optimize: Cache hit rate
Priority: Use segmented LRU with dynamic admission policy; implement probabilistic early expiration to prevent stampedes; prefetch keys with predictable access patterns; adjust protected/probationary ratio based on real-time hit rate
Optimize: Replication throughput
Priority: Batch replication updates into pipeline messages; use async replication for non-critical consistency levels; compress replication payloads with lz4; parallelize replication to independent replicas
Optimize: Memory utilization
Priority: Use slab allocation for common value sizes; compress cold entries with zstd; implement object pooling for cache entry metadata; memory-mapped storage for items exceeding threshold size
# Level 8: Learning
Learn: Optimal TTL for different key patterns
Goal: Maximize hit rate while minimizing stale data served
Adapt: default TTL per key prefix pattern
Based: Access frequency, backend update frequency, and staleness tolerance observed over 7-day windows
Learn: Admission policy thresholds
Goal: Prevent cache pollution from one-time access keys while retaining valuable entries
Adapt: admissionWindow and minimumAccessCount for protected tier
Based: Historical eviction patterns showing recently-evicted items being re-requested vs. truly cold items
Learn: Replication factor per key namespace
Goal: Use higher replication for hot keys and lower replication for cold keys to optimize memory
Adapt: dynamicReplicationFactor per key namespace
Based: Access frequency distribution, read quorum requirements, and node failure probability
# Level 9: Security
Security:
Encrypt: All inter-node replication traffic using TLS 1.3
Encrypt: Client-to-cache connections using TLS with SASL authentication
Encrypt: Cache entry values at rest using AES-256-GCM for sensitive key prefixes
Protect: Key-level access control via ACLs (read, write, admin)
Protect: Admin operations (flush, rebalance, config change) via RBAC
Protect: Against cache poisoning by validating write authorization per namespace
Protect: Against side-channel attacks via constant-time key comparison for authentication
# Level 10: Native
Native: C++
{
class ConsistentHashRing {
std::map<uint64_t, std::string> ring_;
std::unordered_map<std::string, std::vector<uint64_t>> vnodes_;
int replication_factor_;
int vnodes_per_node_;
public:
void addNode(const std::string& node_id) {
std::hash<std::string> hasher;
for (int i = 0; i < vnodes_per_node_; ++i) {
std::string vnode_key = node_id + ":" + std::to_string(i);
uint64_t hash = hasher(vnode_key);
ring_[hash] = node_id;
vnodes_[node_id].push_back(hash);
}
}
void removeNode(const std::string& node_id) {
for (uint64_t hash : vnodes_[node_id]) {
ring_.erase(hash);
}
vnodes_.erase(node_id);
}
std::vector<std::string> getNodes(const std::string& key) {
std::vector<std::string> nodes;
if (ring_.empty()) return nodes;
std::hash<std::string> hasher;
uint64_t hash = hasher(key);
auto it = ring_.lower_bound(hash);
if (it == ring_.end()) it = ring_.begin();
std::set<std::string> seen;
for (int i = 0; i < replication_factor_ && seen.size() < (size_t)replication_factor_; ++i) {
if (seen.find(it->second) == seen.end()) {
nodes.push_back(it->second);
seen.insert(it->second);
}
++it;
if (it == ring_.end()) it = ring_.begin();
}
return nodes;
}
};
class SegmentedLRU {
struct Entry {
std::string key;
std::vector<uint8_t> value;
uint64_t version;
std::chrono::steady_clock::time_point expire_at;
size_t access_count;
int tier; // 0=probationary, 1=protected
};
std::list<Entry> protected_;
std::list<Entry> probationary_;
std::unordered_map<std::string, std::list<Entry>::iterator> index_;
size_t protected_capacity_;
size_t probationary_capacity_;
public:
std::optional<Entry> get(const std::string& key) {
auto it = index_.find(key);
if (it == index_.end()) return std::nullopt;
Entry& e = *(it->second);
if (std::chrono::steady_clock::now() > e.expire_at) {
erase(key);
return std::nullopt;
}
e.access_count++;
if (e.tier == 0 && e.access_count > 1) {
promote(it->second);
}
return e;
}
void promote(std::list<Entry>::iterator it) {
Entry e = std::move(*it);
e.tier = 1;
probationary_.erase(it);
while (protected_.size() >= protected_capacity_) {
Entry victim = std::move(protected_.back());
victim.tier = 0;
victim.access_count = 1;
protected_.pop_back();
probationary_.push_front(std::move(victim));
index_[probationary_.front().key] = probationary_.begin();
}
protected_.push_front(std::move(e));
index_[protected_.front().key] = protected_.begin();
}
};