MukeshKapoor25 commited on
Commit
edd1bb3
·
1 Parent(s): 0f64783

feat(staff): Add dependency injection and enforce merchant_id from authentication token

Browse files

- Add AsyncIOMotorDatabase and AsyncSession dependency injection to all staff endpoints
- Inject get_database and get_postgres_session dependencies into router handlers
- Refactor StaffService to accept db and pg_session as constructor parameters
- Enforce merchant_id from authentication token in create_staff endpoint
- Add merchant_id validation to get_staff_by_code and get_staff_by_id endpoints
- Add merchant_id enforcement to update_staff endpoint with payload sanitization
- Update list_staff to use service instance with injected dependencies
- Add security checks to prevent merchant_id tampering across all endpoints
- Update docstrings to clarify merchant_id is taken from token and cannot be overridden
- Refactor sync service to support dependency injection pattern
- Update staff schema to align with merchant access control requirements

app/staff/controllers/router.py CHANGED
@@ -5,6 +5,8 @@ Consolidated POS staff management.
5
  from typing import Optional, List
6
  from fastapi import APIRouter, HTTPException, Query, status, Header, Depends
7
  import logging
 
 
8
 
9
  from app.dependencies.auth import TokenUser
10
  from app.dependencies.pos_permissions import require_pos_permission
@@ -17,6 +19,8 @@ from app.staff.schemas.staff_schema import (
17
  StaffListResponse
18
  )
19
  from app.staff.services.staff_service import StaffService
 
 
20
 
21
  logger = logging.getLogger(__name__)
22
 
@@ -35,13 +39,14 @@ router = APIRouter(
35
  )
36
  async def create_staff(
37
  payload: StaffCreateSchema,
38
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "create"))
 
 
39
  ) -> StaffResponseSchema:
40
  """
41
  Create a new staff member.
42
 
43
  **Required fields:**
44
- - merchant_id: Store/merchant identifier (optional - taken from token if not provided)
45
  - name: Full name
46
  - role: Position (e.g., Stylist, Cashier, Manager)
47
  - contact: Phone and email
@@ -52,17 +57,20 @@ async def create_staff(
52
  - working_hours: Weekly schedule
53
  - photo_url: Profile photo URL (HTTPS only)
54
  - notes: Additional information
 
 
 
55
  """
56
- # Use merchant_id from token if not provided in request
57
- merchant_id = payload.merchant_id
58
- if merchant_id is None:
59
- if not current_user.merchant_id:
60
- raise HTTPException(status_code=400, detail="merchant_id must be provided in request or token")
61
- merchant_id = current_user.merchant_id
62
 
63
- # Update payload with resolved merchant_id
64
- payload.merchant_id = merchant_id
65
- return await StaffService.create_staff(payload)
 
 
 
66
 
67
 
