| import uuid |
| import datetime |
| import json |
| import re |
| import asyncio |
| from pathlib import Path |
| from typing import List, Tuple |
|
|
| from infj_bot.core.config import PERSIST_DIRECTORY, DRIFT_USE_LOCAL_EMBEDDINGS |
| from infj_bot.core.embeddings import ( |
| get_default_embedding_function, |
| LocalEmbeddingFunction, |
| SemanticEmbeddingFunction, |
| ) |
| from infj_bot.core.unified_memory import MemoryManager, Event |
|
|
|
|
| |
| |
| |
|
|
| SECRET_PATTERNS = [ |
| |
| re.compile( |
| r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", re.S |
| ), |
| |
| re.compile( |
| r"(?i)(api[_-]?key|auth[_-]?token|access[_-]?token|bearer\s+|password|secret|private[_-]?key)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/+=]{8,}['\"]?" |
| ), |
| |
| re.compile(r"\b[a-f0-9]{64}\b"), |
| re.compile(r"\b[a-f0-9]{40}\b"), |
| |
| re.compile( |
| r"(?i)(key|token|secret|password)\s*[=:]\s*['\"]?[A-Za-z0-9_\-/+=]{24,}['\"]?" |
| ), |
| ] |
|
|
| |
| LEGIT_HEX_ALLOWLIST = [ |
| re.compile( |
| r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$" |
| ), |
| re.compile(r"^[a-f0-9]{7,40}$"), |
| re.compile(r"^0x[a-f0-9]+$"), |
| ] |
|
|
|
|
| def _looks_like_secret(value: str) -> bool: |
| """Check if a matched string is likely a secret (not allowlisted).""" |
| for pattern in LEGIT_HEX_ALLOWLIST: |
| if pattern.match(value): |
| return False |
| return True |
|
|
|
|
| def _run_async(coro): |
| """Helper to run async code from sync methods.""" |
| try: |
| loop = asyncio.get_running_loop() |
| |
| loop.create_task(coro) |
| except RuntimeError: |
| |
| asyncio.run(coro) |
|
|
|
|
| class DriftMemory: |
| LEGACY_COLLECTION = "infj_companion_memories" |
| SEMANTIC_COLLECTION = "infj_semantic_memories" |
|
|
| def __init__( |
| self, persist_directory=None, embedding_function=None, use_semantic=True |
| ): |
| if persist_directory is None: |
| persist_directory = str(PERSIST_DIRECTORY) |
|
|
| self.use_semantic = use_semantic and not DRIFT_USE_LOCAL_EMBEDDINGS |
| if embedding_function is None: |
| if DRIFT_USE_LOCAL_EMBEDDINGS: |
| embedding_function = LocalEmbeddingFunction() |
| elif use_semantic: |
| embedding_function = get_default_embedding_function() |
| else: |
| embedding_function = LocalEmbeddingFunction() |
|
|
| self.embedding_function = embedding_function |
|
|
| |
| self.unified_manager = MemoryManager( |
| chroma_path=persist_directory, |
| db_path=str(Path(persist_directory) / "unified_memory.db"), |
| ) |
|
|
| |
| if isinstance(self.embedding_function, SemanticEmbeddingFunction): |
| self.collection_name = self.SEMANTIC_COLLECTION |
| else: |
| self.collection_name = self.LEGACY_COLLECTION |
|
|
| def scrub_text(self, text: str) -> str: |
| """Redact secrets from text, with allowlist protection.""" |
| scrubbed = text |
| for pattern in SECRET_PATTERNS: |
| for match in pattern.finditer(scrubbed): |
| matched_text = match.group() |
| if _looks_like_secret(matched_text): |
| scrubbed = ( |
| scrubbed[: match.start()] |
| + "[REDACTED]" |
| + scrubbed[match.end() :] |
| ) |
| return scrubbed |
|
|
| def save_interaction( |
| self, |
| user_input, |
| bot_output, |
| mode="companion", |
| emotion=None, |
| importance=0.5, |
| dissonance=None, |
| ): |
| timestamp = datetime.datetime.now().isoformat() |
| safe_user_input = self.scrub_text(user_input) |
| safe_bot_output = self.scrub_text(bot_output) |
| content = f"user: {safe_user_input}\nBot: {safe_bot_output}" |
|
|
| emotion = emotion or {"label": "neutral"} |
| dissonance = dissonance or {"score": 0.0, "values": [], "markers": []} |
|
|
| metadata = { |
| "type": "interaction", |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "mode": mode, |
| "emotion": emotion.get("label", "neutral"), |
| "emotion_secondary": emotion.get("secondary", "neutral"), |
| "emotion_confidence": float(emotion.get("confidence", 0.0)), |
| "emotion_valence": float(emotion.get("valence", 0.0)), |
| "emotion_arousal": float(emotion.get("arousal", 0.0)), |
| "emotion_intensity": float(emotion.get("intensity", 0.0)), |
| "emotion_needs": emotion.get("needs", ""), |
| "emotion_detector": emotion.get("detector", "unknown"), |
| "dissonance_score": float(dissonance.get("score", 0.0)), |
| "dissonance_values": ",".join(dissonance.get("values", [])), |
| "dissonance_markers": ",".join(dissonance.get("markers", [])), |
| "dissonance_detector": dissonance.get("detector", "unknown"), |
| "importance": float(importance), |
| } |
|
|
| |
| event = Event( |
| type="interaction", |
| content=content, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| def learn_concept(self, concept_name, description, tags=None, importance=0.8): |
| timestamp = datetime.datetime.now().isoformat() |
| content = f"Concept: {concept_name}\nDescription: {description}" |
|
|
| metadata = { |
| "type": "learned_knowledge", |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "concept": concept_name, |
| "tags": ",".join(tags or []), |
| "importance": float(importance), |
| } |
|
|
| event = Event( |
| type="learned_knowledge", |
| content=content, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| def save_reflection(self, title, summary, tags=None, importance=0.9): |
| timestamp = datetime.datetime.now().isoformat() |
| title = title or f"reflection-{timestamp}" |
| content = f"Reflection: {title}\nSummary: {summary}" |
|
|
| metadata = { |
| "type": "reflection", |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "title": title, |
| "tags": ",".join(tags or []), |
| "importance": float(importance), |
| } |
|
|
| event = Event( |
| type="reflection", |
| content=content, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| def save_thought( |
| self, |
| thought_text, |
| thought_type="autonomous", |
| source="being", |
| emotion_tag=None, |
| importance=0.6, |
| ): |
| """Save a bot thought to semantic memory so it can be retrieved later.""" |
| timestamp = datetime.datetime.now().isoformat() |
| safe_text = self.scrub_text(thought_text) |
| content = f"Thought ({thought_type} from {source}): {safe_text}" |
|
|
| metadata = { |
| "type": "thought", |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "thought_type": thought_type, |
| "source": source, |
| "emotion": emotion_tag or "neutral", |
| "importance": float(importance), |
| } |
|
|
| event = Event( |
| type="thought", |
| content=content, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| def save_bug_record( |
| self, title, document, record_type="bug_note", tags=None, importance=0.85 |
| ): |
| timestamp = datetime.datetime.now().isoformat() |
| safe_title = title.strip() or f"{record_type}-{timestamp}" |
| safe_document = self.scrub_text(document) |
| record_id = str( |
| uuid.uuid5( |
| uuid.NAMESPACE_DNS, f"infj-{record_type}:{safe_title}:{timestamp}" |
| ) |
| ) |
|
|
| metadata = { |
| "type": record_type, |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "title": safe_title, |
| "tags": ",".join(tags or []), |
| "importance": float(importance), |
| } |
|
|
| event = Event( |
| type=record_type, |
| content=safe_document, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| return record_id |
|
|
| def retrieve_thoughts(self, query="", n_results=5): |
| """Retrieve the bot's own thoughts, optionally filtered by semantic similarity.""" |
| if query: |
| entries = self.unified_manager.recall_sync(query, limit=n_results) |
| |
| entries = [e for e in entries if e.metadata.get("type") == "thought"] |
| return [(e.event.content, e.metadata) for e in entries] |
| else: |
| entries = self.unified_manager.get_recent_sync("thought", limit=n_results) |
| return [(e.event.content, e.metadata) for e in entries] |
|
|
| def recent_records(self, record_type, limit=5): |
| entries = self.unified_manager.get_recent_sync(record_type, limit=limit) |
| return [(e.event.content, e.metadata) for e in entries] |
|
|
| def retrieve_context(self, query, n_results=5, include_metadata=False, rerank=True): |
| """Retrieve memory with hybrid reranking (semantic + importance + recency).""" |
| |
| |
| entries = self.unified_manager.recall_sync(query, limit=n_results) |
|
|
| if not include_metadata: |
| return "\n---\n".join([e.event.content for e in entries]) |
| return [(e.event.content, e.metadata) for e in entries] |
|
|
| def retrieve_context_ranked(self, query, n_results=5): |
| """ |
| Retrieve memory context re-ranked by the DMU (Dynamic Memory Unit). |
| |
| This applies a second re-ranking pass on top of the Unified Memory Spine's |
| internal DMU scoring, using an alternative time-decay model with explicit |
| emotional-weight damping. Results are logged to the DMU telemetry database. |
| |
| Falls back to standard `retrieve_context` if the DMU module is unavailable. |
| """ |
| try: |
| from infj_bot.memory.dmu import rank_memory_entries, format_ranked_entries |
|
|
| entries = self.unified_manager.recall_sync(query, limit=n_results * 2) |
| if not entries: |
| return "" |
| ranked = rank_memory_entries(entries, query=query, top_k=n_results) |
| return format_ranked_entries(ranked) |
| except Exception: |
| |
| return self.retrieve_context(query, n_results=n_results) |
|
|
| def _rerank( |
| self, documents, metadatas, distances, top_k=5 |
| ) -> Tuple[List[str], List[dict]]: |
| |
| pass |
|
|
| def search(self, query, n_results=5): |
| return self.retrieve_context(query, n_results=n_results, include_metadata=True) |
|
|
| def recent_interactions(self, limit=10): |
| entries = self.unified_manager.get_recent_sync("interaction", limit=limit) |
| return [e.event.content for e in entries] |
|
|
| def interaction_count(self): |
| return self.unified_manager.count_sync("interaction") |
|
|
| def forget_concept(self, concept_name): |
| self.unified_manager.forget_concept_sync(concept_name) |
|
|
| def edit_concept(self, concept_name, new_description): |
| """Update an existing concept's description.""" |
| |
| self.unified_manager.forget_concept_sync(concept_name) |
|
|
| timestamp = datetime.datetime.now().isoformat() |
| content = f"Concept: {concept_name}\nDescription: {new_description}" |
|
|
| metadata = { |
| "type": "learned_knowledge", |
| "timestamp": timestamp, |
| "last_updated": timestamp, |
| "concept": concept_name, |
| "tags": "edited", |
| "importance": 0.8, |
| } |
|
|
| event = Event( |
| type="learned_knowledge", |
| content=content, |
| timestamp=datetime.datetime.fromisoformat(timestamp), |
| ) |
| _run_async(self.unified_manager.remember(event, metadata)) |
|
|
| def export_json(self, path): |
| |
| |
| pass |
| return 0 |
|
|
| def import_json(self, path): |
| payload = json.loads(Path(path).read_text(encoding="utf-8")) |
| records = payload.get("records", []) |
| if not records: |
| return 0 |
| bad = [r for r in records if not all(k in r for k in ("id", "document"))] |
| if bad: |
| raise ValueError( |
| f"Import failed: {len(bad)} records missing required fields." |
| ) |
| |
| return 0 |
|
|
| def count(self): |
| return self.unified_manager.count_sync() |
|
|
| def prune_interactions(self, max_age_days=30, max_importance=0.4, force=False): |
| """Remove old interactions with low importance. Returns count removed.""" |
| now = datetime.datetime.now() |
| stats = self.unified_manager.prune_sync( |
| now=now, threshold=0.1, force=force |
| ) |
| return stats.sqlite_deleted |
|
|
| def auto_prune(self, turn_count: int = 0, force: bool = False) -> int: |
| """Auto-prune low-value memories based on turn count or time elapsed. |
| |
| Returns number of memories pruned. |
| """ |
| stats = self.unified_manager.auto_prune_sync(turn_count=turn_count, force=force) |
| return stats.sqlite_deleted |
|
|
| def migrate_from_legacy(self) -> int: |
| """Deprecated.""" |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| |
| memory = DriftMemory() |
| print("Memory System Initialized.") |
|
|