MukeshKapoor25 commited on
Commit
8d1fdf5
Β·
1 Parent(s): 7c601e8

feat(sync): Implement warehouse synchronization from MongoDB to PostgreSQL

Browse files

- Added WarehouseSyncService for managing async sync operations.
- Created WarehouseSyncHandler for handling warehouse data fetching and validation.
- Introduced models for field mapping and required fields for warehouse data.
- Developed service methods for upserting warehouse data into PostgreSQL.
- Implemented async task queue and background workers for processing sync operations.
- Updated warehouse service to trigger async sync on create, update, and delete operations.
- Created SQL script for the new scm_warehouse_ref table in PostgreSQL.
- Added migration scripts for updating existing warehouse documents in MongoDB.
- Implemented tests for verifying warehouse sync functionality.
- Updated documentation to reflect changes in MongoDB collection naming and sync operations.

app/sync/warehouses/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ """Warehouse sync package."""
2
+ from .service import WarehouseSyncService
3
+
4
+ __all__ = ["WarehouseSyncService"]
app/sync/warehouses/handler.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Warehouse sync handler for MongoDB to PostgreSQL synchronization.
3
+ """
4
+ from typing import Dict, Any, Optional
5
+ from sqlalchemy.ext.asyncio import AsyncConnection
6
+ from sqlalchemy import text
7
+ from motor.motor_asyncio import AsyncIOMotorDatabase
8
+ from datetime import datetime
9
+ from app.core.logging import get_logger
10
+
11
+ from app.sync.common.handler import SyncHandler
12
+ from app.sync.warehouses.models import WAREHOUSE_FIELD_MAPPING, WAREHOUSE_REQUIRED_FIELDS
13
+ from app.warehouses.constants import SCM_WAREHOUSES_COLLECTION
14
+
15
+ logger = get_logger(__name__)
16
+
17
+
18
+ class WarehouseSyncHandler(SyncHandler):
19
+ """
20
+ Handler for syncing warehouse data from MongoDB to PostgreSQL.
21
+
22
+ Implements entity-specific logic for warehouse synchronization including
23
+ field mapping, validation, and upsert operations.
24
+ """
25
+
26
+ def __init__(self):
27
+ super().__init__(entity_type="warehouse")
28
+
29
+ async def fetch_from_mongodb(
30
+ self,
31
+ entity_id: str,
32
+ mongo_db: AsyncIOMotorDatabase
33
+ ) -> Optional[Dict[str, Any]]:
34
+ """
35
+ Fetch warehouse from MongoDB by warehouse_id.
36
+
37
+ Args:
38
+ entity_id: warehouse_id to fetch
39
+ mongo_db: MongoDB database instance
40
+
41
+ Returns:
42
+ Warehouse document or None if not found
43
+ """
44
+ try:
45
+ collection = mongo_db[SCM_WAREHOUSES_COLLECTION]
46
+ warehouse = await collection.find_one({"warehouse_id": entity_id})
47
+ return warehouse
48
+ except Exception as e:
49
+ logger.error(
50
+ "Error fetching warehouse from MongoDB",
51
+ exc_info=e,
52
+ extra={
53
+ "entity_type": self.entity_type,
54
+ "entity_id": entity_id
55
+ }
56
+ )
57
+ raise
58
+
59
+ def get_field_mapping(self) -> Dict[str, str]:
60
+ """
61
+ Get field mapping from MongoDB to PostgreSQL.
62
+
63
+ Returns:
64
+ Dictionary mapping MongoDB field names to PostgreSQL column names
65
+ """
66
+ return WAREHOUSE_FIELD_MAPPING
67
+
68
+ def validate_required_fields(self, entity: Dict[str, Any]) -> bool:
69
+ """
70
+ Validate that all required fields are present in warehouse document.
71
+
72
+ Required fields: warehouse_id, warehouse_code, warehouse_name,
73
+ warehouse_type, merchant_id, merchant_type, status
74
+
75
+ Args:
76
+ entity: Warehouse document from MongoDB
77
+
78
+ Returns:
79
+ True if all required fields present, False otherwise
80
+ """
81
+ missing_fields = []
82
+
83
+ for field in WAREHOUSE_REQUIRED_FIELDS:
84
+ if field not in entity or entity[field] is None:
85
+ missing_fields.append(field)
86
+
87
+ if missing_fields:
88
+ logger.error(
89
+ "Warehouse missing required fields",
90
+ extra={
91
+ "entity_type": self.entity_type,
92
+ "entity_id": entity.get("warehouse_id"),
93
+ "missing_fields": missing_fields
94
+ }
95
+ )
96
+ return False
97
+
98
+ return True
99
+
100
+ def transform_field_value(self, field_name: str, value: Any) -> Any:
101
+ """
102
+ Transform field value for PostgreSQL.
103
+
104
+ Handles type conversions and nested field extraction.
105
+
106
+ Args:
107
+ field_name: Name of the field
108
+ value: Value from MongoDB
109
+
110
+ Returns:
111
+ Transformed value for PostgreSQL
112
+ """
113
+ # Convert warehouse_type enum to string if needed
114
+ if field_name == "warehouse_type" and hasattr(value, 'value'):
115
+ return value.value
116
+
117
+ # Convert status enum to string if needed
118
+ if field_name == "status" and hasattr(value, 'value'):
119
+ return value.value
120
+
121
+ # Handle datetime objects
122
+ if isinstance(value, datetime):
123
+ return value
124
+
125
+ return value
126
+
127
+ def extract_nested_fields(self, entity: Dict[str, Any]) -> Dict[str, Any]:
128
+ """
129
+ Extract nested fields from warehouse document.
130
+
131
+ Extracts:
132
+ - pincode from address
133
+ - contact_name, contact_phone, contact_email from contact
134
+
135
+ Args:
136
+ entity: Warehouse document from MongoDB
137
+
138
+ Returns:
139
+ Dictionary with flattened fields
140
+ """
141
+ flattened = {}
142
+
143
+ # Extract address fields (only pincode)
144
+ if "address" in entity and entity["address"]:
145
+ address = entity["address"]
146
+ flattened["pincode"] = address.get("pincode")
147
+ else:
148
+ flattened["pincode"] = None
149
+
150
+ # Extract contact fields
151
+ if "contact" in entity and entity["contact"]:
152
+ contact = entity["contact"]
153
+ flattened["contact_name"] = contact.get("name")
154
+ flattened["contact_phone"] = contact.get("phone")
155
+ flattened["contact_email"] = contact.get("email")
156
+ else:
157
+ flattened["contact_name"] = None
158
+ flattened["contact_phone"] = None
159
+ flattened["contact_email"] = None
160
+
161
+ return flattened
162
+
163
+ async def upsert_to_postgres(
164
+ self,
165
+ entity: Dict[str, Any],
166
+ pg_conn: AsyncConnection
167
+ ) -> bool:
168
+ """
169
+ Upsert warehouse to PostgreSQL trans.scm_warehouse_ref table.
170
+
171
+ Performs timestamp-based conflict resolution:
172
+ - If record doesn't exist, insert
173
+ - If MongoDB updated_at >= PostgreSQL updated_at, update
174
+ - Otherwise, skip update
175
+
176
+ Args:
177
+ entity: Warehouse document from MongoDB
178
+ pg_conn: PostgreSQL connection
179
+
180
+ Returns:
181
+ True if upsert successful, False otherwise
182
+ """
183
+ try:
184
+ warehouse_id = entity["warehouse_id"]
185
+ updated_at = entity.get("updated_at") or entity.get("created_at")
186
+
187
+ # Check timestamp conflict
188
+ should_update = await self.check_timestamp_conflict(
189
+ entity_id=warehouse_id,
190
+ mongo_updated_at=updated_at,
191
+ pg_conn=pg_conn,
192
+ table_name="trans.scm_warehouse_ref",
193
+ id_column="warehouse_id"
194
+ )
195
+
196
+ if not should_update:
197
+ logger.debug(
198
+ "Skipping warehouse sync due to timestamp conflict",
199
+ extra={
200
+ "entity_type": self.entity_type,
201
+ "entity_id": warehouse_id,
202
+ "mongo_updated_at": updated_at
203
+ }
204
+ )
205
+ return True # Not an error, just skipped
206
+
207
+ # Extract nested fields
208
+ nested_fields = self.extract_nested_fields(entity)
209
+
210
+ # Merge nested fields into entity for mapping
211
+ entity_with_nested = {**entity, **nested_fields}
212
+
213
+ # Map fields
214
+ mapped_entity = self.map_fields(entity_with_nested)
215
+
216
+ # Build UPSERT query
217
+ columns = list(mapped_entity.keys())
218
+ placeholders = [f":{col}" for col in columns]
219
+
220
+ # Build UPDATE clause (exclude primary key)
221
+ update_columns = [col for col in columns if col != "warehouse_id"]
222
+ update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
223
+
224
+ query = text(f"""
225
+ INSERT INTO trans.scm_warehouse_ref ({', '.join(columns)})
226
+ VALUES ({', '.join(placeholders)})
227
+ ON CONFLICT (warehouse_id)
228
+ DO UPDATE SET {update_clause}
229
+ """)
230
+
231
+ await pg_conn.execute(query, mapped_entity)
232
+
233
+ logger.debug(
234
+ "Warehouse upserted to PostgreSQL",
235
+ extra={
236
+ "entity_type": self.entity_type,
237
+ "entity_id": warehouse_id
238
+ }
239
+ )
240
+
241
+ return True
242
+
243
+ except Exception as e:
244
+ logger.error(
245
+ "Error upserting warehouse to PostgreSQL",
246
+ exc_info=e,
247
+ extra={
248
+ "entity_type": self.entity_type,
249
+ "entity_id": entity.get("warehouse_id"),
250
+ "error": str(e)
251
+ }
252
+ )
253
+ raise
app/sync/warehouses/models.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Data models for warehouse sync operations.
3
+ """
4
+
5
+ # Field mapping from MongoDB to PostgreSQL
6
+ WAREHOUSE_FIELD_MAPPING = {
7
+ "warehouse_id": "warehouse_id", # TEXT PRIMARY KEY
8
+ "warehouse_code": "warehouse_code", # TEXT NOT NULL
9
+ "warehouse_name": "warehouse_name", # TEXT NOT NULL
10
+ "warehouse_type": "warehouse_type", # TEXT NOT NULL
11
+ "merchant_id": "merchant_id", # TEXT NOT NULL
12
+ "merchant_type": "merchant_type", # TEXT NOT NULL
13
+ "status": "status", # TEXT NOT NULL
14
+ "pincode": "pincode", # TEXT (from address.pincode)
15
+ "contact_name": "contact_name", # TEXT (from contact.name)
16
+ "contact_phone": "contact_phone", # TEXT (from contact.phone)
17
+ "contact_email": "contact_email", # TEXT (from contact.email)
18
+ "created_at": "created_at", # TIMESTAMP NOT NULL
19
+ "updated_at": "updated_at" # TIMESTAMP
20
+ }
21
+
22
+ # Required fields that must be present
23
+ WAREHOUSE_REQUIRED_FIELDS = [
24
+ "warehouse_id",
25
+ "warehouse_code",
26
+ "warehouse_name",
27
+ "warehouse_type",
28
+ "merchant_id",
29
+ "merchant_type",
30
+ "status"
31
+ ]
app/sync/warehouses/service.py ADDED
@@ -0,0 +1,341 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Warehouse sync service with async task queue and background workers.
3
+ """
4
+ import asyncio
5
+ from typing import Optional
6
+ from motor.motor_asyncio import AsyncIOMotorDatabase
7
+ from app.core.logging import get_logger
8
+ import time
9
+ import traceback
10
+
11
+ from app.sync.warehouses.handler import WarehouseSyncHandler
12
+ from app.sync.common.retry import RetryManager
13
+ from app.sync.common.monitoring import SyncMonitoringService
14
+ from app.sync.common.models import SyncOperation
15
+ from app.sql import async_engine
16
+
17
+ logger = get_logger(__name__)
18
+
19
+
20
+ class WarehouseSyncService:
21
+ """
22
+ Service for asynchronously syncing warehouses from MongoDB to PostgreSQL.
23
+
24
+ Features:
25
+ - Async task queue for non-blocking sync operations
26
+ - Background worker for processing sync queue
27
+ - Retry logic with exponential backoff
28
+ - Monitoring and metrics tracking
29
+ """
30
+
31
+ def __init__(
32
+ self,
33
+ mongo_db: AsyncIOMotorDatabase,
34
+ max_queue_size: int = 1000,
35
+ worker_count: int = 5,
36
+ max_retries: int = 3
37
+ ):
38
+ """
39
+ Initialize warehouse sync service.
40
+
41
+ Args:
42
+ mongo_db: MongoDB database instance
43
+ max_queue_size: Maximum size of sync queue
44
+ worker_count: Number of background workers
45
+ max_retries: Maximum retry attempts for failed syncs
46
+ """
47
+ self.mongo_db = mongo_db
48
+ self.max_queue_size = max_queue_size
49
+ self.worker_count = worker_count
50
+
51
+ # Initialize components
52
+ self.handler = WarehouseSyncHandler()
53
+ self.retry_manager = RetryManager(max_retries=max_retries)
54
+ self.monitoring = SyncMonitoringService()
55
+
56
+ # Async queue for sync operations
57
+ self.sync_queue: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
58
+
59
+ # Background workers
60
+ self.workers: list[asyncio.Task] = []
61
+ self.is_running = False
62
+
63
+ async def start_workers(self) -> None:
64
+ """
65
+ Start background workers for processing sync queue.
66
+
67
+ Creates worker_count background tasks that continuously process
68
+ the sync queue.
69
+ """
70
+ if self.is_running:
71
+ logger.warning("Warehouse sync workers already running")
72
+ return
73
+
74
+ self.is_running = True
75
+
76
+ logger.info(
77
+ "Starting warehouse sync workers",
78
+ extra={
79
+ "worker_count": self.worker_count,
80
+ "max_queue_size": self.max_queue_size
81
+ }
82
+ )
83
+
84
+ for i in range(self.worker_count):
85
+ worker = asyncio.create_task(self._worker(worker_id=i))
86
+ self.workers.append(worker)
87
+
88
+ logger.info(f"Started {self.worker_count} warehouse sync workers")
89
+
90
+ async def stop_workers(self) -> None:
91
+ """
92
+ Stop all background workers gracefully.
93
+
94
+ Waits for workers to finish processing current operations
95
+ before shutting down.
96
+ """
97
+ if not self.is_running:
98
+ logger.warning("Warehouse sync workers not running")
99
+ return
100
+
101
+ logger.info("Stopping warehouse sync workers")
102
+
103
+ self.is_running = False
104
+
105
+ # Cancel all workers
106
+ for worker in self.workers:
107
+ worker.cancel()
108
+
109
+ # Wait for workers to finish
110
+ await asyncio.gather(*self.workers, return_exceptions=True)
111
+
112
+ self.workers.clear()
113
+
114
+ logger.info("Warehouse sync workers stopped")
115
+
116
+ async def sync_warehouse(
117
+ self,
118
+ warehouse_id: str,
119
+ operation: str = "update"
120
+ ) -> None:
121
+ """
122
+ Queue a warehouse for synchronization.
123
+
124
+ This is a fire-and-forget operation that adds the warehouse to the
125
+ sync queue for async processing. Does not wait for sync completion.
126
+
127
+ Args:
128
+ warehouse_id: ID of warehouse to sync
129
+ operation: Type of operation ("create" | "update")
130
+ """
131
+ try:
132
+ sync_op = SyncOperation(
133
+ entity_type="warehouse",
134
+ entity_id=warehouse_id,
135
+ operation=operation
136
+ )
137
+
138
+ # Try to add to queue without blocking
139
+ try:
140
+ self.sync_queue.put_nowait(sync_op)
141
+ logger.debug(
142
+ "Warehouse queued for sync",
143
+ extra={
144
+ "entity_type": "warehouse",
145
+ "entity_id": warehouse_id,
146
+ "operation": operation,
147
+ "queue_size": self.sync_queue.qsize()
148
+ }
149
+ )
150
+ except asyncio.QueueFull:
151
+ # Queue is full, drop oldest and add new
152
+ logger.warning(
153
+ "Sync queue full, dropping oldest operation",
154
+ extra={
155
+ "entity_type": "warehouse",
156
+ "entity_id": warehouse_id,
157
+ "queue_size": self.sync_queue.qsize()
158
+ }
159
+ )
160
+
161
+ # Drop oldest (get without waiting)
162
+ try:
163
+ dropped_op = self.sync_queue.get_nowait()
164
+ logger.warning(
165
+ "Dropped sync operation",
166
+ extra={
167
+ "entity_type": dropped_op.entity_type,
168
+ "entity_id": dropped_op.entity_id
169
+ }
170
+ )
171
+ except asyncio.QueueEmpty:
172
+ pass
173
+
174
+ # Add new operation
175
+ self.sync_queue.put_nowait(sync_op)
176
+
177
+ except Exception as e:
178
+ logger.error(
179
+ "Error queuing warehouse for sync",
180
+ exc_info=e,
181
+ extra={
182
+ "entity_type": "warehouse",
183
+ "entity_id": warehouse_id,
184
+ "error": str(e)
185
+ }
186
+ )
187
+
188
+ async def _worker(self, worker_id: int) -> None:
189
+ """
190
+ Background worker that processes sync queue.
191
+
192
+ Continuously pulls operations from the queue and processes them
193
+ with retry logic and monitoring.
194
+
195
+ Args:
196
+ worker_id: Unique identifier for this worker
197
+ """
198
+ logger.info(f"Warehouse sync worker {worker_id} started")
199
+
200
+ while self.is_running:
201
+ try:
202
+ # Get next operation from queue (with timeout)
203
+ try:
204
+ sync_op = await asyncio.wait_for(
205
+ self.sync_queue.get(),
206
+ timeout=1.0
207
+ )
208
+ except asyncio.TimeoutError:
209
+ # No operations in queue, continue loop
210
+ continue
211
+
212
+ # Process the sync operation
213
+ await self._process_sync_operation(sync_op, worker_id)
214
+
215
+ # Mark task as done
216
+ self.sync_queue.task_done()
217
+
218
+ except asyncio.CancelledError:
219
+ logger.info(f"Warehouse sync worker {worker_id} cancelled")
220
+ break
221
+ except Exception as e:
222
+ logger.error(
223
+ f"Error in warehouse sync worker {worker_id}",
224
+ exc_info=e,
225
+ extra={
226
+ "worker_id": worker_id,
227
+ "error": str(e)
228
+ }
229
+ )
230
+ # Continue processing despite error
231
+ await asyncio.sleep(1.0)
232
+
233
+ logger.info(f"Warehouse sync worker {worker_id} stopped")
234
+
235
+ async def _process_sync_operation(
236
+ self,
237
+ sync_op: SyncOperation,
238
+ worker_id: int
239
+ ) -> None:
240
+ """
241
+ Process a single sync operation with retry logic.
242
+
243
+ Args:
244
+ sync_op: Sync operation to process
245
+ worker_id: ID of worker processing this operation
246
+ """
247
+ start_time = time.time()
248
+
249
+ try:
250
+ logger.debug(
251
+ f"Worker {worker_id} processing warehouse sync",
252
+ extra={
253
+ "worker_id": worker_id,
254
+ "entity_type": sync_op.entity_type,
255
+ "entity_id": sync_op.entity_id,
256
+ "operation": sync_op.operation
257
+ }
258
+ )
259
+
260
+ # Execute sync with retry logic
261
+ await self.retry_manager.execute_with_retry(
262
+ self._sync_warehouse_with_connection,
263
+ sync_op.entity_id,
264
+ entity_type="warehouse",
265
+ entity_id=sync_op.entity_id
266
+ )
267
+
268
+ # Record success
269
+ duration_ms = (time.time() - start_time) * 1000
270
+ self.monitoring.record_sync_success(
271
+ entity_type="warehouse",
272
+ entity_id=sync_op.entity_id,
273
+ duration_ms=duration_ms
274
+ )
275
+
276
+ except Exception as e:
277
+ # Record failure
278
+ error_msg = str(e)
279
+ stack_trace = traceback.format_exc()
280
+
281
+ self.monitoring.record_sync_failure(
282
+ entity_type="warehouse",
283
+ entity_id=sync_op.entity_id,
284
+ error=error_msg,
285
+ stack_trace=stack_trace
286
+ )
287
+
288
+ logger.error(
289
+ f"Worker {worker_id} failed to sync warehouse after retries",
290
+ exc_info=e,
291
+ extra={
292
+ "worker_id": worker_id,
293
+ "entity_type": sync_op.entity_type,
294
+ "entity_id": sync_op.entity_id,
295
+ "error": error_msg
296
+ }
297
+ )
298
+
299
+ async def _sync_warehouse_with_connection(self, warehouse_id: str) -> bool:
300
+ """
301
+ Sync a single warehouse with connection management.
302
+
303
+ Acquires a PostgreSQL connection, performs the sync, and releases
304
+ the connection.
305
+
306
+ Args:
307
+ warehouse_id: ID of warehouse to sync
308
+
309
+ Returns:
310
+ True if sync successful
311
+
312
+ Raises:
313
+ Exception: If sync fails
314
+ """
315
+ async with async_engine.begin() as pg_conn:
316
+ # Perform sync
317
+ success = await self.handler.sync(
318
+ entity_id=warehouse_id,
319
+ mongo_db=self.mongo_db,
320
+ pg_conn=pg_conn
321
+ )
322
+
323
+ return success
324
+
325
+ def get_queue_size(self) -> int:
326
+ """
327
+ Get current size of sync queue.
328
+
329
+ Returns:
330
+ Number of operations in queue
331
+ """
332
+ return self.sync_queue.qsize()
333
+
334
+ def get_metrics(self) -> dict:
335
+ """
336
+ Get sync metrics for warehouse operations.
337
+
338
+ Returns:
339
+ Dictionary containing sync metrics
340
+ """
341
+ return self.monitoring.get_entity_metrics("warehouse")
app/warehouses/README.md CHANGED
@@ -5,7 +5,7 @@
5
  Complete CRUD operations for warehouse management in the SCM system.
6
 
7
  ## Database Collection
8
- - MongoDB Collection: `scm_warehouses`
9
 
10
  ## Data Structure
11
 
@@ -328,7 +328,7 @@ All operations are tracked in the audit object:
328
  - Router: `app/warehouses/controllers/router.py`
329
 
330
  ### Collection Name
331
- - MongoDB: `scm_warehouses`
332
 
333
  ### Warehouse ID Generation
334
  - Format: `wh_{12_char_hex_id}`
 
5
  Complete CRUD operations for warehouse management in the SCM system.
6
 
7
  ## Database Collection
8
+ - MongoDB Collection: `scm_warehouse`
9
 
10
  ## Data Structure
11
 
 
328
  - Router: `app/warehouses/controllers/router.py`
329
 
330
  ### Collection Name
331
+ - MongoDB: `scm_warehouse`
332
 
333
  ### Warehouse ID Generation
334
  - Format: `wh_{12_char_hex_id}`
app/warehouses/constants.py CHANGED
@@ -3,4 +3,4 @@ Warehouse-related constants.
3
  """