68
  @router.post(
@@ -72,7 +80,9 @@ async def create_staff(
72
  )
73
  async def list_staff(
74
  req: StaffListRequest,
75
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view"))
 
 
76
  ):
77
  """
78
  List staff members with optional filters, pagination, and projection support.
@@ -88,7 +98,9 @@ async def list_staff(
88
  if not current_user.merchant_id:
89
  raise HTTPException(status_code=400, detail="merchant_id must be available in token")
90
 
91
- staff_list, total = await StaffService.list_staff(
 
 
92
  merchant_id=current_user.merchant_id,
93
  designation=req.designation,
94
  manager_id=req.manager_id,
@@ -124,12 +136,21 @@ async def list_staff(
124
  )
125
  async def get_staff_by_code(
126
  staff_code: str,
127
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view"))
 
 
128
  ) -> StaffResponseSchema:
129
  """
130
  Get staff member by their unique staff code (case-insensitive).
 
131
  """
132
- staff = await StaffService.get_staff_by_code(staff_code)
 
 
 
 
 
 
133
  if not staff:
134
  raise HTTPException(
135
  status_code=status.HTTP_404_NOT_FOUND,
@@ -145,12 +166,21 @@ async def get_staff_by_code(
145
  )
146
  async def get_staff(
147
  staff_id: str,
148
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view"))
 
 
149
  ) -> StaffResponseSchema:
150
  """
151
  Get detailed information about a specific staff member by ID.
 
152
  """
153
- staff = await StaffService.get_staff_by_id(staff_id)
 
 
 
 
 
 
154
  if not staff:
155
  raise HTTPException(
156
  status_code=status.HTTP_404_NOT_FOUND,
@@ -168,19 +198,36 @@ async def update_staff(
168
  staff_id: str,
169
  payload: StaffUpdateSchema,
170
  x_user_id: Optional[str] = Header(None, description="User ID making the update"),
171
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "update"))
 
 
172
  ) -> StaffResponseSchema:
173
  """
174
  Update staff member information.
175
 
176
  All fields are optional - only provided fields will be updated.
 
177
 
178
  **Validations:**
 
179
  - Email uniqueness (if changing email)
180
  - Phone uniqueness (if changing phone)
181
  - Manager validation (if changing manager)
 
 
 
182
  """
183
- return await StaffService.update_staff(staff_id, payload, x_user_id)
 
 
 
 
 
 
 
 
 
 
184
 
185
 
186
  @router.patch(
@@ -192,12 +239,15 @@ async def update_staff_status(
192
  staff_id: str,
193
  new_status: str,
194
  x_user_id: Optional[str] = Header(None, description="User ID making the update"),
195
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "update"))
 
 
196
  ) -> StaffResponseSchema:
197
  """
198
  Update staff status.
199
 
200
  Convenience endpoint for status changes (e.g., onboarding → active, active → suspended).
 
201
 
202
  **Status Transitions:**
203
  - onboarding → active (activation)
@@ -205,8 +255,14 @@ async def update_staff_status(
205
  - active → suspended (disciplinary)
206
  - active/inactive/suspended → terminated (termination)
207
  """
 
 
 
 
208
  update_payload = StaffUpdateSchema(status=new_status)
209
- return await StaffService.update_staff(staff_id, update_payload, x_user_id)
 
 
210
 
211
 
212
  @router.delete(
@@ -216,11 +272,20 @@ async def update_staff_status(
216
  async def delete_staff(
217
  staff_id: str,
218
  x_user_id: Optional[str] = Header(None, description="User ID performing the deletion"),
219
- current_user: TokenUser = Depends(require_pos_permission("retail_staff", "delete"))
 
 
220
  ):
221
  """
222
  Delete a staff member (soft delete - sets status to inactive/terminated).
 
223
 
224
  Cannot delete staff with active direct reports.
225
  """
226
- return await StaffService.delete_staff(staff_id, x_user_id)
 
 
 
 
 
 
 
5
  from typing import Optional, List
6
  from fastapi import APIRouter, HTTPException, Query, status, Header, Depends
7
  import logging
8
+ from motor.core import AgnosticDatabase as AsyncIOMotorDatabase
9
+ from sqlalchemy.ext.asyncio import AsyncSession
10
 
11
  from app.dependencies.auth import TokenUser
12
  from app.dependencies.pos_permissions import require_pos_permission
 
19
  StaffListResponse
20
  )
21
  from app.staff.services.staff_service import StaffService
22
+ from app.nosql import get_database
23
+ from app.sql import get_postgres_session
24
 
25
  logger = logging.getLogger(__name__)
26
 
 
39
  )
40
  async def create_staff(
41
  payload: StaffCreateSchema,
42
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "create")),
43
+ db: AsyncIOMotorDatabase = Depends(get_database),
44
+ pg_session: AsyncSession = Depends(get_postgres_session)
45
  ) -> StaffResponseSchema:
46
  """
47
  Create a new staff member.
48
 
49
  **Required fields:**
 
50
  - name: Full name
51
  - role: Position (e.g., Stylist, Cashier, Manager)
52
  - contact: Phone and email
 
57
  - working_hours: Weekly schedule
58
  - photo_url: Profile photo URL (HTTPS only)
59
  - notes: Additional information
60
+
61
+ **Security Note:**
62
+ merchant_id is automatically taken from authentication token and cannot be overridden.
63
  """
64
+ # SECURITY: Always use merchant_id from token, ignore any UI-provided value
65
+ if not current_user.merchant_id:
66
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
 
 
 
67
 
68
+ # Force merchant_id from token (ignore any UI-provided value)
69
+ payload.merchant_id = current_user.merchant_id
70
+
71
+ # Create service instance with dependencies
72
+ service = StaffService(db, pg_session)
73
+ return await service.create_staff(payload)
74
 
75
 
76
  @router.post(
 
80
  )
81
  async def list_staff(
82
  req: StaffListRequest,
83
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view")),
84
+ db: AsyncIOMotorDatabase = Depends(get_database),
85
+ pg_session: AsyncSession = Depends(get_postgres_session)
86
  ):
87
  """
88
  List staff members with optional filters, pagination, and projection support.
 
98
  if not current_user.merchant_id:
99
  raise HTTPException(status_code=400, detail="merchant_id must be available in token")
100
 
101
+ # Create service instance with dependencies
102
+ service = StaffService(db, pg_session)
103
+ staff_list, total = await service.list_staff(
104
  merchant_id=current_user.merchant_id,
105
  designation=req.designation,
106
  manager_id=req.manager_id,
 
136
  )
137
  async def get_staff_by_code(
138
  staff_code: str,
139
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view")),
140
+ db: AsyncIOMotorDatabase = Depends(get_database),
141
+ pg_session: AsyncSession = Depends(get_postgres_session)
142
  ) -> StaffResponseSchema:
143
  """
144
  Get staff member by their unique staff code (case-insensitive).
145
+ Only returns staff belonging to the authenticated merchant.
146
  """
147
+ # SECURITY: Ensure merchant_id is available in token
148
+ if not current_user.merchant_id:
149
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
150
+
151
+ # Create service instance with dependencies
152
+ service = StaffService(db, pg_session)
153
+ staff = await service.get_staff_by_code(staff_code, current_user.merchant_id)
154
  if not staff:
155
  raise HTTPException(
156
  status_code=status.HTTP_404_NOT_FOUND,
 
166
  )
167
  async def get_staff(
168
  staff_id: str,
169
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "view")),
170
+ db: AsyncIOMotorDatabase = Depends(get_database),
171
+ pg_session: AsyncSession = Depends(get_postgres_session)
172
  ) -> StaffResponseSchema:
