""" Memory tools for Project Memory - complete_task and memory_search. Dependencies (from Dev A): - app.models: Task, LogEntry, TaskStatus, ActorType, ActionType - app.database: get_db These imports will work once Dev A completes models.py and database.py. """ from datetime import datetime from typing import Optional from sqlalchemy.orm import Session # Dev A's imports (will work when their files are ready) from app.models import Task, LogEntry, TaskStatus, ActorType, ActionType from app.database import get_db # Dev B's imports from app.llm import generate_documentation, synthesize_answer, get_embedding from app.vectorstore import add_embedding, search # Tool definitions for MCP server TOOLS = [ { "name": "complete_task", "description": "Mark a task as complete with AI-generated documentation", "inputSchema": { "type": "object", "properties": { "task_id": {"type": "string", "description": "ID of the task to complete"}, "project_id": {"type": "string", "description": "Project ID"}, "user_id": {"type": "string", "description": "User completing the task"}, "what_i_did": {"type": "string", "description": "Description of work done"}, "code_snippet": {"type": "string", "description": "Optional code snippet"} }, "required": ["task_id", "project_id", "user_id", "what_i_did"] } }, { "name": "memory_search", "description": "Search project memory with natural language and get AI-synthesized answers", "inputSchema": { "type": "object", "properties": { "project_id": {"type": "string", "description": "Project to search in"}, "query": {"type": "string", "description": "Natural language search query"} }, "required": ["project_id", "query"] } } ] async def complete_task( task_id: str, project_id: str, user_id: str, what_i_did: str, code_snippet: Optional[str] = None, actor_type: str = "human", db: Optional[Session] = None ) -> dict: """ Complete a task and create searchable memory. Pipeline: 1. Update task status to done 2. Generate documentation via LLM 3. Create LogEntry record 4. Create embedding and store in vector DB """ if db is None: db = next(get_db()) try: # 1. Get and update task task = db.query(Task).filter(Task.id == task_id).first() if not task: return {"success": False, "error": "Task not found"} task.status = TaskStatus.done task.completed_at = datetime.now() # 2. Generate documentation via LLM doc = await generate_documentation( task_title=task.title, what_i_did=what_i_did, code_snippet=code_snippet ) # 3. Create LogEntry log_entry = LogEntry( project_id=project_id, task_id=task_id, user_id=user_id, actor_type=ActorType(actor_type), action_type=ActionType.task_completed, raw_input=what_i_did, code_snippet=code_snippet, generated_doc=doc["details"], tags=doc.get("tags", []) ) db.add(log_entry) db.commit() db.refresh(log_entry) # 4. Create embedding and store text_to_embed = f""" Task: {task.title} Summary: {doc['summary']} Details: {doc['details']} Code: {code_snippet or ''} """ embedding = await get_embedding(text_to_embed) add_embedding( log_entry_id=str(log_entry.id), text=text_to_embed, embedding=embedding, metadata={ "project_id": project_id, "user_id": user_id, "task_id": task_id, "created_at": log_entry.created_at.isoformat() } ) return { "success": True, "log_entry_id": str(log_entry.id), "summary": doc["summary"], "tags": doc.get("tags", []) } except Exception as e: db.rollback() return {"success": False, "error": str(e)} async def memory_search( project_id: str, query: str, filters: Optional[dict] = None, db: Optional[Session] = None ) -> dict: """ Search project memory and synthesize answer. Pipeline: 1. Get query embedding 2. Vector similarity search (with optional filters) 3. Fetch full log entries 4. LLM synthesis of answer with citations Args: project_id: Project to search in query: Natural language search query filters: Optional filters dict with keys: userId, dateFrom, dateTo, tags db: Database session (optional) """ if db is None: db = next(get_db()) try: # 1. Get query embedding query_embedding = await get_embedding(query) # 2. Convert camelCase filter keys to snake_case for vectorstore search_filters = None if filters: search_filters = {} if filters.get("userId"): search_filters["user_id"] = filters["userId"] if filters.get("dateFrom"): search_filters["date_from"] = filters["dateFrom"] if filters.get("dateTo"): search_filters["date_to"] = filters["dateTo"] # Note: tags filtering not yet implemented in vectorstore # 3. Vector search with filters results = search( query_embedding=query_embedding, project_id=project_id, n_results=10, filters=search_filters ) if not results: return { "answer": "No relevant information found in project memory.", "sources": [] } # 3. Get full log entries log_entry_ids = [r["id"] for r in results] log_entries = db.query(LogEntry).filter(LogEntry.id.in_(log_entry_ids)).all() if not log_entries: # Fallback to vector store text if log entries not found context = "\n---\n".join([ f"Entry: {r['metadata'].get('text', '')}" for r in results ]) else: # 4. Build context from log entries context = "\n---\n".join([ f"Date: {e.created_at}\nTask: {e.raw_input}\nDoc: {e.generated_doc}" for e in log_entries ]) # 5. Synthesize answer answer = await synthesize_answer(context, query) return { "answer": answer, "sources": [ { "id": str(e.id), "summary": e.raw_input, "date": e.created_at.isoformat() } for e in log_entries ] if log_entries else [ { "id": r["id"], "summary": r["metadata"].get("text", "")[:100], "date": r["metadata"].get("created_at", "") } for r in results ] } except Exception as e: return {"answer": f"Error searching memory: {str(e)}", "sources": []} # Handler map for MCP server HANDLERS = { "complete_task": complete_task, "memory_search": memory_search, }