File size: 5,633 Bytes
5f07cd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
DreamEngine — Vitalis FSI

Runs during idle time. Clusters recent hypervectors,
compresses them into HelixMemory prototypes.
This is how Vitalis consolidates experience into long-term patterns.
No external dependencies. Pure HDC clustering.
"""
import numpy as np
from collections import deque
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
from src.dream_engine.helix_memory import HelixMemory


class DreamEngine:
    DREAM_INTERVAL_MINUTES = 30
    MIN_BUFFER_SIZE = 50
    N_CLUSTERS = 4
    CLUSTER_ITERATIONS = 8

    def __init__(
        self,
        helix: HelixMemory,
        buffer_max: int = 500,
    ):
        self.helix = helix
        self.buffer: deque = deque(maxlen=buffer_max)
        self.last_dream: datetime = datetime.min
        self.dream_count: int = 0

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------
    def ingest(self, hv: np.ndarray, meta: Optional[dict] = None) -> None:
        """Accept one hypervector into the episodic buffer."""
        self.buffer.append((datetime.utcnow(), hv, meta or {}))

    def dream(self, force: bool = False) -> bool:
        """
        Consolidate episodic buffer into HelixMemory.
        Returns True if consolidation ran, False if skipped.
        """
        now = datetime.utcnow()
        if not force:
            if (now - self.last_dream) < timedelta(minutes=self.DREAM_INTERVAL_MINUTES):
                return False
        if len(self.buffer) < self.MIN_BUFFER_SIZE:
            print(f"[DREAM] Buffer too small ({len(self.buffer)}). Skipping.")
            return False

        print(f"[DREAM] Consolidating {len(self.buffer)} vectors...")

        # Extract hypervectors and metadata
        hvs = np.stack([hv for _, hv, _ in self.buffer]).astype(np.int8)
        metas = [meta for _, _, meta in self.buffer]

        # Cluster
        centroids, assignments = self._cluster(hvs)

        # Store each centroid as a helix code
        consolidated = 0
        for i, centroid in enumerate(centroids):
            cluster_mask = assignments == i
            if not np.any(cluster_mask):
                continue
            # Aggregate metadata from this cluster
            cluster_metas = [metas[j] for j in range(len(metas)) if cluster_mask[j]]
            merged_meta = self._merge_meta(cluster_metas)
            merged_meta["cluster_size"] = int(np.sum(cluster_mask))
            merged_meta["dream_cycle"] = self.dream_count
            self.helix.add(centroid, merged_meta)
            consolidated += 1

        # Generative replay — re-ingest perturbed versions of rare patterns
        self._replay(hvs, assignments)

        # Clean up
        self.buffer.clear()
        self.last_dream = now
        self.dream_count += 1
        print(f"[DREAM] Cycle {self.dream_count} complete. "
              f"{consolidated} prototypes stored in HelixMemory.")
        return True

    # ------------------------------------------------------------------
    # Internal
    # ------------------------------------------------------------------
    def _cluster(
        self, hvs: np.ndarray
    ):
        """
        Online bipolar k-means.
        Distance metric: Hamming (count of differing bits).
        Centroids are binarized after each update.
        """
        k = min(self.N_CLUSTERS, len(hvs))
        indices = np.random.choice(len(hvs), k, replace=False)
        centroids = hvs[indices].copy().astype(np.int8)

        assignments = np.zeros(len(hvs), dtype=np.int32)
        for _ in range(self.CLUSTER_ITERATIONS):
            # Hamming distance: count positions where they differ
            diffs = np.stack(
                [np.sum(hvs != c, axis=1) for c in centroids],
                axis=1
            )
            assignments = np.argmin(diffs, axis=1)

            # Update centroids via majority vote (bipolar sign)
            for i in range(k):
                mask = assignments == i
                if np.any(mask):
                    summed = hvs[mask].astype(np.int32).sum(axis=0)
                    new_centroid = np.sign(summed).astype(np.int8)
                    new_centroid[new_centroid == 0] = 1
                    centroids[i] = new_centroid

        return centroids, assignments

    def _replay(self, hvs: np.ndarray, assignments: np.ndarray) -> None:
        """
        Generative replay: add small noise to rare cluster members
        and re-ingest them. Prevents forgetting of low-frequency patterns.
        """
        cluster_sizes = np.bincount(assignments, minlength=self.N_CLUSTERS)
        rare_threshold = np.percentile(cluster_sizes, 25)

        for i, size in enumerate(cluster_sizes):
            if size <= rare_threshold and size > 0:
                rare_hvs = hvs[assignments == i]
                for hv in rare_hvs[:2]:  # replay at most 2 per rare cluster
                    noise = np.random.choice(
                        [-1, 1],
                        size=len(hv),
                        p=[0.02, 0.98]
                    ).astype(np.int8)
                    perturbed = (hv * noise).astype(np.int8)
                    self.buffer.append((datetime.utcnow(), perturbed, {"replayed": True}))

    @staticmethod
    def _merge_meta(metas: list) -> dict:
        """Merge a list of metadata dicts into one summary."""
        merged = {}
        for m in metas:
            for k, v in m.items():
                if k not in merged:
                    merged[k] = v
        return merged