Spaces:
Build error
Build error
| """ | |
| Memory tools for Project Memory - complete_task and memory_search. | |
| Dependencies (from Dev A): | |
| - app.models: Task, LogEntry, TaskStatus, ActorType, ActionType | |
| - app.database: get_db | |
| These imports will work once Dev A completes models.py and database.py. | |
| """ | |
| from datetime import datetime | |
| from typing import Optional | |
| from sqlalchemy.orm import Session | |
| # Dev A's imports (will work when their files are ready) | |
| from app.models import Task, LogEntry, TaskStatus, ActorType, ActionType | |
| from app.database import get_db | |
| # Dev B's imports | |
| from app.llm import generate_documentation, synthesize_answer, get_embedding | |
| from app.vectorstore import add_embedding, search | |
| # Tool definitions for MCP server | |
| TOOLS = [ | |
| { | |
| "name": "complete_task", | |
| "description": "Mark a task as complete with AI-generated documentation", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "task_id": {"type": "string", "description": "ID of the task to complete"}, | |
| "project_id": {"type": "string", "description": "Project ID"}, | |
| "user_id": {"type": "string", "description": "User completing the task"}, | |
| "what_i_did": {"type": "string", "description": "Description of work done"}, | |
| "code_snippet": {"type": "string", "description": "Optional code snippet"} | |
| }, | |
| "required": ["task_id", "project_id", "user_id", "what_i_did"] | |
| } | |
| }, | |
| { | |
| "name": "memory_search", | |
| "description": "Search project memory with natural language and get AI-synthesized answers", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "project_id": {"type": "string", "description": "Project to search in"}, | |
| "query": {"type": "string", "description": "Natural language search query"} | |
| }, | |
| "required": ["project_id", "query"] | |
| } | |
| } | |
| ] | |
| async def complete_task( | |
| task_id: str, | |
| project_id: str, | |
| user_id: str, | |
| what_i_did: str, | |
| code_snippet: Optional[str] = None, | |
| actor_type: str = "human", | |
| db: Optional[Session] = None | |
| ) -> dict: | |
| """ | |
| Complete a task and create searchable memory. | |
| Pipeline: | |
| 1. Update task status to done | |
| 2. Generate documentation via LLM | |
| 3. Create LogEntry record | |
| 4. Create embedding and store in vector DB | |
| """ | |
| if db is None: | |
| db = next(get_db()) | |
| try: | |
| # 1. Get and update task | |
| task = db.query(Task).filter(Task.id == task_id).first() | |
| if not task: | |
| return {"success": False, "error": "Task not found"} | |
| task.status = TaskStatus.done | |
| task.completed_at = datetime.now() | |
| # 2. Generate documentation via LLM | |
| doc = await generate_documentation( | |
| task_title=task.title, | |
| what_i_did=what_i_did, | |
| code_snippet=code_snippet | |
| ) | |
| # 3. Create LogEntry | |
| log_entry = LogEntry( | |
| project_id=project_id, | |
| task_id=task_id, | |
| user_id=user_id, | |
| actor_type=ActorType(actor_type), | |
| action_type=ActionType.task_completed, | |
| raw_input=what_i_did, | |
| code_snippet=code_snippet, | |
| generated_doc=doc["details"], | |
| tags=doc.get("tags", []) | |
| ) | |
| db.add(log_entry) | |
| db.commit() | |
| db.refresh(log_entry) | |
| # 4. Create embedding and store | |
| text_to_embed = f""" | |
| Task: {task.title} | |
| Summary: {doc['summary']} | |
| Details: {doc['details']} | |
| Code: {code_snippet or ''} | |
| """ | |
| embedding = await get_embedding(text_to_embed) | |
| add_embedding( | |
| log_entry_id=str(log_entry.id), | |
| text=text_to_embed, | |
| embedding=embedding, | |
| metadata={ | |
| "project_id": project_id, | |
| "user_id": user_id, | |
| "task_id": task_id, | |
| "created_at": log_entry.created_at.isoformat() | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "log_entry_id": str(log_entry.id), | |
| "summary": doc["summary"], | |
| "tags": doc.get("tags", []) | |
| } | |
| except Exception as e: | |
| db.rollback() | |
| return {"success": False, "error": str(e)} | |
| async def memory_search( | |
| project_id: str, | |
| query: str, | |
| filters: Optional[dict] = None, | |
| db: Optional[Session] = None | |
| ) -> dict: | |
| """ | |
| Search project memory and synthesize answer. | |
| Pipeline: | |
| 1. Get query embedding | |
| 2. Vector similarity search (with optional filters) | |
| 3. Fetch full log entries | |
| 4. LLM synthesis of answer with citations | |
| Args: | |
| project_id: Project to search in | |
| query: Natural language search query | |
| filters: Optional filters dict with keys: userId, dateFrom, dateTo, tags | |
| db: Database session (optional) | |
| """ | |
| if db is None: | |
| db = next(get_db()) | |
| try: | |
| # 1. Get query embedding | |
| query_embedding = await get_embedding(query) | |
| # 2. Convert camelCase filter keys to snake_case for vectorstore | |
| search_filters = None | |
| if filters: | |
| search_filters = {} | |
| if filters.get("userId"): | |
| search_filters["user_id"] = filters["userId"] | |
| if filters.get("dateFrom"): | |
| search_filters["date_from"] = filters["dateFrom"] | |
| if filters.get("dateTo"): | |
| search_filters["date_to"] = filters["dateTo"] | |
| # Note: tags filtering not yet implemented in vectorstore | |
| # 3. Vector search with filters | |
| results = search( | |
| query_embedding=query_embedding, | |
| project_id=project_id, | |
| n_results=10, | |
| filters=search_filters | |
| ) | |
| if not results: | |
| return { | |
| "answer": "No relevant information found in project memory.", | |
| "sources": [] | |
| } | |
| # 3. Get full log entries | |
| log_entry_ids = [r["id"] for r in results] | |
| log_entries = db.query(LogEntry).filter(LogEntry.id.in_(log_entry_ids)).all() | |
| if not log_entries: | |
| # Fallback to vector store text if log entries not found | |
| context = "\n---\n".join([ | |
| f"Entry: {r['metadata'].get('text', '')}" | |
| for r in results | |
| ]) | |
| else: | |
| # 4. Build context from log entries | |
| context = "\n---\n".join([ | |
| f"Date: {e.created_at}\nTask: {e.raw_input}\nDoc: {e.generated_doc}" | |
| for e in log_entries | |
| ]) | |
| # 5. Synthesize answer | |
| answer = await synthesize_answer(context, query) | |
| return { | |
| "answer": answer, | |
| "sources": [ | |
| { | |
| "id": str(e.id), | |
| "summary": e.raw_input, | |
| "date": e.created_at.isoformat() | |
| } | |
| for e in log_entries | |
| ] if log_entries else [ | |
| { | |
| "id": r["id"], | |
| "summary": r["metadata"].get("text", "")[:100], | |
| "date": r["metadata"].get("created_at", "") | |
| } | |
| for r in results | |
| ] | |
| } | |
| except Exception as e: | |
| return {"answer": f"Error searching memory: {str(e)}", "sources": []} | |
| # Handler map for MCP server | |
| HANDLERS = { | |
| "complete_task": complete_task, | |
| "memory_search": memory_search, | |
| } | |