Michael-Antony's picture
feat: task geofence validation against destination merchant, shared haversine utility
9c3b067
"""
Service layer for attendance operations.
Handles business logic for check-in/check-out.
"""
from typing import Optional, Dict, Any
from datetime import datetime, date
import uuid
from motor.motor_asyncio import AsyncIOMotorDatabase
from app.core.logging import get_logger
from app.core.geo import haversine_distance_km
from app.postgres import get_postgres_connection, release_postgres_connection
from app.tracker.attendance.schemas import CheckInRequest, CheckInResponse, CheckOutRequest, CheckOutResponse
from app.tracker.attendance.constants import (
EMPLOYEES_COLLECTION,
ERROR_DUPLICATE_CHECKIN,
ERROR_GPS_DISABLED,
ERROR_EMPLOYEE_NOT_FOUND,
ERROR_NO_CHECKIN_FOUND,
ERROR_ALREADY_CHECKED_OUT,
ERROR_OUTSIDE_GEOFENCE,
SUCCESS_CHECKIN,
SUCCESS_CHECKOUT
)
logger = get_logger(__name__)
class AttendanceService:
"""Service for attendance operations"""
def __init__(self, mongo_db: AsyncIOMotorDatabase):
self.mongo_db = mongo_db
self.employees_collection = mongo_db[EMPLOYEES_COLLECTION]
async def check_location_tracking_consent(self, employee_id: str) -> bool:
"""
Check if employee has location tracking enabled.
Args:
employee_id: Employee UUID
Returns:
True if location tracking is enabled, False otherwise
Raises:
ValueError: If employee not found or validation fails
"""
try:
employee = await self.employees_collection.find_one(
{"user_id": employee_id},
{"location_settings.location_tracking_consent": 1}
)
if not employee:
logger.warning(f"Employee not found: {employee_id}")
raise ValueError(ERROR_EMPLOYEE_NOT_FOUND)
# Check location_settings.location_tracking_consent
location_settings = employee.get("location_settings", {})
consent = location_settings.get("location_tracking_consent", False)
logger.info(f"Location tracking consent for {employee_id}: {consent}")
return consent
except ValueError:
# Re-raise ValueError (employee not found)
raise
except Exception as e:
logger.error(f"Error checking location tracking consent: {e}", exc_info=e)
raise
async def validate_geofence(
self,
employee_id: str,
latitude: float,
longitude: float
) -> None:
"""
Validate that the employee's GPS coordinates are within the allowed
geofence boundary defined in location_settings.geo_location with
precision_level (in km) as the allowed radius.
If geo_location is not configured, validation is skipped.
Args:
employee_id: Employee UUID
latitude: Employee's current latitude
longitude: Employee's current longitude
Raises:
ValueError: If employee is outside the allowed geofence boundary
"""
try:
employee = await self.employees_collection.find_one(
{"user_id": employee_id},
{
"geo_location": 1,
"location_settings.precision_level": 1
}
)
if not employee:
raise ValueError(ERROR_EMPLOYEE_NOT_FOUND)
# geo_location is a top-level field on the employee document
geo_location = employee.get("geo_location")
# Skip geofence validation if geo_location is not configured
if not geo_location:
logger.info(
f"No geo_location configured for employee {employee_id}, skipping geofence validation"
)
return
geo_lat = geo_location.get("lat")
geo_lng = geo_location.get("lng")
if geo_lat is None or geo_lng is None:
logger.info(
f"Incomplete geo_location for employee {employee_id}, skipping geofence validation"
)
return
# Get precision_level in km from location_settings (default 1.0 km if not set)
location_settings = employee.get("location_settings", {})
precision_km = float(location_settings.get("precision_level", 1.0))
# Calculate distance between employee location and configured geo_location
distance_km = haversine_distance_km(latitude, longitude, geo_lat, geo_lng)
logger.info(
"Geofence validation",
extra={
"employee_id": employee_id,
"employee_lat": latitude,
"employee_lng": longitude,
"geo_lat": geo_lat,
"geo_lng": geo_lng,
"distance_km": round(distance_km, 3),
"precision_km": precision_km,
"within_boundary": distance_km <= precision_km
}
)
if distance_km > precision_km:
raise ValueError(
f"{ERROR_OUTSIDE_GEOFENCE} "
f"(distance: {round(distance_km, 2)} km, allowed: {precision_km} km)"
)
except ValueError:
raise
except Exception as e:
logger.error(f"Error validating geofence: {e}", exc_info=e)
raise
async def check_duplicate_checkin(
self,
employee_id: str,
work_date: date
) -> bool:
"""
Check if employee has already checked in today.
Args:
employee_id: Employee UUID (as string)
work_date: Date to check
Returns:
True if already checked in, False otherwise
"""
conn = None
try:
conn = await get_postgres_connection()
query = """
SELECT id FROM trans.scm_attendance
WHERE employee_id = $1::uuid AND work_date = $2
"""
result = await conn.fetchval(query, employee_id, work_date)
return result is not None
except Exception as e:
logger.error(f"Error checking duplicate check-in: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
async def create_checkin(
self,
employee_id: str,
merchant_id: str,
payload: CheckInRequest
) -> CheckInResponse:
"""
Create a check-in record.
Args:
employee_id: Employee UUID
merchant_id: Merchant UUID
payload: Check-in request data
Returns:
CheckInResponse with attendance record ID
Raises:
ValueError: If validation fails
"""
# Use server time for timestamp and work_date
now = datetime.now()
timestamp_ms = int(now.timestamp() * 1000)
work_date = now.date()
# 1. Check location tracking consent
has_consent = await self.check_location_tracking_consent(employee_id)
if not has_consent:
logger.warning(f"GPS disabled for employee {employee_id}")
raise ValueError(ERROR_GPS_DISABLED)
# 2. Validate geofence boundary
await self.validate_geofence(employee_id, payload.latitude, payload.longitude)
# 3. Check for duplicate check-in
already_checked_in = await self.check_duplicate_checkin(employee_id, work_date)
if already_checked_in:
logger.warning(f"Duplicate check-in attempt for employee {employee_id} on {work_date}")
raise ValueError(ERROR_DUPLICATE_CHECKIN)
# 4. Create attendance record
conn = None
try:
conn = await get_postgres_connection()
attendance_id = uuid.uuid4()
query = """
INSERT INTO trans.scm_attendance (
id,
merchant_id,
employee_id,
work_date,
check_in_time,
check_in_lat,
check_in_lon,
check_in_geofence_id,
created_at,
updated_at
) VALUES ($1, $2::uuid, $3::uuid, $4, $5, $6, $7, $8::uuid, NOW(), NOW())
RETURNING id
"""
result = await conn.fetchval(
query,
attendance_id,
merchant_id,
employee_id,
work_date,
timestamp_ms,
payload.latitude,
payload.longitude,
payload.location_id
)
logger.info(
f"Check-in created successfully",
extra={
"attendance_id": str(result),
"employee_id": employee_id,
"merchant_id": merchant_id,
"work_date": str(work_date),
"timestamp_ms": timestamp_ms,
"geofence_id": payload.location_id if payload.location_id else None
}
)
return CheckInResponse(
success=True,
id=str(result),
message=SUCCESS_CHECKIN
)
except Exception as e:
logger.error(f"Error creating check-in: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
async def get_attendance_by_id(self, attendance_id: str) -> Optional[Dict[str, Any]]:
"""
Get attendance record by ID.
Args:
attendance_id: Attendance UUID (as string)
Returns:
Attendance record or None
"""
conn = None
try:
conn = await get_postgres_connection()
query = """
SELECT
id,
merchant_id,
employee_id,
work_date,
check_in_time,
check_in_lat,
check_in_lon,
check_in_geofence_id,
check_out_time,
check_out_lat,
check_out_lon,
total_minutes,
created_at,
updated_at
FROM trans.scm_attendance
WHERE id = $1::uuid
"""
row = await conn.fetchrow(query, attendance_id)
if not row:
return None
return dict(row)
except Exception as e:
logger.error(f"Error fetching attendance: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
async def create_checkout(
self,
employee_id: str,
merchant_id: str,
payload: CheckOutRequest
) -> CheckOutResponse:
"""
Create a check-out record and calculate total working time.
Args:
employee_id: Employee ID
merchant_id: Merchant ID
payload: Check-out request data
Returns:
CheckOutResponse with total working minutes
Raises:
ValueError: If validation fails or no check-in found
"""
# Use server time for timestamp and work_date
now = datetime.now()
timestamp_ms = int(now.timestamp() * 1000)
work_date = now.date()
# 1. Check if employee exists and has location tracking consent
has_consent = await self.check_location_tracking_consent(employee_id)
if not has_consent:
logger.warning(f"GPS disabled for employee {employee_id}")
raise ValueError(ERROR_GPS_DISABLED)
# 2. Validate geofence boundary
await self.validate_geofence(employee_id, payload.latitude, payload.longitude)
# 3. Get today's attendance record
conn = None
try:
conn = await get_postgres_connection()
# Find today's attendance record
query = """
SELECT
id,
check_in_time,
check_out_time
FROM trans.scm_attendance
WHERE employee_id = $1::uuid
AND work_date = $2
"""
record = await conn.fetchrow(query, employee_id, work_date)
if not record:
logger.warning(f"No check-in found for employee {employee_id} on {work_date}")
raise ValueError(ERROR_NO_CHECKIN_FOUND)
# Check if already checked out
if record['check_out_time'] is not None:
logger.warning(f"Employee {employee_id} already checked out on {work_date}")
raise ValueError(ERROR_ALREADY_CHECKED_OUT)
# Calculate total working time in minutes
check_in_time_ms = record['check_in_time']
check_out_time_ms = timestamp_ms
total_minutes = int((check_out_time_ms - check_in_time_ms) / (1000 * 60))
# Ensure total_minutes is not negative
if total_minutes < 0:
logger.error(f"Negative working time calculated: {total_minutes} minutes")
raise ValueError("Check-out time cannot be before check-in time")
# Update attendance record with check-out details
update_query = """
UPDATE trans.scm_attendance
SET
check_out_time = $1,
check_out_lat = $2,
check_out_lon = $3,
total_minutes = $4,
updated_at = NOW()
WHERE id = $5
RETURNING id
"""
result = await conn.fetchval(
update_query,
timestamp_ms,
payload.latitude,
payload.longitude,
total_minutes,
record['id']
)
logger.info(
f"Check-out created successfully",
extra={
"attendance_id": str(result),
"employee_id": employee_id,
"merchant_id": merchant_id,
"work_date": str(work_date),
"timestamp_ms": timestamp_ms,
"total_minutes": total_minutes
}
)
return CheckOutResponse(
success=True,
total_minutes=total_minutes,
message=SUCCESS_CHECKOUT
)
except ValueError:
# Re-raise ValueError (business logic errors)
raise
except Exception as e:
logger.error(f"Error creating check-out: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
async def get_daily_attendance(
self,
employee_id: str,
work_date: date
) -> Optional[Dict[str, Any]]:
"""
Get daily attendance summary for an employee.
Args:
employee_id: Employee UUID (as string)
work_date: Date to retrieve attendance for
Returns:
Dictionary with attendance summary or None if no record found
{
'date': str,
'on_duty': bool,
'check_in_time': int or None,
'check_out_time': int or None,
'total_hours': float
}
"""
conn = None
try:
conn = await get_postgres_connection()
query = """
SELECT
work_date,
check_in_time,
check_out_time,
total_minutes
FROM trans.scm_attendance
WHERE employee_id = $1::uuid AND work_date = $2
"""
row = await conn.fetchrow(query, employee_id, work_date)
if not row:
return None
# Calculate on-duty status (checked in but not checked out)
on_duty = row['check_in_time'] is not None and row['check_out_time'] is None
# Calculate total hours
total_hours = 0.0
if row['total_minutes'] is not None:
total_hours = round(row['total_minutes'] / 60.0, 2)
return {
'date': row['work_date'].strftime("%Y-%m-%d"),
'on_duty': on_duty,
'check_in_time': row['check_in_time'],
'check_out_time': row['check_out_time'],
'total_hours': total_hours
}
except Exception as e:
logger.error(f"Error fetching daily attendance: {e}", exc_info=e)
raise
finally:
if conn:
await release_postgres_connection(conn)
def get_attendance_service(mongo_db: AsyncIOMotorDatabase) -> AttendanceService:
"""Factory function to create AttendanceService instance"""
return AttendanceService(mongo_db)