| """
|
| Contradiction Detection Module (Phase 5.0)
|
| ==========================================
|
| Detects contradicting memories in MnemoCore using a two-stage pipeline:
|
|
|
| Stage 1: TextEncoder similarity search (fast, vector-based)
|
| - At /store time: compare new memory against top-5 existing memories
|
| - If similarity > SIMILARITY_THRESHOLD (0.80) → proceed to Stage 2
|
|
|
| Stage 2: LLM-based semantic comparison (accurate, but heavier)
|
| - Uses SubconsciousAI connector to evaluate if two memories actually contradict
|
| - Avoids false positives from paraphrases (similarity doesn't mean contradiction)
|
|
|
| On confirmed contradiction:
|
| - Both memories receive a 'contradiction_group_id' in their provenance lineage
|
| - Both are flagged in their metadata
|
| - The API returns an alert in the store response
|
| - Entries are added to a ContradictionRegistry for the /contradictions endpoint
|
|
|
| Background scan:
|
| - ContradictionDetector.scan(nodes) can be called from ConsolidationWorker
|
|
|
| Public API:
|
| detector = ContradictionDetector(engine)
|
| result = await detector.check_on_store(new_content, new_node, existing_nodes)
|
| all = detector.registry.list_all()
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import uuid
|
| from dataclasses import dataclass, field
|
| from datetime import datetime, timezone
|
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
|
|
|
| from loguru import logger
|
|
|
| if TYPE_CHECKING:
|
| from .node import MemoryNode
|
|
|
|
|
|
|
|
|
|
|
|
|
| SIMILARITY_THRESHOLD: float = 0.80
|
| LLM_CONFIRM_MIN_SCORE: float = 0.70
|
|
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class ContradictionRecord:
|
| """A detected contradiction between two memories."""
|
| group_id: str = field(default_factory=lambda: f"cg_{uuid.uuid4().hex[:12]}")
|
| memory_a_id: str = ""
|
| memory_b_id: str = ""
|
| similarity_score: float = 0.0
|
| llm_confirmed: bool = False
|
| detected_at: str = field(
|
| default_factory=lambda: datetime.now(timezone.utc).isoformat()
|
| )
|
| resolved: bool = False
|
| resolution_note: Optional[str] = None
|
|
|
| def to_dict(self) -> Dict[str, Any]:
|
| return {
|
| "group_id": self.group_id,
|
| "memory_a_id": self.memory_a_id,
|
| "memory_b_id": self.memory_b_id,
|
| "similarity_score": round(self.similarity_score, 4),
|
| "llm_confirmed": self.llm_confirmed,
|
| "detected_at": self.detected_at,
|
| "resolved": self.resolved,
|
| "resolution_note": self.resolution_note,
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ContradictionRegistry:
|
| """In-memory store of detected contradictions (survives until restart)."""
|
|
|
| def __init__(self) -> None:
|
| self._records: Dict[str, ContradictionRecord] = {}
|
|
|
| def register(self, record: ContradictionRecord) -> None:
|
| self._records[record.group_id] = record
|
|
|
| def resolve(self, group_id: str, note: Optional[str] = None) -> bool:
|
| if group_id in self._records:
|
| self._records[group_id].resolved = True
|
| self._records[group_id].resolution_note = note
|
| return True
|
| return False
|
|
|
| def list_all(self, unresolved_only: bool = True) -> List[ContradictionRecord]:
|
| recs = list(self._records.values())
|
| if unresolved_only:
|
| recs = [r for r in recs if not r.resolved]
|
| return sorted(recs, key=lambda r: r.detected_at, reverse=True)
|
|
|
| def list_for_memory(self, memory_id: str) -> List[ContradictionRecord]:
|
| return [
|
| r for r in self._records.values()
|
| if r.memory_a_id == memory_id or r.memory_b_id == memory_id
|
| ]
|
|
|
| def __len__(self) -> int:
|
| return len([r for r in self._records.values() if not r.resolved])
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ContradictionDetector:
|
| """
|
| Two-stage contradiction detector.
|
|
|
| Stage 1: Vector similarity via the engine's binary HDV comparison.
|
| Stage 2: LLM semantic check via SubconsciousAI (optional).
|
| """
|
|
|
| def __init__(
|
| self,
|
| engine=None,
|
| similarity_threshold: float = SIMILARITY_THRESHOLD,
|
| top_k: int = 5,
|
| use_llm: bool = True,
|
| ) -> None:
|
| self.engine = engine
|
| self.similarity_threshold = similarity_threshold
|
| self.top_k = top_k
|
| self.use_llm = use_llm
|
| self.registry = ContradictionRegistry()
|
|
|
|
|
|
|
| def _hamming_similarity(self, node_a: "MemoryNode", node_b: "MemoryNode") -> float:
|
| """
|
| Compute binary HDV similarity between two nodes.
|
| Similarity = 1 - normalized_hamming_distance.
|
| """
|
| try:
|
| import numpy as np
|
| a = node_a.hdv.data
|
| b = node_b.hdv.data
|
| xor = np.bitwise_xor(a, b)
|
| ham = float(bin(int.from_bytes(xor.tobytes(), "little")).count("1"))
|
| dim = len(a) * 8
|
| return 1.0 - ham / dim
|
| except Exception:
|
| return 0.0
|
|
|
|
|
|
|
| async def _llm_contradicts(
|
| self, content_a: str, content_b: str
|
| ) -> Tuple[bool, float]:
|
| """
|
| Ask SubconsciousAI if two contents contradict each other.
|
| Returns (is_contradiction, confidence_score).
|
| Falls back to False if LLM is unavailable.
|
| """
|
| if not self.engine or not self.use_llm:
|
| return False, 0.0
|
|
|
| try:
|
| subcon = getattr(self.engine, "subconscious_ai", None)
|
| if subcon is None:
|
| return False, 0.0
|
|
|
| prompt = (
|
| "Do the following two statements contradict each other? "
|
| "Answer with a JSON object: {\"contradiction\": true/false, \"confidence\": 0.0-1.0}.\n\n"
|
| f"Statement A: {content_a[:500]}\n"
|
| f"Statement B: {content_b[:500]}"
|
| )
|
| raw = await subcon.generate(prompt, max_tokens=64)
|
| import json as _json
|
| parsed = _json.loads(raw.strip())
|
| return bool(parsed.get("contradiction", False)), float(parsed.get("confidence", 0.0))
|
| except Exception as exc:
|
| logger.debug(f"LLM contradiction check failed: {exc}")
|
| return False, 0.0
|
|
|
|
|
|
|
| def _flag_node(self, node: "MemoryNode", group_id: str) -> None:
|
| """Attach contradiction metadata to a node's provenance and metadata fields."""
|
| node.metadata["contradiction_group_id"] = group_id
|
| node.metadata["contradicted_at"] = datetime.now(timezone.utc).isoformat()
|
|
|
| prov = getattr(node, "provenance", None)
|
| if prov is not None:
|
| prov.mark_contradicted(group_id)
|
|
|
|
|
|
|
| async def check_on_store(
|
| self,
|
| new_node: "MemoryNode",
|
| candidates: Optional[List["MemoryNode"]] = None,
|
| ) -> Optional[ContradictionRecord]:
|
| """
|
| Check a newly stored node against existing memories.
|
|
|
| Args:
|
| new_node: The node just stored.
|
| candidates: Optional pre-fetched list of nodes to compare against.
|
| If None and engine is available, fetches via HDV search.
|
|
|
| Returns:
|
| ContradictionRecord if a contradiction was detected, else None.
|
| """
|
|
|
| if candidates is None and self.engine is not None:
|
| try:
|
| results = await self.engine.query(
|
| new_node.content, top_k=self.top_k
|
| )
|
| nodes = []
|
| for mem_id, _score in results:
|
| n = await self.engine.get_memory(mem_id)
|
| if n and n.id != new_node.id:
|
| nodes.append(n)
|
| candidates = nodes
|
| except Exception as e:
|
| logger.debug(f"ContradictionDetector: candidate fetch failed: {e}")
|
| candidates = []
|
|
|
| if not candidates:
|
| return None
|
|
|
|
|
| high_sim_candidates = []
|
| for cand in candidates:
|
| sim = self._hamming_similarity(new_node, cand)
|
| if sim >= self.similarity_threshold:
|
| high_sim_candidates.append((cand, sim))
|
|
|
| if not high_sim_candidates:
|
| return None
|
|
|
|
|
| high_sim_candidates.sort(key=lambda x: x[1], reverse=True)
|
| top_cand, top_sim = high_sim_candidates[0]
|
|
|
| is_contradiction = False
|
| llm_confirmed = False
|
|
|
| if self.use_llm:
|
| is_contradiction, conf = await self._llm_contradicts(
|
| new_node.content, top_cand.content
|
| )
|
| llm_confirmed = is_contradiction and conf >= LLM_CONFIRM_MIN_SCORE
|
| else:
|
|
|
| is_contradiction = top_sim >= 0.90
|
| llm_confirmed = False
|
|
|
| if not is_contradiction:
|
| return None
|
|
|
|
|
| record = ContradictionRecord(
|
| memory_a_id=new_node.id,
|
| memory_b_id=top_cand.id,
|
| similarity_score=top_sim,
|
| llm_confirmed=llm_confirmed,
|
| )
|
| self.registry.register(record)
|
| self._flag_node(new_node, record.group_id)
|
| self._flag_node(top_cand, record.group_id)
|
|
|
| logger.warning(
|
| f"⚠️ Contradiction detected: {new_node.id[:8]} ↔ {top_cand.id[:8]} "
|
| f"(sim={top_sim:.3f}, llm_confirmed={llm_confirmed}, group={record.group_id})"
|
| )
|
| return record
|
|
|
| async def scan(self, nodes: "List[MemoryNode]") -> List[ContradictionRecord]:
|
| """
|
| Background scan: compare each node against its peers in the provided list.
|
| Called periodically from ConsolidationWorker.
|
|
|
| Returns all newly detected contradiction records.
|
| """
|
| found: List[ContradictionRecord] = []
|
| n = len(nodes)
|
| for i in range(n):
|
| for j in range(i + 1, n):
|
| sim = self._hamming_similarity(nodes[i], nodes[j])
|
| if sim < self.similarity_threshold:
|
| continue
|
| is_contradiction, _ = await self._llm_contradicts(
|
| nodes[i].content, nodes[j].content
|
| )
|
| if not is_contradiction:
|
| continue
|
| record = ContradictionRecord(
|
| memory_a_id=nodes[i].id,
|
| memory_b_id=nodes[j].id,
|
| similarity_score=sim,
|
| llm_confirmed=True,
|
| )
|
| self.registry.register(record)
|
| self._flag_node(nodes[i], record.group_id)
|
| self._flag_node(nodes[j], record.group_id)
|
| found.append(record)
|
|
|
| if found:
|
| logger.info(f"ContradictionDetector background scan: {len(found)} contradictions found in {n} nodes")
|
| return found
|
|
|
|
|
|
|
|
|
|
|
|
|
| _DETECTOR: ContradictionDetector | None = None
|
|
|
|
|
| def get_contradiction_detector(engine=None) -> ContradictionDetector:
|
| """Return the shared ContradictionDetector singleton."""
|
| global _DETECTOR
|
| if _DETECTOR is None:
|
| _DETECTOR = ContradictionDetector(engine=engine)
|
| elif engine is not None and _DETECTOR.engine is None:
|
| _DETECTOR.engine = engine
|
| return _DETECTOR
|
|
|