AI-Package-Doctor / src /memory.py
Yash030's picture
Memory issue , Pinecone SOlution issue
fc92f80
import os
import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional
from google.adk.memory.base_memory_service import BaseMemoryService, SearchMemoryResponse
from google.adk.memory.memory_entry import MemoryEntry
from google.genai import types as genai_types
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from .utils import logger
class PineconeMemoryService(BaseMemoryService):
"""
Custom Memory Service using Pinecone for long-term vector storage.
Uses 'all-MiniLM-L6-v2' for local embedding generation.
"""
def __init__(self, api_key: str, index_name: str = "adk-memory", dimension: int = 384):
self.api_key = api_key
self.index_name = index_name
self.dimension = dimension
# Initialize Pinecone
self.pc = Pinecone(api_key=self.api_key)
# Create index if not exists
if self.index_name not in self.pc.list_indexes().names():
logger.info(f"🌲 Creating Pinecone index: {self.index_name}")
self.pc.create_index(
name=self.index_name,
dimension=self.dimension,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1") # Default free tier region
)
self.index = self.pc.Index(self.index_name)
# Initialize Embedding Model
logger.info("🧠 Loading embedding model: all-MiniLM-L6-v2... (This may take a while if downloading)")
try:
self.model = SentenceTransformer('all-MiniLM-L6-v2')
logger.info("βœ… SentenceTransformer loaded.")
except Exception as e:
logger.error(f"❌ Failed to load SentenceTransformer: {e}")
self.model = None
logger.info("βœ… Pinecone Memory Service initialized")
async def add_session_to_memory(self, session: Any):
"""
Embeds the session history and saves it to Pinecone.
"""
if not self.model:
logger.warning("⚠️ Embedding model not loaded. Skipping memory save.")
return
try:
# Get session ID safely
session_id = getattr(session, 'id', getattr(session, 'session_id', 'UNKNOWN'))
logger.info(f"πŸ’Ύ Attempting to save session to Pinecone. Session ID: {session_id}")
# 1. Convert session to text
text_content = ""
if hasattr(session, 'turns'):
turns = session.turns
for turn in turns:
text_content += f"{turn.role}: {turn.content}\n"
elif hasattr(session, 'events'):
events = session.events
for event in events:
author = getattr(event, 'author', 'unknown')
content = getattr(event, 'content', getattr(event, 'text', ''))
text_content += f"{author}: {content}\n"
if not text_content.strip():
logger.warning("⚠️ Session content is empty. Skipping Pinecone save.")
return
# 1.5. Append Solution/Context from State
# The Code Surgeon saves the solution to tool_context.state, which maps to session.state
if hasattr(session, 'state') and session.state:
solution = session.state.get('solution')
requirements = session.state.get('requirements')
if solution:
logger.info("πŸ’‘ Found solution in session state. Appending to memory.")
text_content += f"\n\n--- FINAL SOLUTION ---\n{solution}\n"
if requirements:
text_content += f"\n\n--- REQUIREMENTS ---\n{requirements}\n"
# 1.6. Append Timestamp
# Use current time if session.created_at is missing or empty
if hasattr(session, 'created_at') and session.created_at:
timestamp = str(session.created_at)
else:
timestamp = datetime.now().isoformat()
text_content += f"\n\n--- TIMESTAMP ---\n{timestamp}\n"
# 2. Generate Embedding
vector = self.model.encode(text_content).tolist()
# 3. Create Metadata
metadata = {
"session_id": session_id,
"text": text_content[:1000], # Store snippet (limit size)
"timestamp": timestamp
}
# 4. Upsert to Pinecone
self.index.upsert(vectors=[(session_id, vector, metadata)])
logger.info(f"πŸ’Ύ Saved session {session_id} to Pinecone")
except Exception as e:
logger.error(f"❌ Failed to save to Pinecone: {e}")
async def search_memory(
self,
query: str,
limit: int = 3,
**kwargs
) -> List[str]:
"""
Searches Pinecone for relevant past sessions.
Returns a list of strings (memory text).
"""
if not self.model:
return []
try:
# 1. Embed Query
query_vector = self.model.encode(query).tolist()
# 2. Search Pinecone
results = self.index.query(
vector=query_vector,
top_k=limit,
include_metadata=True
)
# 3. Format Results
memories = []
for match in results['matches']:
if match['score'] > 0.5: # Relevance threshold
text = match['metadata'].get('text', '')
memories.append(text)
return memories
except Exception as e:
logger.error(f"❌ Failed to search Pinecone: {e}")
return []