mkkbms
feat: initial foundation β€” workforce profiles + live tracking & route replay
3818b61
"""
MongoDB document models for Live Tracking.
Two collections:
workforce_location_live β€” one doc per employee (upserted), real-time state
workforce_location_history β€” append-only time-series, TTL 90 days
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class GeoPoint(BaseModel):
"""GeoJSON Point β€” stored as { type: 'Point', coordinates: [lng, lat] }"""
type: str = "Point"
coordinates: list[float] = Field(..., description="[longitude, latitude]")
class LiveLocationModel(BaseModel):
"""
workforce_location_live collection.
One document per (profile_id, merchant_id). Upserted on every ping.
"""
profile_id: str
merchant_id: str
location: GeoPoint # GeoJSON for $near queries
latitude: float
longitude: float
accuracy_meters: Optional[float] = None # GPS accuracy
altitude_meters: Optional[float] = None
speed_kmh: Optional[float] = None # Instantaneous speed
bearing_degrees: Optional[float] = None # Direction of travel 0-360
battery_level: Optional[int] = None # 0-100
battery_charging: Optional[bool] = None
is_online: bool = True
last_seen_at: datetime = Field(default_factory=datetime.utcnow)
app_version: Optional[str] = None
device_id: Optional[str] = None
class LocationHistoryModel(BaseModel):
"""
workforce_location_history collection.
Append-only. One doc per ping. TTL index on recorded_at (90 days).
"""
profile_id: str
merchant_id: str
location: GeoPoint
latitude: float
longitude: float
accuracy_meters: Optional[float] = None
altitude_meters: Optional[float] = None
speed_kmh: Optional[float] = None
bearing_degrees: Optional[float] = None
battery_level: Optional[int] = None
battery_charging: Optional[bool] = None
recorded_at: datetime = Field(default_factory=datetime.utcnow)
device_id: Optional[str] = None