MukeshKapoor25 commited on
Commit
a7aaf8d
·
1 Parent(s): e6cdbd6

feat: Migrate PostgreSQL interactions to SQLAlchemy's async engine, improving connection management, query parameterization, and URI configuration.

Browse files
app/core/config.py CHANGED
@@ -4,6 +4,7 @@ Loads environment variables and provides application settings.
4
  """
5
  import os
6
  from typing import Optional, List
 
7
  from pydantic_settings import BaseSettings, SettingsConfigDict
8
 
9
 
@@ -35,10 +36,13 @@ class Settings(BaseSettings):
35
  POSTGRES_SSL_CERT: Optional[str] = os.getenv("POSTGRES_SSL_CERT")
36
  POSTGRES_SSL_KEY: Optional[str] = os.getenv("POSTGRES_SSL_KEY")
37
 
38
- @property
39
- def POSTGRES_URI(self) -> str:
40
- """Build PostgreSQL URI from components"""
41
- return f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
 
 
 
42
 
43
  # Redis Configuration
44
  REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
 
4
  """
5
  import os
6
  from typing import Optional, List
7
+ from pydantic import model_validator
8
  from pydantic_settings import BaseSettings, SettingsConfigDict
9
 
10
 
 
36
  POSTGRES_SSL_CERT: Optional[str] = os.getenv("POSTGRES_SSL_CERT")
37
  POSTGRES_SSL_KEY: Optional[str] = os.getenv("POSTGRES_SSL_KEY")
38
 
39
+ POSTGRES_URI: Optional[str] = os.getenv("POSTGRES_URI")
40
+
41
+ @model_validator(mode='after')
42
+ def assemble_db_connection(self) -> 'Settings':
43
+ if not self.POSTGRES_URI:
44
+ self.POSTGRES_URI = f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@{self.POSTGRES_HOST}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
45
+ return self
46
 
47
  # Redis Configuration
48
  REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
app/sql.py CHANGED
@@ -90,11 +90,7 @@ async def connect_to_database() -> None:
90
  try:
91
  async with async_engine.begin() as conn:
92
  await conn.execute(text("SELECT 1"))
93
- logger.info("Successfully connected to PostgreSQL database", extra={
94
- "host": settings.POSTGRES_HOST,
95
- "port": settings.POSTGRES_PORT,
96
- "database": settings.POSTGRES_DB
97
- })
98
  return
99
  except Exception as e:
100
  last_error = e
@@ -104,22 +100,14 @@ async def connect_to_database() -> None:
104
  extra={
105
  "attempt": attempts,
106
  "max_attempts": max_attempts,
107
- "retry_delay_ms": int(delay * 1000),
108
- "host": settings.POSTGRES_HOST,
109
- "port": settings.POSTGRES_PORT,
110
- "database": settings.POSTGRES_DB
111
  }
112
  )
113
  await asyncio.sleep(delay)
114
  delay = min(delay * settings.POSTGRES_CONNECT_BACKOFF_MULTIPLIER, 30.0)
115
  logger.error(
116
  "Failed to connect to PostgreSQL after retries",
117
- exc_info=last_error,
118
- extra={
119
- "host": settings.POSTGRES_HOST,
120
- "port": settings.POSTGRES_PORT,
121
- "database": settings.POSTGRES_DB
122
- }
123
  )
124
  raise last_error
125
 
 
90
  try:
91
  async with async_engine.begin() as conn:
92
  await conn.execute(text("SELECT 1"))
93
+ logger.info("Successfully connected to PostgreSQL database")
 
 
 
 
94
  return
95
  except Exception as e:
96
  last_error = e
 
100
  extra={
101
  "attempt": attempts,
102
  "max_attempts": max_attempts,
103
+ "retry_delay_ms": int(delay * 1000)
 
 
 
104
  }
105
  )
106
  await asyncio.sleep(delay)
107
  delay = min(delay * settings.POSTGRES_CONNECT_BACKOFF_MULTIPLIER, 30.0)
108
  logger.error(
109
  "Failed to connect to PostgreSQL after retries",
110
+ exc_info=last_error
 
 
 
 
 
111
  )
112
  raise last_error
113
 
