MnemoCore / src /mnemocore /core /provenance.py
Granis87's picture
Upload folder using huggingface_hub
c3a3710 verified
"""
Provenance Tracking Module (Phase 5.0)
=======================================
W3C PROV-inspired source tracking for MnemoCore memories.
Tracks the full lifecycle of every MemoryNode:
- origin: where/how the memory was created
- lineage: ordered list of transformation events
- version: incremented on each significant mutation
This is the foundation for:
- Trust & audit trails (AI Governance)
- Contradiction resolution
- Memory-as-a-Service lineage API
- Source reliability scoring
Public API:
record = ProvenanceRecord.new(origin_type="observation", agent_id="agent-001")
record.add_event("consolidated", source_memories=["mem_a", "mem_b"])
serialized = record.to_dict()
restored = ProvenanceRecord.from_dict(serialized)
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
# ------------------------------------------------------------------ #
# Origin types #
# ------------------------------------------------------------------ #
ORIGIN_TYPES = {
"observation", # Direct input from agent or user
"inference", # Derived/reasoned by LLM or engine
"dream", # Produced by SubconsciousAI dream cycle
"consolidation", # Result of SemanticConsolidation merge
"external_sync", # Fetched from external source (RSS, API, etc.)
"user_correction", # Explicit user override
"prediction", # Stored as a future prediction
}
# ------------------------------------------------------------------ #
# Lineage event #
# ------------------------------------------------------------------ #
@dataclass
class LineageEvent:
"""
A single step in a memory's transformation history.
Examples:
created – initial storage
accessed – retrieved by a query
consolidated – merged into or from a proto-memory cluster
verified – reliability confirmed externally
contradicted – flagged as contradicting another memory
updated – content or metadata modified
archived – moved to COLD tier
expired – TTL reached or evicted
"""
event: str
timestamp: str # ISO 8601
actor: Optional[str] = None # agent_id, "system", "user", etc.
source_memories: List[str] = field(default_factory=list) # for consolidation
outcome: Optional[bool] = None # for verification events
notes: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"event": self.event,
"timestamp": self.timestamp,
}
if self.actor is not None:
d["actor"] = self.actor
if self.source_memories:
d["source_memories"] = self.source_memories
if self.outcome is not None:
d["outcome"] = self.outcome
if self.notes:
d["notes"] = self.notes
if self.extra:
d["extra"] = self.extra
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "LineageEvent":
return cls(
event=d["event"],
timestamp=d["timestamp"],
actor=d.get("actor"),
source_memories=d.get("source_memories", []),
outcome=d.get("outcome"),
notes=d.get("notes"),
extra=d.get("extra", {}),
)
# ------------------------------------------------------------------ #
# Origin #
# ------------------------------------------------------------------ #
@dataclass
class ProvenanceOrigin:
"""Where/how a memory was first created."""
type: str # One of ORIGIN_TYPES
agent_id: Optional[str] = None
session_id: Optional[str] = None
source_url: Optional[str] = None # For external_sync
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"type": self.type,
"timestamp": self.timestamp,
}
if self.agent_id:
d["agent_id"] = self.agent_id
if self.session_id:
d["session_id"] = self.session_id
if self.source_url:
d["source_url"] = self.source_url
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "ProvenanceOrigin":
return cls(
type=d.get("type", "observation"),
agent_id=d.get("agent_id"),
session_id=d.get("session_id"),
source_url=d.get("source_url"),
timestamp=d.get("timestamp", datetime.now(timezone.utc).isoformat()),
)
# ------------------------------------------------------------------ #
# ProvenanceRecord — the full provenance object on a MemoryNode #
# ------------------------------------------------------------------ #
@dataclass
class ProvenanceRecord:
"""
Full provenance object attached to a MemoryNode.
Designed to be serialized into node.metadata["provenance"] for
backward compatibility with existing storage layers.
"""
origin: ProvenanceOrigin
lineage: List[LineageEvent] = field(default_factory=list)
version: int = 1
confidence_source: str = "bayesian_ltp" # How the confidence score is derived
# ---- Factory methods ------------------------------------------ #
@classmethod
def new(
cls,
origin_type: str = "observation",
agent_id: Optional[str] = None,
session_id: Optional[str] = None,
source_url: Optional[str] = None,
actor: Optional[str] = None,
) -> "ProvenanceRecord":
"""Create a fresh ProvenanceRecord and log the 'created' event."""
now = datetime.now(timezone.utc).isoformat()
origin = ProvenanceOrigin(
type=origin_type if origin_type in ORIGIN_TYPES else "observation",
agent_id=agent_id,
session_id=session_id,
source_url=source_url,
timestamp=now,
)
record = cls(origin=origin)
record.add_event(
event="created",
actor=actor or agent_id or "system",
)
return record
# ---- Mutation ------------------------------------------------- #
def add_event(
self,
event: str,
actor: Optional[str] = None,
source_memories: Optional[List[str]] = None,
outcome: Optional[bool] = None,
notes: Optional[str] = None,
**extra: Any,
) -> "ProvenanceRecord":
"""Append a new lineage event and bump the version counter."""
evt = LineageEvent(
event=event,
timestamp=datetime.now(timezone.utc).isoformat(),
actor=actor,
source_memories=source_memories or [],
outcome=outcome,
notes=notes,
extra=extra,
)
self.lineage.append(evt)
self.version += 1
return self
def mark_consolidated(
self,
source_memory_ids: List[str],
actor: str = "consolidation_worker",
) -> "ProvenanceRecord":
"""Convenience wrapper for consolidation events."""
return self.add_event(
event="consolidated",
actor=actor,
source_memories=source_memory_ids,
)
def mark_verified(
self,
success: bool,
actor: str = "system",
notes: Optional[str] = None,
) -> "ProvenanceRecord":
"""Record a verification outcome."""
return self.add_event(
event="verified",
actor=actor,
outcome=success,
notes=notes,
)
def mark_contradicted(
self,
contradiction_group_id: str,
actor: str = "contradiction_detector",
) -> "ProvenanceRecord":
"""Flag this memory as contradicted."""
return self.add_event(
event="contradicted",
actor=actor,
contradiction_group_id=contradiction_group_id,
)
# ---- Serialization -------------------------------------------- #
def to_dict(self) -> Dict[str, Any]:
return {
"origin": self.origin.to_dict(),
"lineage": [e.to_dict() for e in self.lineage],
"version": self.version,
"confidence_source": self.confidence_source,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "ProvenanceRecord":
return cls(
origin=ProvenanceOrigin.from_dict(d.get("origin", {"type": "observation"})),
lineage=[LineageEvent.from_dict(e) for e in d.get("lineage", [])],
version=d.get("version", 1),
confidence_source=d.get("confidence_source", "bayesian_ltp"),
)
# ---- Helpers -------------------------------------------------- #
@property
def created_at(self) -> str:
"""ISO timestamp of the creation event."""
for event in self.lineage:
if event.event == "created":
return event.timestamp
return self.origin.timestamp
@property
def last_event(self) -> Optional[LineageEvent]:
"""Most recent lineage event."""
return self.lineage[-1] if self.lineage else None
def is_contradicted(self) -> bool:
return any(e.event == "contradicted" for e in self.lineage)
def is_verified(self) -> bool:
return any(
e.event == "verified" and e.outcome is True for e in self.lineage
)
def __repr__(self) -> str:
return (
f"ProvenanceRecord(origin_type={self.origin.type!r}, "
f"version={self.version}, events={len(self.lineage)})"
)