""" Service layer for tasks operations. Handles business logic for fetching and managing tasks. """ import re import uuid from typing import List, Optional from datetime import date, datetime, time from app.core.logging import get_logger from app.postgres import get_postgres_connection, release_postgres_connection from app.tracker.tasks.schemas import TaskResponse, TaskStatus, UpdateTaskStatusRequest, AttachmentType from app.storage import get_storage_adapter logger = get_logger(__name__) class TaskService: """Service for task operations""" async def get_today_tasks(self, employee_id: str, merchant_id: str) -> List[TaskResponse]: """ Get all tasks assigned to an employee for today. Args: employee_id: Employee UUID merchant_id: Merchant UUID Returns: List of TaskResponse objects """ conn = None try: conn = await get_postgres_connection() # Get today's date range today = date.today() start_of_day = datetime.combine(today, time.min) end_of_day = datetime.combine(today, time.max) logger.info(f"Fetching today's tasks for employee {employee_id}", extra={ "employee_id": employee_id, "merchant_id": merchant_id, "date": str(today) }) # Query tasks for today query = """ SELECT id, title, description, status, latitude, longitude, address, scheduled_at FROM trans.scm_tasks WHERE assigned_to = $1::uuid AND merchant_id = $2::uuid AND scheduled_at >= $3 AND scheduled_at <= $4 ORDER BY scheduled_at ASC """ rows = await conn.fetch( query, employee_id, merchant_id, start_of_day, end_of_day ) # Convert to response objects tasks = [] for row in rows: task = TaskResponse( id=str(row['id']), title=row['title'], description=row['description'], status=row['status'], lat=row['latitude'], lon=row['longitude'], address=row['address'], scheduledAt=row['scheduled_at'].isoformat() if row['scheduled_at'] else None ) tasks.append(task) logger.info(f"Found {len(tasks)} tasks for today", extra={ "employee_id": employee_id, "task_count": len(tasks) }) return tasks except Exception as e: logger.error(f"Error fetching today's tasks: {e}", exc_info=e) raise finally: if conn: await release_postgres_connection(conn) async def update_task_status( self, task_id: str, employee_id: str, merchant_id: str, payload: UpdateTaskStatusRequest ) -> None: """ Update the status of a task with location and timestamp tracking. Args: task_id: Task UUID employee_id: Employee UUID (for authorization) merchant_id: Merchant UUID (for authorization) payload: Update request with status, timestamp, and location Raises: ValueError: If task not found, not assigned to employee, or invalid status transition Exception: For database errors """ conn = None try: conn = await get_postgres_connection() logger.info(f"Updating task status", extra={ "task_id": task_id, "employee_id": employee_id, "new_status": payload.status, "timestamp": payload.timestamp }) # First, verify the task exists and is assigned to this employee verify_query = """ SELECT id, status, assigned_to, merchant_id FROM trans.scm_tasks WHERE id = $1::uuid """ task = await conn.fetchrow(verify_query, task_id) if not task: logger.warning(f"Task not found: {task_id}") raise ValueError(f"Task with ID {task_id} not found") # Verify task is assigned to this employee if str(task['assigned_to']) != employee_id: logger.warning(f"Task not assigned to employee", extra={ "task_id": task_id, "employee_id": employee_id, "assigned_to": str(task['assigned_to']) }) raise ValueError("You are not authorized to update this task") # Verify merchant matches if str(task['merchant_id']) != merchant_id: logger.warning(f"Task merchant mismatch", extra={ "task_id": task_id, "merchant_id": merchant_id, "task_merchant_id": str(task['merchant_id']) }) raise ValueError("You are not authorized to update this task") current_status = task['status'] new_status = payload.status.value # Validate status transition self._validate_status_transition(current_status, new_status) # Build update query based on status if new_status == TaskStatus.IN_PROGRESS.value: # Update started_at timestamp update_query = """ UPDATE trans.scm_tasks SET status = $1, started_at = $2, latitude = $3, longitude = $4, updated_at = now() WHERE id = $5::uuid """ await conn.execute( update_query, new_status, payload.timestamp, payload.latitude, payload.longitude, task_id ) elif new_status == TaskStatus.COMPLETED.value: # Update completed_at timestamp update_query = """ UPDATE trans.scm_tasks SET status = $1, completed_at = $2, latitude = $3, longitude = $4, updated_at = now() WHERE id = $5::uuid """ await conn.execute( update_query, new_status, payload.timestamp, payload.latitude, payload.longitude, task_id ) else: # NOT_STARTED or any other status # Just update status and location update_query = """ UPDATE trans.scm_tasks SET status = $1, latitude = $2, longitude = $3, updated_at = now() WHERE id = $4::uuid """ await conn.execute( update_query, new_status, payload.latitude, payload.longitude, task_id ) logger.info(f"Task status updated successfully", extra={ "task_id": task_id, "old_status": current_status, "new_status": new_status, "employee_id": employee_id }) except ValueError: # Re-raise validation errors raise except Exception as e: logger.error(f"Error updating task status: {e}", exc_info=e) raise finally: if conn: await release_postgres_connection(conn) def _validate_status_transition(self, current_status: str, new_status: str) -> None: """ Validate that the status transition is allowed. Args: current_status: Current task status new_status: Requested new status Raises: ValueError: If the transition is not allowed """ # Define allowed transitions allowed_transitions = { TaskStatus.NOT_STARTED.value: [ TaskStatus.IN_PROGRESS.value, TaskStatus.COMPLETED.value # Allow direct completion ], TaskStatus.IN_PROGRESS.value: [ TaskStatus.COMPLETED.value, TaskStatus.NOT_STARTED.value # Allow reverting ], TaskStatus.COMPLETED.value: [ TaskStatus.IN_PROGRESS.value # Allow reopening ] } # Allow same status (idempotent updates) if current_status == new_status: return # Check if transition is allowed if new_status not in allowed_transitions.get(current_status, []): logger.warning(f"Invalid status transition", extra={ "current_status": current_status, "new_status": new_status }) raise ValueError( f"Cannot transition from '{current_status}' to '{new_status}'" ) async def upload_attachment( self, task_id: str, employee_id: str, merchant_id: str, file_data: bytes, file_name: str, mime_type: str, attachment_type: AttachmentType ) -> dict: """ Upload an attachment for a task. Args: task_id: Task UUID employee_id: Employee UUID (for authorization) merchant_id: Merchant UUID (for authorization) file_data: File content as bytes file_name: Original file name mime_type: MIME type of the file attachment_type: Type of attachment (photo or signature) Returns: Dictionary with attachment details (id, url, file_name) Raises: ValueError: If task not found, not assigned to employee, or invalid file Exception: For database or storage errors """ conn = None try: conn = await get_postgres_connection() logger.info(f"Uploading attachment for task", extra={ "task_id": task_id, "employee_id": employee_id, "attachment_type": attachment_type.value, "file_name": file_name, "file_size": len(file_data) }) # Verify the task exists and is assigned to this employee verify_query = """ SELECT id, assigned_to, merchant_id FROM trans.scm_tasks WHERE id = $1::uuid """ task = await conn.fetchrow(verify_query, task_id) if not task: logger.warning(f"Task not found: {task_id}") raise ValueError(f"Task with ID {task_id} not found") # Verify task is assigned to this employee if str(task['assigned_to']) != employee_id: logger.warning(f"Task not assigned to employee", extra={ "task_id": task_id, "employee_id": employee_id, "assigned_to": str(task['assigned_to']) }) raise ValueError("You are not authorized to upload attachments for this task") # Verify merchant matches if str(task['merchant_id']) != merchant_id: logger.warning(f"Task merchant mismatch", extra={ "task_id": task_id, "merchant_id": merchant_id, "task_merchant_id": str(task['merchant_id']) }) raise ValueError("You are not authorized to upload attachments for this task") # Validate file type (images only) if not mime_type.lower().startswith("image/"): raise ValueError(f"Only image files are allowed. Received: {mime_type}") # Validate file size (max 10MB) from app.core.config import settings max_size = settings.STORAGE_MAX_IMAGE_MB * 1024 * 1024 if len(file_data) > max_size: raise ValueError(f"File size exceeds maximum allowed size of {settings.STORAGE_MAX_IMAGE_MB}MB") # Validate MIME type against allowed types allowed_mimes = [m.lower() for m in settings.STORAGE_ALLOWED_IMAGE_MIME] if mime_type.lower() not in allowed_mimes: raise ValueError(f"File type {mime_type} is not allowed. Allowed types: {', '.join(allowed_mimes)}") # Generate unique object key attachment_id = str(uuid.uuid4()) safe_filename = self._sanitize_filename(file_name) object_key = f"{employee_id}/tasks/{task_id}/{attachment_type.value}_{attachment_id}_{safe_filename}" # Upload to MinIO storage = get_storage_adapter() await storage.upload_file(object_key, file_data, mime_type) # Get public URL file_url = await storage.get_public_url(object_key) # Save attachment record to database insert_query = """ INSERT INTO trans.scm_task_attachments ( id, task_id, attachment_type, file_url, file_name, mime_type, file_size, uploaded_by, uploaded_at ) VALUES ( $1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8::uuid, now() ) RETURNING id, uploaded_at """ result = await conn.fetchrow( insert_query, attachment_id, task_id, attachment_type.value, file_url, file_name, mime_type, len(file_data), employee_id ) logger.info(f"Attachment uploaded successfully", extra={ "attachment_id": attachment_id, "task_id": task_id, "file_url": file_url }) return { "id": str(result['id']), "url": file_url, "file_name": file_name, "uploaded_at": result['uploaded_at'].isoformat() } except ValueError: # Re-raise validation errors raise except Exception as e: logger.error(f"Error uploading attachment: {e}", exc_info=e) raise finally: if conn: await release_postgres_connection(conn) @staticmethod def _sanitize_filename(filename: str) -> str: """ Sanitize filename to remove unsafe characters. Args: filename: Original filename Returns: Sanitized filename """ # Remove path separators name = filename.replace("\\", "/").split("/")[-1] # Remove unsafe characters name = re.sub(r'[^A-Za-z0-9._-]', '_', name) # Limit length return name[:100] def get_task_service() -> TaskService: """Factory function to create TaskService instance""" return TaskService()