app/sync/catalogues/handler.py CHANGED
@@ -3,7 +3,8 @@ Catalogue sync handler for MongoDB to PostgreSQL synchronization.
3
  """
4
  from typing import Dict, Any, Optional
5
  from decimal import Decimal, InvalidOperation
6
- import asyncpg
 
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from datetime import datetime
9
  from insightfy_utils.logging import get_logger
@@ -252,7 +253,7 @@ class CatalogueSyncHandler(SyncHandler):
252
  async def upsert_to_postgres(
253
  self,
254
  entity: Dict[str, Any],
255
- pg_conn: asyncpg.Connection
256
  ) -> bool:
257
  """
258
  Upsert catalogue to PostgreSQL trans.catalogue_ref table.
@@ -312,21 +313,20 @@ class CatalogueSyncHandler(SyncHandler):
312
 
313
  # Build UPSERT query
314
  columns = list(mapped_entity.keys())
315
- placeholders = [f"${i+1}" for i in range(len(columns))]
316
- values = [mapped_entity[col] for col in columns]
317
 
318
  # Build UPDATE clause (exclude primary key)
319
  update_columns = [col for col in columns if col != "catalogue_id"]
320
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
321
 
322
- query = f"""
323
  INSERT INTO trans.catalogue_ref ({', '.join(columns)})
324
  VALUES ({', '.join(placeholders)})
325
  ON CONFLICT (catalogue_id)
326
  DO UPDATE SET {update_clause}
327
- """
328
 
329
- await pg_conn.execute(query, *values)
330
 