173
  """
174
  Get detailed information about a specific staff member by ID.
175
+ Only returns staff belonging to the authenticated merchant.
176
  """
177
+ # SECURITY: Ensure merchant_id is available in token
178
+ if not current_user.merchant_id:
179
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
180
+
181
+ # Create service instance with dependencies
182
+ service = StaffService(db, pg_session)
183
+ staff = await service.get_staff_by_id(staff_id, current_user.merchant_id)
184
  if not staff:
185
  raise HTTPException(
186
  status_code=status.HTTP_404_NOT_FOUND,
 
198
  staff_id: str,
199
  payload: StaffUpdateSchema,
200
  x_user_id: Optional[str] = Header(None, description="User ID making the update"),
201
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "update")),
202
+ db: AsyncIOMotorDatabase = Depends(get_database),
203
+ pg_session: AsyncSession = Depends(get_postgres_session)
204
  ) -> StaffResponseSchema:
205
  """
206
  Update staff member information.
207
 
208
  All fields are optional - only provided fields will be updated.
209
+ Only allows updating staff belonging to the authenticated merchant.
210
 
211
  **Validations:**
212
+ - Staff must belong to authenticated merchant
213
  - Email uniqueness (if changing email)
214
  - Phone uniqueness (if changing phone)
215
  - Manager validation (if changing manager)
216
+
217
+ **Security Note:**
218
+ merchant_id cannot be changed and is enforced from authentication token.
219
  """
220
+ # SECURITY: Ensure merchant_id is available in token
221
+ if not current_user.merchant_id:
222
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
223
+
224
+ # SECURITY: Remove any merchant_id from payload to prevent tampering
225
+ if hasattr(payload, 'merchant_id'):
226
+ payload.merchant_id = None
227
+
228
+ # Create service instance with dependencies
229
+ service = StaffService(db, pg_session)
230
+ return await service.update_staff(staff_id, payload, current_user.merchant_id, x_user_id)
231
 
232
 
233
  @router.patch(
 
239
  staff_id: str,
240
  new_status: str,
241
  x_user_id: Optional[str] = Header(None, description="User ID making the update"),
242
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "update")),
243
+ db: AsyncIOMotorDatabase = Depends(get_database),
244
+ pg_session: AsyncSession = Depends(get_postgres_session)
245
  ) -> StaffResponseSchema:
246
  """
247
  Update staff status.
248
 
249
  Convenience endpoint for status changes (e.g., onboarding → active, active → suspended).
250
+ Only allows updating staff belonging to the authenticated merchant.
251
 
252
  **Status Transitions:**
253
  - onboarding → active (activation)
 
255
  - active → suspended (disciplinary)
256
  - active/inactive/suspended → terminated (termination)
257
  """
258
+ # SECURITY: Ensure merchant_id is available in token
259
+ if not current_user.merchant_id:
260
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
261
+
262
  update_payload = StaffUpdateSchema(status=new_status)
263
+ # Create service instance with dependencies
264
+ service = StaffService(db, pg_session)
265
+ return await service.update_staff(staff_id, update_payload, current_user.merchant_id, x_user_id)
266
 
267
 
