sharktide commited on
Commit
1a31c75
·
verified ·
1 Parent(s): 61b4d52

Update helper/subscriptions.py

Browse files
Files changed (1) hide show
  1. helper/subscriptions.py +29 -20
helper/subscriptions.py CHANGED
@@ -129,37 +129,46 @@ async def get_conn():
129
  return conn
130
 
131
 
 
 
 
132
  async def execute_query(query: str, params=(), *, fetchone: bool = False, commit: bool = False):
133
  async with conn_lock:
134
  connection = await get_conn()
135
 
136
  try:
137
- with connection.cursor() as cur:
138
- cur.execute(query, params)
139
- if cur.description is None:
140
- result = None if fetchone else []
141
- else:
142
- result = cur.fetchone() if fetchone else cur.fetchall()
143
- if commit:
144
- connection.commit()
145
- return result
146
- except psycopg.OperationalError:
147
- connection = psycopg.connect(
 
 
148
  POSTGRE_SECRET,
149
  row_factory=dict_row,
150
  sslmode="verify-full",
151
  sslrootcert="prod-ca-2021.crt",
152
  )
153
  globals()["conn"] = connection
154
- with connection.cursor() as cur:
155
- cur.execute(query, params)
156
- if cur.description is None:
157
- result = None if fetchone else []
158
- else:
159
- result = cur.fetchone() if fetchone else cur.fetchall()
160
- if commit:
161
- connection.commit()
162
- return result
 
 
 
 
163
 
164
 
165
  def normalize_plan_key(plan_name: str | None) -> str:
 
129
  return conn
130
 
131
 
132
+ import psycopg
133
+ from psycopg.rows import dict_row
134
+
135
  async def execute_query(query: str, params=(), *, fetchone: bool = False, commit: bool = False):
136
  async with conn_lock:
137
  connection = await get_conn()
138
 
139
  try:
140
+ # Use connection.transaction() to safely manage rollback/commit automatically
141
+ async with connection.transaction():
142
+ async with connection.cursor() as cur:
143
+ await cur.execute(query, params)
144
+
145
+ if cur.description is None:
146
+ return None if fetchone else []
147
+
148
+ return await cur.fetchone() if fetchone else await cur.fetchall()
149
+
150
+ except (psycopg.OperationalError, psycopg.InterfaceError):
151
+ # 1. Handle dropped connections by reconnecting asynchronously
152
+ connection = await psycopg.AsyncConnection.connect(
153
  POSTGRE_SECRET,
154
  row_factory=dict_row,
155
  sslmode="verify-full",
156
  sslrootcert="prod-ca-2021.crt",
157
  )
158
  globals()["conn"] = connection
159
+
160
+ # 2. Retry the query safely on the new connection
161
+ async with connection.transaction():
162
+ async with connection.cursor() as cur:
163
+ await cur.execute(query, params)
164
+ if cur.description is None:
165
+ return None if fetchone else []
166
+ return await cur.fetchone() if fetchone else await cur.fetchall()
167
+
168
+ except psycopg.Error:
169
+ # 3. Handle bad SQL/transactions: the transaction context manager
170
+ # automatically issued a ROLLBACK before reaching this point.
171
+ raise
172
 
173
 
174
  def normalize_plan_key(plan_name: str | None) -> str: