File size: 3,982 Bytes
f99bc7c
b70e36c
 
 
 
 
f99bc7c
b70e36c
 
 
 
 
 
 
f99bc7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b70e36c
 
 
 
 
 
f99bc7c
b70e36c
 
f99bc7c
b70e36c
 
 
 
 
 
 
f99bc7c
 
 
 
 
 
 
 
 
 
 
b70e36c
f99bc7c
b70e36c
 
 
f99bc7c
 
 
 
 
 
 
b70e36c
 
 
 
 
 
 
 
 
f99bc7c
b70e36c
 
 
f99bc7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import TypeVar, Generic, Callable, Any
from pydantic import BaseModel, Field

A = TypeVar('A')
B = TypeVar('B')


class CognitiveState(BaseModel):
    """The continuous variables tracked by PEDI."""
    coherence: float = Field(default=0.8, ge=0.0, le=1.0)
    resonance: float = Field(default=0.5, ge=0.0, le=1.0)
    tension: float = Field(default=0.3, ge=0.0, le=1.0)
    shadow_depth: float = Field(default=0.2, ge=0.0, le=1.0)


class CognitivePayload(BaseModel):
    """Structured payload for the comonadic cognitive pipeline.

    Each step in the pipeline writes to its own field instead of
    overwriting a raw string. This prevents the ‘junk drawer’ effect
    when the pipeline grows to support tool calls, embeddings, and
    multi-step reasoning.
    """
    user_input: str = ""
    internal_log: str = ""
    response: str = ""
    metadata: dict = Field(default_factory=dict)

    def model_copy(self, update: dict | None = None, **kwargs) -> "CognitivePayload":
        """Immutable copy with optional field overrides."""
        copied = super().model_copy(update=update, **kwargs)
        # Deep-copy metadata so mutations don't leak across contexts
        if update is None or "metadata" not in update:
            copied.metadata = {k: v for k, v in self.metadata.items()}
        return copied


class Context(BaseModel, Generic[A]):
    """The immutable container holding both the state and the current computation value."""
    state: CognitiveState
    history: list[CognitiveState] = Field(default_factory=list)
    value: A


class ContextWorker(Generic[A]):
    """The Comonadic wrapper that executes operations in context."""

    def __init__(self, context: Context[A]):
        self._ctx = context

    def current(self) -> A:
        """Extracts the current focused value."""
        return self._ctx.value

    @property
    def state(self) -> CognitiveState:
        """The current cognitive state (PEDI variables)."""
        return self._ctx.state

    @property
    def history(self) -> list[CognitiveState]:
        """The full chain of previous states (read-only via copy)."""
        return list(self._ctx.history)

    def extend(self, operation: Callable[["ContextWorker[A]"], B]) -> "ContextWorker[B]":
        """
        Takes a context-dependent operation, applies it, and returns a NEW
        ContextWorker with the updated history and newly computed value.
        """
        result = operation(self)

        # Unpack result if the operation modified the state
        if (
            isinstance(result, tuple)
            and len(result) == 2
            and isinstance(result[1], CognitiveState)
        ):
            new_value, new_state = result
        else:
            new_value = result
            new_state = self._ctx.state.model_copy()

        # Build the new context immutably
        new_ctx = Context[B](
            state=new_state,
            history=self._ctx.history + [self._ctx.state],
            value=new_value,
        )
        return ContextWorker[B](new_ctx)

    def fork(
        self, operations: list[Callable[["ContextWorker[A]"], Any]]
    ) -> list["ContextWorker[Any]"]:
        """
        Branching cognition. Runs parallel operations from the same
        initial context and returns a list of new workers.

        Useful for:
        - logical path vs intuitive path
        - low tension vs high tension response
        - different prompt strategies
        """
        return [self.extend(op) for op in operations]

    def merge(
        self,
        branches: list["ContextWorker[Any]"],
        selector: Callable[[list["ContextWorker[Any]"]], "ContextWorker[Any]"],
    ) -> "ContextWorker[Any]":
        """
        Merge multiple branched workers back into a single lineage.

        The selector receives the list of branch workers and picks one
        (or synthesises a new payload) to continue the pipeline.
        """
        return selector(branches)