268
  @router.delete(
 
272
  async def delete_staff(
273
  staff_id: str,
274
  x_user_id: Optional[str] = Header(None, description="User ID performing the deletion"),
275
+ current_user: TokenUser = Depends(require_pos_permission("retail_staff", "delete")),
276
+ db: AsyncIOMotorDatabase = Depends(get_database),
277
+ pg_session: AsyncSession = Depends(get_postgres_session)
278
  ):
279
  """
280
  Delete a staff member (soft delete - sets status to inactive/terminated).
281
+ Only allows deleting staff belonging to the authenticated merchant.
282
 
283
  Cannot delete staff with active direct reports.
284
  """
285
+ # SECURITY: Ensure merchant_id is available in token
286
+ if not current_user.merchant_id:
287
+ raise HTTPException(status_code=400, detail="merchant_id must be available in authentication token")
288
+
289
+ # Create service instance with dependencies
290
+ service = StaffService(db, pg_session)
291
+ return await service.delete_staff(staff_id, current_user.merchant_id, x_user_id)
app/staff/schemas/staff_schema.py CHANGED
@@ -35,7 +35,7 @@ class WorkingHoursSchema(BaseModel):
35
 
36
  class StaffCreateSchema(BaseModel):
37
  """Schema for creating a new staff member."""
38
- merchant_id: Optional[str] = Field(None, description="Merchant/store identifier", min_length=3, max_length=50)
39
  name: str = Field(..., description="Staff member name", min_length=1, max_length=100)
40
  role: str = Field(..., description="Role/position", min_length=1, max_length=50)
41
  staff_code: Optional[str] = Field(None, description="Unique staff code (optional)", max_length=20)
@@ -46,6 +46,9 @@ class StaffCreateSchema(BaseModel):
46
  photo_url: Optional[str] = Field(None, description="Profile photo URL")
47
  notes: Optional[str] = Field(None, description="Additional notes", max_length=500)
48
 
 
 
 
49
  @field_validator('photo_url')
50
  @classmethod
51
  def validate_photo_url(cls, v):
 
35
 
36
  class StaffCreateSchema(BaseModel):
37
  """Schema for creating a new staff member."""
38
+ # NOTE: merchant_id is automatically set from authentication token and cannot be provided via API
39
  name: str = Field(..., description="Staff member name", min_length=1, max_length=100)
40
  role: str = Field(..., description="Role/position", min_length=1, max_length=50)
41
  staff_code: Optional[str] = Field(None, description="Unique staff code (optional)", max_length=20)
 
46
  photo_url: Optional[str] = Field(None, description="Profile photo URL")
47
  notes: Optional[str] = Field(None, description="Additional notes", max_length=500)
48
 
49
+ # Internal field set by service layer from token
50
+ merchant_id: Optional[str] = Field(None, exclude=True, description="Internal field - set from authentication token")
51
+
52
  @field_validator('photo_url')
53
  @classmethod
54
  def validate_photo_url(cls, v):
app/staff/services/staff_service.py CHANGED
@@ -8,54 +8,17 @@ from fastapi import HTTPException
8
  from fastapi import status as http_status
9
  import logging
10
  import secrets
11
- from sqlalchemy import text
12
 
13
  from app.nosql import get_database
14
- from app.sql import get_postgres_session
15
  from app.constants.collections import POS_STAFF_COLLECTION
16
  from app.staff.models.staff_model import StaffModel
17
  from app.staff.schemas.staff_schema import StaffCreateSchema, StaffUpdateSchema, StaffResponseSchema
 
18
 
19
  logger = logging.getLogger(__name__)
20
 
21
 
22
-
23
-
24
- async def sync_staff_to_postgres(staff_id: str, merchant_id: str, staff_name: str):
25
- """
26
- Sync staff data to PostgreSQL trans.pos_staff_ref table.
27
-
28
- Args:
29
- staff_id: Staff UUID
30
- merchant_id: Merchant UUID
31
- staff_name: Staff member name
32
- """
33
- try:
34
- async with get_postgres_session() as session:
35
- if session is None:
36
- logger.warning("PostgreSQL not available, skipping staff sync")
37
- return
38
-
39
- query = text("""
40
- INSERT INTO trans.pos_staff_ref (staff_id, merchant_id, staff_name, created_at, updated_at)
41
- VALUES (:staff_id, :merchant_id, :staff_name, NOW(), NOW())
42
- ON CONFLICT (staff_id)
43
- DO UPDATE SET
44
- staff_name = EXCLUDED.staff_name,
45
- updated_at = NOW()
46
- """)
47
-
48
- await session.execute(query, {
49
- "staff_id": staff_id,
50
- "merchant_id": merchant_id,
51
- "staff_name": staff_name
52
- })
53
- await session.commit()
54
- logger.info(f"Synced staff {staff_id} to trans.pos_staff_ref")
55
-
56
- except Exception as e:
57
- logger.error(f"Failed to sync staff {staff_id} to PostgreSQL: {e}")
58
- # Don't raise - PostgreSQL sync is secondary to MongoDB
59
  def generate_staff_id() -> str:
60
  """Generate a unique staff ID."""
61
  return f"staff_{secrets.token_urlsafe(16)}"
@@ -63,9 +26,14 @@ def generate_staff_id() -> str:
63
 
64
  class StaffService:
65
  """Service class for staff operations."""
 
 
 
 
 
 
66
 
67
- @staticmethod
68
- async def create_staff(payload: StaffCreateSchema) -> StaffResponseSchema:
69
  """
70
  Create a new staff member.
71
 
@@ -86,15 +54,30 @@ class StaffService:
86
  staff_data["created_at"] = now
87
  staff_data["updated_at"] = now
88
 
89
- # Insert into database
90
- await get_database()[POS_STAFF_COLLECTION].insert_one(staff_data)
91
 
92
- # Sync to PostgreSQL trans.pos_staff_ref
93
- await sync_staff_to_postgres(
94
- staff_id=staff_id,
95
- merchant_id=payload.merchant_id,
96
- staff_name=payload.name
97
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
 
99
  logger.info(f"Created staff {staff_id} for merchant {payload.merchant_id}")
100
 
@@ -108,11 +91,20 @@ class StaffService:
108
  detail=f"Error creating staff: {str(e)}"
109
  )
110
 
111
- @staticmethod
112
- async def get_staff_by_id(staff_id: str) -> Optional[StaffResponseSchema]:
113
- """Get staff by ID."""
 
 
 
 
 
114
  try:
115
- staff = await get_database()[POS_STAFF_COLLECTION].find_one({"staff_id": staff_id})
 
 
 
 
116
  if not staff:
117
  return None
118
 
@@ -159,11 +151,20 @@ class StaffService:
159
  detail="Error retrieving staff"
160
  )
161
 
162
- @staticmethod
163
- async def get_staff_by_code(staff_code: str) -> Optional[StaffResponseSchema]:
164
- """Get staff by staff code."""
 
 
 
 
 
165
  try:
166
- staff = await get_database()[POS_STAFF_COLLECTION].find_one({"staff_code": staff_code.upper()})
 
 
 
 
167
  if not staff:
168
  return None
169
 
@@ -210,24 +211,27 @@ class StaffService:
210
  detail="Error retrieving staff"
211
  )
212
 
213
- @staticmethod
214
  async def update_staff(
 
215
  staff_id: str,
216
  payload: StaffUpdateSchema,
 
217
  x_user_id: Optional[str] = None
218
  ) -> StaffResponseSchema:
219
  """
220
- Update staff information.
221
 
222
  Args:
223
  staff_id: Staff ID to update
224
  payload: Update data
 
 
225
 
226
  Returns:
227
  Updated staff response
228
  """
229
- # Check staff exists
230
- existing = await StaffService.get_staff_by_id(staff_id)
231
  if not existing:
232
  raise HTTPException(
233
  status_code=http_status.HTTP_404_NOT_FOUND,
@@ -246,20 +250,33 @@ class StaffService:
246
  update_data["updated_at"] = datetime.utcnow()
247
 
248
  try:
249
- # Update in database
250
- result = await get_database()[POS_STAFF_COLLECTION].update_one(
251
- {"staff_id": staff_id},
252
  {"$set": update_data}
253
  )
254
- updated_staff = await StaffService.get_staff_by_id(staff_id)
255
 
256
- # Sync to PostgreSQL if name was updated
257
- if updated_staff and "name" in update_data:
258
- await sync_staff_to_postgres(
259
- staff_id=staff_id,
260
- merchant_id=updated_staff.merchant_id,
261
- staff_name=updated_staff.name
262
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
  if result.modified_count == 0:
265
  logger.warning(f"No changes made to staff {staff_id}")
@@ -275,8 +292,8 @@ class StaffService:
275
  detail=f"Error updating staff: {str(e)}"
276
  )
277
 
278
- @staticmethod
279
  async def list_staff(
 
280
  merchant_id: Optional[str] = None,
281
  designation: Optional[str] = None,
282
  manager_id: Optional[str] = None,
@@ -327,10 +344,10 @@ class StaffService:
327
  projection_dict["_id"] = 0
328
 
329
  # Get total count
330
- total = await get_database()[POS_STAFF_COLLECTION].count_documents(query)
331
 
332
  # Fetch staff
333
- cursor = get_database()[POS_STAFF_COLLECTION].find(query, projection_dict).skip(skip).limit(limit)
334
  staff_list = await cursor.to_list(length=limit)
335
 
336
  # Return raw dict if projection used, model otherwise
@@ -384,9 +401,10 @@ class StaffService:
384
  detail="Error listing staff"
385
  )
386
 
387
- @staticmethod
388
  async def delete_staff(
 
389
  staff_id: str,
 
390
  x_user_id: Optional[str] = None
391
  ) -> Dict[str, str]:
392
  """
@@ -394,12 +412,14 @@ class StaffService:
394
 
395
  Args:
396
  staff_id: Staff ID to delete
 
 
397
 
398
  Returns:
399
  Success message
400
  """
401
- # Check staff exists
402
- existing = await StaffService.get_staff_by_id(staff_id)
403
  if not existing:
404
  raise HTTPException(
405
  status_code=http_status.HTTP_404_NOT_FOUND,
@@ -407,9 +427,9 @@ class StaffService:
407
  )
408
 
409
  try:
410
- # Soft delete - set status to inactive
411
- await get_database()[POS_STAFF_COLLECTION].update_one(
412
- {"staff_id": staff_id},
413
  {
414
  "$set": {
415
  "status": "inactive",
@@ -418,6 +438,15 @@ class StaffService:
418
  }
419
  )
420
 
 
 
 
 
 
 
 
 
 
421
  logger.info(f"Deleted staff {staff_id}")
422
  return {"message": f"Staff {staff_id} deleted successfully"}
423
 
 
8
  from fastapi import status as http_status
9
  import logging
10
  import secrets
11
+ from sqlalchemy.ext.asyncio import AsyncSession
12
 
13
  from app.nosql import get_database
 
14
  from app.constants.collections import POS_STAFF_COLLECTION
15
  from app.staff.models.staff_model import StaffModel
16
  from app.staff.schemas.staff_schema import StaffCreateSchema, StaffUpdateSchema, StaffResponseSchema
17
+ from app.sync.staff.sync_service import StaffSyncService
18
 
19
  logger = logging.getLogger(__name__)
20
 
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  def generate_staff_id() -> str:
23
  """Generate a unique staff ID."""
24
  return f"staff_{secrets.token_urlsafe(16)}"
 
26
 
27
  class StaffService:
28
  """Service class for staff operations."""
29
+
30
+ def __init__(self, db, pg_session: AsyncSession = None):
31
+ self.db = db
32
+ self.collection = db[POS_STAFF_COLLECTION]
33
+ self.pg_session = pg_session
34
+ self.sync_service = StaffSyncService(pg_session) if pg_session else None
35
 
36
+ async def create_staff(self, payload: StaffCreateSchema) -> StaffResponseSchema:
 
37
  """
38
  Create a new staff member.
39
 
 
54
  staff_data["created_at"] = now
55
  staff_data["updated_at"] = now
56
 
57
+ # Insert into MongoDB
58
+ await self.collection.insert_one(staff_data)
59
 
60
+ # Sync to PostgreSQL trans.pos_staff_ref if sync service is available
61
+ if self.sync_service:
62
+ try:
63
+ # Prepare sync data
64
+ sync_data = {
65
+ "staff_id": staff_id,
66
+ "merchant_id": payload.merchant_id,
67
+ "name": payload.name,
68
+ "phone": payload.contact.phone if payload.contact else None,
69
+ "email": payload.contact.email if payload.contact else None,
70
+ "role": payload.role,
71
+ "status": payload.status if hasattr(payload, 'status') else "active",
72
+ "created_at": now,
73
+ "updated_at": now
74
+ }
75
+
76
+ await self.sync_service.create_staff_ref(sync_data)
77
+ logger.info(f"Staff synced to PostgreSQL: {staff_id}")
78
+ except Exception as sync_error:
79
+ logger.error(f"Failed to sync staff to PostgreSQL: {sync_error}")
80
+ # Don't raise - PostgreSQL sync is secondary to MongoDB
81
 
82
  logger.info(f"Created staff {staff_id} for merchant {payload.merchant_id}")
83
 
 
91
  detail=f"Error creating staff: {str(e)}"
92
  )
