File size: 5,944 Bytes
2147ce8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import math
from dataclasses import dataclass
import site
import sys
from pathlib import Path
from .linalg import Matrix, Vector, identity, invert_matrix, matvec
_VENDOR_ROOT = Path(__file__).resolve().parent.parent / ".vendor"
for _vendor_path in (_VENDOR_ROOT / "python", _VENDOR_ROOT / "sitepkgs"):
if _vendor_path.exists():
vendor_text = str(_vendor_path)
if vendor_text not in sys.path:
sys.path.insert(0, vendor_text)
try:
import numpy as np
except ModuleNotFoundError:
user_site = site.getusersitepackages()
if user_site and user_site not in sys.path:
sys.path.append(user_site)
try:
import numpy as np
except ModuleNotFoundError:
np = None
def hippo_legs_matrix(order: int) -> tuple[Matrix, Vector]:
a_matrix = [[0.0 for _ in range(order)] for _ in range(order)]
b_vector = [0.0 for _ in range(order)]
for row in range(order):
for col in range(order):
if row > col:
a_matrix[row][col] = -math.sqrt(2 * row + 1) * math.sqrt(2 * col + 1)
elif row == col:
a_matrix[row][col] = -(row + 1)
b_vector[row] = math.sqrt(2 * row + 1)
return a_matrix, b_vector
def analytical_embedding_drive(embedding: Vector, state_dim: int) -> Vector:
if not embedding:
return [0.0 for _ in range(state_dim)]
width = len(embedding)
return [
(
embedding[index % width]
+ 0.5 * embedding[(3 * index + 1) % width]
- 0.25 * embedding[(5 * index + 2) % width]
)
for index in range(state_dim)
]
def analytical_embedding_drive_fast(embedding: object, state_dim: int) -> object:
if np is None:
embedding_vector = embedding.tolist() if hasattr(embedding, "tolist") else list(embedding)
return analytical_embedding_drive(embedding_vector, state_dim)
embedding_array = embedding if hasattr(embedding, "shape") else np.asarray(embedding, dtype=np.float64)
if embedding_array.size == 0:
return np.zeros(state_dim, dtype=np.float64)
indices = np.arange(state_dim, dtype=np.int64)
width = int(embedding_array.shape[0])
return (
embedding_array[indices % width]
+ 0.5 * embedding_array[(3 * indices + 1) % width]
- 0.25 * embedding_array[(5 * indices + 2) % width]
)
@dataclass(slots=True)
class AnalyticalMemoryUnit:
state_dim: int
timescale: float
def __post_init__(self) -> None:
a_matrix, b_vector = hippo_legs_matrix(self.state_dim)
self.transition, self.input_projection = self._discretize_transition(
a_matrix,
b_vector,
self.timescale,
)
transition: Matrix = None # type: ignore[assignment]
input_projection: Vector = None # type: ignore[assignment]
transition_array: object | None = None # type: ignore[assignment]
input_projection_array: object | None = None # type: ignore[assignment]
@staticmethod
def _discretize_transition(
a_matrix: Matrix,
b_vector: Vector,
step: float,
) -> tuple[Matrix, Vector]:
implicit_system = [
[
identity_value - step * a_value
for identity_value, a_value in zip(identity_row, a_row)
]
for identity_row, a_row in zip(identity(len(a_matrix)), a_matrix)
]
transition = invert_matrix(implicit_system)
input_projection = matvec(transition, [step * value for value in b_vector])
return transition, input_projection
def step(self, state: Vector, scalar_input: float) -> Vector:
if np is not None and self.transition_array is None:
self.transition_array = np.asarray(self.transition, dtype=np.float64)
self.input_projection_array = np.asarray(self.input_projection, dtype=np.float64)
propagated = matvec(self.transition, state)
return [
propagated[index] + self.input_projection[index] * scalar_input
for index in range(self.state_dim)
]
def step_vector(self, state: Vector, drive: Vector) -> Vector:
propagated = matvec(self.transition, state)
return [
propagated[index] + self.input_projection[index] * drive[index]
for index in range(self.state_dim)
]
def step_fast(self, state: object, scalar_input: float) -> object:
if np is None:
state_vector = state.tolist() if hasattr(state, "tolist") else list(state)
return self.step(state_vector, scalar_input)
if self.transition_array is None or self.input_projection_array is None:
self.transition_array = np.asarray(self.transition, dtype=np.float64)
self.input_projection_array = np.asarray(self.input_projection, dtype=np.float64)
state_array = state if hasattr(state, "shape") else np.asarray(state, dtype=np.float64)
return (self.transition_array @ state_array) + (self.input_projection_array * scalar_input)
def step_vector_fast(self, state: object, drive: object) -> object:
if np is None:
state_vector = state.tolist() if hasattr(state, "tolist") else list(state)
drive_vector = drive.tolist() if hasattr(drive, "tolist") else list(drive)
return self.step_vector(state_vector, drive_vector)
if self.transition_array is None or self.input_projection_array is None:
self.transition_array = np.asarray(self.transition, dtype=np.float64)
self.input_projection_array = np.asarray(self.input_projection, dtype=np.float64)
state_array = state if hasattr(state, "shape") else np.asarray(state, dtype=np.float64)
drive_array = drive if hasattr(drive, "shape") else np.asarray(drive, dtype=np.float64)
return (self.transition_array @ state_array) + (self.input_projection_array * drive_array)
|