""" Cross-Domain Association Builder (Phase 5.0 — Agent 3) ======================================================= Automatically links memories across three semantic domains: strategic – goals, decisions, roadmaps, strategies operational – code, bugs, documentation, tasks personal – preferences, habits, relationships, context Cross-domain synapses improve holistic reasoning: when a strategic goal changes, the system can surface related operational tasks or personal context without being explicitly queried. Implementation: - Each memory is tagged with a `domain` in its metadata (or inferred) - CrossDomainSynapseBuilder monitors recently stored memories - Co-occurrence within a time window → create a cross-domain synapse - Synapse weight is damped (0.2×) relative to intra-domain (1.0×) Integration with RippleContext: - ripple_context.py uses domain_weight when propagating context - Cross-domain propagation uses CROSS_DOMAIN_WEIGHT as the multiplier Public API: builder = CrossDomainSynapseBuilder(engine) await builder.process_new_memory(node) # call after /store pairs = await builder.scan_recent(hours=1) # background scan """ from __future__ import annotations from datetime import datetime, timezone, timedelta from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple from loguru import logger if TYPE_CHECKING: from .node import MemoryNode # ------------------------------------------------------------------ # # Constants # # ------------------------------------------------------------------ # DOMAINS = {"strategic", "operational", "personal"} DEFAULT_DOMAIN = "operational" # Weight applied to cross-domain synapses (vs 1.0 for intra-domain) CROSS_DOMAIN_WEIGHT: float = 0.2 # Time window for co-occurrence detection (hours) COOCCURRENCE_WINDOW_HOURS: float = 2.0 # Keywords used to infer domain automatically if not tagged DOMAIN_KEYWORDS: Dict[str, List[str]] = { "strategic": [ "goal", "strategy", "roadmap", "vision", "mission", "objective", "decision", "priority", "kpi", "okr", "plan", "budget", "market", ], "personal": [ "prefer", "habit", "feel", "emotion", "prefer", "like", "dislike", "relationship", "trust", "colleague", "friend", "name", "remember me", ], "operational": [ "code", "bug", "fix", "implement", "test", "deploy", "api", "function", "class", "module", "error", "exception", "task", "ticket", ], } # ------------------------------------------------------------------ # # Domain inference # # ------------------------------------------------------------------ # def infer_domain(content: str, metadata: Optional[Dict] = None) -> str: """ Infer the semantic domain of a memory from its content and metadata. Priority: 1. metadata["domain"] if set 2. keyword match in content (highest score wins) 3. DEFAULT_DOMAIN ("operational") """ if metadata and "domain" in metadata: d = metadata["domain"].lower() return d if d in DOMAINS else DEFAULT_DOMAIN content_lower = content.lower() best_domain = DEFAULT_DOMAIN best_count = 0 for domain, keywords in DOMAIN_KEYWORDS.items(): count = sum(1 for kw in keywords if kw in content_lower) if count > best_count: best_count = count best_domain = domain return best_domain # ------------------------------------------------------------------ # # CrossDomainSynapseBuilder # # ------------------------------------------------------------------ # class CrossDomainSynapseBuilder: """ Detects cross-domain co-occurrences and requests synapse creation. Works by maintaining a rolling buffer of recently stored memories, then pairing memories from different domains that appeared within COOCCURRENCE_WINDOW_HOURS of each other. """ def __init__( self, engine=None, # HAIMEngine window_hours: float = COOCCURRENCE_WINDOW_HOURS, cross_domain_weight: float = CROSS_DOMAIN_WEIGHT, ) -> None: self.engine = engine self.window = timedelta(hours=window_hours) self.weight = cross_domain_weight # Buffer: list of (node_id, domain, stored_at) self._buffer: List[Tuple[str, str, datetime]] = [] # ---- Domain helpers ------------------------------------------ # def tag_domain(self, node: "MemoryNode") -> str: """Infer and write domain tag to node.metadata. Returns domain string.""" domain = infer_domain(node.content, getattr(node, "metadata", {})) if hasattr(node, "metadata"): node.metadata["domain"] = domain return domain # ---- Synapse creation --------------------------------------- # async def _create_synapse(self, id_a: str, id_b: str) -> None: """ Request synapse creation between two nodes via the engine's synapse index. Weight is damped by CROSS_DOMAIN_WEIGHT. """ if self.engine is None: logger.debug(f"CrossDomain: no engine, skipping synapse {id_a[:8]} ↔ {id_b[:8]}") return try: synapse_index = getattr(self.engine, "synapse_index", None) if synapse_index is not None: synapse_index.add_or_strengthen(id_a, id_b, delta=self.weight) logger.debug( f"CrossDomain synapse created: {id_a[:8]} ↔ {id_b[:8]} weight={self.weight}" ) except Exception as exc: logger.debug(f"CrossDomain synapse creation failed: {exc}") # ---- Main API ----------------------------------------------- # async def process_new_memory(self, node: "MemoryNode") -> List[Tuple[str, str]]: """ Called after a new memory is stored. Tags its domain and checks for cross-domain co-occurrences in the buffer. Returns list of (id_a, id_b) pairs for which synapses were created. """ domain = self.tag_domain(node) now = datetime.now(timezone.utc) # Cut stale entries from buffer cutoff = now - self.window self._buffer = [(nid, d, ts) for nid, d, ts in self._buffer if ts >= cutoff] # Find cross-domain pairs with current node pairs: List[Tuple[str, str]] = [] already_seen: Set[str] = set() for existing_id, existing_domain, _ts in self._buffer: if existing_domain != domain and existing_id not in already_seen: await self._create_synapse(node.id, existing_id) pairs.append((node.id, existing_id)) already_seen.add(existing_id) # Add current node to buffer self._buffer.append((node.id, domain, now)) if pairs: logger.info( f"CrossDomain: {len(pairs)} cross-domain synapses created for node {node.id[:8]} (domain={domain})" ) return pairs async def scan_recent(self, hours: float = 1.0) -> List[Tuple[str, str]]: """ Scan the current buffer for any unpaired cross-domain co-occurrences. Returns all cross-domain pairs. """ now = datetime.now(timezone.utc) cutoff = now - timedelta(hours=hours) recent = [(nid, d, ts) for nid, d, ts in self._buffer if ts >= cutoff] pairs: List[Tuple[str, str]] = [] n = len(recent) for i in range(n): for j in range(i + 1, n): id_i, dom_i, _ = recent[i] id_j, dom_j, _ = recent[j] if dom_i != dom_j: await self._create_synapse(id_i, id_j) pairs.append((id_i, id_j)) return pairs def clear_buffer(self) -> None: """Reset the co-occurrence buffer.""" self._buffer.clear()