93
 
94
+ async def get_staff_by_id(self, staff_id: str, merchant_id: str) -> Optional[StaffResponseSchema]:
95
+ """
96
+ Get staff by ID, ensuring it belongs to the specified merchant.
97
+
98
+ Args:
99
+ staff_id: Staff ID to retrieve
100
+ merchant_id: Merchant ID from authentication token
101
+ """
102
  try:
103
+ # SECURITY: Query with both staff_id AND merchant_id
104
+ staff = await self.collection.find_one({
105
+ "staff_id": staff_id,
106
+ "merchant_id": merchant_id
107
+ })
108
  if not staff:
109
  return None
110
 
 
151
  detail="Error retrieving staff"
152
  )
153
 
154
+ async def get_staff_by_code(self, staff_code: str, merchant_id: str) -> Optional[StaffResponseSchema]:
155
+ """
156
+ Get staff by staff code, ensuring it belongs to the specified merchant.
157
+
158
+ Args:
159
+ staff_code: Staff code to search for
160
+ merchant_id: Merchant ID from authentication token
161
+ """
162
  try:
163
+ # SECURITY: Query with both staff_code AND merchant_id
164
+ staff = await self.collection.find_one({
165
+ "staff_code": staff_code.upper(),
166
+ "merchant_id": merchant_id
167
+ })
168
  if not staff:
