Amal Nimmy Lal
feat : updated pages
698b2c1
"""
Memory tools for Project Memory - complete_task and memory_search.
Dependencies (from Dev A):
- app.models: Task, LogEntry, TaskStatus, ActorType, ActionType
- app.database: get_db
These imports will work once Dev A completes models.py and database.py.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
# Dev A's imports (will work when their files are ready)
from app.models import Task, LogEntry, TaskStatus, ActorType, ActionType
from app.database import get_db
# Dev B's imports
from app.llm import generate_documentation, synthesize_answer, get_embedding
from app.vectorstore import add_embedding, search
# Tool definitions for MCP server
TOOLS = [
{
"name": "complete_task",
"description": "Mark a task as complete with AI-generated documentation",
"inputSchema": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "ID of the task to complete"},
"project_id": {"type": "string", "description": "Project ID"},
"user_id": {"type": "string", "description": "User completing the task"},
"what_i_did": {"type": "string", "description": "Description of work done"},
"code_snippet": {"type": "string", "description": "Optional code snippet"}
},
"required": ["task_id", "project_id", "user_id", "what_i_did"]
}
},
{
"name": "memory_search",
"description": "Search project memory with natural language and get AI-synthesized answers",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Project to search in"},
"query": {"type": "string", "description": "Natural language search query"}
},
"required": ["project_id", "query"]
}
}
]
async def complete_task(
task_id: str,
project_id: str,
user_id: str,
what_i_did: str,
code_snippet: Optional[str] = None,
actor_type: str = "human",
db: Optional[Session] = None
) -> dict:
"""
Complete a task and create searchable memory.
Pipeline:
1. Update task status to done
2. Generate documentation via LLM
3. Create LogEntry record
4. Create embedding and store in vector DB
"""
if db is None:
db = next(get_db())
try:
# 1. Get and update task
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
return {"success": False, "error": "Task not found"}
task.status = TaskStatus.done
task.completed_at = datetime.now()
# 2. Generate documentation via LLM
doc = await generate_documentation(
task_title=task.title,
what_i_did=what_i_did,
code_snippet=code_snippet
)
# 3. Create LogEntry
log_entry = LogEntry(
project_id=project_id,
task_id=task_id,
user_id=user_id,
actor_type=ActorType(actor_type),
action_type=ActionType.task_completed,
raw_input=what_i_did,
code_snippet=code_snippet,
generated_doc=doc["details"],
tags=doc.get("tags", [])
)
db.add(log_entry)
db.commit()
db.refresh(log_entry)
# 4. Create embedding and store
text_to_embed = f"""
Task: {task.title}
Summary: {doc['summary']}
Details: {doc['details']}
Code: {code_snippet or ''}
"""
embedding = await get_embedding(text_to_embed)
add_embedding(
log_entry_id=str(log_entry.id),
text=text_to_embed,
embedding=embedding,
metadata={
"project_id": project_id,
"user_id": user_id,
"task_id": task_id,
"created_at": log_entry.created_at.isoformat()
}
)
return {
"success": True,
"log_entry_id": str(log_entry.id),
"summary": doc["summary"],
"tags": doc.get("tags", [])
}
except Exception as e:
db.rollback()
return {"success": False, "error": str(e)}
async def memory_search(
project_id: str,
query: str,
filters: Optional[dict] = None,
db: Optional[Session] = None
) -> dict:
"""
Search project memory and synthesize answer.
Pipeline:
1. Get query embedding
2. Vector similarity search (with optional filters)
3. Fetch full log entries
4. LLM synthesis of answer with citations
Args:
project_id: Project to search in
query: Natural language search query
filters: Optional filters dict with keys: userId, dateFrom, dateTo, tags
db: Database session (optional)
"""
if db is None:
db = next(get_db())
try:
# 1. Get query embedding
query_embedding = await get_embedding(query)
# 2. Convert camelCase filter keys to snake_case for vectorstore
search_filters = None
if filters:
search_filters = {}
if filters.get("userId"):
search_filters["user_id"] = filters["userId"]
if filters.get("dateFrom"):
search_filters["date_from"] = filters["dateFrom"]
if filters.get("dateTo"):
search_filters["date_to"] = filters["dateTo"]
# Note: tags filtering not yet implemented in vectorstore
# 3. Vector search with filters
results = search(
query_embedding=query_embedding,
project_id=project_id,
n_results=10,
filters=search_filters
)
if not results:
return {
"answer": "No relevant information found in project memory.",
"sources": []
}
# 3. Get full log entries
log_entry_ids = [r["id"] for r in results]
log_entries = db.query(LogEntry).filter(LogEntry.id.in_(log_entry_ids)).all()
if not log_entries:
# Fallback to vector store text if log entries not found
context = "\n---\n".join([
f"Entry: {r['metadata'].get('text', '')}"
for r in results
])
else:
# 4. Build context from log entries
context = "\n---\n".join([
f"Date: {e.created_at}\nTask: {e.raw_input}\nDoc: {e.generated_doc}"
for e in log_entries
])
# 5. Synthesize answer
answer = await synthesize_answer(context, query)
return {
"answer": answer,
"sources": [
{
"id": str(e.id),
"summary": e.raw_input,
"date": e.created_at.isoformat()
}
for e in log_entries
] if log_entries else [
{
"id": r["id"],
"summary": r["metadata"].get("text", "")[:100],
"date": r["metadata"].get("created_at", "")
}
for r in results
]
}
except Exception as e:
return {"answer": f"Error searching memory: {str(e)}", "sources": []}
# Handler map for MCP server
HANDLERS = {
"complete_task": complete_task,
"memory_search": memory_search,
}