tillu-daemon / app /api /memory.py
tillu-AI's picture
upload app/api/memory.py
27ca8e7 verified
"""
Memory API endpoints for semantic search and storage
"""
import time
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, Header
from app.models.api import (
MemorySearchRequest, MemorySearchResponse, MemoryItem
)
from app.utils.database import db
from app.utils.cache import cache
from app.utils.logging import get_logger
logger = get_logger("memory_api")
router = APIRouter(prefix="/api/v1/memory")
async def verify_auth(authorization: Optional[str] = Header(None)):
"""Verify bearer token authentication"""
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header missing")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization format")
return {"user_id": "test-user-id"}
@router.post("/search", response_model=MemorySearchResponse)
async def search_memory(
request: MemorySearchRequest,
auth: dict = Depends(verify_auth)
):
"""
Semantic memory query across all stores.
Uses pgvector similarity search with embeddings.
"""
user_id = auth["user_id"]
start_time = time.time()
logger.info("Memory search", query=request.query, limit=request.limit)
try:
# PHASE 2: Semantic search with embeddings
from app.memory.semantic_search import semantic_search
results = await semantic_search.search_all(
user_id=user_id,
query=request.query,
max_results_per_source=request.limit
)
# Format results from all sources
items = []
# Add knowledge base results
for item in results.get("knowledge", []):
items.append(MemoryItem(
id=item["id"],
content=item["content"],
content_type=item.get("content_type", "fact"),
category=item.get("category"),
source_type="knowledge_base",
confidence_score=item.get("confidence_score", 0.8),
similarity=item.get("similarity", 0.75),
created_at=item["created_at"]
))
# Add news results
for item in results.get("news", []):
items.append(MemoryItem(
id=item["id"],
content=f"{item.get('title', '')}: {item.get('summary', '')}",
content_type="news",
category="current_events",
source_type="news_article",
confidence_score=item.get("relevance_score", 0.7),
similarity=item.get("similarity", 0.7),
created_at=item["fetched_at"]
))
# Add research results
for item in results.get("research", []):
items.append(MemoryItem(
id=item["id"],
content=item.get("executive_summary", item.get("query", "")),
content_type="research",
category="research_session",
source_type="research",
confidence_score=0.85,
similarity=item.get("similarity", 0.75),
created_at=item["created_at"]
))
# Sort by similarity
items.sort(key=lambda x: x.similarity, reverse=True)
# Apply limit
items = items[:request.limit]
search_time_ms = int((time.time() - start_time) * 1000)
logger.info(
"Memory search complete",
knowledge=len(results.get("knowledge", [])),
news=len(results.get("news", [])),
research=len(results.get("research", [])),
search_time_ms=search_time_ms
)
return MemorySearchResponse(
query=request.query,
results=items,
total_found=len(items),
search_time_ms=search_time_ms
)
except Exception as e:
logger.error("Memory search error", error=str(e))
raise HTTPException(status_code=500, detail="Search failed")
@router.post("/store")
async def store_memory(
content: str,
content_type: str = "fact",
category: Optional[str] = None,
auth: dict = Depends(verify_auth)
):
"""
Explicitly store a knowledge item.
Generates embedding automatically (Phase 2).
"""
user_id = auth["user_id"]
logger.info("Storing memory", content_type=content_type, category=category)
try:
# PHASE 2: Store with embedding
from app.memory.semantic_search import semantic_search
result = await semantic_search.store_with_embedding(
user_id=user_id,
content=content,
content_type=content_type,
category=category,
source_type="user",
source_metadata={"api_endpoint": "/memory/store"}
)
if result:
return {
"success": True,
"memory_id": result["id"],
"embedding_generated": True,
"message": "Memory stored with embedding successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to store memory")
except Exception as e:
logger.error("Memory store error", error=str(e))
raise HTTPException(status_code=500, detail="Store failed")
@router.delete("/{memory_id}")
async def delete_memory(
memory_id: str,
auth: dict = Depends(verify_auth)
):
"""Delete a memory item"""
user_id = auth["user_id"]
logger.info("Deleting memory", memory_id=memory_id)
result = await db.delete(
"knowledge_base",
{"id": memory_id, "user_id": user_id}
)
if result:
return {"success": True, "message": "Memory deleted"}
else:
raise HTTPException(status_code=404, detail="Memory not found")