169
  return None
170
 
 
211
  detail="Error retrieving staff"
212
  )
213
 
 
214
  async def update_staff(
215
+ self,
216
  staff_id: str,
217
  payload: StaffUpdateSchema,
218
+ merchant_id: str,
219
  x_user_id: Optional[str] = None
220
  ) -> StaffResponseSchema:
221
  """
222
+ Update staff information, ensuring it belongs to the specified merchant.
223
 
224
  Args:
225
  staff_id: Staff ID to update
226
  payload: Update data
227
+ merchant_id: Merchant ID from authentication token
228
+ x_user_id: User ID making the update
229
 
230
  Returns:
231
  Updated staff response
232
  """
233
+ # SECURITY: Check staff exists and belongs to merchant
234
+ existing = await self.get_staff_by_id(staff_id, merchant_id)
235
  if not existing:
236
  raise HTTPException(
237
  status_code=http_status.HTTP_404_NOT_FOUND,
 
250
  update_data["updated_at"] = datetime.utcnow()
251
 
252
  try:
253
+ # SECURITY: Update only for the specific merchant
254
+ result = await self.collection.update_one(
255
+ {"staff_id": staff_id, "merchant_id": merchant_id},
256
  {"$set": update_data}
257
  )
258
+ updated_staff = await self.get_staff_by_id(staff_id, merchant_id)
259
 
260
+ # Sync to PostgreSQL if sync service is available and relevant fields were updated
261
+ if self.sync_service and updated_staff and any(field in update_data for field in ["name", "contact", "role", "status"]):
262
+ try:
263
+ # Prepare sync data
264
+ sync_data = {
265
+ "staff_id": staff_id,
266
+ "merchant_id": updated_staff.merchant_id,
267
+ "name": updated_staff.name,
268
+ "phone": updated_staff.contact.phone if updated_staff.contact else None,
269
+ "email": updated_staff.contact.email if updated_staff.contact else None,
270
+ "role": updated_staff.role,
271
+ "status": updated_staff.status,
272
+ "updated_at": datetime.utcnow()
273
+ }
274
+
275
+ await self.sync_service.update_staff_ref(staff_id, sync_data)
276
+ logger.info(f"Staff update synced to PostgreSQL: {staff_id}")
277
+ except Exception as sync_error:
278
+ logger.error(f"Failed to sync staff update to PostgreSQL: {sync_error}")
279
+ # Don't raise - PostgreSQL sync is secondary to MongoDB
280
 
281
  if result.modified_count == 0:
282
  logger.warning(f"No changes made to staff {staff_id}")
 
292
  detail=f"Error updating staff: {str(e)}"
293
  )