4
 
5
  # Collection names
6
- SCM_WAREHOUSES_COLLECTION = "scm_warehouses"
 
3
  """
4
 
5
  # Collection names
6
+ SCM_WAREHOUSES_COLLECTION = "scm_warehouse"
app/warehouses/services/service.py CHANGED
@@ -16,6 +16,23 @@ from app.warehouses.schemas.schema import (
16
  WarehouseResponse,
17
  )
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  logger = get_logger(__name__)
20
 
21
 
@@ -141,6 +158,16 @@ class WarehouseService:
141
  "created_by": payload.created_by
142
  }
143
  )
 
 
 
 
 
 
 
 
 
 
144
  return WarehouseResponse(**warehouse_dict)
145
 
146
 
@@ -210,6 +237,15 @@ class WarehouseService:
210
  extra={"warehouse_id": warehouse_id, "fields_updated": list(update_data.keys())}
211
  )
212
 
 
 
 
 
 
 
 
 
 
213
  return WarehouseResponse(**updated_warehouse)
214
 
215
  except HTTPException:
@@ -445,6 +481,15 @@ class WarehouseService:
445
 
446
  logger.info(f"Deleted (soft) warehouse {warehouse_id}")
447
 
 
 
 
 
 
 
 
 
 
448
  return {"message": f"Warehouse {warehouse_id} has been deleted"}
449
 
450
  except HTTPException:
 
16
  WarehouseResponse,
17
  )
18
 
19
+ # Import sync service
20
+ _warehouse_sync_service = None
21
+
22
+
23
+ def get_warehouse_sync_service():
24
+ """Get or initialize the warehouse sync service."""
25
+ global _warehouse_sync_service
26
+ if _warehouse_sync_service is None:
27
+ from app.sync.warehouses.service import WarehouseSyncService
28
+ _warehouse_sync_service = WarehouseSyncService(
29
+ mongo_db=get_database(),
30
+ max_queue_size=1000,
31
+ worker_count=3,
32
+ max_retries=3
33
+ )
34
+ return _warehouse_sync_service
35
+
36
  logger = get_logger(__name__)
37
 
38
 
 
158
  "created_by": payload.created_by
159
  }
160
  )
161
+
162
+ # Trigger async sync to PostgreSQL
163
+ try:
164
+ sync_service = get_warehouse_sync_service()
165
+ await sync_service.sync_warehouse(warehouse_id, operation="create")
166
+ logger.debug(f"Warehouse {warehouse_id} queued for sync")
167
+ except Exception as e:
168
+ logger.warning(f"Failed to queue warehouse sync: {e}", exc_info=e)
169
+ # Don't fail the operation if sync fails
170
+
171
  return WarehouseResponse(**warehouse_dict)
172
 
173
 
 
237
  extra={"warehouse_id": warehouse_id, "fields_updated": list(update_data.keys())}
238
  )
239
 
240
+ # Trigger async sync to PostgreSQL
241
+ try:
242
+ sync_service = get_warehouse_sync_service()
243
+ await sync_service.sync_warehouse(warehouse_id, operation="update")
244
+ logger.debug(f"Warehouse {warehouse_id} queued for sync")
245
+ except Exception as e:
246
+ logger.warning(f"Failed to queue warehouse sync: {e}", exc_info=e)
247
+ # Don't fail the operation if sync fails
248
+
249
  return WarehouseResponse(**updated_warehouse)
250
 
251
  except HTTPException:
 
481
 
482
  logger.info(f"Deleted (soft) warehouse {warehouse_id}")
483
 
484
+ # Trigger async sync to PostgreSQL to update status
485
+ try:
486
+ sync_service = get_warehouse_sync_service()
487
+ await sync_service.sync_warehouse(warehouse_id, operation="update")
488
+ logger.debug(f"Warehouse {warehouse_id} queued for sync after delete")
489
+ except Exception as e:
490
+ logger.warning(f"Failed to queue warehouse sync: {e}", exc_info=e)
491
+ # Don't fail the operation if sync fails
492
+
493
  return {"message": f"Warehouse {warehouse_id} has been deleted"}
494
 
495
  except HTTPException:
create_warehouse_ref_table.sql ADDED
@@ -0,0 +1,176 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- =====================================================
2
+ -- Warehouse Reference Table Creation Script
3
+ -- =====================================================
4
+ -- This script creates the scm_warehouse_ref table in the trans schema
5
+ -- for syncing warehouse data from MongoDB to PostgreSQL
6
+
7
+ -- Create trans schema if it doesn't exist
8
+ CREATE SCHEMA IF NOT EXISTS trans;
9
+
10
+ -- =====================================================
11
+ -- SCM Warehouse Reference Table
12
+ -- =====================================================
13
+ -- This table stores warehouse reference data synced from MongoDB
14
+ -- Used for fast lookups and reporting in PostgreSQL
15
+ CREATE TABLE IF NOT EXISTS trans.scm_warehouse_ref (
16
+ -- Primary identifiers
17
+ warehouse_id TEXT PRIMARY KEY,
18
+ warehouse_code TEXT NOT NULL,
19
+ warehouse_name TEXT NOT NULL,
20
+ warehouse_type TEXT NOT NULL,
21
+
22
+ -- Merchant information
23
+ merchant_id TEXT NOT NULL,
24
+ merchant_type TEXT NOT NULL,
25
+
26
+ -- Status
27
+ status TEXT NOT NULL,
28
+
29
+ pincode TEXT,
30
+
31
+ -- Contact information (flattened from nested structure)
32
+ contact_name TEXT,
33
+ contact_phone TEXT,
34
+ contact_email TEXT,
35
+
36
+ -- Audit fields
37
+ created_at TIMESTAMPTZ NOT NULL,
38
+ updated_at TIMESTAMPTZ,
39
+
40
+ -- Constraints
41
+ CONSTRAINT chk_warehouse_status CHECK (status IN ('draft', 'active', 'inactive')),
42
+ CONSTRAINT chk_warehouse_type CHECK (warehouse_type IN ('MAIN', 'SECONDARY', 'STORE', 'TEMP')),
43
+ CONSTRAINT chk_merchant_type CHECK (merchant_type IN ('ncnf', 'cnf', 'distributor', 'retail', 'company'))
44
+ );
45
+
46
+ -- =====================================================
47
+ -- Indexes for Performance
48
+ -- =====================================================
49
+
50
+ -- Index on merchant_id for filtering warehouses by merchant
51
+ CREATE INDEX IF NOT EXISTS idx_scm_warehouse_ref_merchant_id
52
+ ON trans.scm_warehouse_ref (merchant_id);
53
+
54
+ -- Index on warehouse_code for unique code lookups
55
+ CREATE INDEX IF NOT EXISTS idx_scm_warehouse_ref_warehouse_code
56
+ ON trans.scm_warehouse_ref (warehouse_code);
57
+
58
+ -- Index on status for filtering active/inactive warehouses
59
+ CREATE INDEX IF NOT EXISTS idx_scm_warehouse_ref_status
60
+ ON trans.scm_warehouse_ref (status);
61
+
62
+ -- Composite index for merchant + status queries
63
+ CREATE INDEX IF NOT EXISTS idx_scm_warehouse_ref_merchant_status
64
+ ON trans.scm_warehouse_ref (merchant_id, status);
65
+
66
+ -- Index on warehouse_type for filtering by type
67
+ CREATE INDEX IF NOT EXISTS idx_scm_warehouse_ref_warehouse_type
68
+ ON trans.scm_warehouse_ref (warehouse_type);
69
+
70
+ -- =====================================================
71
+ -- Comments for Documentation
72
+ -- =====================================================
73
+
74
+ COMMENT ON TABLE trans.scm_warehouse_ref IS
75
+ 'Warehouse reference table synced from MongoDB. Contains flattened warehouse data for fast lookups and reporting.';
76
+
77
+ COMMENT ON COLUMN trans.scm_warehouse_ref.warehouse_id IS
78
+ 'Unique warehouse identifier (synced from MongoDB)';
79
+
80
+ COMMENT ON COLUMN trans.scm_warehouse_ref.warehouse_code IS
81
+ 'Unique warehouse code (e.g., WH-DIST-0001)';
82
+
83
+ COMMENT ON COLUMN trans.scm_warehouse_ref.warehouse_name IS
84
+ 'Display name of the warehouse';
85
+
86
+ COMMENT ON COLUMN trans.scm_warehouse_ref.warehouse_type IS
87
+ 'Type of warehouse: MAIN, SECONDARY, STORE, TEMP';
88
+
89
+ COMMENT ON COLUMN trans.scm_warehouse_ref.merchant_id IS
90
+ 'Merchant who owns this warehouse';
91
+
92
+ COMMENT ON COLUMN trans.scm_warehouse_ref.merchant_type IS
93
+ 'Type of merchant: ncnf, cnf, distributor, retail, company';
94
+
95
+ COMMENT ON COLUMN trans.scm_warehouse_ref.status IS
96
+ 'Warehouse status: draft, active, inactive';
97
+
98
+ COMMENT ON COLUMN trans.scm_warehouse_ref.city IS
99
+ 'City from warehouse address';
100
+
101
+ COMMENT ON COLUMN trans.scm_warehouse_ref.state IS
102
+ 'State from warehouse address';
103
+
104
+ COMMENT ON COLUMN trans.scm_warehouse_ref.pincode IS
105
+ 'Pincode from warehouse address';
106
+
107
+ COMMENT ON COLUMN trans.scm_warehouse_ref.contact_name IS
108
+ 'Contact person name';
109
+
110
+ COMMENT ON COLUMN trans.scm_warehouse_ref.contact_phone IS
111
+ 'Contact phone number';pincode IS
112
+ 'Pincode from warehouse address';
113
+
114
+ COMMENT ON COLUMN trans.scm_warehouse_ref.contact_name IS
115
+ 'Contact person name';
116
+
117
+ COMMENT ON COLUMN trans.scm_warehouse_ref.contact_phone IS
118
+ 'Contact phone number';
119
+
120
+ COMMENT ON COLUMN trans.scm_warehouse_ref.contact_email IS
121
+ 'Contact email address
122
+ column_name,
123
+ data_type,
124
+ is_nullable,
125
+ column_default
126
+ FROM information_schema.columns
127
+ WHERE table_schema = 'trans'
128
+ AND table_name = 'scm_warehouse_ref'
129
+ ORDER BY ordinal_position;
130
+
131
+ -- Check indexes
132
+ SELECT
133
+ indexname,
134
+ indexdef
135
+ FROM pg_indexes
136
+ WHERE schemaname = 'trans'
137
+ AND tablename = 'scm_warehouse_ref';
138
+
139
+ -- Check constraints
140
+ SELECT
141
+ conname AS constraint_name,
142
+ contype AS constraint_type,
143
+ pg_get_constraintdef(oid) AS constraint_definition
144
+ FROM pg_constraint
145
+ WHERE conrelid = 'trans.scm_warehouse_ref'::regclass;
146
+
147
+ -- Check row count (should be 0 after creation)
148
+ SELECT COUNT(*) as warehouse_ref_count
149
+ FROM trans.scm_warehouse_ref;
150
+
151
+ -- =====================================================
152
+ -- Sample Query Examples
153
+ -- =====================================================
154
+
155
+ -- Get all active warehouses for a merchant
156
+ -- SELECT * FROM trans.scm_warehouse_ref
157
+ -- WHERE merchant_id = 'mch_DIST_0004' AND status = 'active';
158
+
159
+ -- Get all warehouses in a specific city
160
+ -- SELECT * FROM trans.scm_warehouse_ref
161
+ -- WHERE city = 'Mumbai' AND status = 'active';
162
+
163
+ -- Get all warehouses that can receive goods
164
+ -- SELECT * FROM trans.scm_warehouse_ref
165
+ -- WHERE can_receive = true AND status = 'active';
166
+
167
+ -- Get warehouses by type
168
+ -- SELECT * FROM trans.scm_warehouse_ref
169
+ -- WHERE warehouse_type = 'MAIN' AND status = 'active';
170
+ warehouses by type
171
+ -- SELECT * FROM trans.scm_warehouse_ref
172
+ -- WHERE warehouse_type = 'MAIN' AND status = 'active';
173
+
174
+ -- Get warehouses by pincode
175
+ -- SELECT * FROM trans.scm_warehouse_ref
176
+ -- WHERE pincode = '400001
docs/database/migrations/migration_warehouse_fields.sql CHANGED
@@ -2,7 +2,7 @@
2
  -- This script should be run in MongoDB shell or via MongoDB Compass
3
 
4
  -- Update warehouses missing created_at field
5
- db.scm_warehouses.updateMany(
6
  { "created_at": { $exists: false } },
7
  {
8
  $set: {
@@ -13,7 +13,7 @@ db.scm_warehouses.updateMany(
13
  );
14
 
15
  -- Update warehouses with null created_at
16
- db.scm_warehouses.updateMany(
17
  { "created_at": null },
18
  {
19
  $set: {
@@ -24,31 +24,31 @@ db.scm_warehouses.updateMany(
24
  );
25
 
26
  -- Update warehouses missing created_by field
27
- db.scm_warehouses.updateMany(
28
  { "created_by": { $exists: false } },
29
  { $set: { "created_by": "system" } }
30
  );
31
 
32
  -- Update warehouses with null created_by
33
- db.scm_warehouses.updateMany(
34
  { "created_by": null },
35
  { $set: { "created_by": "system" } }
36
  );
37
 
38
  -- Update warehouses missing updated_at field
39
- db.scm_warehouses.updateMany(
40
  { "updated_at": { $exists: false } },
41
  { $set: { "updated_at": null } }
42
  );
43
 
44
  -- Update warehouses missing updated_by field
45
- db.scm_warehouses.updateMany(
46
  { "updated_by": { $exists: false } },
47
  { $set: { "updated_by": null } }
48
  );
49
 
50
  -- Update warehouses missing capabilities field
51
- db.scm_warehouses.updateMany(
52
  { "capabilities": { $exists: false } },
53
  {
54
  $set: {
@@ -64,7 +64,7 @@ db.scm_warehouses.updateMany(
64
  );
65
 
66
  -- Update warehouses with null capabilities
67
- db.scm_warehouses.updateMany(
68
  { "capabilities": null },
69
  {
70
  $set: {
@@ -83,25 +83,25 @@ db.scm_warehouses.updateMany(
83
  print("=== Migration Verification ===");
84
 
85
  print("Warehouses missing created_at:");
86
- db.scm_warehouses.countDocuments({ "created_at": { $exists: false } });
87
 
88
  print("Warehouses with null created_at:");
89
- db.scm_warehouses.countDocuments({ "created_at": null });
90
 
91
  print("Warehouses missing created_by:");
92
- db.scm_warehouses.countDocuments({ "created_by": { $exists: false } });
93
 
94
  print("Warehouses with null created_by:");
95
- db.scm_warehouses.countDocuments({ "created_by": null });
96
 
97
  print("Warehouses missing capabilities:");
98
- db.scm_warehouses.countDocuments({ "capabilities": { $exists: false } });
99
 
100
  print("Warehouses with null capabilities:");
101
- db.scm_warehouses.countDocuments({ "capabilities": null });
102
 
103
  print("Total warehouses:");
104
- db.scm_warehouses.countDocuments({});
105
 
106
  print("=== Sample Documents ===");
107
- db.scm_warehouses.find({}).limit(2).pretty();
 
2
  -- This script should be run in MongoDB shell or via MongoDB Compass
3
 
4
  -- Update warehouses missing created_at field
5
+ db.scm_warehouse.updateMany(
6
  { "created_at": { $exists: false } },
7
  {
8
  $set: {
 
13
  );
14
 
15
  -- Update warehouses with null created_at
16
+ db.scm_warehouse.updateMany(
17
  { "created_at": null },
18
  {
19
  $set: {
 
24
  );
25
 
26
  -- Update warehouses missing created_by field
27
+ db.scm_warehouse.updateMany(
28
  { "created_by": { $exists: false } },
29
  { $set: { "created_by": "system" } }
30
  );
31
 
32
  -- Update warehouses with null created_by
33
+ db.scm_warehouse.updateMany(
34
  { "created_by": null },
35
  { $set: { "created_by": "system" } }
36
  );
37
 
38
  -- Update warehouses missing updated_at field
39
+ db.scm_warehouse.updateMany(
40
  { "updated_at": { $exists: false } },
41
  { $set: { "updated_at": null } }
42
  );
43
 
44
  -- Update warehouses missing updated_by field
45
+ db.scm_warehouse.updateMany(
46
  { "updated_by": { $exists: false } },
47
  { $set: { "updated_by": null } }
48
  );
49
 
50
  -- Update warehouses missing capabilities field
51
+ db.scm_warehouse.updateMany(
52
  { "capabilities": { $exists: false } },
53
  {
54
  $set: {
 
64
  );
65
 
66
  -- Update warehouses with null capabilities
67
+ db.scm_warehouse.updateMany(
68
  { "capabilities": null },
69
  {
70
  $set: {
 
83
  print("=== Migration Verification ===");
84
 
85
  print("Warehouses missing created_at:");
86
+ db.scm_warehouse.countDocuments({ "created_at": { $exists: false } });
87
 
88
  print("Warehouses with null created_at:");
89
+ db.scm_warehouse.countDocuments({ "created_at": null });
90
 
91
  print("Warehouses missing created_by:");
92
+ db.scm_warehouse.countDocuments({ "created_by": { $exists: false } });
93
 
94
  print("Warehouses with null created_by:");
95
+ db.scm_warehouse.countDocuments({ "created_by": null });
96
 
97
  print("Warehouses missing capabilities:");
98
+ db.scm_warehouse.countDocuments({ "capabilities": { $exists: false } });
99
 
100
  print("Warehouses with null capabilities:");
101
+ db.scm_warehouse.countDocuments({ "capabilities": null });
102
 
103
  print("Total warehouses:");
104
+ db.scm_warehouse.countDocuments({});
105
 
106
  print("=== Sample Documents ===");
107
+ db.scm_warehouse.find({}).limit(2).pretty();
test_warehouse_sync.py ADDED
@@ -0,0 +1,229 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test script for warehouse sync functionality.
3
+
4
+ This script tests the warehouse sync implementation by:
5
+ 1. Creating a test warehouse in MongoDB
6
+ 2. Verifying sync to PostgreSQL
7
+ 3. Updating the warehouse
8
+ 4. Verifying sync of updates
9
+ 5. Deleting the warehouse
10
+ 6. Verifying sync of delete (status change)
11
+ """
12
+ import asyncio
13
+ from datetime import datetime, timezone
14
+ from sqlalchemy import text
15
+
16
+ from app.nosql import get_database
17
+ from app.sql import async_engine
18
+ from app.warehouses.services.service import WarehouseService, get_warehouse_sync_service
19
+ from app.warehouses.schemas.schema import WarehouseCreate, WarehouseUpdate
20
+ from app.warehouses.models.model import AddressModel, GeoModel, ContactModel
21
+
22
+
23
+ async def test_warehouse_sync():
24
+ """Test warehouse sync functionality."""
25
+
26
+ print("\n" + "="*60)
27
+ print("WAREHOUSE SYNC TEST")
28
+ print("="*60 + "\n")
29
+
30
+ # Test data
31
+ test_warehouse_code = f"WH-TEST-{datetime.now().strftime('%Y%m%d%H%M%S')}"
32
+ test_warehouse_id = None
33
+
34
+ try:
35
+ # ===== TEST 1: Create Warehouse =====
36
+ print("TEST 1: Creating warehouse...")
37
+
38
+ payload = WarehouseCreate(
39
+ warehouse_code=test_warehouse_code,
40
+ warehouse_name="Test Sync Warehouse",
41
+ warehouse_type="MAIN",
42
+ address=AddressModel(
43
+ line1="123 Test Street",
44
+ line2="Suite 100",
45
+ city="Mumbai",
46
+ state="Maharashtra",
47
+ country="India",
48
+ pincode="400001"
49
+ ),
50
+ geo=GeoModel(
51
+ lat=19.0760,
52
+ lng=72.8777
53
+ ),
54
+ contact=ContactModel(
55
+ name="Test Contact",
56
+ phone="+91-9876543210",
57
+ email="test@example.com"
58
+ ),
59
+ notes="Test warehouse for sync verification",
60
+ merchant_id="mch_test_001",
61
+ merchant_type="distributor",
62
+ created_by="test_user"
63
+ )
64
+
65
+ warehouse = await WarehouseService.create_warehouse(payload)
66
+ test_warehouse_id = warehouse.warehouse_id
67
+
68
+ print(f"βœ“ Warehouse created: {test_warehouse_id}")
69
+ print(f" Code: {warehouse.warehouse_code}")
70
+ print(f" Name: {warehouse.warehouse_name}")
71
+ print(f" Status: {warehouse.status}")
72
+
73
+ # Wait for async sync
74
+ print("\n Waiting for sync (3 seconds)...")
75
+ await asyncio.sleep(3)
76
+
77
+ # Check PostgreSQL
78
+ print("\n Checking PostgreSQL...")
79
+ async with async_engine.begin() as conn:
80
+ result = await conn.execute(
81
+ text("SELECT * FROM trans.scm_warehouse_ref WHERE warehouse_id = :id"),
82
+ {"id": test_warehouse_id}
83
+ )
84
+ row = result.fetchone()
85
+
86
+ if row:
87
+ print(" βœ“ Warehouse synced to PostgreSQL")
88
+ print(f" - warehouse_code: {row[1]}")
89
+ print(f" - warehouse_name: {row[2]}")
90
+ print(f" - city: {row[8]}")
91
+ print(f" - status: {row[6]}")
92
+ else:
93
+ print(" βœ— Warehouse NOT found in PostgreSQL")
94
+ return False
95
+
96
+ # ===== TEST 2: Update Warehouse =====
97
+ print("\n" + "-"*60)
98
+ print("TEST 2: Updating warehouse...")
99
+
100
+ update_payload = WarehouseUpdate(
101
+ warehouse_name="Test Sync Warehouse Updated",
102
+ notes="Updated notes for sync test",
103
+ updated_by="test_user"
104
+ )
105
+
106
+ updated_warehouse = await WarehouseService.update_warehouse(
107
+ warehouse_id=test_warehouse_id,
108
+ payload=update_payload,
109
+ merchant_id="mch_test_001"
110
+ )
111
+
112
+ print(f"βœ“ Warehouse updated: {test_warehouse_id}")
113
+ print(f" New name: {updated_warehouse.warehouse_name}")
114
+
115
+ # Wait for async sync
116
+ print("\n Waiting for sync (3 seconds)...")
117
+ await asyncio.sleep(3)
118
+
119
+ # Check PostgreSQL
120
+ print("\n Checking PostgreSQL...")
121
+ async with async_engine.begin() as conn:
122
+ result = await conn.execute(
123
+ text("SELECT warehouse_name, updated_at FROM trans.scm_warehouse_ref WHERE warehouse_id = :id"),
124
+ {"id": test_warehouse_id}
125
+ )
126
+ row = result.fetchone()
127
+
128
+ if row and row[0] == "Test Sync Warehouse Updated":
129
+ print(" βœ“ Warehouse update synced to PostgreSQL")
130
+ print(f" - warehouse_name: {row[0]}")
131
+ print(f" - updated_at: {row[1]}")
132
+ else:
133
+ print(" βœ— Warehouse update NOT synced")
134
+ return False
135
+
136
+ # ===== TEST 3: Delete Warehouse (Soft Delete) =====
137
+ print("\n" + "-"*60)
138
+ print("TEST 3: Deleting warehouse (soft delete)...")
139
+
140
+ result = await WarehouseService.delete_warehouse(
141
+ warehouse_id=test_warehouse_id,
142
+ merchant_id="mch_test_001"
143
+ )
144
+
145
+ print(f"βœ“ Warehouse deleted: {result['message']}")
146
+
147
+ # Wait for async sync
148
+ print("\n Waiting for sync (3 seconds)...")
149
+ await asyncio.sleep(3)
150
+
151
+ # Check PostgreSQL
152
+ print("\n Checking PostgreSQL...")
153
+ async with async_engine.begin() as conn:
154
+ result = await conn.execute(
155
+ text("SELECT status FROM trans.scm_warehouse_ref WHERE warehouse_id = :id"),
156
+ {"id": test_warehouse_id}
157
+ )
158
+ row = result.fetchone()
159
+
160
+ if row and row[0] == "inactive":
161
+ print(" βœ“ Warehouse delete (status change) synced to PostgreSQL")
162
+ print(f" - status: {row[0]}")
163
+ else:
164
+ print(" βœ— Warehouse delete NOT synced")
165
+ return False
166
+
167
+ # ===== TEST 4: Check Sync Metrics =====
168
+ print("\n" + "-"*60)
169
+ print("TEST 4: Checking sync metrics...")
170
+
171
+ sync_service = get_warehouse_sync_service()
172
+ metrics = sync_service.get_metrics()
173
+ queue_size = sync_service.get_queue_size()
174
+
175
+ print(f"βœ“ Sync metrics retrieved:")
176
+ print(f" - Total operations: {metrics.get('total', 0)}")
177
+ print(f" - Success: {metrics.get('success', 0)}")
178
+ print(f" - Failure: {metrics.get('failure', 0)}")
179
+ print(f" - Avg duration: {metrics.get('avg_duration_ms', 0):.2f}ms")
180
+ print(f" - Queue size: {queue_size}")
181
+
182
+ print("\n" + "="*60)
183
+ print("ALL TESTS PASSED βœ“")
184
+ print("="*60 + "\n")
185
+
186
+ return True
187
+
188
+ except Exception as e:
189
+ print(f"\nβœ— TEST FAILED: {str(e)}")
190
+ import traceback
191
+ traceback.print_exc()
192
+ return False
193
+
194
+ finally:
195
+ # Cleanup: Remove test data from MongoDB and PostgreSQL
196
+ print("\nCleaning up test data...")
197
+ try:
198
+ if test_warehouse_id:
199
+ # Remove from MongoDB
200
+ await get_database()["scm_warehouses"].delete_one({"warehouse_id": test_warehouse_id})
201
+ print(f" βœ“ Removed from MongoDB: {test_warehouse_id}")
202
+
203
+ # Remove from PostgreSQL
204
+ async with async_engine.begin() as conn:
205
+ await conn.execute(
206
+ text("DELETE FROM trans.scm_warehouse_ref WHERE warehouse_id = :id"),
207
+ {"id": test_warehouse_id}
208
+ )
209
+ print(f" βœ“ Removed from PostgreSQL: {test_warehouse_id}")
210
+ except Exception as e:
211
+ print(f" Warning: Cleanup error: {e}")
212
+
213
+
214
+ if __name__ == "__main__":
215
+ print("\nStarting warehouse sync tests...")
216
+ print("Make sure:")
217
+ print("1. MongoDB is running and accessible")
218
+ print("2. PostgreSQL is running and accessible")
219
+ print("3. trans.scm_warehouse_ref table exists")
220
+ print()
221
+
222
+ result = asyncio.run(test_warehouse_sync())
223
+
224
+ if result:
225
+ print("\nβœ“ All tests passed successfully!")
226
+ exit(0)
227
+ else:
228
+ print("\nβœ— Tests failed!")
229
+ exit(1)