| """ |
| Project Friday — Deep Memory Service (Cloud Sovereign) |
| Utilizes Supabase PgVector via SQLAlchemy to store and retrieve conversational snippets. |
| """ |
| import logging |
| from typing import List, Optional |
| from sqlalchemy.orm import Session |
| from sqlalchemy import select, func |
|
|
| from app.core.database import SessionLocal, engine |
| from app.models.entities import ConversationMemory |
| from app.services import llm |
|
|
| logger = logging.getLogger("friday.memory") |
|
|
| def init_memory(): |
| """Ensures the cloud schema is ready.""" |
| try: |
| from app.models import entities |
| from app.core.database import Base |
| Base.metadata.create_all(bind=engine) |
| logger.info("Deep Memory: Cloud Sovereign Schema Verified.") |
| except Exception as e: |
| logger.error(f"Deep Memory: Schema verification failed: {e}") |
|
|
| def embed_conversation(topic: str, content: str, entities: str = ""): |
| """Store a piece of text into semantic cloud memory.""" |
| db = SessionLocal() |
| try: |
| |
| vector = llm.get_embedding(content) |
| |
| memo = ConversationMemory( |
| topic=topic, |
| content=content, |
| entities=entities, |
| embedding=vector |
| ) |
| db.add(memo) |
| db.commit() |
| except Exception as e: |
| logger.error(f"Deep Memory storage failed: {e}") |
| db.rollback() |
| finally: |
| db.close() |
|
|
| def bulk_embed_conversations(entries: list[dict], use_local: bool = False): |
| """ |
| Hyper-velocity Parallel batch ingestion for large-scale neural distillation. |
| Entries: list of {topic, content, entities} |
| """ |
| from concurrent.futures import ThreadPoolExecutor |
| from app.services import llm |
| |
| logger.info(f"Memory: Parallel extraction engaged for {len(entries)} patterns...") |
| |
| |
| |
| def get_vec(entry): |
| return { |
| **entry, |
| "embedding": llm.get_embedding(entry["content"]) |
| } |
|
|
| with ThreadPoolExecutor(max_workers=20) as executor: |
| memos_with_vecs = list(executor.map(get_vec, entries)) |
|
|
| |
| db = SessionLocal() |
| try: |
| memos = [] |
| for mv in memos_with_vecs: |
| memos.append(ConversationMemory( |
| topic=mv["topic"], |
| content=mv["content"], |
| entities=mv.get("entities", ""), |
| embedding=mv["embedding"] |
| )) |
| |
| db.bulk_save_objects(memos) |
| db.commit() |
| logger.info(f"Deep Memory: Parallel pulse complete. Ingested {len(entries)} Master Patterns.") |
| except Exception as e: |
| logger.error(f"Deep Memory parallel batch storage failed: {e}") |
| db.rollback() |
| finally: |
| db.close() |
|
|
| def recall_memory(query: str, n_results: int = 3) -> str: |
| """Searches cloud memory for semantically related conversations using PgVector.""" |
| db = SessionLocal() |
| try: |
| |
| search_vector = llm.get_embedding(query) |
| |
| |
| is_sqlite = "sqlite" in str(engine.url) |
| |
| try: |
| if is_sqlite: |
| |
| logger.debug("Memory: Local Grid (SQLite) detected. Using optimized index search.") |
| stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results) |
| results = db.execute(stmt).scalars().all() |
| else: |
| from pgvector.sqlalchemy import Vector |
| |
| stmt = select(ConversationMemory).order_by( |
| ConversationMemory.embedding.cosine_distance(search_vector) |
| ).limit(n_results) |
| results = db.execute(stmt).scalars().all() |
| except Exception as se: |
| |
| logger.warning(f"Memory Search Issue: {se}. Falling back to recent context.") |
| stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results) |
| results = db.execute(stmt).scalars().all() |
|
|
| if not results: |
| return "Nothing found in recent deep memory about that, Sir." |
| |
| output = "Retrieved Context from Sovereign Grid:\n" |
| for i, res in enumerate(results): |
| output += f"[{i+1}] (Topic: {res.topic}) {res.content}\n" |
| return output |
| except Exception as e: |
| logger.error(f"Deep Memory recall failed: {e}") |
| return "Failed to recall memory due to an error." |
| finally: |
| db.close() |
|
|
| def summarize_and_consolidate(topic: str, thread_summary: str, entities: str = "") -> str: |
| """Consolidates a conversation thread into a high-quality semantic memory.""" |
| try: |
| embed_conversation(topic, thread_summary, entities) |
| return f"Successfully consolidated memory for topic: {topic}" |
| except Exception as e: |
| logger.error(f"Consolidation failed: {e}") |
| return f"Failed to consolidate memory: {str(e)}" |
|
|