294
 
 
295
  async def list_staff(
296
+ self,
297
  merchant_id: Optional[str] = None,
298
  designation: Optional[str] = None,
299
  manager_id: Optional[str] = None,
 
344
  projection_dict["_id"] = 0
345
 
346
  # Get total count
347
+ total = await self.collection.count_documents(query)
348
 
349
  # Fetch staff
350
+ cursor = self.collection.find(query, projection_dict).skip(skip).limit(limit)
351
  staff_list = await cursor.to_list(length=limit)
352
 
353
  # Return raw dict if projection used, model otherwise
 
401
  detail="Error listing staff"
402
  )
403
 
 
404
  async def delete_staff(
405
+ self,
406
  staff_id: str,
407
+ merchant_id: str,
408
  x_user_id: Optional[str] = None
409
  ) -> Dict[str, str]:
410
  """
 
412
 
413
  Args:
414
  staff_id: Staff ID to delete
415
+ merchant_id: Merchant ID from authentication token
416
+ x_user_id: User ID performing the deletion
417
 
418
  Returns:
419
  Success message
420
  """
421
+ # SECURITY: Check staff exists and belongs to merchant
422
+ existing = await self.get_staff_by_id(staff_id, merchant_id)
423
  if not existing:
424
  raise HTTPException(
425
  status_code=http_status.HTTP_404_NOT_FOUND,
 
427
  )
