Michael-Antony's picture
feat: implement tasks API with GET today and PATCH status endpoints
32eb084
"""
Service layer for tasks operations.
Handles business logic for fetching and managing tasks.
"""
from typing import List, Optional
from datetime import date, datetime, time
from app.core.logging import get_logger
from app.postgres import get_postgres_connection, release_postgres_connection
from app.tracker.tasks.schemas import TaskResponse, TaskStatus, UpdateTaskStatusRequest
logger = get_logger(__name__)
class TaskService:
"""Service for task operations"""
async def get_today_tasks(self, employee_id: str, merchant_id: str) -> List[TaskResponse]:
"""
Get all tasks assigned to an employee for today.
Args:
employee_id: Employee UUID
merchant_id: Merchant UUID
Returns:
List of TaskResponse objects
"""
conn = None
try:
conn = await get_postgres_connection()
# Get today's date range
today = date.today()
start_of_day = datetime.combine(today, time.min)
end_of_day = datetime.combine(today, time.max)
logger.info(f"Fetching today's tasks for employee {employee_id}", extra={
"employee_id": employee_id,
"merchant_id": merchant_id,
"date": str(today)
})
# Query tasks for today
query = """
SELECT
id,
title,
description,
status,
latitude,
longitude,
address,
scheduled_at
FROM trans.scm_tasks
WHERE assigned_to = $1::uuid
AND merchant_id = $2::uuid
AND scheduled_at >= $3
AND scheduled_at <= $4
ORDER BY scheduled_at ASC
"""
rows = await conn.fetch(
query,
employee_id,
merchant_id,
start_of_day,
end_of_day
)
# Convert to response objects
tasks = []
for row in rows:
task = TaskResponse(
id=str(row['id']),
title=row['title'],
description=row['description'],
status=row['status'],
lat=row['latitude'],
lon=row['longitude'],
address=row['address'],
scheduledAt=row['scheduled_at'].isoformat() if row['scheduled_at'] else None
)
tasks.append(task)
logger.info(f"Found {len(tasks)} tasks for today", extra={
"employee_id": employee_id,
"task_count": len(tasks)
})
return tasks
except Exception as e:
logger.error(f"Error fetching today's tasks: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
async def update_task_status(
self,
task_id: str,
employee_id: str,
merchant_id: str,
payload: UpdateTaskStatusRequest
) -> None:
"""
Update the status of a task with location and timestamp tracking.
Args:
task_id: Task UUID
employee_id: Employee UUID (for authorization)
merchant_id: Merchant UUID (for authorization)
payload: Update request with status, timestamp, and location
Raises:
ValueError: If task not found, not assigned to employee, or invalid status transition
Exception: For database errors
"""
conn = None
try:
conn = await get_postgres_connection()
logger.info(f"Updating task status", extra={
"task_id": task_id,
"employee_id": employee_id,
"new_status": payload.status,
"timestamp": payload.timestamp
})
# First, verify the task exists and is assigned to this employee
verify_query = """
SELECT id, status, assigned_to, merchant_id
FROM trans.scm_tasks
WHERE id = $1::uuid
"""
task = await conn.fetchrow(verify_query, task_id)
if not task:
logger.warning(f"Task not found: {task_id}")
raise ValueError(f"Task with ID {task_id} not found")
# Verify task is assigned to this employee
if str(task['assigned_to']) != employee_id:
logger.warning(f"Task not assigned to employee", extra={
"task_id": task_id,
"employee_id": employee_id,
"assigned_to": str(task['assigned_to'])
})
raise ValueError("You are not authorized to update this task")
# Verify merchant matches
if str(task['merchant_id']) != merchant_id:
logger.warning(f"Task merchant mismatch", extra={
"task_id": task_id,
"merchant_id": merchant_id,
"task_merchant_id": str(task['merchant_id'])
})
raise ValueError("You are not authorized to update this task")
current_status = task['status']
new_status = payload.status.value
# Validate status transition
self._validate_status_transition(current_status, new_status)
# Build update query based on status
if new_status == TaskStatus.IN_PROGRESS.value:
# Update started_at timestamp
update_query = """
UPDATE trans.scm_tasks
SET
status = $1,
started_at = $2,
latitude = $3,
longitude = $4,
updated_at = now()
WHERE id = $5::uuid
"""
await conn.execute(
update_query,
new_status,
payload.timestamp,
payload.latitude,
payload.longitude,
task_id
)
elif new_status == TaskStatus.COMPLETED.value:
# Update completed_at timestamp
update_query = """
UPDATE trans.scm_tasks
SET
status = $1,
completed_at = $2,
latitude = $3,
longitude = $4,
updated_at = now()
WHERE id = $5::uuid
"""
await conn.execute(
update_query,
new_status,
payload.timestamp,
payload.latitude,
payload.longitude,
task_id
)
else: # NOT_STARTED or any other status
# Just update status and location
update_query = """
UPDATE trans.scm_tasks
SET
status = $1,
latitude = $2,
longitude = $3,
updated_at = now()
WHERE id = $4::uuid
"""
await conn.execute(
update_query,
new_status,
payload.latitude,
payload.longitude,
task_id
)
logger.info(f"Task status updated successfully", extra={
"task_id": task_id,
"old_status": current_status,
"new_status": new_status,
"employee_id": employee_id
})
except ValueError:
# Re-raise validation errors
raise
except Exception as e:
logger.error(f"Error updating task status: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
def _validate_status_transition(self, current_status: str, new_status: str) -> None:
"""
Validate that the status transition is allowed.
Args:
current_status: Current task status
new_status: Requested new status
Raises:
ValueError: If the transition is not allowed
"""
# Define allowed transitions
allowed_transitions = {
TaskStatus.NOT_STARTED.value: [
TaskStatus.IN_PROGRESS.value,
TaskStatus.COMPLETED.value # Allow direct completion
],
TaskStatus.IN_PROGRESS.value: [
TaskStatus.COMPLETED.value,
TaskStatus.NOT_STARTED.value # Allow reverting
],
TaskStatus.COMPLETED.value: [
TaskStatus.IN_PROGRESS.value # Allow reopening
]
}
# Allow same status (idempotent updates)
if current_status == new_status:
return
# Check if transition is allowed
if new_status not in allowed_transitions.get(current_status, []):
logger.warning(f"Invalid status transition", extra={
"current_status": current_status,
"new_status": new_status
})
raise ValueError(
f"Cannot transition from '{current_status}' to '{new_status}'"
)
def get_task_service() -> TaskService:
"""Factory function to create TaskService instance"""
return TaskService()