swiftops-backend / src /app /services /ticket_assignment_service.py
kamau1's picture
Trigger real-time timesheet updates on journey start and arrival actions
c001b0a
"""
Ticket Assignment Service
Business logic for ticket assignments:
- Assign tickets to agents (individual and team)
- Agent actions (accept, reject, drop, complete)
- GPS tracking (journey + arrival)
- Queue management (reorder execution)
- Capacity limits (max 4 tickets per agent)
- Team coordination (multiple active assignments)
"""
from datetime import datetime
from typing import List, Optional, Tuple
from uuid import UUID
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, func, desc
from fastapi import HTTPException, status
import logging
logger = logging.getLogger(__name__)
from app.models.ticket import Ticket
from app.models.ticket_assignment import TicketAssignment
from app.models.incident import Incident
from app.models.user import User
from app.models.project_team import ProjectTeam
from app.models.enums import TicketStatus, TicketSource, AppRole, AssignmentAction
from app.schemas.ticket_assignment import (
TicketAssignCreate,
TicketAssignTeamCreate,
TicketSelfAssign,
AssignmentAccept,
AssignmentReject,
AssignmentStartJourney,
AssignmentUpdateLocation,
AssignmentArrived,
AssignmentCustomerUnavailable,
AssignmentDrop,
AssignmentComplete,
AssignmentReorder,
TicketAssignmentResponse,
AssignmentQueueResponse,
TeamAssignmentResult,
AssignmentStats,
AvailableTicketsResponse,
AvailableTicketInfo,
)
class TicketAssignmentService:
"""Service for managing ticket assignments"""
MAX_AGENT_CAPACITY = 4 # Maximum active tickets per agent (minimum 3 per day + 1 for ambitious agents)
def __init__(self, db: Session):
self.db = db
# ============================================
# ASSIGNMENT CREATION
# ============================================
def assign_ticket(
self,
ticket_id: UUID,
user_id: UUID,
assigned_by_user_id: UUID,
data: TicketAssignCreate
) -> TicketAssignmentResponse:
"""
Assign ticket to single agent.
Validates:
- Ticket exists and is assignable
- Agent is on project team
- Agent has capacity (< 4 active tickets)
- No duplicate active assignment
"""
# Validate ticket
ticket = self._get_ticket_or_404(ticket_id)
self._validate_ticket_assignable(ticket)
# Validate agent
agent = self._get_user_or_404(user_id)
self._validate_agent_on_project(agent.id, ticket.project_id)
# Check agent capacity
self._validate_agent_capacity(agent.id)
# Check for duplicate assignment
existing = self._get_active_assignment(ticket_id, user_id)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Agent {agent.full_name} is already assigned to this ticket"
)
# Create assignment
assignment = TicketAssignment(
ticket_id=ticket_id,
user_id=user_id,
assigned_by_user_id=assigned_by_user_id,
action="assigned",
is_self_assigned=False,
execution_order=data.execution_order,
planned_start_time=data.planned_start_time,
notes=data.notes,
assigned_at=datetime.utcnow(),
)
self.db.add(assignment)
self.db.flush() # Flush to get assignment.id before creating status history
# Update ticket status
old_status = ticket.status
if ticket.status == TicketStatus.OPEN.value:
ticket.status = TicketStatus.ASSIGNED.value
# Create status history entry with assignment_id
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.ASSIGNED.value,
changed_by_user_id=assigned_by_user_id,
assignment_id=assignment.id,
reason=f"Assigned to {agent.full_name}",
location_lat=data.location.latitude if data.location else None,
location_lng=data.location.longitude if data.location else None
)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='assignment_created',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for assignment {assignment.id}: {e}", exc_info=True)
# Send notification to agent (non-blocking)
try:
logger.info(f"Ticket {ticket.id} assigned to {agent.full_name} - notification queued")
except Exception as e:
logger.warning(f"Failed to queue assignment notification: {str(e)}")
return self._to_response(assignment)
def assign_team(
self,
ticket_id: UUID,
assigned_by_user_id: UUID,
data: TicketAssignTeamCreate
) -> TeamAssignmentResult:
"""
Assign ticket to team (multiple agents).
Creates multiple assignment rows (one per agent).
Updates ticket.required_team_size.
"""
# Validate ticket
ticket = self._get_ticket_or_404(ticket_id)
self._validate_ticket_assignable(ticket)
# Validate all agents
agents = []
for user_id in data.user_ids:
agent = self._get_user_or_404(user_id)
self._validate_agent_on_project(agent.id, ticket.project_id)
self._validate_agent_capacity(agent.id)
# Check duplicate
existing = self._get_active_assignment(ticket_id, agent.id)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Agent {agent.full_name} is already assigned to this ticket"
)
agents.append(agent)
# Set required team size
required_size = data.required_team_size or len(data.user_ids)
ticket.required_team_size = required_size
# Create assignments
assignments = []
for agent in agents:
assignment = TicketAssignment(
ticket_id=ticket_id,
user_id=agent.id,
assigned_by_user_id=assigned_by_user_id,
action="assigned",
is_self_assigned=False,
notes=data.notes,
assigned_at=datetime.utcnow(),
)
self.db.add(assignment)
assignments.append(assignment)
# Update ticket status
if ticket.status == TicketStatus.OPEN.value:
ticket.status = TicketStatus.ASSIGNED.value
self.db.commit()
# Refresh assignments
for assignment in assignments:
self.db.refresh(assignment)
# Send notifications to all team members (non-blocking)
try:
logger.info(f"Ticket {ticket.id} assigned to team of {len(agents)} - notifications queued")
except Exception as e:
logger.warning(f"Failed to queue team assignment notifications: {str(e)}")
return TeamAssignmentResult(
ticket_id=ticket_id,
required_team_size=ticket.required_team_size,
assigned_team_size=ticket.assigned_team_size,
is_fully_assigned=ticket.is_fully_assigned,
assignments=[self._to_response(a) for a in assignments]
)
# ============================================
# SELF-ASSIGNMENT (Agent picks their own work)
# ============================================
def get_available_tickets_for_agent(self, user_id: UUID) -> AvailableTicketsResponse:
"""
Get tickets available for agent to self-assign.
Returns tickets that are:
- In agent's region (based on project_team.project_region_id)
- Status = OPEN (not assigned, in_progress, completed, or cancelled)
- From projects agent is on
This creates a "picking basket" for agents.
"""
# Get agent info
agent = self._get_user_or_404(user_id)
# Check agent capacity
active_count = self.db.query(func.count(TicketAssignment.id)).filter(
TicketAssignment.user_id == user_id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.deleted_at.is_(None)
).scalar()
remaining_capacity = self.MAX_AGENT_CAPACITY - active_count
# Get agent's regional assignments across all projects
agent_project_teams = self.db.query(ProjectTeam).filter(
ProjectTeam.user_id == user_id,
ProjectTeam.removed_at.is_(None),
ProjectTeam.deleted_at.is_(None)
).all()
if not agent_project_teams:
# Agent not on any project
return AvailableTicketsResponse(
agent_id=user_id,
agent_region_id=None,
current_active_count=active_count,
max_capacity=self.MAX_AGENT_CAPACITY,
remaining_capacity=remaining_capacity,
available_tickets=[],
total_available=0
)
# Build query for available tickets
# Get all project IDs agent is on
project_ids = [pt.project_id for pt in agent_project_teams]
# Get region IDs agent can work in (NULL means all regions in that project)
agent_regions = {} # project_id -> region_id (or None for all regions)
for pt in agent_project_teams:
agent_regions[pt.project_id] = pt.project_region_id
# Query tickets
query = self.db.query(Ticket).filter(
Ticket.project_id.in_(project_ids),
Ticket.status == TicketStatus.OPEN.value,
Ticket.deleted_at.is_(None)
)
# Apply region filters
region_conditions = []
for project_id, region_id in agent_regions.items():
if region_id is None:
# Agent can work all regions in this project
region_conditions.append(Ticket.project_id == project_id)
else:
# Agent limited to specific region
region_conditions.append(
and_(
Ticket.project_id == project_id,
Ticket.project_region_id == region_id
)
)
if region_conditions:
query = query.filter(or_(*region_conditions))
# Order by priority and due date
query = query.order_by(
Ticket.priority.desc(), # High priority first
Ticket.due_date.asc().nullslast(), # Urgent first
Ticket.created_at.asc() # Oldest first
)
tickets = query.all()
# Convert to response
available_ticket_info = [
AvailableTicketInfo(
id=t.id,
ticket_name=t.ticket_name,
ticket_type=t.ticket_type,
service_type=t.service_type,
priority=t.priority,
status=t.status,
work_description=t.work_description,
scheduled_date=t.scheduled_date,
scheduled_time_slot=t.scheduled_time_slot,
due_date=t.due_date,
sla_violated=t.sla_violated,
project_region_id=t.project_region_id,
created_at=t.created_at
)
for t in tickets
]
# Determine agent's primary region (for display)
primary_region_id = None
if agent_project_teams:
primary_region_id = agent_project_teams[0].project_region_id
return AvailableTicketsResponse(
agent_id=user_id,
agent_region_id=primary_region_id,
current_active_count=active_count,
max_capacity=self.MAX_AGENT_CAPACITY,
remaining_capacity=remaining_capacity,
available_tickets=available_ticket_info,
total_available=len(available_ticket_info)
)
def self_assign_ticket(
self,
ticket_id: UUID,
user_id: UUID,
data: TicketSelfAssign
) -> TicketAssignmentResponse:
"""
Agent self-assigns ticket from available pool.
Validates:
- Ticket is in OPEN status (not already assigned)
- Agent is on project team
- Agent's region matches ticket region
- Agent has capacity (< 4 active tickets)
- No duplicate active assignment
"""
# Validate ticket
ticket = self._get_ticket_or_404(ticket_id)
# Check ticket is OPEN (not already assigned)
if ticket.status != TicketStatus.OPEN.value:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Ticket is not available for self-assignment (status: {ticket.status})"
)
# Validate agent
agent = self._get_user_or_404(user_id)
self._validate_agent_on_project(agent.id, ticket.project_id)
# Note: No region validation - agents can work across regions
# Region is for filtering/preference, not access control
# Check agent capacity
self._validate_agent_capacity(agent.id)
# Check for duplicate assignment
existing = self._get_active_assignment(ticket_id, user_id)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"You are already assigned to this ticket"
)
# Create self-assignment (auto-accepted since agent chose it)
assignment = TicketAssignment(
ticket_id=ticket_id,
user_id=user_id,
assigned_by_user_id=user_id, # Self-assigned
action=AssignmentAction.ACCEPTED.value, # Auto-accepted
is_self_assigned=True,
execution_order=data.execution_order,
planned_start_time=data.planned_start_time,
notes=data.notes,
assigned_at=datetime.utcnow(),
responded_at=datetime.utcnow(), # Auto-accepted
)
self.db.add(assignment)
self.db.flush() # Flush to get assignment.id before creating status history
# Update ticket status
old_status = ticket.status
ticket.status = TicketStatus.ASSIGNED.value
# Create status history entry with assignment_id
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.ASSIGNED.value,
changed_by_user_id=user_id,
assignment_id=assignment.id, # Now has ID from flush
reason="Self-assigned by agent",
location_lat=data.location.latitude if data.location else None,
location_lng=data.location.longitude if data.location else None
)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='self_assignment_created',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for self-assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for self-assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for self-assignment {assignment.id}: {e}", exc_info=True)
# Notify PM/Dispatcher about self-assignment
# Note: Notifications are non-blocking and failures don't affect the operation
try:
from app.services.notification_helper import NotificationHelper
notify_users = NotificationHelper.get_project_managers_and_dispatchers(self.db, ticket.project_id)
if notify_users:
# Use background task or queue for async notifications
# For now, we'll skip to avoid blocking the request
logger.info(f"Ticket {ticket.id} self-assigned by {agent.full_name} - notification queued")
except Exception as e:
logger.warning(f"Failed to queue self-assignment notification: {str(e)}")
return self._to_response(assignment)
# ============================================
# AGENT ACTIONS
# ============================================
def accept_assignment(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentAccept
) -> TicketAssignmentResponse:
"""Agent accepts assignment"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
if assignment.responded_at:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Assignment already responded to"
)
assignment.mark_accepted(data.notes)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=assignment.ticket.project_id,
work_date=date.today(),
event_type='assignment_accepted',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for accepted assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for accepted assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for accepted assignment {assignment.id}: {e}", exc_info=True)
return self._to_response(assignment)
def reject_assignment(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentReject
) -> TicketAssignmentResponse:
"""
Agent rejects assignment.
Closes assignment immediately.
Ticket status reverts to OPEN if no other active assignments.
"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
if assignment.responded_at:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Assignment already responded to"
)
assignment.mark_rejected(data.reason)
# Check if ticket has other active assignments
ticket = assignment.ticket
active_assignments = self._count_active_assignments(ticket.id, exclude_assignment_id=assignment.id)
if active_assignments == 0:
# No other active assignments - revert to OPEN
old_status = ticket.status
ticket.status = TicketStatus.OPEN.value
# Create status history for reversion to OPEN
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.OPEN.value,
changed_by_user_id=user_id,
assignment_id=assignment.id,
reason="Assignment rejected - ticket reverted to open"
)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='assignment_rejected',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for rejected assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for rejected assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for rejected assignment {assignment.id}: {e}", exc_info=True)
# Notify dispatcher/PM about rejection (non-blocking)
try:
logger.info(f"Assignment {assignment.id} rejected - notification queued")
except Exception as e:
logger.warning(f"Failed to queue rejection notification: {str(e)}")
return self._to_response(assignment)
def start_journey(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentStartJourney
) -> TicketAssignmentResponse:
"""
Agent starts journey to site.
Updates ticket status to IN_PROGRESS.
Begins GPS tracking.
"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
if not assignment.responded_at:
# Auto-accept if not yet responded
assignment.mark_accepted("Auto-accepted when journey started")
if assignment.journey_started_at:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Journey already started"
)
assignment.start_journey(
latitude=data.location.latitude,
longitude=data.location.longitude
)
# Update ticket status and set started_at
ticket = assignment.ticket
if ticket.status != TicketStatus.IN_PROGRESS.value:
old_status = ticket.status
ticket.status = TicketStatus.IN_PROGRESS.value
# Set started_at timestamp if not already set
if not ticket.started_at:
ticket.started_at = datetime.utcnow()
# Create status history
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.IN_PROGRESS.value,
changed_by_user_id=user_id,
assignment_id=assignment.id,
reason="Agent started journey to site",
location_lat=data.location.latitude,
location_lng=data.location.longitude
)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='journey_started',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for journey start {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for journey start {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for journey start {assignment.id}: {e}", exc_info=True)
return self._to_response(assignment)
def update_location(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentUpdateLocation
) -> TicketAssignmentResponse:
"""
Update agent location during journey (GPS breadcrumb).
"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
if not assignment.journey_started_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Journey not started. Call /start-journey first."
)
if assignment.arrived_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Already arrived. No more location updates needed."
)
assignment.add_location_breadcrumb(
latitude=data.location.latitude,
longitude=data.location.longitude,
accuracy=data.location.accuracy,
speed=data.location.speed,
battery=data.location.battery,
network=data.location.network,
)
# CRITICAL: Flag JSONB column as modified so SQLAlchemy detects the change
from sqlalchemy.orm.attributes import flag_modified
flag_modified(assignment, 'journey_location_history')
self.db.commit()
self.db.refresh(assignment)
return self._to_response(assignment)
def mark_arrived(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentArrived
) -> TicketAssignmentResponse:
"""
Agent arrives at site.
No ticket status change (stays IN_PROGRESS).
Arrival verification is manual (no distance check).
"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
if not assignment.journey_started_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Journey not started. Call /start-journey first."
)
if assignment.arrived_at:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Already marked as arrived"
)
assignment.mark_arrived(
latitude=data.location.latitude,
longitude=data.location.longitude
)
# Note: arrival_verified is set manually by admin (no auto verification)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=assignment.ticket.project_id,
work_date=date.today(),
event_type='arrived_at_site',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for arrival {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for arrival {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for arrival {assignment.id}: {e}", exc_info=True)
return self._to_response(assignment)
def mark_customer_unavailable(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentCustomerUnavailable
) -> TicketAssignmentResponse:
"""
Customer not available at site.
Agent chooses:
- drop: Close assignment (ended_at set)
- keep: Keep assignment active (try again later same day)
Ticket status reverts to ASSIGNED if assignment dropped.
"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
# Auto-arrive if not already arrived
if not assignment.arrived_at and assignment.journey_started_at:
# Use last breadcrumb or journey start location
if assignment.journey_location_history:
last_point = assignment.journey_location_history[-1]
assignment.mark_arrived(
latitude=last_point["lat"],
longitude=last_point["lng"]
)
elif assignment.journey_start_latitude and assignment.journey_start_longitude:
assignment.mark_arrived(
latitude=float(assignment.journey_start_latitude),
longitude=float(assignment.journey_start_longitude)
)
if data.action == "drop":
# Drop assignment
assignment.mark_dropped(data.reason)
# Check if ticket has other active assignments
ticket = assignment.ticket
active_assignments = self._count_active_assignments(ticket.id, exclude_assignment_id=assignment.id)
if active_assignments == 0:
# No other active assignments - revert to ASSIGNED
old_status = ticket.status
ticket.status = TicketStatus.ASSIGNED.value
# Create status history
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.ASSIGNED.value,
changed_by_user_id=user_id,
assignment_id=assignment.id,
reason=f"Customer unavailable - {data.reason}"
)
# Notify dispatcher/PM about customer unavailability (non-blocking)
try:
logger.info(f"Customer unavailable for ticket {ticket.id} - notification queued")
except Exception as e:
logger.warning(f"Failed to queue customer unavailable notification: {str(e)}")
else: # keep
# Keep assignment active, just add note
assignment.notes = f"[Customer Unavailable] {data.reason}\n{assignment.notes or ''}"
self.db.commit()
self.db.refresh(assignment)
return self._to_response(assignment)
def drop_assignment(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentDrop
) -> TicketAssignmentResponse:
"""
Agent drops ticket (can't complete).
Closes assignment and sets action to DROPPED.
Ticket status goes to PENDING_REVIEW for PM/dispatcher to handle.
For team tickets: drops ALL active assignments (team rule).
"""
from app.models.enums import AssignmentAction
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
# Mark assignment as dropped
assignment.mark_dropped(f"[{data.drop_type}] {data.reason}")
assignment.action = AssignmentAction.DROPPED.value
# Get ticket
ticket = assignment.ticket
# Get all other active assignments (team members)
other_active_assignments = self.db.query(TicketAssignment).filter(
TicketAssignment.ticket_id == ticket.id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.id != assignment.id,
TicketAssignment.deleted_at.is_(None)
).all()
# If team ticket, drop all other assignments too
if other_active_assignments:
for other_assignment in other_active_assignments:
other_assignment.ended_at = datetime.utcnow()
other_assignment.action = AssignmentAction.DROPPED.value
other_assignment.reason = f"[TEAM DROP] Team member dropped ticket: {data.reason}"
# Set ticket to pending_review (PM/dispatcher will handle)
old_status = ticket.status
ticket.status = TicketStatus.PENDING_REVIEW.value
# Create status history
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.PENDING_REVIEW.value,
changed_by_user_id=user_id,
assignment_id=assignment.id,
reason=f"Ticket dropped by agent - {data.drop_type}: {data.reason}"
)
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='assignment_dropped',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for dropped assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for dropped assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for dropped assignment {assignment.id}: {e}", exc_info=True)
# Notification is handled in the API endpoint (async context)
logger.info(f"Ticket {ticket.id} dropped by agent {user_id} - status set to PENDING_REVIEW")
return self._to_response(assignment)
def complete_assignment(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentComplete
) -> TicketAssignmentResponse:
"""
Agent completes work.
Closes assignment AND ticket.
Closes ALL active assignments for this ticket (team rule).
Sets assignment action to COMPLETED.
Ticket status → COMPLETED.
"""
from app.models.enums import AssignmentAction
from app.services.ticket_location_service import TicketLocationService
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
# Auto-arrive if location provided and not already arrived
if data.location and not assignment.arrived_at:
assignment.mark_arrived(
latitude=data.location.latitude,
longitude=data.location.longitude
)
# Validate arrived (with warning, not block)
if not assignment.arrived_at:
# Auto-set arrival to journey start or current time
if assignment.journey_start_latitude and assignment.journey_start_longitude:
assignment.mark_arrived(
latitude=float(assignment.journey_start_latitude),
longitude=float(assignment.journey_start_longitude)
)
else:
# No location data - set as unverified
assignment.arrived_at = datetime.utcnow()
assignment.arrival_verified = False
# Mark assignment complete
assignment.mark_completed()
assignment.action = AssignmentAction.COMPLETED.value
assignment.notes = f"[COMPLETED] {data.work_notes}\n{assignment.notes or ''}"
# Mark ticket complete
ticket = assignment.ticket
old_status = ticket.status
ticket.status = TicketStatus.COMPLETED.value
ticket.completed_at = datetime.utcnow()
# Set started_at if not already set
if not ticket.started_at:
ticket.started_at = assignment.journey_started_at or datetime.utcnow()
# Update or verify work location from arrival coordinates
location_result = TicketLocationService.update_work_location_on_completion(
ticket=ticket,
assignment=assignment
)
logger.info(
f"Work location update for ticket {ticket.id}: "
f"action={location_result['action']}, success={location_result['success']}, "
f"message={location_result['message']}"
)
# Create status history
location_lat = None
location_lng = None
if data.location:
location_lat = data.location.latitude
location_lng = data.location.longitude
self._create_status_history(
ticket=ticket,
old_status=old_status,
new_status=TicketStatus.COMPLETED.value,
changed_by_user_id=user_id,
assignment_id=assignment.id,
reason="Ticket completed by agent (legacy method)",
location_lat=location_lat,
location_lng=location_lng
)
# CRITICAL: Handle ticket source on completion
# - sales_order → Create subscription (installation complete)
# - incident → Mark incident resolved (support complete, NO subscription)
# - task → Just complete (infrastructure work, NO subscription)
if ticket.source == TicketSource.INCIDENT.value and ticket.source_id:
# Mark incident as resolved (support ticket completed)
incident = self.db.query(Incident).filter(
Incident.id == ticket.source_id,
Incident.deleted_at.is_(None)
).first()
if incident:
incident.mark_resolved(data.work_notes)
# Note: NO subscription creation - service already exists via incident.subscription_id
# TODO: Handle sales_order source → Create subscription
# TODO: Handle task source → Mark task complete
# Close ALL active assignments for this ticket (team rule)
# Note: Current assignment already marked complete above via mark_completed()
# This closes OTHER team members' assignments
other_active_assignments = self.db.query(TicketAssignment).filter(
TicketAssignment.ticket_id == ticket.id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.id != assignment.id, # Exclude current (already closed)
TicketAssignment.deleted_at.is_(None)
).all()
for other_assignment in other_active_assignments:
other_assignment.ended_at = datetime.utcnow()
other_assignment.action = AssignmentAction.COMPLETED.value
other_assignment.notes = f"[AUTO-CLOSED] Ticket completed by another team member\n{other_assignment.notes or ''}"
self.db.commit()
self.db.refresh(assignment)
# Real-time timesheet update (best-effort, don't block on failure)
try:
from app.services.timesheet_realtime_service import update_timesheet_for_event
from datetime import date
timesheet_id = update_timesheet_for_event(
db=self.db,
user_id=user_id,
project_id=ticket.project_id,
work_date=date.today(),
event_type='assignment_completed',
entity_type='ticket_assignment',
entity_id=assignment.id
)
if timesheet_id:
logger.debug(f"Timesheet {timesheet_id} updated for completed assignment {assignment.id}")
else:
logger.warning(f"Failed to update timesheet for completed assignment {assignment.id}")
except Exception as e:
logger.error(f"Timesheet update error for completed assignment {assignment.id}: {e}", exc_info=True)
# Notify PM/Dispatcher about ticket completion (non-blocking)
try:
logger.info(f"Ticket {ticket.id} completed - notification queued")
except Exception as e:
logger.warning(f"Failed to queue completion notification: {str(e)}")
return self._to_response(assignment)
# ============================================
# QUEUE MANAGEMENT
# ============================================
def reorder_execution(
self,
assignment_id: UUID,
user_id: UUID,
data: AssignmentReorder
) -> TicketAssignmentResponse:
"""Agent reorders their ticket queue"""
assignment = self._get_assignment_or_404(assignment_id)
self._validate_assignment_ownership(assignment, user_id)
self._validate_assignment_active(assignment)
assignment.update_execution_order(data.execution_order, data.planned_start_time)
self.db.commit()
self.db.refresh(assignment)
return self._to_response(assignment)
def get_agent_queue(self, user_id: UUID) -> AssignmentQueueResponse:
"""Get agent's assignment queue (ordered by execution_order)"""
active_assignments = self.db.query(TicketAssignment).options(
joinedload(TicketAssignment.ticket),
joinedload(TicketAssignment.user),
joinedload(TicketAssignment.assigned_by)
).filter(
TicketAssignment.user_id == user_id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.deleted_at.is_(None)
).order_by(
TicketAssignment.execution_order.asc().nullsfirst(),
TicketAssignment.assigned_at.desc()
).all()
return AssignmentQueueResponse(
user_id=user_id,
active_count=len(active_assignments),
max_capacity=self.MAX_AGENT_CAPACITY,
can_accept_more=len(active_assignments) < self.MAX_AGENT_CAPACITY,
assignments=[self._to_response(a) for a in active_assignments]
)
# ============================================
# QUERIES
# ============================================
def get_assignment(self, assignment_id: UUID) -> TicketAssignmentResponse:
"""Get single assignment"""
assignment = self._get_assignment_or_404(assignment_id)
return self._to_response(assignment)
def get_location_trail(self, assignment_id: UUID, current_user: User):
"""
Get GPS location trail for map visualization.
Authorization:
- Agent can view their own trail
- PM/Dispatcher/Admin can view any trail
"""
from app.schemas.ticket_assignment import LocationTrailResponse, LocationBreadcrumb
assignment = self.db.query(TicketAssignment).options(
joinedload(TicketAssignment.user)
).filter(
TicketAssignment.id == assignment_id,
TicketAssignment.deleted_at.is_(None)
).first()
if not assignment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Assignment not found"
)
# Authorization check
is_own_assignment = assignment.user_id == current_user.id
is_authorized_role = current_user.role in [
AppRole.PLATFORM_ADMIN,
AppRole.PROJECT_MANAGER,
AppRole.DISPATCHER
]
if not (is_own_assignment or is_authorized_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to view this location trail"
)
# Build journey start point
journey_start = None
if assignment.journey_start_latitude and assignment.journey_start_longitude:
journey_start = {
"lat": float(assignment.journey_start_latitude),
"lng": float(assignment.journey_start_longitude)
}
# Build arrival point
arrival_point = None
if assignment.arrival_latitude and assignment.arrival_longitude:
arrival_point = {
"lat": float(assignment.arrival_latitude),
"lng": float(assignment.arrival_longitude)
}
# Convert trail to LocationBreadcrumb objects
trail = []
if isinstance(assignment.journey_location_history, list):
for point in assignment.journey_location_history:
if isinstance(point, dict) and "lat" in point and "lng" in point:
trail.append(LocationBreadcrumb(**point))
# Fetch ticket info
from app.schemas.ticket_assignment import LocationTrailTicketInfo, LocationTrailExpense, LocationTrailStatusChange
from app.models.ticket_expense import TicketExpense
from app.models.ticket_status_history import TicketStatusHistory
ticket = assignment.ticket
ticket_info = None
if ticket:
# Get customer info from source
customer_name = None
customer_phone = None
installation_address = None
if ticket.source == "sales_order" and ticket.source_id:
from app.models.sales_order import SalesOrder
sales_order = self.db.query(SalesOrder).options(
joinedload(SalesOrder.customer)
).filter(SalesOrder.id == ticket.source_id).first()
if sales_order:
if sales_order.customer:
customer_name = sales_order.customer.customer_name
customer_phone = sales_order.customer.phone_primary
# Build installation address
installation_address = sales_order.installation_address_line1 or ""
if sales_order.installation_address_line2:
installation_address += f", {sales_order.installation_address_line2}"
installation_address = installation_address.strip() or None
ticket_info = LocationTrailTicketInfo(
id=ticket.id,
ticket_name=ticket.ticket_name or "Unknown",
ticket_type=ticket.ticket_type,
service_type=ticket.service_type,
work_description=ticket.work_description,
status=ticket.status,
customer_name=customer_name,
customer_phone=customer_phone,
installation_address=installation_address
)
# Fetch expenses for this assignment
expenses_list = []
total_expenses_amount = 0.0
expenses = self.db.query(TicketExpense).options(
joinedload(TicketExpense.incurred_by_user)
).filter(
TicketExpense.ticket_assignment_id == assignment.id,
TicketExpense.deleted_at.is_(None)
).order_by(TicketExpense.expense_date.desc()).all()
for expense in expenses:
expenses_list.append(LocationTrailExpense(
id=expense.id,
category=expense.category,
description=expense.description,
total_cost=float(expense.total_cost),
expense_date=expense.expense_date.isoformat(),
is_approved=expense.is_approved
))
total_expenses_amount += float(expense.total_cost)
# Fetch status changes for this assignment
status_changes_list = []
status_changes = self.db.query(TicketStatusHistory).options(
joinedload(TicketStatusHistory.changed_by)
).filter(
TicketStatusHistory.ticket_assignment_id == assignment.id,
TicketStatusHistory.deleted_at.is_(None)
).order_by(TicketStatusHistory.changed_at.asc()).all()
for change in status_changes:
status_changes_list.append(LocationTrailStatusChange(
id=change.id,
old_status=change.old_status,
new_status=change.new_status,
changed_at=change.changed_at,
changed_by_user_name=change.changed_by.full_name if change.changed_by else None,
change_reason=change.change_reason,
location_latitude=float(change.location_latitude) if change.location_latitude else None,
location_longitude=float(change.location_longitude) if change.location_longitude else None
))
return LocationTrailResponse(
assignment_id=assignment.id,
ticket_id=assignment.ticket_id,
user_id=assignment.user_id,
user_name=assignment.user.full_name if assignment.user else "Unknown",
journey_started_at=assignment.journey_started_at,
arrived_at=assignment.arrived_at,
ended_at=assignment.ended_at,
journey_start=journey_start,
arrival_point=arrival_point,
trail=trail,
total_points=len(trail),
journey_distance_km=assignment.journey_distance_km,
travel_time_minutes=assignment.travel_time_minutes,
work_time_minutes=assignment.work_time_minutes,
action=assignment.action,
reason=assignment.reason,
notes=assignment.notes,
ticket=ticket_info,
expenses=expenses_list,
total_expenses=total_expenses_amount,
status_changes=status_changes_list
)
def get_ticket_assignments(self, ticket_id: UUID) -> dict:
"""Get all assignments for ticket (current + past)"""
assignments = self.db.query(TicketAssignment).options(
joinedload(TicketAssignment.user),
joinedload(TicketAssignment.assigned_by)
).filter(
TicketAssignment.ticket_id == ticket_id,
TicketAssignment.deleted_at.is_(None)
).order_by(
TicketAssignment.assigned_at.desc()
).all()
current = [self._to_response(a) for a in assignments if a.is_active]
past = [self._to_response(a) for a in assignments if not a.is_active]
return {
"current_assignments": current,
"past_assignments": past
}
def get_assignment_stats(self, project_id: Optional[UUID] = None) -> AssignmentStats:
"""Get assignment statistics"""
query = self.db.query(TicketAssignment).filter(
TicketAssignment.deleted_at.is_(None)
)
if project_id:
query = query.join(Ticket).filter(Ticket.project_id == project_id)
assignments = query.all()
active = [a for a in assignments if a.is_active]
completed = [a for a in assignments if a.ended_at and a.action != "rejected"]
dropped = [a for a in assignments if a.action == "dropped"]
rejected = [a for a in assignments if a.action == "rejected"]
# Performance metrics
travel_times = [a.travel_time_minutes for a in completed if a.travel_time_minutes]
work_times = [a.work_time_minutes for a in completed if a.work_time_minutes]
total_times = [a.total_time_minutes for a in completed if a.total_time_minutes]
distances = [a.journey_distance_km for a in completed if a.journey_distance_km]
return AssignmentStats(
total_assignments=len(assignments),
active_assignments=len(active),
completed_assignments=len(completed),
dropped_assignments=len(dropped),
rejected_assignments=len(rejected),
avg_travel_time_minutes=sum(travel_times) / len(travel_times) if travel_times else None,
avg_work_time_minutes=sum(work_times) / len(work_times) if work_times else None,
avg_total_time_minutes=sum(total_times) / len(total_times) if total_times else None,
avg_journey_distance_km=sum(distances) / len(distances) if distances else None,
)
# ============================================
# HELPER METHODS
# ============================================
def _get_ticket_or_404(self, ticket_id: UUID) -> Ticket:
"""Get ticket or raise 404"""
ticket = self.db.query(Ticket).options(
joinedload(Ticket.assignments)
).filter(
Ticket.id == ticket_id,
Ticket.deleted_at.is_(None)
).first()
if not ticket:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ticket {ticket_id} not found"
)
return ticket
def _get_user_or_404(self, user_id: UUID) -> User:
"""Get user or raise 404"""
user = self.db.query(User).filter(
User.id == user_id,
User.deleted_at.is_(None)
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {user_id} not found"
)
return user
def _get_assignment_or_404(self, assignment_id: UUID) -> TicketAssignment:
"""Get assignment or raise 404"""
assignment = self.db.query(TicketAssignment).options(
joinedload(TicketAssignment.ticket),
joinedload(TicketAssignment.user),
joinedload(TicketAssignment.assigned_by)
).filter(
TicketAssignment.id == assignment_id,
TicketAssignment.deleted_at.is_(None)
).first()
if not assignment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Assignment {assignment_id} not found"
)
return assignment
def _validate_ticket_assignable(self, ticket: Ticket):
"""Validate ticket can be assigned"""
if ticket.status == TicketStatus.COMPLETED.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot assign completed ticket"
)
if ticket.status == TicketStatus.CANCELLED.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot assign cancelled ticket"
)
def _validate_agent_on_project(self, user_id: UUID, project_id: UUID):
"""Validate agent is on project team"""
team_member = self.db.query(ProjectTeam).filter(
ProjectTeam.user_id == user_id,
ProjectTeam.project_id == project_id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_member:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Agent is not on this project's team"
)
def _validate_agent_region_match(self, user_id: UUID, project_id: UUID, ticket_region_id: Optional[UUID]):
"""Validate agent's region matches ticket region for self-assignment"""
team_member = self.db.query(ProjectTeam).filter(
ProjectTeam.user_id == user_id,
ProjectTeam.project_id == project_id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_member:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Agent is not on this project's team"
)
# If agent has no regional restriction (project_region_id is NULL), they can take any ticket
if team_member.project_region_id is None:
return
# If agent is restricted to a region, ticket must be in that region
if team_member.project_region_id != ticket_region_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This ticket is not in your assigned region"
)
def _validate_agent_capacity(self, user_id: UUID):
"""Validate agent has capacity for more tickets"""
active_count = self.db.query(func.count(TicketAssignment.id)).filter(
TicketAssignment.user_id == user_id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.deleted_at.is_(None)
).scalar()
if active_count >= self.MAX_AGENT_CAPACITY:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Agent has reached capacity ({self.MAX_AGENT_CAPACITY} active tickets)"
)
def _get_active_assignment(self, ticket_id: UUID, user_id: UUID) -> Optional[TicketAssignment]:
"""Get active assignment for ticket + user"""
return self.db.query(TicketAssignment).filter(
TicketAssignment.ticket_id == ticket_id,
TicketAssignment.user_id == user_id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.deleted_at.is_(None)
).first()
def _count_active_assignments(self, ticket_id: UUID, exclude_assignment_id: Optional[UUID] = None) -> int:
"""Count active assignments for ticket"""
query = self.db.query(func.count(TicketAssignment.id)).filter(
TicketAssignment.ticket_id == ticket_id,
TicketAssignment.ended_at.is_(None),
TicketAssignment.deleted_at.is_(None)
)
if exclude_assignment_id:
query = query.filter(TicketAssignment.id != exclude_assignment_id)
return query.scalar()
def _validate_assignment_ownership(self, assignment: TicketAssignment, user_id: UUID):
"""Validate user owns assignment"""
if assignment.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not assigned to this ticket"
)
def _validate_assignment_active(self, assignment: TicketAssignment):
"""Validate assignment is active"""
if not assignment.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Assignment is already closed"
)
def _create_status_history(
self,
ticket: Ticket,
old_status: str,
new_status: str,
changed_by_user_id: UUID,
assignment_id: Optional[UUID] = None,
reason: Optional[str] = None,
location_lat: Optional[float] = None,
location_lng: Optional[float] = None
):
"""Create ticket status history entry"""
from app.models.ticket_status_history import TicketStatusHistory
history = TicketStatusHistory(
ticket_id=ticket.id,
old_status=old_status,
new_status=new_status,
changed_by_user_id=changed_by_user_id,
ticket_assignment_id=assignment_id,
changed_at=datetime.utcnow(),
change_reason=reason, # DB column is 'change_reason', not 'reason'
location_latitude=location_lat,
location_longitude=location_lng,
location_verified=False,
communication_method='app'
)
self.db.add(history)
def _to_response(self, assignment: TicketAssignment) -> TicketAssignmentResponse:
"""Convert assignment to response"""
from app.schemas.ticket_assignment import AssignmentUserInfo
user_info = None
if assignment.user:
user_info = AssignmentUserInfo(
id=assignment.user.id,
full_name=assignment.user.full_name,
email=assignment.user.email,
phone=assignment.user.phone
)
assigned_by_info = None
if assignment.assigned_by:
assigned_by_info = AssignmentUserInfo(
id=assignment.assigned_by.id,
full_name=assignment.assigned_by.full_name,
email=assignment.assigned_by.email,
phone=assignment.assigned_by.phone
)
# Check if location trail exists
has_location_trail = (
isinstance(assignment.journey_location_history, list) and
len(assignment.journey_location_history) > 0
)
return TicketAssignmentResponse(
id=assignment.id,
ticket_id=assignment.ticket_id,
user_id=assignment.user_id,
assigned_by_user_id=assignment.assigned_by_user_id,
action=assignment.action,
is_self_assigned=assignment.is_self_assigned,
execution_order=assignment.execution_order,
planned_start_time=assignment.planned_start_time,
assigned_at=assignment.assigned_at,
responded_at=assignment.responded_at,
journey_started_at=assignment.journey_started_at,
arrived_at=assignment.arrived_at,
ended_at=assignment.ended_at,
journey_start_latitude=assignment.journey_start_latitude,
journey_start_longitude=assignment.journey_start_longitude,
arrival_latitude=assignment.arrival_latitude,
arrival_longitude=assignment.arrival_longitude,
arrival_verified=assignment.arrival_verified,
journey_location_history=assignment.journey_location_history,
has_location_trail=has_location_trail,
status=assignment.status,
is_active=assignment.is_active,
travel_time_minutes=assignment.travel_time_minutes,
work_time_minutes=assignment.work_time_minutes,
total_time_minutes=assignment.total_time_minutes,
journey_distance_km=assignment.journey_distance_km,
reason=assignment.reason,
notes=assignment.notes,
user=user_info,
assigned_by=assigned_by_info,
created_at=assignment.created_at,
updated_at=assignment.updated_at
)