331
  logger.debug(
332
  "Catalogue upserted to PostgreSQL",
 
3
  """
4
  from typing import Dict, Any, Optional
5
  from decimal import Decimal, InvalidOperation
6
+ from sqlalchemy.ext.asyncio import AsyncConnection
7
+ from sqlalchemy import text
8
  from motor.motor_asyncio import AsyncIOMotorDatabase
9
  from datetime import datetime
10
  from insightfy_utils.logging import get_logger
 
253
  async def upsert_to_postgres(
254
  self,
255
  entity: Dict[str, Any],
256
+ pg_conn: AsyncConnection
257
  ) -> bool:
258
  """
259
  Upsert catalogue to PostgreSQL trans.catalogue_ref table.
 
313
 
314
  # Build UPSERT query
315
  columns = list(mapped_entity.keys())
316
+ placeholders = [f":{col}" for col in columns]
 
317
 
318
  # Build UPDATE clause (exclude primary key)
319
  update_columns = [col for col in columns if col != "catalogue_id"]
320
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
321
 
322
+ query = text(f"""
323
  INSERT INTO trans.catalogue_ref ({', '.join(columns)})
324
  VALUES ({', '.join(placeholders)})
325
  ON CONFLICT (catalogue_id)
326
  DO UPDATE SET {update_clause}
327
+ """)
328
 
329
+ await pg_conn.execute(query, mapped_entity)
330
 
331
  logger.debug(
332
  "Catalogue upserted to PostgreSQL",
app/sync/catalogues/service.py CHANGED
@@ -12,7 +12,7 @@ from app.sync.catalogues.handler import CatalogueSyncHandler
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
- from app.postgres import get_postgres_connection, release_postgres_connection
16
 
17
  logger = get_logger(__name__)
18
 
@@ -312,12 +312,7 @@ class CatalogueSyncService:
312
  Raises:
313
  Exception: If sync fails
314
  """
315
- pg_conn = None
316
-
317
- try:
318
- # Acquire PostgreSQL connection
319
- pg_conn = await get_postgres_connection()
320
-
321
  # Perform sync
322
  success = await self.handler.sync(
323
  entity_id=catalogue_id,
@@ -326,11 +321,6 @@ class CatalogueSyncService:
326
  )
327
 
328
  return success
329
-
330
- finally:
331
- # Always release connection
332
- if pg_conn:
333
- await release_postgres_connection(pg_conn)
334
 
335
  def get_queue_size(self) -> int:
336
  """
 
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
+ from app.sql import async_engine
16
 
17
  logger = get_logger(__name__)
18
 
 
312
  Raises:
313
  Exception: If sync fails
314
  """
315
+ async with async_engine.begin() as pg_conn:
 
 
 
 
 
316
  # Perform sync
317
  success = await self.handler.sync(
318
  entity_id=catalogue_id,
 
321
  )
322
 
323
  return success
 
 
 
 
 
324
 
325
  def get_queue_size(self) -> int:
326
  """
app/sync/common/handler.py CHANGED
@@ -1,9 +1,7 @@
1
- """
2
- Base sync handler with shared sync logic.
3
- """
4
  from abc import ABC, abstractmethod
5
  from typing import Dict, Any, Optional
6
- import asyncpg
 
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from datetime import datetime
9
  from insightfy_utils.logging import get_logger
@@ -86,7 +84,7 @@ class SyncHandler(ABC):
86
  async def upsert_to_postgres(
87
  self,
88
  entity: Dict[str, Any],
89
- pg_conn: asyncpg.Connection
90
  ) -> bool:
91
  """
92
  Upsert entity to PostgreSQL.
@@ -104,7 +102,7 @@ class SyncHandler(ABC):
104
  self,
105
  entity_id: str,
106
  mongo_db: AsyncIOMotorDatabase,
107
- pg_conn: asyncpg.Connection
108
  ) -> bool:
109
  """
110
  Sync entity from MongoDB to PostgreSQL.
@@ -203,7 +201,7 @@ class SyncHandler(ABC):
203
  self,
204
  entity_id: str,
205
  mongo_updated_at: datetime,
206
- pg_conn: asyncpg.Connection,
207
  table_name: str,
208
  id_column: str
209
  ) -> bool:
@@ -221,8 +219,10 @@ class SyncHandler(ABC):
221
  True if should update (MongoDB is newer or record doesn't exist), False otherwise
222
  """
223
  try:
224
- query = f"SELECT updated_at FROM {table_name} WHERE {id_column} = $1"
225
- pg_updated_at = await pg_conn.fetchval(query, entity_id)
 
 
226
 
227
  if pg_updated_at is None:
228
  # Record doesn't exist, should insert
 
 
 
 
1
  from abc import ABC, abstractmethod
2
  from typing import Dict, Any, Optional
3
+ from sqlalchemy.ext.asyncio import AsyncConnection
4
+ from sqlalchemy import text
5
  from motor.motor_asyncio import AsyncIOMotorDatabase
6
  from datetime import datetime
7
  from insightfy_utils.logging import get_logger
 
84
  async def upsert_to_postgres(
85
  self,
86
  entity: Dict[str, Any],
87
+ pg_conn: AsyncConnection
88
  ) -> bool:
89
  """
90
  Upsert entity to PostgreSQL.
 
102
  self,
103
  entity_id: str,
104
  mongo_db: AsyncIOMotorDatabase,
105
+ pg_conn: AsyncConnection
106
  ) -> bool:
107
  """
108
  Sync entity from MongoDB to PostgreSQL.
 
201
  self,
202
  entity_id: str,
203
  mongo_updated_at: datetime,
204
+ pg_conn: AsyncConnection,
205
  table_name: str,
206
  id_column: str
207
  ) -> bool:
 
219
  True if should update (MongoDB is newer or record doesn't exist), False otherwise
220
  """
221
  try:
222
+ # Replaced $1 with :id and fetchval with execute().scalar()
223
+ query = text(f"SELECT updated_at FROM {table_name} WHERE {id_column} = :id")
224
+ result = await pg_conn.execute(query, {"id": entity_id})
225
+ pg_updated_at = result.scalar()
226
 
227
  if pg_updated_at is None:
228
  # Record doesn't exist, should insert
app/sync/employees/handler.py CHANGED
@@ -2,7 +2,8 @@
2
  Employee sync handler for MongoDB to PostgreSQL synchronization.
3
  """
4
  from typing import Dict, Any, Optional
5
- import asyncpg
 
6
  from motor.motor_asyncio import AsyncIOMotorDatabase
7
  from datetime import datetime
8
  from insightfy_utils.logging import get_logger
@@ -176,7 +177,7 @@ class EmployeeSyncHandler(SyncHandler):
176
  async def upsert_to_postgres(
177
  self,
178
  entity: Dict[str, Any],
179
- pg_conn: asyncpg.Connection
180
  ) -> bool:
181
  """
182
  Upsert employee to PostgreSQL trans.employees_ref table.
@@ -232,21 +233,20 @@ class EmployeeSyncHandler(SyncHandler):
232
 
233
  # Build UPSERT query
234
  columns = list(mapped_entity.keys())
235
- placeholders = [f"${i+1}" for i in range(len(columns))]
236
- values = [mapped_entity[col] for col in columns]
237
 
238
  # Build UPDATE clause (exclude primary key)
239
  update_columns = [col for col in columns if col != "employee_id"]
240
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
241
 
242
- query = f"""
243
  INSERT INTO trans.employees_ref ({', '.join(columns)})
244
  VALUES ({', '.join(placeholders)})
245
  ON CONFLICT (employee_id)
246
  DO UPDATE SET {update_clause}
247
- """
248
 
249
- await pg_conn.execute(query, *values)
250
 
251
  logger.debug(
252
  "Employee upserted to PostgreSQL",
 
2
  Employee sync handler for MongoDB to PostgreSQL synchronization.
3
  """
4
  from typing import Dict, Any, Optional
5
+ from sqlalchemy.ext.asyncio import AsyncConnection
6
+ from sqlalchemy import text
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from datetime import datetime
9
  from insightfy_utils.logging import get_logger
 
177
  async def upsert_to_postgres(
178
  self,
179
  entity: Dict[str, Any],
180
+ pg_conn: AsyncConnection
181
  ) -> bool:
182
  """
183
  Upsert employee to PostgreSQL trans.employees_ref table.
 
233
 
234
  # Build UPSERT query
235
  columns = list(mapped_entity.keys())
236
+ placeholders = [f":{col}" for col in columns]
 
237
 
238
  # Build UPDATE clause (exclude primary key)
239
  update_columns = [col for col in columns if col != "employee_id"]
240
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
241
 
242
+ query = text(f"""
243
  INSERT INTO trans.employees_ref ({', '.join(columns)})
244
  VALUES ({', '.join(placeholders)})
245
  ON CONFLICT (employee_id)
246
  DO UPDATE SET {update_clause}
247
+ """)
248
 
249
+ await pg_conn.execute(query, mapped_entity)
250
 
251
  logger.debug(
252
  "Employee upserted to PostgreSQL",
app/sync/employees/service.py CHANGED
@@ -12,7 +12,7 @@ from app.sync.employees.handler import EmployeeSyncHandler
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
- from app.postgres import get_postgres_connection, release_postgres_connection
16
 
17
  logger = get_logger(__name__)
18
 
@@ -312,12 +312,7 @@ class EmployeeSyncService:
312
  Raises:
313
  Exception: If sync fails
314
  """
315
- pg_conn = None
316
-
317
- try:
318
- # Acquire PostgreSQL connection
319
- pg_conn = await get_postgres_connection()
320
-
321
  # Perform sync
322
  success = await self.handler.sync(
323
  entity_id=employee_id,
@@ -326,11 +321,6 @@ class EmployeeSyncService:
326
  )
327
 
328
  return success
329
-
330
- finally:
331
- # Always release connection
332
- if pg_conn:
333
- await release_postgres_connection(pg_conn)
334
 
335
  def get_queue_size(self) -> int:
336
  """
 
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
+ from app.sql import async_engine
16
 
17
  logger = get_logger(__name__)
18
 
 
312
  Raises:
313
  Exception: If sync fails
314
  """
315
+ async with async_engine.begin() as pg_conn:
 
 
 
 
 
316
  # Perform sync
317
  success = await self.handler.sync(
318
  entity_id=employee_id,
 
321
  )
322
 
323
  return success
 
 
 
 
 
324
 
325
  def get_queue_size(self) -> int:
326
  """
app/sync/merchants/handler.py CHANGED
@@ -2,7 +2,8 @@
2
  Merchant sync handler for MongoDB to PostgreSQL synchronization.
3
  """
4
  from typing import Dict, Any, Optional
5
- import asyncpg
 
6
  from motor.motor_asyncio import AsyncIOMotorDatabase
7
  from datetime import datetime
8
  from insightfy_utils.logging import get_logger
@@ -159,7 +160,7 @@ class MerchantSyncHandler(SyncHandler):
159
  async def upsert_to_postgres(
160
  self,
161
  entity: Dict[str, Any],
162
- pg_conn: asyncpg.Connection
163
  ) -> bool:
164
  """
165
  Upsert merchant to PostgreSQL trans.merchants_ref table.
@@ -211,21 +212,20 @@ class MerchantSyncHandler(SyncHandler):
211
 
212
  # Build UPSERT query
213
  columns = list(mapped_entity.keys())
214
- placeholders = [f"${i+1}" for i in range(len(columns))]
215
- values = [mapped_entity[col] for col in columns]
216
 
217
  # Build UPDATE clause (exclude primary key)
218
  update_columns = [col for col in columns if col != "merchant_id"]
219
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
220
 
221
- query = f"""
222
  INSERT INTO trans.merchants_ref ({', '.join(columns)})
223
  VALUES ({', '.join(placeholders)})
224
  ON CONFLICT (merchant_id)
225
  DO UPDATE SET {update_clause}
226
- """
227
 
228
- await pg_conn.execute(query, *values)
229
 
230
  logger.debug(
231
  "Merchant upserted to PostgreSQL",
 
2
  Merchant sync handler for MongoDB to PostgreSQL synchronization.
3
  """
4
  from typing import Dict, Any, Optional
5
+ from sqlalchemy.ext.asyncio import AsyncConnection
6
+ from sqlalchemy import text
7
  from motor.motor_asyncio import AsyncIOMotorDatabase
8
  from datetime import datetime
9
  from insightfy_utils.logging import get_logger
 
160
  async def upsert_to_postgres(
161
  self,
162
  entity: Dict[str, Any],
163
+ pg_conn: AsyncConnection
164
  ) -> bool:
165
  """
166
  Upsert merchant to PostgreSQL trans.merchants_ref table.
 
212
 
213
  # Build UPSERT query
214
  columns = list(mapped_entity.keys())
215
+ placeholders = [f":{col}" for col in columns]
 
216
 
217
  # Build UPDATE clause (exclude primary key)
218
  update_columns = [col for col in columns if col != "merchant_id"]
219
  update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])
220
 
221
+ query = text(f"""
222
  INSERT INTO trans.merchants_ref ({', '.join(columns)})
223
  VALUES ({', '.join(placeholders)})
224
  ON CONFLICT (merchant_id)
225
  DO UPDATE SET {update_clause}
226
+ """)
227
 
228
+ await pg_conn.execute(query, mapped_entity)
229
 
230
  logger.debug(
231
  "Merchant upserted to PostgreSQL",
app/sync/merchants/service.py CHANGED
@@ -12,7 +12,7 @@ from app.sync.merchants.handler import MerchantSyncHandler
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
- from app.postgres import get_postgres_connection, release_postgres_connection
16
 
17
  logger = get_logger(__name__)
18
 
@@ -312,12 +312,7 @@ class MerchantSyncService:
312
  Raises:
313
  Exception: If sync fails
314
  """
315
- pg_conn = None
316
-
317
- try:
318
- # Acquire PostgreSQL connection
319
- pg_conn = await get_postgres_connection()
320
-
321
  # Perform sync
322
  success = await self.handler.sync(
323
  entity_id=merchant_id,
@@ -326,11 +321,6 @@ class MerchantSyncService:
326
  )
327
 
328
  return success
329
-
330
- finally:
331
- # Always release connection
332
- if pg_conn:
333
- await release_postgres_connection(pg_conn)
334
 
335
  def get_queue_size(self) -> int:
336
  """
 
12
  from app.sync.common.retry import RetryManager
13
  from app.sync.common.monitoring import SyncMonitoringService
14
  from app.sync.common.models import SyncOperation
15
+ from app.sql import async_engine
16
 
17
  logger = get_logger(__name__)
18
 
 
312
  Raises:
313
  Exception: If sync fails
314
  """
315
+ async with async_engine.begin() as pg_conn:
 
 
 
 
 
316
  # Perform sync
317
  success = await self.handler.sync(
318
  entity_id=merchant_id,
 
321
  )
322
 
323
  return success
 
 
 
 
 
324
 
325
  def get_queue_size(self) -> int:
326
  """