Paritosh Upadhyay
Neural Core Maturation: Self-Evolution Initiated
64cc56e
"""
Project Friday — Deep Memory Service (Cloud Sovereign)
Utilizes Supabase PgVector via SQLAlchemy to store and retrieve conversational snippets.
"""
import logging
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from app.core.database import SessionLocal, engine
from app.models.entities import ConversationMemory
from app.services import llm
logger = logging.getLogger("friday.memory")
def init_memory():
"""Ensures the cloud schema is ready."""
try:
from app.models import entities
from app.core.database import Base
Base.metadata.create_all(bind=engine)
logger.info("Deep Memory: Cloud Sovereign Schema Verified.")
except Exception as e:
logger.error(f"Deep Memory: Schema verification failed: {e}")
def embed_conversation(topic: str, content: str, entities: str = ""):
"""Store a piece of text into semantic cloud memory."""
db = SessionLocal()
try:
# 1. Generate Embedding
vector = llm.get_embedding(content)
memo = ConversationMemory(
topic=topic,
content=content,
entities=entities,
embedding=vector
)
db.add(memo)
db.commit()
except Exception as e:
logger.error(f"Deep Memory storage failed: {e}")
db.rollback()
finally:
db.close()
def bulk_embed_conversations(entries: list[dict], use_local: bool = False):
"""
Hyper-velocity Parallel batch ingestion for large-scale neural distillation.
Entries: list of {topic, content, entities}
"""
from concurrent.futures import ThreadPoolExecutor
from app.services import llm
logger.info(f"Memory: Parallel extraction engaged for {len(entries)} patterns...")
# 1. Parallel Embedding Generation
# Saturate API/Core capacity with concurrent workers
def get_vec(entry):
return {
**entry,
"embedding": llm.get_embedding(entry["content"])
}
with ThreadPoolExecutor(max_workers=20) as executor:
memos_with_vecs = list(executor.map(get_vec, entries))
# 2. Sequential SQL Persistence (Short-duration transaction)
db = SessionLocal()
try:
memos = []
for mv in memos_with_vecs:
memos.append(ConversationMemory(
topic=mv["topic"],
content=mv["content"],
entities=mv.get("entities", ""),
embedding=mv["embedding"]
))
db.bulk_save_objects(memos)
db.commit()
logger.info(f"Deep Memory: Parallel pulse complete. Ingested {len(entries)} Master Patterns.")
except Exception as e:
logger.error(f"Deep Memory parallel batch storage failed: {e}")
db.rollback()
finally:
db.close()
def recall_memory(query: str, n_results: int = 3) -> str:
"""Searches cloud memory for semantically related conversations using PgVector."""
db = SessionLocal()
try:
# 1. Generate Search Vector
search_vector = llm.get_embedding(query)
# 2. Execute Semantic Search with Engine Awareness
is_sqlite = "sqlite" in str(engine.url)
try:
if is_sqlite:
# SQLite: No PgVector. Use timestamp/keyword fallback.
logger.debug("Memory: Local Grid (SQLite) detected. Using optimized index search.")
stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results)
results = db.execute(stmt).scalars().all()
else:
from pgvector.sqlalchemy import Vector
# Optimized search via PgVector (Postgres Only)
stmt = select(ConversationMemory).order_by(
ConversationMemory.embedding.cosine_distance(search_vector)
).limit(n_results)
results = db.execute(stmt).scalars().all()
except Exception as se:
# Fallback if PgVector extension is not enabled or library issue
logger.warning(f"Memory Search Issue: {se}. Falling back to recent context.")
stmt = select(ConversationMemory).order_by(ConversationMemory.timestamp.desc()).limit(n_results)
results = db.execute(stmt).scalars().all()
if not results:
return "Nothing found in recent deep memory about that, Sir."
output = "Retrieved Context from Sovereign Grid:\n"
for i, res in enumerate(results):
output += f"[{i+1}] (Topic: {res.topic}) {res.content}\n"
return output
except Exception as e:
logger.error(f"Deep Memory recall failed: {e}")
return "Failed to recall memory due to an error."
finally:
db.close()
def summarize_and_consolidate(topic: str, thread_summary: str, entities: str = "") -> str:
"""Consolidates a conversation thread into a high-quality semantic memory."""
try:
embed_conversation(topic, thread_summary, entities)
return f"Successfully consolidated memory for topic: {topic}"
except Exception as e:
logger.error(f"Consolidation failed: {e}")
return f"Failed to consolidate memory: {str(e)}"