| from typing import TypeVar, Generic, Callable, Any |
| from pydantic import BaseModel, Field |
|
|
| A = TypeVar('A') |
| B = TypeVar('B') |
|
|
|
|
| class CognitiveState(BaseModel): |
| """The continuous variables tracked by PEDI.""" |
| coherence: float = Field(default=0.8, ge=0.0, le=1.0) |
| resonance: float = Field(default=0.5, ge=0.0, le=1.0) |
| tension: float = Field(default=0.3, ge=0.0, le=1.0) |
| shadow_depth: float = Field(default=0.2, ge=0.0, le=1.0) |
|
|
|
|
| class CognitivePayload(BaseModel): |
| """Structured payload for the comonadic cognitive pipeline. |
| |
| Each step in the pipeline writes to its own field instead of |
| overwriting a raw string. This prevents the ‘junk drawer’ effect |
| when the pipeline grows to support tool calls, embeddings, and |
| multi-step reasoning. |
| """ |
| user_input: str = "" |
| internal_log: str = "" |
| response: str = "" |
| metadata: dict = Field(default_factory=dict) |
|
|
| def model_copy(self, update: dict | None = None, **kwargs) -> "CognitivePayload": |
| """Immutable copy with optional field overrides.""" |
| copied = super().model_copy(update=update, **kwargs) |
| |
| if update is None or "metadata" not in update: |
| copied.metadata = {k: v for k, v in self.metadata.items()} |
| return copied |
|
|
|
|
| class Context(BaseModel, Generic[A]): |
| """The immutable container holding both the state and the current computation value.""" |
| state: CognitiveState |
| history: list[CognitiveState] = Field(default_factory=list) |
| value: A |
|
|
|
|
| class ContextWorker(Generic[A]): |
| """The Comonadic wrapper that executes operations in context.""" |
|
|
| def __init__(self, context: Context[A]): |
| self._ctx = context |
|
|
| def current(self) -> A: |
| """Extracts the current focused value.""" |
| return self._ctx.value |
|
|
| @property |
| def state(self) -> CognitiveState: |
| """The current cognitive state (PEDI variables).""" |
| return self._ctx.state |
|
|
| @property |
| def history(self) -> list[CognitiveState]: |
| """The full chain of previous states (read-only via copy).""" |
| return list(self._ctx.history) |
|
|
| def extend(self, operation: Callable[["ContextWorker[A]"], B]) -> "ContextWorker[B]": |
| """ |
| Takes a context-dependent operation, applies it, and returns a NEW |
| ContextWorker with the updated history and newly computed value. |
| """ |
| result = operation(self) |
|
|
| |
| if ( |
| isinstance(result, tuple) |
| and len(result) == 2 |
| and isinstance(result[1], CognitiveState) |
| ): |
| new_value, new_state = result |
| else: |
| new_value = result |
| new_state = self._ctx.state.model_copy() |
|
|
| |
| new_ctx = Context[B]( |
| state=new_state, |
| history=self._ctx.history + [self._ctx.state], |
| value=new_value, |
| ) |
| return ContextWorker[B](new_ctx) |
|
|
| def fork( |
| self, operations: list[Callable[["ContextWorker[A]"], Any]] |
| ) -> list["ContextWorker[Any]"]: |
| """ |
| Branching cognition. Runs parallel operations from the same |
| initial context and returns a list of new workers. |
| |
| Useful for: |
| - logical path vs intuitive path |
| - low tension vs high tension response |
| - different prompt strategies |
| """ |
| return [self.extend(op) for op in operations] |
|
|
| def merge( |
| self, |
| branches: list["ContextWorker[Any]"], |
| selector: Callable[[list["ContextWorker[Any]"]], "ContextWorker[Any]"], |
| ) -> "ContextWorker[Any]": |
| """ |
| Merge multiple branched workers back into a single lineage. |
| |
| The selector receives the list of branch workers and picks one |
| (or synthesises a new payload) to continue the pipeline. |
| """ |
| return selector(branches) |
|
|