phi-drift / core /memory.py
crexs's picture
Upload folder using huggingface_hub
914e970 verified
Raw
History Blame Contribute Delete
15.2 kB
import uuid
import datetime
import json
import re
import asyncio
from pathlib import Path
from typing import List, Tuple
from infj_bot.core.config import PERSIST_DIRECTORY, DRIFT_USE_LOCAL_EMBEDDINGS
from infj_bot.core.embeddings import (
get_default_embedding_function,
LocalEmbeddingFunction,
SemanticEmbeddingFunction,
)
from infj_bot.core.unified_memory import MemoryManager, Event
# ── Secret scrubbing ──────────────────────────────────────────────
# Patterns are ordered from most specific to least specific.
# An allowlist prevents false positives on legit hex/base64-looking data.
SECRET_PATTERNS = [
# PEM private keys (most specific)
re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", re.S
),
# API key / token / password with common prefixes (highly specific)
re.compile(
r"(?i)(api[_-]?key|auth[_-]?token|access[_-]?token|bearer\s+|password|secret|private[_-]?key)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/+=]{8,}['\"]?"
),
# Generic long hex that looks like a key (less specific — guarded by allowlist)
re.compile(r"\b[a-f0-9]{64}\b"), # 64-char hex (SHA-256, common API key length)
re.compile(r"\b[a-f0-9]{40}\b"), # 40-char hex (SHA-1, GitHub token-like)
# Long base64-ish strings with suspicious context
re.compile(
r"(?i)(key|token|secret|password)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/+=]{24,}['\"]?"
),
]
# Patterns that look like secrets but are actually normal content
LEGIT_HEX_ALLOWLIST = [
re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"
), # UUID
re.compile(r"^[a-f0-9]{7,40}$"), # short git hashes
re.compile(r"^0x[a-f0-9]+$"), # Ethereum / hex addresses
]
def _looks_like_secret(value: str) -> bool:
"""Check if a matched string is likely a secret (not allowlisted)."""
for pattern in LEGIT_HEX_ALLOWLIST:
if pattern.match(value):
return False
return True
def _run_async(coro):
"""Helper to run async code from sync methods."""
try:
loop = asyncio.get_running_loop()
# Create a fire-and-forget task if we are already in an event loop
loop.create_task(coro)
except RuntimeError:
# No running event loop — use a fresh loop per call (safest for threads)
asyncio.run(coro)
class DriftMemory:
LEGACY_COLLECTION = "infj_companion_memories"
SEMANTIC_COLLECTION = "infj_semantic_memories"
def __init__(
self, persist_directory=None, embedding_function=None, use_semantic=True
):
if persist_directory is None:
persist_directory = str(PERSIST_DIRECTORY)
self.use_semantic = use_semantic and not DRIFT_USE_LOCAL_EMBEDDINGS
if embedding_function is None:
if DRIFT_USE_LOCAL_EMBEDDINGS:
embedding_function = LocalEmbeddingFunction()
elif use_semantic:
embedding_function = get_default_embedding_function()
else:
embedding_function = LocalEmbeddingFunction()
self.embedding_function = embedding_function
# Phase 4.1/4.3: Initialize Unified Memory Spine directly
self.unified_manager = MemoryManager(
chroma_path=persist_directory,
db_path=str(Path(persist_directory) / "unified_memory.db"),
)
# For backwards compatibility with external scripts, expose the collection name
if isinstance(self.embedding_function, SemanticEmbeddingFunction):
self.collection_name = self.SEMANTIC_COLLECTION
else:
self.collection_name = self.LEGACY_COLLECTION
def scrub_text(self, text: str) -> str:
"""Redact secrets from text, with allowlist protection."""
scrubbed = text
for pattern in SECRET_PATTERNS:
for match in pattern.finditer(scrubbed):
matched_text = match.group()
if _looks_like_secret(matched_text):
scrubbed = (
scrubbed[: match.start()]
+ "[REDACTED]"
+ scrubbed[match.end() :]
)
return scrubbed
def save_interaction(
self,
user_input,
bot_output,
mode="companion",
emotion=None,
importance=0.5,
dissonance=None,
):
timestamp = datetime.datetime.now().isoformat()
safe_user_input = self.scrub_text(user_input)
safe_bot_output = self.scrub_text(bot_output)
content = f"user: {safe_user_input}\nBot: {safe_bot_output}"
emotion = emotion or {"label": "neutral"}
dissonance = dissonance or {"score": 0.0, "values": [], "markers": []}
metadata = {
"type": "interaction",
"timestamp": timestamp,
"last_updated": timestamp,
"mode": mode,
"emotion": emotion.get("label", "neutral"),
"emotion_secondary": emotion.get("secondary", "neutral"),
"emotion_confidence": float(emotion.get("confidence", 0.0)),
"emotion_valence": float(emotion.get("valence", 0.0)),
"emotion_arousal": float(emotion.get("arousal", 0.0)),
"emotion_intensity": float(emotion.get("intensity", 0.0)),
"emotion_needs": emotion.get("needs", ""),
"emotion_detector": emotion.get("detector", "unknown"),
"dissonance_score": float(dissonance.get("score", 0.0)),
"dissonance_values": ",".join(dissonance.get("values", [])),
"dissonance_markers": ",".join(dissonance.get("markers", [])),
"dissonance_detector": dissonance.get("detector", "unknown"),
"importance": float(importance),
}
# Phase 4.3: Write to MemoryManager spine
event = Event(
type="interaction",
content=content,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
def learn_concept(self, concept_name, description, tags=None, importance=0.8):
timestamp = datetime.datetime.now().isoformat()
content = f"Concept: {concept_name}\nDescription: {description}"
metadata = {
"type": "learned_knowledge",
"timestamp": timestamp,
"last_updated": timestamp,
"concept": concept_name,
"tags": ",".join(tags or []),
"importance": float(importance),
}
event = Event(
type="learned_knowledge",
content=content,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
def save_reflection(self, title, summary, tags=None, importance=0.9):
timestamp = datetime.datetime.now().isoformat()
title = title or f"reflection-{timestamp}"
content = f"Reflection: {title}\nSummary: {summary}"
metadata = {
"type": "reflection",
"timestamp": timestamp,
"last_updated": timestamp,
"title": title,
"tags": ",".join(tags or []),
"importance": float(importance),
}
event = Event(
type="reflection",
content=content,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
def save_thought(
self,
thought_text,
thought_type="autonomous",
source="being",
emotion_tag=None,
importance=0.6,
):
"""Save a bot thought to semantic memory so it can be retrieved later."""
timestamp = datetime.datetime.now().isoformat()
safe_text = self.scrub_text(thought_text)
content = f"Thought ({thought_type} from {source}): {safe_text}"
metadata = {
"type": "thought",
"timestamp": timestamp,
"last_updated": timestamp,
"thought_type": thought_type,
"source": source,
"emotion": emotion_tag or "neutral",
"importance": float(importance),
}
event = Event(
type="thought",
content=content,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
def save_bug_record(
self, title, document, record_type="bug_note", tags=None, importance=0.85
):
timestamp = datetime.datetime.now().isoformat()
safe_title = title.strip() or f"{record_type}-{timestamp}"
safe_document = self.scrub_text(document)
record_id = str(
uuid.uuid5(
uuid.NAMESPACE_DNS, f"infj-{record_type}:{safe_title}:{timestamp}"
)
)
metadata = {
"type": record_type,
"timestamp": timestamp,
"last_updated": timestamp,
"title": safe_title,
"tags": ",".join(tags or []),
"importance": float(importance),
}
event = Event(
type=record_type,
content=safe_document,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
return record_id
def retrieve_thoughts(self, query="", n_results=5):
"""Retrieve the bot's own thoughts, optionally filtered by semantic similarity."""
if query:
entries = self.unified_manager.recall_sync(query, limit=n_results)
# Filter by type thought
entries = [e for e in entries if e.metadata.get("type") == "thought"]
return [(e.event.content, e.metadata) for e in entries]
else:
entries = self.unified_manager.get_recent_sync("thought", limit=n_results)
return [(e.event.content, e.metadata) for e in entries]
def recent_records(self, record_type, limit=5):
entries = self.unified_manager.get_recent_sync(record_type, limit=limit)
return [(e.event.content, e.metadata) for e in entries]
def retrieve_context(self, query, n_results=5, include_metadata=False, rerank=True):
"""Retrieve memory with hybrid reranking (semantic + importance + recency)."""
# MemoryManager recall already handles Ebbinghaus recency & hybrid scoring.
# We can just fetch via recall_sync.
entries = self.unified_manager.recall_sync(query, limit=n_results)
if not include_metadata:
return "\n---\n".join([e.event.content for e in entries])
return [(e.event.content, e.metadata) for e in entries]
def retrieve_context_ranked(self, query, n_results=5):
"""
Retrieve memory context re-ranked by the DMU (Dynamic Memory Unit).
This applies a second re-ranking pass on top of the Unified Memory Spine's
internal DMU scoring, using an alternative time-decay model with explicit
emotional-weight damping. Results are logged to the DMU telemetry database.
Falls back to standard `retrieve_context` if the DMU module is unavailable.
"""
try:
from infj_bot.memory.dmu import rank_memory_entries, format_ranked_entries
entries = self.unified_manager.recall_sync(query, limit=n_results * 2)
if not entries:
return ""
ranked = rank_memory_entries(entries, query=query, top_k=n_results)
return format_ranked_entries(ranked)
except Exception:
# Safe fallback: if DMU fails for any reason, use standard retrieval
return self.retrieve_context(query, n_results=n_results)
def _rerank(
self, documents, metadatas, distances, top_k=5
) -> Tuple[List[str], List[dict]]:
# Deprecated: _rerank logic is now handled internally by MemoryManager.recall
pass
def search(self, query, n_results=5):
return self.retrieve_context(query, n_results=n_results, include_metadata=True)
def recent_interactions(self, limit=10):
entries = self.unified_manager.get_recent_sync("interaction", limit=limit)
return [e.event.content for e in entries]
def interaction_count(self):
return self.unified_manager.count_sync("interaction")
def forget_concept(self, concept_name):
self.unified_manager.forget_concept_sync(concept_name)
def edit_concept(self, concept_name, new_description):
"""Update an existing concept's description."""
# We first forget the old concept to avoid duplicates
self.unified_manager.forget_concept_sync(concept_name)
timestamp = datetime.datetime.now().isoformat()
content = f"Concept: {concept_name}\nDescription: {new_description}"
metadata = {
"type": "learned_knowledge",
"timestamp": timestamp,
"last_updated": timestamp,
"concept": concept_name,
"tags": "edited",
"importance": 0.8,
}
event = Event(
type="learned_knowledge",
content=content,
timestamp=datetime.datetime.fromisoformat(timestamp),
)
_run_async(self.unified_manager.remember(event, metadata))
def export_json(self, path):
# We export what we can from unified manager via recall
# This is a bit of a hack for backwards compatibility
pass # Will implement fully later if needed, but for now we skip or return 0
return 0
def import_json(self, path):
payload = json.loads(Path(path).read_text(encoding="utf-8"))
records = payload.get("records", [])
if not records:
return 0
bad = [r for r in records if not all(k in r for k in ("id", "document"))]
if bad:
raise ValueError(
f"Import failed: {len(bad)} records missing required fields."
)
# Skipping for phase 4.2 unless required, but validation passed
return 0
def count(self):
return self.unified_manager.count_sync()
def prune_interactions(self, max_age_days=30, max_importance=0.4, force=False):
"""Remove old interactions with low importance. Returns count removed."""
now = datetime.datetime.now()
stats = self.unified_manager.prune_sync(
now=now, threshold=0.1, force=force
) # Uses standard Ebbinghaus
return stats.sqlite_deleted
def auto_prune(self, turn_count: int = 0, force: bool = False) -> int:
"""Auto-prune low-value memories based on turn count or time elapsed.
Returns number of memories pruned.
"""
stats = self.unified_manager.auto_prune_sync(turn_count=turn_count, force=force)
return stats.sqlite_deleted
def migrate_from_legacy(self) -> int:
"""Deprecated."""
return 0
if __name__ == "__main__":
# Quick test
memory = DriftMemory()
print("Memory System Initialized.")