kamau1's picture
fix(sqlalchemy): replace deprecated subquery() usages with select().scalar_subquery() across services to eliminate warnings
c54345d
"""
Map and Location API endpoints.
Provides visualization endpoints for location data and journey tracking.
All endpoints are READ-ONLY - they query existing location data.
"""
from typing import List, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from sqlalchemy import select
from app.api.deps import get_db, get_current_user
from app.models.user import User
from app.models.ticket_assignment import TicketAssignment
from app.models.project_team import ProjectTeam
from app.services.location_service import LocationService
from app.services.journey_service import JourneyService
from app.schemas.map import (
MapEntitiesResponse,
RegionLocationResponse,
JourneyDetails,
NearestRegionResponse,
Coordinates
)
router = APIRouter()
@router.get(
"/entities",
response_model=MapEntitiesResponse,
summary="Get aggregated map view",
description="Get all entity locations for map visualization. Supports filtering by entity types and status."
)
async def get_map_entities(
project_id: UUID = Query(..., description="Project ID"),
entity_types: Optional[List[str]] = Query(
None,
description="Entity types to include: customers, sales_orders, subscriptions, tickets, tasks, regions, agents"
),
status: Optional[str] = Query(None, description="Status filter for entities"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get aggregated map view with multiple entity types.
Returns locations for:
- Customers (home addresses)
- Sales Orders (pending installation sites)
- Subscriptions (active service locations - derived from technician arrival point)
- Tickets (work locations)
- Tasks (infrastructure work)
- Regions (regional hubs)
- Agents (currently on journeys - privacy-compliant)
**Authorization**: Must be a member of the project team.
"""
# Verify user has access to project
team_membership = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_membership and current_user.role != "super_admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this project"
)
location_service = LocationService(db)
return location_service.get_entities_map_view(
project_id=project_id,
entity_types=entity_types,
status_filter=status
)
@router.get(
"/regions/{project_id}",
response_model=List[RegionLocationResponse],
summary="Get regional hub locations",
description="Get all regional hub locations for a project with coverage information."
)
async def get_region_locations(
project_id: UUID,
include_inactive: bool = Query(False, description="Include inactive regions"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get regional hub locations with coverage statistics.
Returns:
- Region coordinates
- Coverage radius
- Active tickets count
- Active agents count
**Authorization**: Must be a member of the project team.
"""
# Verify user has access to project
team_membership = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_membership and current_user.role != "super_admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this project"
)
location_service = LocationService(db)
return location_service.get_region_locations(
project_id=project_id,
include_inactive=include_inactive
)
@router.get(
"/journeys/{assignment_id}",
response_model=JourneyDetails,
summary="Get journey details with breadcrumbs",
description="Get complete journey information including GPS breadcrumb trail and analytics."
)
async def get_journey_details(
assignment_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get journey details with breadcrumb trail and analytics.
Returns:
- Timeline (start, arrival, completion times)
- GPS breadcrumb trail
- Journey statistics (distance, speed, etc.)
- Location information
**Authorization**:
- Agents can only view their own journeys
- Managers can view all journeys in their projects
- Super admins can view all journeys
**Privacy**: Journey tracking only occurs during active assignments
(from "Start Journey" to "Arrived"). No surveillance outside work hours.
"""
journey_service = JourneyService(db)
# Get assignment to check authorization
assignment = db.query(TicketAssignment).filter(
TicketAssignment.id == assignment_id,
TicketAssignment.deleted_at.is_(None)
).first()
if not assignment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Assignment not found"
)
# Authorization check
can_view = False
# Super admins can view all
if current_user.role == "super_admin":
can_view = True
# Agents can view their own journeys
elif assignment.user_id == current_user.id:
can_view = True
# Managers can view journeys in their projects
else:
ticket = assignment.ticket
if ticket:
team_membership = db.query(ProjectTeam).filter(
ProjectTeam.project_id == ticket.project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.role.in_(["manager", "project_manager"]),
ProjectTeam.deleted_at.is_(None)
).first()
if team_membership:
can_view = True
if not can_view:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to view this journey"
)
journey = journey_service.get_journey_details(assignment_id)
if not journey:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Journey details not found"
)
return journey
@router.get(
"/journeys/agent/{user_id}",
response_model=List[JourneyDetails],
summary="Get agent journey history",
description="Get journey history for a specific agent with optional date filtering."
)
async def get_agent_journey_history(
user_id: UUID,
start_date: Optional[str] = Query(None, description="Start date filter (ISO format)"),
end_date: Optional[str] = Query(None, description="End date filter (ISO format)"),
limit: int = Query(50, ge=1, le=100, description="Maximum number of journeys"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get journey history for an agent.
**Authorization**:
- Agents can only view their own history
- Managers can view history of agents in their projects
- Super admins can view all history
"""
from datetime import datetime
# Authorization check
can_view = False
if current_user.role == "super_admin":
can_view = True
elif current_user.id == user_id:
can_view = True
else:
# Check if current user is a manager in any project where target user is a team member
target_projects = select(ProjectTeam.project_id).filter(
ProjectTeam.user_id == user_id,
ProjectTeam.deleted_at.is_(None)
).scalar_subquery()
manager_membership = db.query(ProjectTeam).filter(
ProjectTeam.user_id == current_user.id,
ProjectTeam.project_id.in_(target_projects),
ProjectTeam.role.in_(["manager", "project_manager"]),
ProjectTeam.deleted_at.is_(None)
).first()
if manager_membership:
can_view = True
if not can_view:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to view this agent's journey history"
)
# Parse dates
start_dt = None
end_dt = None
if start_date:
try:
start_dt = datetime.fromisoformat(start_date.replace("Z", "+00:00"))
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid start_date format. Use ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)"
)
if end_date:
try:
end_dt = datetime.fromisoformat(end_date.replace("Z", "+00:00"))
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid end_date format. Use ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)"
)
journey_service = JourneyService(db)
return journey_service.get_agent_journey_history(
user_id=user_id,
start_date=start_dt,
end_date=end_dt,
limit=limit
)
@router.get(
"/nearest-region",
response_model=NearestRegionResponse,
summary="Find nearest regional hub",
description="Find the nearest regional hub to a given location."
)
async def find_nearest_region(
project_id: UUID = Query(..., description="Project ID"),
latitude: float = Query(..., ge=-90, le=90, description="Latitude"),
longitude: float = Query(..., ge=-180, le=180, description="Longitude"),
max_distance_km: Optional[float] = Query(None, gt=0, description="Maximum search distance in km"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Find the nearest regional hub to a location.
Used for:
- Auto-assigning regions to new customers
- Auto-assigning regions to sales orders
- Finding closest support hub
**Authorization**: Must be a member of the project team.
"""
# Verify user has access to project
team_membership = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_membership and current_user.role != "super_admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this project"
)
location_service = LocationService(db)
result = location_service.find_nearest_region(
project_id=project_id,
latitude=latitude,
longitude=longitude,
max_distance_km=max_distance_km
)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No region found within specified distance"
)
region_id, distance_km = result
# Get region details
regions = location_service.get_region_locations(project_id)
region = next((r for r in regions if r.region_id == region_id), None)
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Region not found"
)
return NearestRegionResponse(
region_id=region.region_id,
region_name=region.region_name,
distance_km=round(distance_km, 2),
coordinates=region.coordinates
)
@router.get(
"/entities-near",
response_model=MapEntitiesResponse,
summary="Find entities near location",
description="Find all entities within a radius of a given location."
)
async def find_entities_near_location(
project_id: UUID = Query(..., description="Project ID"),
latitude: float = Query(..., ge=-90, le=90, description="Center latitude"),
longitude: float = Query(..., ge=-180, le=180, description="Center longitude"),
radius_km: float = Query(..., gt=0, le=100, description="Search radius in km (max 100)"),
entity_types: Optional[List[str]] = Query(
None,
description="Entity types to include"
),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Find all entities within a radius of a location.
Useful for:
- Finding nearby customers
- Finding nearby tickets
- Service coverage analysis
**Authorization**: Must be a member of the project team.
"""
# Verify user has access to project
team_membership = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None)
).first()
if not team_membership and current_user.role != "super_admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this project"
)
location_service = LocationService(db)
return location_service.find_entities_near_location(
project_id=project_id,
latitude=latitude,
longitude=longitude,
radius_km=radius_km,
entity_types=entity_types
)