phi-drift / core /context_engine.py
crexs's picture
sync: update core/context_engine.py
f99bc7c verified
Raw
History Blame Contribute Delete
3.98 kB
from typing import TypeVar, Generic, Callable, Any
from pydantic import BaseModel, Field
A = TypeVar('A')
B = TypeVar('B')
class CognitiveState(BaseModel):
"""The continuous variables tracked by PEDI."""
coherence: float = Field(default=0.8, ge=0.0, le=1.0)
resonance: float = Field(default=0.5, ge=0.0, le=1.0)
tension: float = Field(default=0.3, ge=0.0, le=1.0)
shadow_depth: float = Field(default=0.2, ge=0.0, le=1.0)
class CognitivePayload(BaseModel):
"""Structured payload for the comonadic cognitive pipeline.
Each step in the pipeline writes to its own field instead of
overwriting a raw string. This prevents the ‘junk drawer’ effect
when the pipeline grows to support tool calls, embeddings, and
multi-step reasoning.
"""
user_input: str = ""
internal_log: str = ""
response: str = ""
metadata: dict = Field(default_factory=dict)
def model_copy(self, update: dict | None = None, **kwargs) -> "CognitivePayload":
"""Immutable copy with optional field overrides."""
copied = super().model_copy(update=update, **kwargs)
# Deep-copy metadata so mutations don't leak across contexts
if update is None or "metadata" not in update:
copied.metadata = {k: v for k, v in self.metadata.items()}
return copied
class Context(BaseModel, Generic[A]):
"""The immutable container holding both the state and the current computation value."""
state: CognitiveState
history: list[CognitiveState] = Field(default_factory=list)
value: A
class ContextWorker(Generic[A]):
"""The Comonadic wrapper that executes operations in context."""
def __init__(self, context: Context[A]):
self._ctx = context
def current(self) -> A:
"""Extracts the current focused value."""
return self._ctx.value
@property
def state(self) -> CognitiveState:
"""The current cognitive state (PEDI variables)."""
return self._ctx.state
@property
def history(self) -> list[CognitiveState]:
"""The full chain of previous states (read-only via copy)."""
return list(self._ctx.history)
def extend(self, operation: Callable[["ContextWorker[A]"], B]) -> "ContextWorker[B]":
"""
Takes a context-dependent operation, applies it, and returns a NEW
ContextWorker with the updated history and newly computed value.
"""
result = operation(self)
# Unpack result if the operation modified the state
if (
isinstance(result, tuple)
and len(result) == 2
and isinstance(result[1], CognitiveState)
):
new_value, new_state = result
else:
new_value = result
new_state = self._ctx.state.model_copy()
# Build the new context immutably
new_ctx = Context[B](
state=new_state,
history=self._ctx.history + [self._ctx.state],
value=new_value,
)
return ContextWorker[B](new_ctx)
def fork(
self, operations: list[Callable[["ContextWorker[A]"], Any]]
) -> list["ContextWorker[Any]"]:
"""
Branching cognition. Runs parallel operations from the same
initial context and returns a list of new workers.
Useful for:
- logical path vs intuitive path
- low tension vs high tension response
- different prompt strategies
"""
return [self.extend(op) for op in operations]
def merge(
self,
branches: list["ContextWorker[Any]"],
selector: Callable[[list["ContextWorker[Any]"]], "ContextWorker[Any]"],
) -> "ContextWorker[Any]":
"""
Merge multiple branched workers back into a single lineage.
The selector receives the list of branch workers and picks one
(or synthesises a new payload) to continue the pipeline.
"""
return selector(branches)