| """ |
| Service layer for attendance operations. |
| Handles business logic for check-in/check-out. |
| """ |
| from typing import Optional, Dict, Any |
| from datetime import datetime, date |
| import uuid |
| from motor.motor_asyncio import AsyncIOMotorDatabase |
|
|
| from app.core.logging import get_logger |
| from app.core.geo import haversine_distance_km |
| from app.postgres import get_postgres_connection, release_postgres_connection |
| from app.tracker.attendance.schemas import CheckInRequest, CheckInResponse, CheckOutRequest, CheckOutResponse |
| from app.tracker.attendance.constants import ( |
| EMPLOYEES_COLLECTION, |
| ERROR_DUPLICATE_CHECKIN, |
| ERROR_GPS_DISABLED, |
| ERROR_EMPLOYEE_NOT_FOUND, |
| ERROR_NO_CHECKIN_FOUND, |
| ERROR_ALREADY_CHECKED_OUT, |
| ERROR_OUTSIDE_GEOFENCE, |
| SUCCESS_CHECKIN, |
| SUCCESS_CHECKOUT |
| ) |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class AttendanceService: |
| """Service for attendance operations""" |
| |
| def __init__(self, mongo_db: AsyncIOMotorDatabase): |
| self.mongo_db = mongo_db |
| self.employees_collection = mongo_db[EMPLOYEES_COLLECTION] |
| |
| async def check_location_tracking_consent(self, employee_id: str) -> bool: |
| """ |
| Check if employee has location tracking enabled. |
| |
| Args: |
| employee_id: Employee UUID |
| |
| Returns: |
| True if location tracking is enabled, False otherwise |
| |
| Raises: |
| ValueError: If employee not found or validation fails |
| """ |
| try: |
| employee = await self.employees_collection.find_one( |
| {"user_id": employee_id}, |
| {"location_settings.location_tracking_consent": 1} |
| ) |
|
|
| if not employee: |
| logger.warning(f"Employee not found: {employee_id}") |
| raise ValueError(ERROR_EMPLOYEE_NOT_FOUND) |
|
|
| |
| location_settings = employee.get("location_settings", {}) |
| consent = location_settings.get("location_tracking_consent", False) |
|
|
| logger.info(f"Location tracking consent for {employee_id}: {consent}") |
| return consent |
|
|
| except ValueError: |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error checking location tracking consent: {e}", exc_info=e) |
| raise |
|
|
| async def validate_geofence( |
| self, |
| employee_id: str, |
| latitude: float, |
| longitude: float |
| ) -> None: |
| """ |
| Validate that the employee's GPS coordinates are within the allowed |
| geofence boundary defined in location_settings.geo_location with |
| precision_level (in km) as the allowed radius. |
| |
| If geo_location is not configured, validation is skipped. |
| |
| Args: |
| employee_id: Employee UUID |
| latitude: Employee's current latitude |
| longitude: Employee's current longitude |
| |
| Raises: |
| ValueError: If employee is outside the allowed geofence boundary |
| """ |
| try: |
| employee = await self.employees_collection.find_one( |
| {"user_id": employee_id}, |
| { |
| "geo_location": 1, |
| "location_settings.precision_level": 1 |
| } |
| ) |
|
|
| if not employee: |
| raise ValueError(ERROR_EMPLOYEE_NOT_FOUND) |
|
|
| |
| geo_location = employee.get("geo_location") |
|
|
| |
| if not geo_location: |
| logger.info( |
| f"No geo_location configured for employee {employee_id}, skipping geofence validation" |
| ) |
| return |
|
|
| geo_lat = geo_location.get("lat") |
| geo_lng = geo_location.get("lng") |
|
|
| if geo_lat is None or geo_lng is None: |
| logger.info( |
| f"Incomplete geo_location for employee {employee_id}, skipping geofence validation" |
| ) |
| return |
|
|
| |
| location_settings = employee.get("location_settings", {}) |
| precision_km = float(location_settings.get("precision_level", 1.0)) |
|
|
| |
| distance_km = haversine_distance_km(latitude, longitude, geo_lat, geo_lng) |
|
|
| logger.info( |
| "Geofence validation", |
| extra={ |
| "employee_id": employee_id, |
| "employee_lat": latitude, |
| "employee_lng": longitude, |
| "geo_lat": geo_lat, |
| "geo_lng": geo_lng, |
| "distance_km": round(distance_km, 3), |
| "precision_km": precision_km, |
| "within_boundary": distance_km <= precision_km |
| } |
| ) |
|
|
| if distance_km > precision_km: |
| raise ValueError( |
| f"{ERROR_OUTSIDE_GEOFENCE} " |
| f"(distance: {round(distance_km, 2)} km, allowed: {precision_km} km)" |
| ) |
|
|
| except ValueError: |
| raise |
| except Exception as e: |
| logger.error(f"Error validating geofence: {e}", exc_info=e) |
| raise |
| |
| async def check_duplicate_checkin( |
| self, |
| employee_id: str, |
| work_date: date |
| ) -> bool: |
| """ |
| Check if employee has already checked in today. |
| |
| Args: |
| employee_id: Employee UUID (as string) |
| work_date: Date to check |
| |
| Returns: |
| True if already checked in, False otherwise |
| """ |
| conn = None |
| try: |
| conn = await get_postgres_connection() |
| |
| query = """ |
| SELECT id FROM trans.scm_attendance |
| WHERE employee_id = $1::uuid AND work_date = $2 |
| """ |
| |
| result = await conn.fetchval(query, employee_id, work_date) |
| |
| return result is not None |
| |
| except Exception as e: |
| logger.error(f"Error checking duplicate check-in: {e}", exc_info=e) |
| raise |
| finally: |
| if conn: |
| await release_postgres_connection(conn) |
| |
| async def create_checkin( |
| self, |
| employee_id: str, |
| merchant_id: str, |
| payload: CheckInRequest |
| ) -> CheckInResponse: |
| """ |
| Create a check-in record. |
| |
| Args: |
| employee_id: Employee UUID |
| merchant_id: Merchant UUID |
| payload: Check-in request data |
| |
| Returns: |
| CheckInResponse with attendance record ID |
| |
| Raises: |
| ValueError: If validation fails |
| """ |
| |
| now = datetime.now() |
| timestamp_ms = int(now.timestamp() * 1000) |
| work_date = now.date() |
| |
| |
| has_consent = await self.check_location_tracking_consent(employee_id) |
| if not has_consent: |
| logger.warning(f"GPS disabled for employee {employee_id}") |
| raise ValueError(ERROR_GPS_DISABLED) |
|
|
| |
| await self.validate_geofence(employee_id, payload.latitude, payload.longitude) |
|
|
| |
| already_checked_in = await self.check_duplicate_checkin(employee_id, work_date) |
| if already_checked_in: |
| logger.warning(f"Duplicate check-in attempt for employee {employee_id} on {work_date}") |
| raise ValueError(ERROR_DUPLICATE_CHECKIN) |
|
|
| |
| conn = None |
| try: |
| conn = await get_postgres_connection() |
| |
| attendance_id = uuid.uuid4() |
| |
| query = """ |
| INSERT INTO trans.scm_attendance ( |
| id, |
| merchant_id, |
| employee_id, |
| work_date, |
| check_in_time, |
| check_in_lat, |
| check_in_lon, |
| check_in_geofence_id, |
| created_at, |
| updated_at |
| ) VALUES ($1, $2::uuid, $3::uuid, $4, $5, $6, $7, $8::uuid, NOW(), NOW()) |
| RETURNING id |
| """ |
| |
| result = await conn.fetchval( |
| query, |
| attendance_id, |
| merchant_id, |
| employee_id, |
| work_date, |
| timestamp_ms, |
| payload.latitude, |
| payload.longitude, |
| payload.location_id |
| ) |
| |
| logger.info( |
| f"Check-in created successfully", |
| extra={ |
| "attendance_id": str(result), |
| "employee_id": employee_id, |
| "merchant_id": merchant_id, |
| "work_date": str(work_date), |
| "timestamp_ms": timestamp_ms, |
| "geofence_id": payload.location_id if payload.location_id else None |
| } |
| ) |
| |
| return CheckInResponse( |
| success=True, |
| id=str(result), |
| message=SUCCESS_CHECKIN |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error creating check-in: {e}", exc_info=e) |
| raise |
| finally: |
| if conn: |
| await release_postgres_connection(conn) |
| |
| async def get_attendance_by_id(self, attendance_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get attendance record by ID. |
| |
| Args: |
| attendance_id: Attendance UUID (as string) |
| |
| Returns: |
| Attendance record or None |
| """ |
| conn = None |
| try: |
| conn = await get_postgres_connection() |
| |
| query = """ |
| SELECT |
| id, |
| merchant_id, |
| employee_id, |
| work_date, |
| check_in_time, |
| check_in_lat, |
| check_in_lon, |
| check_in_geofence_id, |
| check_out_time, |
| check_out_lat, |
| check_out_lon, |
| total_minutes, |
| created_at, |
| updated_at |
| FROM trans.scm_attendance |
| WHERE id = $1::uuid |
| """ |
| |
| row = await conn.fetchrow(query, attendance_id) |
| |
| if not row: |
| return None |
| |
| return dict(row) |
| |
| except Exception as e: |
| logger.error(f"Error fetching attendance: {e}", exc_info=e) |
| raise |
| finally: |
| if conn: |
| await release_postgres_connection(conn) |
| |
| async def create_checkout( |
| self, |
| employee_id: str, |
| merchant_id: str, |
| payload: CheckOutRequest |
| ) -> CheckOutResponse: |
| """ |
| Create a check-out record and calculate total working time. |
| |
| Args: |
| employee_id: Employee ID |
| merchant_id: Merchant ID |
| payload: Check-out request data |
| |
| Returns: |
| CheckOutResponse with total working minutes |
| |
| Raises: |
| ValueError: If validation fails or no check-in found |
| """ |
| |
| now = datetime.now() |
| timestamp_ms = int(now.timestamp() * 1000) |
| work_date = now.date() |
| |
| |
| has_consent = await self.check_location_tracking_consent(employee_id) |
| if not has_consent: |
| logger.warning(f"GPS disabled for employee {employee_id}") |
| raise ValueError(ERROR_GPS_DISABLED) |
|
|
| |
| await self.validate_geofence(employee_id, payload.latitude, payload.longitude) |
|
|
| |
| conn = None |
| try: |
| conn = await get_postgres_connection() |
| |
| |
| query = """ |
| SELECT |
| id, |
| check_in_time, |
| check_out_time |
| FROM trans.scm_attendance |
| WHERE employee_id = $1::uuid |
| AND work_date = $2 |
| """ |
| |
| record = await conn.fetchrow(query, employee_id, work_date) |
| |
| if not record: |
| logger.warning(f"No check-in found for employee {employee_id} on {work_date}") |
| raise ValueError(ERROR_NO_CHECKIN_FOUND) |
| |
| |
| if record['check_out_time'] is not None: |
| logger.warning(f"Employee {employee_id} already checked out on {work_date}") |
| raise ValueError(ERROR_ALREADY_CHECKED_OUT) |
| |
| |
| check_in_time_ms = record['check_in_time'] |
| check_out_time_ms = timestamp_ms |
| total_minutes = int((check_out_time_ms - check_in_time_ms) / (1000 * 60)) |
| |
| |
| if total_minutes < 0: |
| logger.error(f"Negative working time calculated: {total_minutes} minutes") |
| raise ValueError("Check-out time cannot be before check-in time") |
| |
| |
| update_query = """ |
| UPDATE trans.scm_attendance |
| SET |
| check_out_time = $1, |
| check_out_lat = $2, |
| check_out_lon = $3, |
| total_minutes = $4, |
| updated_at = NOW() |
| WHERE id = $5 |
| RETURNING id |
| """ |
| |
| result = await conn.fetchval( |
| update_query, |
| timestamp_ms, |
| payload.latitude, |
| payload.longitude, |
| total_minutes, |
| record['id'] |
| ) |
| |
| logger.info( |
| f"Check-out created successfully", |
| extra={ |
| "attendance_id": str(result), |
| "employee_id": employee_id, |
| "merchant_id": merchant_id, |
| "work_date": str(work_date), |
| "timestamp_ms": timestamp_ms, |
| "total_minutes": total_minutes |
| } |
| ) |
| |
| return CheckOutResponse( |
| success=True, |
| total_minutes=total_minutes, |
| message=SUCCESS_CHECKOUT |
| ) |
| |
| except ValueError: |
| |
| raise |
| except Exception as e: |
| logger.error(f"Error creating check-out: {e}", exc_info=e) |
| raise |
| finally: |
| if conn: |
| await release_postgres_connection(conn) |
|
|
| async def get_daily_attendance( |
| self, |
| employee_id: str, |
| work_date: date |
| ) -> Optional[Dict[str, Any]]: |
| """ |
| Get daily attendance summary for an employee. |
| |
| Args: |
| employee_id: Employee UUID (as string) |
| work_date: Date to retrieve attendance for |
| |
| Returns: |
| Dictionary with attendance summary or None if no record found |
| { |
| 'date': str, |
| 'on_duty': bool, |
| 'check_in_time': int or None, |
| 'check_out_time': int or None, |
| 'total_hours': float |
| } |
| """ |
| conn = None |
| try: |
| conn = await get_postgres_connection() |
|
|
| query = """ |
| SELECT |
| work_date, |
| check_in_time, |
| check_out_time, |
| total_minutes |
| FROM trans.scm_attendance |
| WHERE employee_id = $1::uuid AND work_date = $2 |
| """ |
|
|
| row = await conn.fetchrow(query, employee_id, work_date) |
|
|
| if not row: |
| return None |
|
|
| |
| on_duty = row['check_in_time'] is not None and row['check_out_time'] is None |
|
|
| |
| total_hours = 0.0 |
| if row['total_minutes'] is not None: |
| total_hours = round(row['total_minutes'] / 60.0, 2) |
|
|
| return { |
| 'date': row['work_date'].strftime("%Y-%m-%d"), |
| 'on_duty': on_duty, |
| 'check_in_time': row['check_in_time'], |
| 'check_out_time': row['check_out_time'], |
| 'total_hours': total_hours |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error fetching daily attendance: {e}", exc_info=e) |
| raise |
| finally: |
| if conn: |
| await release_postgres_connection(conn) |
|
|
|
|
|
|
| def get_attendance_service(mongo_db: AsyncIOMotorDatabase) -> AttendanceService: |
| """Factory function to create AttendanceService instance""" |
| return AttendanceService(mongo_db) |
|
|