Spaces:
Sleeping
Sleeping
| """ | |
| Service layer for tasks operations. | |
| Handles business logic for fetching and managing tasks. | |
| """ | |
| from typing import List, Optional | |
| from datetime import date, datetime, time | |
| from app.core.logging import get_logger | |
| from app.postgres import get_postgres_connection, release_postgres_connection | |
| from app.tracker.tasks.schemas import TaskResponse, TaskStatus, UpdateTaskStatusRequest | |
| logger = get_logger(__name__) | |
| class TaskService: | |
| """Service for task operations""" | |
| async def get_today_tasks(self, employee_id: str, merchant_id: str) -> List[TaskResponse]: | |
| """ | |
| Get all tasks assigned to an employee for today. | |
| Args: | |
| employee_id: Employee UUID | |
| merchant_id: Merchant UUID | |
| Returns: | |
| List of TaskResponse objects | |
| """ | |
| conn = None | |
| try: | |
| conn = await get_postgres_connection() | |
| # Get today's date range | |
| today = date.today() | |
| start_of_day = datetime.combine(today, time.min) | |
| end_of_day = datetime.combine(today, time.max) | |
| logger.info(f"Fetching today's tasks for employee {employee_id}", extra={ | |
| "employee_id": employee_id, | |
| "merchant_id": merchant_id, | |
| "date": str(today) | |
| }) | |
| # Query tasks for today | |
| query = """ | |
| SELECT | |
| id, | |
| title, | |
| description, | |
| status, | |
| latitude, | |
| longitude, | |
| address, | |
| scheduled_at | |
| FROM trans.scm_tasks | |
| WHERE assigned_to = $1::uuid | |
| AND merchant_id = $2::uuid | |
| AND scheduled_at >= $3 | |
| AND scheduled_at <= $4 | |
| ORDER BY scheduled_at ASC | |
| """ | |
| rows = await conn.fetch( | |
| query, | |
| employee_id, | |
| merchant_id, | |
| start_of_day, | |
| end_of_day | |
| ) | |
| # Convert to response objects | |
| tasks = [] | |
| for row in rows: | |
| task = TaskResponse( | |
| id=str(row['id']), | |
| title=row['title'], | |
| description=row['description'], | |
| status=row['status'], | |
| lat=row['latitude'], | |
| lon=row['longitude'], | |
| address=row['address'], | |
| scheduledAt=row['scheduled_at'].isoformat() if row['scheduled_at'] else None | |
| ) | |
| tasks.append(task) | |
| logger.info(f"Found {len(tasks)} tasks for today", extra={ | |
| "employee_id": employee_id, | |
| "task_count": len(tasks) | |
| }) | |
| return tasks | |
| except Exception as e: | |
| logger.error(f"Error fetching today's tasks: {e}", exc_info=e) | |
| raise | |
| finally: | |
| if conn: | |
| await release_postgres_connection(conn) | |
| async def update_task_status( | |
| self, | |
| task_id: str, | |
| employee_id: str, | |
| merchant_id: str, | |
| payload: UpdateTaskStatusRequest | |
| ) -> None: | |
| """ | |
| Update the status of a task with location and timestamp tracking. | |
| Args: | |
| task_id: Task UUID | |
| employee_id: Employee UUID (for authorization) | |
| merchant_id: Merchant UUID (for authorization) | |
| payload: Update request with status, timestamp, and location | |
| Raises: | |
| ValueError: If task not found, not assigned to employee, or invalid status transition | |
| Exception: For database errors | |
| """ | |
| conn = None | |
| try: | |
| conn = await get_postgres_connection() | |
| logger.info(f"Updating task status", extra={ | |
| "task_id": task_id, | |
| "employee_id": employee_id, | |
| "new_status": payload.status, | |
| "timestamp": payload.timestamp | |
| }) | |
| # First, verify the task exists and is assigned to this employee | |
| verify_query = """ | |
| SELECT id, status, assigned_to, merchant_id | |
| FROM trans.scm_tasks | |
| WHERE id = $1::uuid | |
| """ | |
| task = await conn.fetchrow(verify_query, task_id) | |
| if not task: | |
| logger.warning(f"Task not found: {task_id}") | |
| raise ValueError(f"Task with ID {task_id} not found") | |
| # Verify task is assigned to this employee | |
| if str(task['assigned_to']) != employee_id: | |
| logger.warning(f"Task not assigned to employee", extra={ | |
| "task_id": task_id, | |
| "employee_id": employee_id, | |
| "assigned_to": str(task['assigned_to']) | |
| }) | |
| raise ValueError("You are not authorized to update this task") | |
| # Verify merchant matches | |
| if str(task['merchant_id']) != merchant_id: | |
| logger.warning(f"Task merchant mismatch", extra={ | |
| "task_id": task_id, | |
| "merchant_id": merchant_id, | |
| "task_merchant_id": str(task['merchant_id']) | |
| }) | |
| raise ValueError("You are not authorized to update this task") | |
| current_status = task['status'] | |
| new_status = payload.status.value | |
| # Validate status transition | |
| self._validate_status_transition(current_status, new_status) | |
| # Build update query based on status | |
| if new_status == TaskStatus.IN_PROGRESS.value: | |
| # Update started_at timestamp | |
| update_query = """ | |
| UPDATE trans.scm_tasks | |
| SET | |
| status = $1, | |
| started_at = $2, | |
| latitude = $3, | |
| longitude = $4, | |
| updated_at = now() | |
| WHERE id = $5::uuid | |
| """ | |
| await conn.execute( | |
| update_query, | |
| new_status, | |
| payload.timestamp, | |
| payload.latitude, | |
| payload.longitude, | |
| task_id | |
| ) | |
| elif new_status == TaskStatus.COMPLETED.value: | |
| # Update completed_at timestamp | |
| update_query = """ | |
| UPDATE trans.scm_tasks | |
| SET | |
| status = $1, | |
| completed_at = $2, | |
| latitude = $3, | |
| longitude = $4, | |
| updated_at = now() | |
| WHERE id = $5::uuid | |
| """ | |
| await conn.execute( | |
| update_query, | |
| new_status, | |
| payload.timestamp, | |
| payload.latitude, | |
| payload.longitude, | |
| task_id | |
| ) | |
| else: # NOT_STARTED or any other status | |
| # Just update status and location | |
| update_query = """ | |
| UPDATE trans.scm_tasks | |
| SET | |
| status = $1, | |
| latitude = $2, | |
| longitude = $3, | |
| updated_at = now() | |
| WHERE id = $4::uuid | |
| """ | |
| await conn.execute( | |
| update_query, | |
| new_status, | |
| payload.latitude, | |
| payload.longitude, | |
| task_id | |
| ) | |
| logger.info(f"Task status updated successfully", extra={ | |
| "task_id": task_id, | |
| "old_status": current_status, | |
| "new_status": new_status, | |
| "employee_id": employee_id | |
| }) | |
| except ValueError: | |
| # Re-raise validation errors | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error updating task status: {e}", exc_info=e) | |
| raise | |
| finally: | |
| if conn: | |
| await release_postgres_connection(conn) | |
| def _validate_status_transition(self, current_status: str, new_status: str) -> None: | |
| """ | |
| Validate that the status transition is allowed. | |
| Args: | |
| current_status: Current task status | |
| new_status: Requested new status | |
| Raises: | |
| ValueError: If the transition is not allowed | |
| """ | |
| # Define allowed transitions | |
| allowed_transitions = { | |
| TaskStatus.NOT_STARTED.value: [ | |
| TaskStatus.IN_PROGRESS.value, | |
| TaskStatus.COMPLETED.value # Allow direct completion | |
| ], | |
| TaskStatus.IN_PROGRESS.value: [ | |
| TaskStatus.COMPLETED.value, | |
| TaskStatus.NOT_STARTED.value # Allow reverting | |
| ], | |
| TaskStatus.COMPLETED.value: [ | |
| TaskStatus.IN_PROGRESS.value # Allow reopening | |
| ] | |
| } | |
| # Allow same status (idempotent updates) | |
| if current_status == new_status: | |
| return | |
| # Check if transition is allowed | |
| if new_status not in allowed_transitions.get(current_status, []): | |
| logger.warning(f"Invalid status transition", extra={ | |
| "current_status": current_status, | |
| "new_status": new_status | |
| }) | |
| raise ValueError( | |
| f"Cannot transition from '{current_status}' to '{new_status}'" | |
| ) | |
| def get_task_service() -> TaskService: | |
| """Factory function to create TaskService instance""" | |
| return TaskService() | |