""" Project Friday — Deep Memory Service (Cloud Sovereign) Utilizes Supabase PgVector via SQLAlchemy to store and retrieve conversational snippets. """ import logging from typing import List, Optional from sqlalchemy.orm import Session from sqlalchemy import select, func from app.core.database import SessionLocal, engine from app.models.entities import ConversationMemory from app.services import llm logger = logging.getLogger("friday.memory") def init_memory(): """Ensures the cloud schema is ready.""" try: from app.models import entities from app.core.database import Base Base.metadata.create_all(bind=engine) logger.info("Deep Memory: Cloud Sovereign Schema Verified.") except Exception as e: logger.error(f"Deep Memory: Schema verification failed: {e}") def embed_conversation(topic: str, content: str, entities: str = ""): """Store a piece of text into semantic cloud memory.""" db = SessionLocal() try: # 1. Generate Embedding vector = llm.get_embedding(content) memo = ConversationMemory( topic=topic, content=content, entities=entities, embedding=vector ) db.add(memo) db.commit() except Exception as e: logger.error(f"Deep Memory storage failed: {e}") db.rollback() finally: db.close() def bulk_embed_conversations(entries: list[dict], use_local: bool = False): """ Hyper-velocity Parallel batch ingestion for large-scale neural distillation. Entries: list of {topic, content, entities} """ from concurrent.futures import ThreadPoolExecutor from app.services import llm logger.info(f"Memory: Parallel extraction engaged for {len(entries)} patterns...") # 1. Parallel Embedding Generation # Saturate API/Core capacity with concurrent workers def get_vec(entry): return { **entry, "embedding": llm.get_embedding(entry["content"]) } with ThreadPoolExecutor(max_workers=20) as executor: memos_with_vecs = list(executor.map(get_vec, entries)) # 2. Sequential SQL Persistence (Short-duration transaction) db = SessionLocal() try: memos = [] for mv in memos_with_vecs: memos.append(ConversationMemory( topic=mv["topic"], content=mv["content"], entities=mv.get("entities", ""), embedding=mv["embedding"] )) db.bulk_save_objects(memos) db.commit() logger.info(f"Deep Memory: Parallel pulse complete. Ingested {len(entries)} Master Patterns.") except Exception as e: logger.error(f"Deep Memory parallel batch storage failed: {e}") db.rollback() finally: db.close() def recall_memory(query: str, n_results: int = 3) -> str: """Searches cloud memory for semantically related conversations using PgVector.""" db = SessionLocal() try: # 1. Generate Search Vector search_vector = llm.get_embedding(query) # 2. Execute Semantic Search with Engine Awareness is_sqlite = "sqlite" in str(engine.url) try: if is_sqlite: # SQLite: No PgVector. Use timestamp/keyword fallback. logger.debug("Memory: Local Grid (SQLite) detected. Using optimized index search.") stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results) results = db.execute(stmt).scalars().all() else: from pgvector.sqlalchemy import Vector # Optimized search via PgVector (Postgres Only) stmt = select(ConversationMemory).order_by( ConversationMemory.embedding.cosine_distance(search_vector) ).limit(n_results) results = db.execute(stmt).scalars().all() except Exception as se: # Fallback if PgVector extension is not enabled or library issue logger.warning(f"Memory Search Issue: {se}. Falling back to recent context.") stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results) results = db.execute(stmt).scalars().all() if not results: return "Nothing found in recent deep memory about that, Sir." output = "Retrieved Context from Sovereign Grid:\n" for i, res in enumerate(results): output += f"[{i+1}] (Topic: {res.topic}) {res.content}\n" return output except Exception as e: logger.error(f"Deep Memory recall failed: {e}") return "Failed to recall memory due to an error." finally: db.close() def summarize_and_consolidate(topic: str, thread_summary: str, entities: str = "") -> str: """Consolidates a conversation thread into a high-quality semantic memory.""" try: embed_conversation(topic, thread_summary, entities) return f"Successfully consolidated memory for topic: {topic}" except Exception as e: logger.error(f"Consolidation failed: {e}") return f"Failed to consolidate memory: {str(e)}"