Spaces:
Running on Zero
Running on Zero
File size: 4,848 Bytes
0d7db8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | """Read-only observer β the camera crew that watches the ledger and renders to the UI.
Design contract:
- The observer NEVER appends events. It is strictly read-only.
- It maintains its own view-state as a projection, not as shared state.
- It notifies registered callbacks when the view changes.
- The world runs identically whether or not any observer is attached.
- Multiple observers can subscribe to the same ledger simultaneously.
This decoupling gives two guarantees:
1. The cognitive loop is reproducible without a UI.
2. You can run multiple renderers off the same log β a stage view, a
cognition-graph, a plain chat-log β without coupling to each other.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from src.core.events import Event
from src.core.projections import StageProjection
# ββ diff model ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@dataclass
class ViewDiff:
"""What changed between two consecutive observer ticks.
The UI agent computes this diff and streams only the delta to the client,
rather than re-rendering the entire state every turn. This is the right
shape for SSE / WebSocket streaming.
"""
scene_changed: bool = False
new_scene: str = ""
new_agent_notes: list[str] = field(default_factory=list)
new_judge_notes: list[str] = field(default_factory=list)
new_user_artifacts: list[str] = field(default_factory=list)
@property
def has_changes(self) -> bool:
return bool(
self.scene_changed
or self.new_agent_notes
or self.new_judge_notes
or self.new_user_artifacts
)
# ββ observer βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class Observer:
"""Read-only subscriber over the event ledger.
Usage:
observer = Observer()
observer.on_change(lambda diff: send_to_client(diff))
# In the conductor loop:
for event in new_events:
observer.consume(event)
"""
def __init__(self) -> None:
self._view = StageProjection()
self._callbacks: list[Callable[[ViewDiff], None]] = []
# ββ subscription ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def on_change(self, callback: Callable[[ViewDiff], None]) -> None:
"""Register a callback invoked whenever the view changes."""
self._callbacks.append(callback)
def remove_callback(self, callback: Callable[[ViewDiff], None]) -> None:
self._callbacks = [cb for cb in self._callbacks if cb is not callback]
# ββ consumption βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def consume(self, event: Event) -> ViewDiff:
"""Apply one event and return the diff. Notifies callbacks if anything changed."""
prev_scene = self._view.current_scene
prev_notes = list(self._view.agent_notes)
prev_verdicts = list(self._view.judge_notes)
prev_artifacts = list(self._view.user_artifacts)
self._view.apply(event)
diff = ViewDiff(
scene_changed=self._view.current_scene != prev_scene,
new_scene=self._view.current_scene if self._view.current_scene != prev_scene else "",
new_agent_notes=[n for n in self._view.agent_notes if n not in prev_notes],
new_judge_notes=[n for n in self._view.judge_notes if n not in prev_verdicts],
new_user_artifacts=[a for a in self._view.user_artifacts if a not in prev_artifacts],
)
if diff.has_changes:
for cb in self._callbacks:
cb(diff)
return diff
def consume_batch(self, events: tuple[Event, ...]) -> list[ViewDiff]:
"""Consume multiple events in order; returns a diff per event."""
return [self.consume(e) for e in events]
# ββ read access βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@property
def view(self) -> StageProjection:
"""Current materialized view. Read-only; do not mutate directly."""
return self._view
def reset(self) -> None:
self._view = StageProjection()
|