428
 
429
  try:
430
+ # SECURITY: Soft delete only for the specific merchant
431
+ await self.collection.update_one(
432
+ {"staff_id": staff_id, "merchant_id": merchant_id},
433
  {
434
  "$set": {
435
  "status": "inactive",
 
438
  }
439
  )
440
 
441
+ # Sync status to PostgreSQL if sync service is available
442
+ if self.sync_service:
443
+ try:
444
+ await self.sync_service.sync_staff_status(staff_id, "inactive")
445
+ logger.info(f"Staff deletion synced to PostgreSQL: {staff_id}")
446
+ except Exception as sync_error:
447
+ logger.error(f"Failed to sync staff deletion to PostgreSQL: {sync_error}")
448
+ # Don't raise - PostgreSQL sync is secondary to MongoDB
449
+
450
  logger.info(f"Deleted staff {staff_id}")
451
  return {"message": f"Staff {staff_id} deleted successfully"}
452
 
app/sync/models.py CHANGED
@@ -27,7 +27,7 @@ class CustomerRef(Base):
27
 
28
  class StaffRef(Base):
29
  """PostgreSQL reference table for staff"""
30
- __tablename__ = "staff_ref"
31
  __table_args__ = {"schema": "trans"}
32
 
33
  staff_id = Column(String, primary_key=True)
 
27
 
28
  class StaffRef(Base):
29
  """PostgreSQL reference table for staff"""
30
+ __tablename__ = "pos_staff_ref"
31
  __table_args__ = {"schema": "trans"}
32
 
33
  staff_id = Column(String, primary_key=True)
app/sync/staff/sync_service.py CHANGED
@@ -30,7 +30,7 @@ class StaffSyncService:
30
  SELECT 1
31
  FROM information_schema.tables
32
  WHERE table_schema = 'trans'
33
- AND table_name = 'staff_ref'
34
  LIMIT 1
35
  """)
36
  result = await self.pg_session.execute(check_query)
@@ -39,7 +39,7 @@ class StaffSyncService:
39
  if not exists:
40
  # Create table
41
  create_table_sql = """
42
- CREATE TABLE trans.staff_ref (
43
  staff_id VARCHAR PRIMARY KEY,
44
  merchant_id VARCHAR NOT NULL,
45
  name VARCHAR(150) NOT NULL,
@@ -52,12 +52,12 @@ class StaffSyncService:
52
  updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
53
  );
54
 
55
- CREATE INDEX idx_staff_ref_merchant_id ON trans.staff_ref(merchant_id);
56
- CREATE INDEX idx_staff_ref_status ON trans.staff_ref(status);
57
  """
58
  await self.pg_session.execute(text(create_table_sql))
59
  await self.pg_session.commit()
60
- logger.info("✅ Created staff_ref table in trans schema")
61
 
62
  except Exception as e:
63
  logger.warning(f"Schema/table creation failed (may already exist): {e}")
 
30
  SELECT 1
31
  FROM information_schema.tables
32
  WHERE table_schema = 'trans'
33
+ AND table_name = 'pos_staff_ref'
34
  LIMIT 1
35
  """)
36
  result = await self.pg_session.execute(check_query)
 
39
  if not exists:
40
  # Create table
41
  create_table_sql = """
42
+ CREATE TABLE trans.pos_staff_ref (
43
  staff_id VARCHAR PRIMARY KEY,
44
  merchant_id VARCHAR NOT NULL,
45
  name VARCHAR(150) NOT NULL,
 
52
  updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
53
  );
54
 
55
+ CREATE INDEX idx_pos_staff_ref_merchant_id ON trans.pos_staff_ref(merchant_id);
56
+ CREATE INDEX idx_pos_staff_ref_status ON trans.pos_staff_ref(status);
57
  """
58
  await self.pg_session.execute(text(create_table_sql))
59
  await self.pg_session.commit()
60
+ logger.info("✅ Created pos_staff_ref table in trans schema")
61
 
62
  except Exception as e:
63
  logger.warning(f"Schema/table creation failed (may already exist): {e}")