ishaq101 commited on
Commit
1228c41
·
1 Parent(s): 1ee7e5c

[NOTICKET] Feat: retry_db decorator

Browse files
externals/databases/pg_crud.py CHANGED
@@ -1,7 +1,7 @@
1
  from typing import Optional, List
2
  from uuid import UUID
3
 
4
- from sqlalchemy import select, and_, update, delete, or_, func
5
  from sqlalchemy.ext.asyncio import AsyncSession
6
  from externals.databases.pg_models import (
7
  CVUser,
@@ -13,6 +13,10 @@ from externals.databases.pg_models import (
13
  CVMatching,
14
  CVScore,
15
  )
 
 
 
 
16
 
17
  # =========================
18
  # USER
@@ -20,6 +24,8 @@ from externals.databases.pg_models import (
20
  from externals.databases.schemas.user import UserCreate, UserResponse
21
  from utils.security import hash_password
22
 
 
 
23
  async def get_user_by_username(
24
  db: AsyncSession,
25
  username: str,
@@ -30,6 +36,7 @@ async def get_user_by_username(
30
  return result.scalar_one_or_none()
31
 
32
 
 
33
  async def create_user(
34
  db: AsyncSession,
35
  user_in: UserCreate,
@@ -52,6 +59,8 @@ async def create_user(
52
 
53
  return user
54
 
 
 
55
  async def deactivate_user(
56
  db: AsyncSession,
57
  username: str,
@@ -71,6 +80,7 @@ async def deactivate_user(
71
  return user
72
 
73
 
 
74
  async def get_user_by_email(db, email: str):
75
  result = await db.execute(
76
  select(CVUser).where(CVUser.email == email)
@@ -78,6 +88,7 @@ async def get_user_by_email(db, email: str):
78
  return result.scalar_one_or_none()
79
 
80
 
 
81
  async def get_user_by_id(db, user_id: str):
82
  result = await db.execute(
83
  select(CVUser).where(CVUser.user_id == user_id)
@@ -90,6 +101,8 @@ async def get_user_by_id(db, user_id: str):
90
  # =========================
91
  from externals.databases.schemas.tenant import TenantCreate
92
 
 
 
93
  async def get_tenant_by_name(
94
  db: AsyncSession,
95
  tenant_name: str,
@@ -100,6 +113,7 @@ async def get_tenant_by_name(
100
  return result.scalar_one_or_none()
101
 
102
 
 
103
  async def create_tenant(
104
  db: AsyncSession,
105
  tenant_in: TenantCreate,
@@ -121,15 +135,7 @@ async def create_tenant(
121
  # FILE
122
  # =========================
123
 
124
- # async def get_file_by_filename(
125
- # db: AsyncSession,
126
- # filename: str,
127
- # ) -> Optional[CVFile]:
128
- # stmt = select(CVFile).where(CVFile.filename == filename)
129
- # result = await db.execute(stmt)
130
- # return result.scalar_one_or_none()
131
-
132
-
133
  async def mark_file_extracted(
134
  db: AsyncSession,
135
  file_id: UUID,
@@ -166,6 +172,7 @@ async def create_cv_file(
166
  return cv_file
167
 
168
 
 
169
  async def delete_file_by_filename(
170
  db: AsyncSession,
171
  filename: str,
 
1
  from typing import Optional, List
2
  from uuid import UUID
3
 
4
+ from sqlalchemy import select, and_, update, delete, func
5
  from sqlalchemy.ext.asyncio import AsyncSession
6
  from externals.databases.pg_models import (
7
  CVUser,
 
13
  CVMatching,
14
  CVScore,
15
  )
16
+ from utils.decorator import retry_db
17
+
18
+
19
+
20
 
21
  # =========================
22
  # USER
 
24
  from externals.databases.schemas.user import UserCreate, UserResponse
25
  from utils.security import hash_password
26
 
27
+
28
+ @retry_db(retries=1, delay=2)
29
  async def get_user_by_username(
30
  db: AsyncSession,
31
  username: str,
 
36
  return result.scalar_one_or_none()
37
 
38
 
39
+ @retry_db(retries=1, delay=2)
40
  async def create_user(
41
  db: AsyncSession,
42
  user_in: UserCreate,
 
59
 
60
  return user
61
 
62
+
63
+ @retry_db(retries=1, delay=2)
64
  async def deactivate_user(
65
  db: AsyncSession,
66
  username: str,
 
80
  return user
81
 
82
 
83
+ @retry_db(retries=1, delay=2)
84
  async def get_user_by_email(db, email: str):
85
  result = await db.execute(
86
  select(CVUser).where(CVUser.email == email)
 
88
  return result.scalar_one_or_none()
89
 
90
 
91
+ @retry_db(retries=1, delay=2)
92
  async def get_user_by_id(db, user_id: str):
93
  result = await db.execute(
94
  select(CVUser).where(CVUser.user_id == user_id)
 
101
  # =========================
102
  from externals.databases.schemas.tenant import TenantCreate
103
 
104
+
105
+ @retry_db(retries=1, delay=2)
106
  async def get_tenant_by_name(
107
  db: AsyncSession,
108
  tenant_name: str,
 
113
  return result.scalar_one_or_none()
114
 
115
 
116
+ @retry_db(retries=1, delay=2)
117
  async def create_tenant(
118
  db: AsyncSession,
119
  tenant_in: TenantCreate,
 
135
  # FILE
136
  # =========================
137
 
138
+ @retry_db(retries=1, delay=2)
 
 
 
 
 
 
 
 
139
  async def mark_file_extracted(
140
  db: AsyncSession,
141
  file_id: UUID,
 
172
  return cv_file
173
 
174
 
175
+ @retry_db(retries=1, delay=2)
176
  async def delete_file_by_filename(
177
  db: AsyncSession,
178
  filename: str,
externals/databases/pg_models.py CHANGED
@@ -3,6 +3,19 @@ from config.get_config import master_config
3
  from sqlalchemy import Column, String, Integer, TIMESTAMP, func
4
  from sqlalchemy.dialects.postgresql import UUID, ARRAY, DOUBLE_PRECISION, BOOLEAN
5
  from sqlalchemy.orm import declarative_base
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
 
8
  Base = declarative_base()
 
3
  from sqlalchemy import Column, String, Integer, TIMESTAMP, func
4
  from sqlalchemy.dialects.postgresql import UUID, ARRAY, DOUBLE_PRECISION, BOOLEAN
5
  from sqlalchemy.orm import declarative_base
6
+ # from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
7
+ # from config.constant import EnvPostgresConstants
8
+
9
+ # engine = create_async_engine(
10
+ # EnvPostgresConstants.CONSTRING.replace('psycopg2', 'asyncpg'),
11
+ # pool_pre_ping=True,
12
+ # pool_recycle=300,
13
+ # )
14
+
15
+ # AsyncSessionLocal = async_sessionmaker(
16
+ # engine,
17
+ # expire_on_commit=False,
18
+ # )
19
 
20
 
21
  Base = declarative_base()
utils/decorator.py CHANGED
@@ -1,10 +1,11 @@
1
  import asyncio
 
2
  import inspect
3
- import functools
4
  import time
5
- import warnings
6
  from functools import wraps
7
  from typing import Callable, Any
 
8
 
9
  def trace_runtime(func: Callable) -> Callable:
10
  @wraps(func)
@@ -37,18 +38,43 @@ def trace_runtime(func: Callable) -> Callable:
37
  return sync_wrapper
38
 
39
 
40
- def deprecated(reason):
41
- """
42
- This is a decorator which can be used to mark functions
43
- and classes as deprecated.
44
- """
45
- def decorator(func_or_class):
46
- @functools.wraps(func_or_class)
47
- def new_func(*args, **kwargs):
48
- warnings.warn(f"Call to deprecated {func_or_class.__name__}."
49
- f" Reason: {reason}",
50
- category=FutureWarning,
51
- stacklevel=2)
52
- return func_or_class(*args, **kwargs)
53
- return new_func
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  return decorator
 
1
  import asyncio
2
+ import asyncpg
3
  import inspect
 
4
  import time
5
+
6
  from functools import wraps
7
  from typing import Callable, Any
8
+ from sqlalchemy.exc import OperationalError, InterfaceError
9
 
10
  def trace_runtime(func: Callable) -> Callable:
11
  @wraps(func)
 
38
  return sync_wrapper
39
 
40
 
41
+
42
+ def retry_db(
43
+ retries: int = 3,
44
+ delay: float = 2.0,
45
+ backoff: float = 2.0,
46
+ ) -> Callable:
47
+
48
+ def decorator(func: Callable) -> Callable:
49
+
50
+ @wraps(func)
51
+ async def async_wrapper(*args, **kwargs) -> Any:
52
+ current_delay = delay
53
+
54
+ for attempt in range(1, retries + 1):
55
+ try:
56
+ return await func(*args, **kwargs)
57
+
58
+ except (
59
+ OperationalError,
60
+ InterfaceError,
61
+ asyncpg.exceptions.PostgresConnectionError,
62
+ asyncpg.exceptions.CannotConnectNowError,
63
+ ConnectionError,
64
+ TimeoutError,
65
+ ) as e:
66
+
67
+ if attempt == retries:
68
+ raise
69
+
70
+ print(
71
+ f"🔁 Retry {attempt}/{retries} for '{func.__name__}' "
72
+ f"after {current_delay:.2f}s due to: {type(e).__name__}"
73
+ )
74
+
75
+ await asyncio.sleep(current_delay)
76
+ current_delay *= backoff
77
+
78
+ return async_wrapper
79
+
80
  return decorator