ProjectMemory / backend /app /smart_query.py
Amal Nimmy Lal
feat : updated pages
698b2c1
"""
Smart Query - LLM-First implementation using Gemini function calling.
Enables natural language queries like:
- "What did I do yesterday?"
- "What did dev_a do today?"
- "Did dev_b complete task abc?"
- "How did the xyz api get implemented?"
- "Task 13 status?"
"""
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
import google.generativeai as genai
import json
import os
import logging
from dotenv import load_dotenv
# Configure logging for smart_query
logger = logging.getLogger("smart_query")
logger.setLevel(logging.DEBUG)
# Create console handler if not already present
if not logger.handlers:
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('[SmartQuery] %(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
# Load environment variables
load_dotenv()
from app.database import SessionLocal
from app.models import Task, LogEntry, User, TaskStatus, ProjectMembership
from app.llm import get_embedding
from app.vectorstore import search
from app.model_router import router as model_router
# Configure Gemini (reuse existing config pattern from llm.py)
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
@dataclass
class QueryContext:
"""Context from frontend for query processing."""
current_user_id: str
current_datetime: datetime
project_id: str
# Gemini Function Declarations
GEMINI_TOOLS = [
genai.protos.Tool(
function_declarations=[
genai.protos.FunctionDeclaration(
name="get_user_activity",
description="Get activity/log entries for a user within a date range",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
"user_id": genai.protos.Schema(type=genai.protos.Type.STRING, description="User ID (use 'current' for the current user)"),
"user_name": genai.protos.Schema(type=genai.protos.Type.STRING, description="User name to look up"),
"date_from": genai.protos.Schema(type=genai.protos.Type.STRING, description="Start date ISO format"),
"date_to": genai.protos.Schema(type=genai.protos.Type.STRING, description="End date ISO format"),
},
required=["date_from", "date_to"]
)
),
genai.protos.FunctionDeclaration(
name="get_task_status",
description="Get status and details of a specific task by ID or title",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
"task_id": genai.protos.Schema(type=genai.protos.Type.STRING, description="Task ID"),
"task_title": genai.protos.Schema(type=genai.protos.Type.STRING, description="Task title to search"),
}
)
),
genai.protos.FunctionDeclaration(
name="check_task_completion",
description="Check if a task was completed, optionally by a specific user",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
"task_title": genai.protos.Schema(type=genai.protos.Type.STRING, description="Task title"),
"user_name": genai.protos.Schema(type=genai.protos.Type.STRING, description="User to check"),
},
required=["task_title"]
)
),
genai.protos.FunctionDeclaration(
name="semantic_search",
description="Search project memory semantically for concepts, implementations, or how things work",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
"search_query": genai.protos.Schema(type=genai.protos.Type.STRING, description="Semantic search query"),
},
required=["search_query"]
)
),
genai.protos.FunctionDeclaration(
name="list_users",
description="List all users/members in the project",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={}
)
),
genai.protos.FunctionDeclaration(
name="list_tasks",
description="List tasks in the project, optionally filtered by status, who completed them, or completion date range. Use this for queries like 'what tasks are done', 'tasks completed by X', 'pending tasks', 'tasks completed in last 5 minutes'",
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
"status": genai.protos.Schema(
type=genai.protos.Type.STRING,
description="Filter by status: 'todo', 'in_progress', 'done', or 'all' for no filter"
),
"completed_by": genai.protos.Schema(
type=genai.protos.Type.STRING,
description="Filter by who completed the task - user name or user ID (e.g., 'sunny', 'John', 'ai-agent')"
),
"completed_after": genai.protos.Schema(
type=genai.protos.Type.STRING,
description="Filter tasks completed after this datetime (ISO format)"
),
"completed_before": genai.protos.Schema(
type=genai.protos.Type.STRING,
description="Filter tasks completed before this datetime (ISO format)"
),
"limit": genai.protos.Schema(
type=genai.protos.Type.INTEGER,
description="Max number of tasks to return (default 20)"
),
}
)
),
]
)
]
async def smart_query(
project_id: str,
query: str,
current_user_id: str,
current_datetime: Optional[str] = None
) -> dict:
"""
Main entry point for LLM-first query processing.
Args:
project_id: Project to query
query: Natural language query
current_user_id: ID of user making the query
current_datetime: ISO datetime string (optional, defaults to now)
Returns:
{answer: str, tools_used: list[str], sources: list[dict]}
"""
logger.info(f"=== NEW QUERY ===")
logger.info(f"Query: {query}")
logger.info(f"Project: {project_id}, User: {current_user_id}")
# Parse datetime
dt = datetime.fromisoformat(current_datetime) if current_datetime else datetime.now()
context = QueryContext(current_user_id, dt, project_id)
# Build system prompt with context
yesterday = (dt - timedelta(days=1)).date()
today = dt.date()
system_prompt = f"""You are a project memory assistant helping users find information about their project work.
CONTEXT:
- Current User ID: {context.current_user_id}
- Current Date/Time: {dt.isoformat()}
- Project ID: {context.project_id}
RULES:
1. When user says "I", "me", or "my" - use user_id="{context.current_user_id}"
2. "today" = {today}
3. "yesterday" = {yesterday}
4. Convert relative dates to absolute ISO format before calling tools
5. Use the tools to fetch real data - NEVER make up information
6. If you need to resolve a user name to ID, call list_users first
7. Always cite sources in your answer when available
"""
try:
# Get best available model from router for smart_query task
model_name = model_router.get_model_for_task("smart_query")
logger.info(f"Selected model: {model_name}")
if not model_name:
logger.warning("All models rate limited!")
return {
"answer": "All models are rate limited. Please try again in a minute.",
"tools_used": [],
"sources": []
}
# Record usage for rate limiting
model_router._record_usage(model_name)
# Create model with function calling using selected model
model = genai.GenerativeModel(
model_name,
tools=GEMINI_TOOLS,
system_instruction=system_prompt
)
# Start chat and send query
chat = model.start_chat()
logger.debug(f"Sending query to model...")
response = chat.send_message(query)
# Tool calling loop
tool_results = []
max_iterations = 5
iteration = 0
for iteration in range(max_iterations):
# Check for function calls in response
function_calls = []
for part in response.candidates[0].content.parts:
if hasattr(part, 'function_call') and part.function_call.name:
function_calls.append(part.function_call)
if not function_calls:
logger.debug(f"No more function calls after iteration {iteration}")
break # No more function calls, we have final answer
logger.info(f"--- Iteration {iteration + 1}: {len(function_calls)} tool call(s) ---")
# Execute each function call
function_responses = []
for fn_call in function_calls:
logger.info(f" TOOL: {fn_call.name}")
logger.info(f" ARGS: {json.dumps(dict(fn_call.args), default=str)}")
result = await execute_tool(fn_call.name, dict(fn_call.args), context)
tool_results.append({"tool": fn_call.name, "args": dict(fn_call.args), "result": result})
# Log result summary
if "error" in result:
logger.error(f" ERROR: {result['error']}")
elif "count" in result:
logger.info(f" RESULT: {result.get('count', 'N/A')} items returned")
elif "found" in result:
logger.info(f" RESULT: found={result['found']}")
else:
logger.info(f" RESULT: {list(result.keys())}")
function_responses.append(
genai.protos.Part(
function_response=genai.protos.FunctionResponse(
name=fn_call.name,
response={"result": json.dumps(result, default=str)}
)
)
)
# Send function results back to model
response = chat.send_message(function_responses)
# Extract final text answer
final_answer = ""
for part in response.candidates[0].content.parts:
if hasattr(part, 'text'):
final_answer += part.text
logger.info(f"=== QUERY COMPLETE ===")
logger.info(f"Tools used: {[tr['tool'] for tr in tool_results]}")
logger.info(f"Answer preview: {final_answer[:200]}...")
return {
"answer": final_answer,
"tools_used": [tr["tool"] for tr in tool_results],
"sources": extract_sources(tool_results)
}
except Exception as e:
logger.error(f"Error processing query: {str(e)}", exc_info=True)
return {
"answer": f"Error processing query: {str(e)}",
"tools_used": [],
"sources": []
}
async def execute_tool(name: str, args: dict, context: QueryContext) -> dict:
"""Execute a tool by name with given arguments."""
db = SessionLocal()
try:
match name:
case "get_user_activity":
return _tool_get_user_activity(db, context, args)
case "get_task_status":
return _tool_get_task_status(db, context.project_id, args)
case "check_task_completion":
return _tool_check_completion(db, context.project_id, args)
case "semantic_search":
return await _tool_semantic_search(context.project_id, args)
case "list_users":
return _tool_list_users(db, context.project_id)
case "list_tasks":
return _tool_list_tasks(db, context.project_id, args)
case _:
return {"error": f"Unknown tool: {name}"}
finally:
db.close()
def _get_recent_work_hint(db, user_id: str, project_id: str) -> str:
"""Get a short hint about user's recent work for disambiguation."""
recent_entry = db.query(LogEntry).filter(
LogEntry.user_id == user_id,
LogEntry.project_id == project_id
).order_by(LogEntry.created_at.desc()).first()
if recent_entry:
# Use first 50 chars of raw_input or first tag
if recent_entry.raw_input:
return f"worked on: {recent_entry.raw_input[:50]}..."
elif recent_entry.tags:
return f"worked on: {recent_entry.tags[0]}"
return "no recent activity"
def _resolve_user_in_project(db, project_id: str, user_name: str) -> dict:
"""
Resolve a user by ID or name within project scope.
First tries exact match by user ID (for cases like "ai-agent"),
then falls back to searching by first/last name.
Returns:
{"found": True, "user_id": "...", "user_name": "..."} - single match
{"found": False, "reason": "not_found"} - no matches
{"found": False, "reason": "ambiguous", "options": [...]} - multiple matches
"""
from sqlalchemy import func, or_
# First try exact match by user ID (for cases like "ai-agent")
exact_match = db.query(User, ProjectMembership).join(
ProjectMembership, User.id == ProjectMembership.user_id
).filter(
ProjectMembership.project_id == project_id,
User.id == user_name
).first()
if exact_match:
user, membership = exact_match
return {
"found": True,
"user_id": str(user.id),
"user_name": f"{user.first_name} {user.last_name}"
}
# Fall back to searching by first_name, last_name, or concatenated full name
matches = db.query(User, ProjectMembership).join(
ProjectMembership, User.id == ProjectMembership.user_id
).filter(
ProjectMembership.project_id == project_id,
or_(
User.first_name.ilike(f"%{user_name}%"),
User.last_name.ilike(f"%{user_name}%"),
func.concat(User.first_name, ' ', User.last_name).ilike(f"%{user_name}%")
)
).all()
if not matches:
return {
"found": False,
"reason": "not_found",
"message": f"No project member with ID or name matching '{user_name}' found"
}
if len(matches) == 1:
user, membership = matches[0]
return {
"found": True,
"user_id": str(user.id),
"user_name": f"{user.first_name} {user.last_name}"
}
# Multiple matches - return disambiguation options
options = []
for user, membership in matches:
options.append({
"user_id": str(user.id),
"name": user.name,
"role": membership.role,
"recent_work": _get_recent_work_hint(db, str(user.id), project_id)
})
return {
"found": False,
"reason": "ambiguous",
"message": f"Found {len(matches)} project members matching '{user_name}'",
"options": options
}
def _tool_get_user_activity(db, context: QueryContext, args: dict) -> dict:
"""Get user activity within date range."""
user_id = args.get("user_id")
user_name = args.get("user_name")
resolved_user_name = None
# Handle "current" user reference
if user_id == "current" or user_id == context.current_user_id:
user_id = context.current_user_id
# Resolve user name to ID using project-scoped resolver
if user_name and not user_id:
resolution = _resolve_user_in_project(db, context.project_id, user_name)
if not resolution["found"]:
# Return disambiguation or not found directly
return resolution
user_id = resolution["user_id"]
resolved_user_name = resolution["user_name"]
# Parse dates (handle 'Z' suffix and various formats)
date_from_str = args["date_from"].replace('Z', '+00:00').replace('+00:00', '')
date_to_str = args["date_to"].replace('Z', '+00:00').replace('+00:00', '')
try:
date_from = datetime.fromisoformat(date_from_str)
except ValueError:
# Try parsing just date
date_from = datetime.strptime(date_from_str[:10], '%Y-%m-%d')
try:
date_to = datetime.fromisoformat(date_to_str)
except ValueError:
date_to = datetime.strptime(date_to_str[:10], '%Y-%m-%d') + timedelta(days=1)
# Query LogEntry table
query = db.query(LogEntry).filter(
LogEntry.project_id == context.project_id,
LogEntry.created_at >= date_from,
LogEntry.created_at <= date_to
)
if user_id:
query = query.filter(LogEntry.user_id == user_id)
entries = query.order_by(LogEntry.created_at.desc()).all()
# Get user name for response (use already resolved name if available)
if not resolved_user_name and user_id:
resolved_user = db.query(User).filter(User.id == user_id).first()
resolved_user_name = (
f"{resolved_user.first_name} {resolved_user.last_name}"
if resolved_user else None
)
return {
"user_id": user_id,
"user_name": resolved_user_name,
"date_range": {"from": args["date_from"], "to": args["date_to"]},
"count": len(entries),
"activities": [
{
"id": str(e.id),
"task_id": str(e.task_id) if e.task_id else None,
"what_was_done": e.raw_input,
"summary": e.generated_doc[:200] if e.generated_doc else None,
"tags": e.tags or [],
"timestamp": e.created_at.isoformat()
}
for e in entries
]
}
def _tool_get_task_status(db, project_id: str, args: dict) -> dict:
"""Get task by ID or title."""
task_id = args.get("task_id")
task_title = args.get("task_title")
query = db.query(Task).filter(Task.project_id == project_id)
if task_id:
# Try exact match first, then partial
task = query.filter(Task.id == task_id).first()
if not task:
task = query.filter(Task.id.like(f"%{task_id}%")).first()
elif task_title:
task = query.filter(Task.title.ilike(f"%{task_title}%")).first()
else:
return {"error": "Provide either task_id or task_title"}
if not task:
return {"found": False, "message": "Task not found"}
# Get completion log if exists
log_entry = db.query(LogEntry).filter(LogEntry.task_id == task.id).first()
return {
"found": True,
"task": {
"id": str(task.id),
"title": task.title,
"description": task.description,
"status": task.status.value,
"assigned_to": task.assigned_to,
"created_at": task.created_at.isoformat(),
"completed_at": task.completed_at.isoformat() if task.completed_at else None
},
"completion_details": {
"what_was_done": log_entry.raw_input,
"completed_by": str(log_entry.user_id) if log_entry.user_id else None,
"documentation": log_entry.generated_doc[:300] if log_entry.generated_doc else None
} if log_entry else None
}
def _tool_check_completion(db, project_id: str, args: dict) -> dict:
"""Check if task was completed, optionally by specific user."""
task_title = args["task_title"]
user_name = args.get("user_name")
user_id = None
# Find task
task = db.query(Task).filter(
Task.project_id == project_id,
Task.title.ilike(f"%{task_title}%")
).first()
if not task:
return {"found": False, "message": f"Task matching '{task_title}' not found"}
# Resolve user name using project-scoped resolver
if user_name:
resolution = _resolve_user_in_project(db, project_id, user_name)
if not resolution["found"]:
# Return disambiguation or not found directly
return resolution
user_id = resolution["user_id"]
# Check completion log
log_query = db.query(LogEntry).filter(LogEntry.task_id == task.id)
if user_id:
log_query = log_query.filter(LogEntry.user_id == user_id)
log_entry = log_query.first()
return {
"found": True,
"task_title": task.title,
"task_id": str(task.id),
"status": task.status.value,
"is_completed": task.status == TaskStatus.done,
"completed_by_specified_user": log_entry is not None if user_name else None,
"completion_details": {
"what_was_done": log_entry.raw_input,
"timestamp": log_entry.created_at.isoformat()
} if log_entry else None
}
async def _tool_semantic_search(project_id: str, args: dict) -> dict:
"""Semantic search using vector store."""
query_text = args["search_query"]
# Get embedding (reuse existing function from llm.py)
query_embedding = await get_embedding(query_text)
# Search vector store
results = search(
query_embedding=query_embedding,
project_id=project_id,
n_results=10
)
if not results:
return {"query": query_text, "count": 0, "results": []}
# Enrich with full log entries
db = SessionLocal()
try:
log_ids = [r["id"] for r in results]
entries = db.query(LogEntry).filter(LogEntry.id.in_(log_ids)).all()
entry_map = {str(e.id): e for e in entries}
enriched = []
for r in results:
entry = entry_map.get(r["id"])
enriched.append({
"id": r["id"],
"relevance_score": round(1 - r.get("distance", 0), 3),
"what_was_done": entry.raw_input if entry else r["metadata"].get("text", "")[:200],
"documentation": entry.generated_doc[:300] if entry and entry.generated_doc else None,
"tags": entry.tags if entry else [],
"timestamp": entry.created_at.isoformat() if entry else r["metadata"].get("created_at")
})
return {"query": query_text, "count": len(enriched), "results": enriched}
finally:
db.close()
def _tool_list_users(db, project_id: str) -> dict:
"""List all project members."""
memberships = db.query(ProjectMembership).filter(
ProjectMembership.project_id == project_id
).all()
users = []
for m in memberships:
user = db.query(User).filter(User.id == m.user_id).first()
if user:
users.append({
"id": str(user.id),
"name": user.name,
"first_name": user.first_name,
"last_name": user.last_name,
"role": m.role
})
return {"project_id": project_id, "count": len(users), "users": users}
def _tool_list_tasks(db, project_id: str, args: dict) -> dict:
"""List tasks in project, optionally filtered by status, who completed them, or date range."""
from sqlalchemy import desc, func
status_filter = args.get("status", "all")
completed_by_name = args.get("completed_by")
completed_after = args.get("completed_after")
completed_before = args.get("completed_before")
limit = args.get("limit", 20)
# Resolve completed_by user name/ID to user_id
completed_by_user_id = None
resolved_user_name = None
if completed_by_name:
resolution = _resolve_user_in_project(db, project_id, completed_by_name)
if not resolution["found"]:
return resolution # Return not_found or ambiguous response
completed_by_user_id = resolution["user_id"]
resolved_user_name = resolution["user_name"]
# Parse date filters
date_after = None
date_before = None
if completed_after:
try:
date_str = completed_after.replace('Z', '+00:00').replace('+00:00', '')
date_after = datetime.fromisoformat(date_str)
except ValueError:
date_after = datetime.strptime(date_str[:10], '%Y-%m-%d')
if completed_before:
try:
date_str = completed_before.replace('Z', '+00:00').replace('+00:00', '')
date_before = datetime.fromisoformat(date_str)
except ValueError:
date_before = datetime.strptime(date_str[:10], '%Y-%m-%d') + timedelta(days=1)
# Build base query - always filter by project_id
base_query = db.query(Task).filter(Task.project_id == project_id)
# Apply status filter
if status_filter and status_filter != "all":
try:
status_enum = TaskStatus(status_filter)
base_query = base_query.filter(Task.status == status_enum)
except ValueError:
return {"error": f"Invalid status: {status_filter}. Use: todo, in_progress, done, or all"}
# If filtering by completed_by or date range, find task_ids from LogEntry first
if completed_by_user_id or date_after or date_before:
# Build LogEntry query
log_query = db.query(LogEntry.task_id).filter(LogEntry.project_id == project_id)
if completed_by_user_id:
log_query = log_query.filter(LogEntry.user_id == completed_by_user_id)
if date_after:
log_query = log_query.filter(LogEntry.created_at >= date_after)
if date_before:
log_query = log_query.filter(LogEntry.created_at <= date_before)
# Get task_ids matching the filters
completed_task_ids = [log.task_id for log in log_query.all()]
if not completed_task_ids:
filter_desc = []
if resolved_user_name:
filter_desc.append(f"by {resolved_user_name}")
if date_after:
filter_desc.append(f"after {date_after.isoformat()}")
if date_before:
filter_desc.append(f"before {date_before.isoformat()}")
return {
"project_id": project_id,
"filter": {
"status": status_filter,
"completed_by": resolved_user_name,
"completed_after": completed_after,
"completed_before": completed_before
},
"total_count": 0,
"returned_count": 0,
"status_summary": {"todo": 0, "in_progress": 0, "done": 0},
"tasks": [],
"message": f"No tasks completed {' '.join(filter_desc)}"
}
base_query = base_query.filter(Task.id.in_(completed_task_ids))
# Get TOTAL count before applying limit
total_count = base_query.count()
# Now apply ordering and limit for the actual results
tasks = base_query.order_by(desc(Task.created_at)).limit(limit).all()
# Get status counts for ALL tasks in project (not filtered)
all_tasks = db.query(Task).filter(Task.project_id == project_id).all()
status_counts = {"todo": 0, "in_progress": 0, "done": 0}
for task in all_tasks:
status_counts[task.status.value] += 1
# Get completion info (who completed each task) from LogEntry
task_ids = [str(task.id) for task in tasks]
completion_logs = db.query(LogEntry).filter(LogEntry.task_id.in_(task_ids)).all()
completion_map = {} # task_id -> {user_id, user_name, what_was_done}
for log in completion_logs:
user = db.query(User).filter(User.id == log.user_id).first() if log.user_id else None
completion_map[str(log.task_id)] = {
"completed_by_id": str(log.user_id) if log.user_id else None,
"completed_by_name": user.name if user else None,
"what_was_done": log.raw_input[:100] if log.raw_input else None
}
return {
"project_id": project_id,
"filter": {
"status": status_filter,
"completed_by": resolved_user_name,
"completed_after": completed_after,
"completed_before": completed_before
},
"total_count": total_count, # Actual total matching the filter
"returned_count": len(tasks), # How many in this response (may be limited)
"status_summary": status_counts, # Counts for ALL tasks in project
"tasks": [
{
"id": str(task.id),
"title": task.title,
"description": task.description[:100] if task.description else None,
"status": task.status.value,
"assigned_to": task.assigned_to,
"created_at": task.created_at.isoformat() if task.created_at else None,
"completed_at": task.completed_at.isoformat() if task.completed_at else None,
# Include completion details if available
"completed_by": completion_map.get(str(task.id), {}).get("completed_by_name"),
"completed_by_id": completion_map.get(str(task.id), {}).get("completed_by_id"),
"what_was_done": completion_map.get(str(task.id), {}).get("what_was_done")
}
for task in tasks
]
}
def extract_sources(tool_results: list) -> list:
"""Extract source citations from tool results."""
sources = []
seen_ids = set()
for tr in tool_results:
result = tr.get("result", {})
# From activity queries
if "activities" in result:
for a in result["activities"][:5]:
if a.get("id") and a["id"] not in seen_ids:
sources.append({
"id": a["id"],
"type": "activity",
"summary": a.get("what_was_done", "")[:100],
"date": a.get("timestamp")
})
seen_ids.add(a["id"])
# From semantic search
if "results" in result:
for r in result["results"][:5]:
if r.get("id") and r["id"] not in seen_ids:
sources.append({
"id": r["id"],
"type": "memory",
"summary": r.get("what_was_done", "")[:100],
"date": r.get("timestamp"),
"relevance": r.get("relevance_score")
})
seen_ids.add(r["id"])
# From task queries (single task)
if "task" in result and result.get("found"):
task = result["task"]
sources.append({
"id": task["id"],
"type": "task",
"summary": task["title"],
"status": task["status"]
})
# From list_tasks queries (multiple tasks)
if "tasks" in result and isinstance(result["tasks"], list):
for task in result["tasks"][:5]:
if task.get("id") and task["id"] not in seen_ids:
sources.append({
"id": task["id"],
"type": "task",
"summary": task["title"],
"status": task.get("status")
})
seen_ids.add(task["id"])
return sources