niwayandm commited on
Commit
03810ee
·
1 Parent(s): 9625e2f

Enhance Supabase utility functions to include affected row IDs. Edited texts in load data.

Browse files
python/load_hubspot_data.py CHANGED
@@ -145,16 +145,16 @@ def main():
145
  logging.exception("Error running emails pipeline: %s", e)
146
 
147
  try:
148
- print("\n[6/7] Billing")
149
  hubspot_billing.main(since_ms=since_ms)
150
  except Exception as e:
151
- logging.exception("Error running billing pipeline: %s", e)
152
 
153
  try:
154
- print("\n[7/7] Billing")
155
  hubspot_audit.main(since_ms=since_ms)
156
  except Exception as e:
157
- logging.exception("Error running billing pipeline: %s", e)
158
 
159
  print("\n=== HubSpot sync complete ===")
160
 
 
145
  logging.exception("Error running emails pipeline: %s", e)
146
 
147
  try:
148
+ print("\n[6/7] Billing Services")
149
  hubspot_billing.main(since_ms=since_ms)
150
  except Exception as e:
151
+ logging.exception("Error running billing services pipeline: %s", e)
152
 
153
  try:
154
+ print("\n[7/7] Audit Logs")
155
  hubspot_audit.main(since_ms=since_ms)
156
  except Exception as e:
157
+ logging.exception("Error running audit log pipeline: %s", e)
158
 
159
  print("\n=== HubSpot sync complete ===")
160
 
python/supabase_utils.py CHANGED
@@ -22,6 +22,41 @@ except Exception:
22
  _RETURN_MINIMAL = "minimal"
23
 
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def insert_into_supabase_table(
26
  supabase_client: Any,
27
  table_name: str,
@@ -83,9 +118,13 @@ def insert_into_supabase_table(
83
  attempt += 1
84
 
85
  if attempt > max_retries:
 
86
  logging.error(
87
- "Permanent failure upserting into %s after %d attempts: %s",
88
- table_name, attempt, e
 
 
 
89
  )
90
  return False
91
 
@@ -105,9 +144,13 @@ def insert_into_supabase_table(
105
  except (httpx.RequestError, httpx.HTTPStatusError) as e:
106
  attempt += 1
107
  if attempt > max_retries:
 
108
  logging.error(
109
- "Permanent HTTP failure upserting into %s after %d attempts: %s",
110
- table_name, attempt, e
 
 
 
111
  )
112
  return False
113
 
@@ -121,9 +164,13 @@ def insert_into_supabase_table(
121
  except Exception as e:
122
  attempt += 1
123
  if attempt > max_retries:
 
124
  logging.error(
125
- "Permanent unexpected failure upserting into %s after %d attempts: %s",
126
- table_name, attempt, e
 
 
 
127
  )
128
  return False
129
 
@@ -214,9 +261,13 @@ def batched_insert(
214
  # Other errors or batch already at min size -> retry with backoff
215
  attempt += 1
216
  if attempt > max_retries:
 
217
  logging.error(
218
- f"Failed inserting batch rows {i}–{i + current_batch_size} into {table_name} "
219
- f"after {max_retries} retries: {e}"
 
 
 
220
  )
221
  raise
222
 
 
22
  _RETURN_MINIMAL = "minimal"
23
 
24
 
25
+ def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]:
26
+ """
27
+ Extract a list of dictionaries containing the on_conflict keys from each row.
28
+
29
+ If no on_conflict keys are provided, it will attempt to find a suitable key
30
+ by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row.
31
+
32
+ If no conflict keys are found, it will return an empty list.
33
+
34
+ :param rows: List of dictionaries to extract IDs from
35
+ :param on_conflict: List of column names to use as conflict target
36
+ :return: List of dictionaries containing the on_conflict keys from each row
37
+ """
38
+ if not rows:
39
+ return []
40
+
41
+ keys: List[str] = list(on_conflict or [])
42
+
43
+ # Fallback if no explicit conflict keys were given
44
+ if not keys:
45
+ for candidate in ("id", "uuid", "hubspot_id", "object_id"):
46
+ if candidate in rows[0]:
47
+ keys = [candidate]
48
+ break
49
+
50
+ if not keys:
51
+ return []
52
+
53
+ id_list: List[Dict[str, Any]] = []
54
+ for r in rows:
55
+ id_list.append({k: r.get(k) for k in keys if k in r})
56
+
57
+ return id_list
58
+
59
+
60
  def insert_into_supabase_table(
61
  supabase_client: Any,
62
  table_name: str,
 
118
  attempt += 1
119
 
120
  if attempt > max_retries:
121
+ affected_ids = extract_row_ids(rows, on_conflict)
122
  logging.error(
123
+ "Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s",
124
+ table_name,
125
+ attempt,
126
+ e,
127
+ affected_ids,
128
  )
129
  return False
130
 
 
144
  except (httpx.RequestError, httpx.HTTPStatusError) as e:
145
  attempt += 1
146
  if attempt > max_retries:
147
+ affected_ids = extract_row_ids(rows, on_conflict)
148
  logging.error(
149
+ "Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s",
150
+ table_name,
151
+ attempt,
152
+ e,
153
+ affected_ids,
154
  )
155
  return False
156
 
 
164
  except Exception as e:
165
  attempt += 1
166
  if attempt > max_retries:
167
+ affected_ids = extract_row_ids(rows, on_conflict)
168
  logging.error(
169
+ "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
170
+ table_name,
171
+ attempt,
172
+ e,
173
+ affected_ids,
174
  )
175
  return False
176
 
 
261
  # Other errors or batch already at min size -> retry with backoff
262
  attempt += 1
263
  if attempt > max_retries:
264
+ affected_ids = extract_row_ids(rows, on_conflict)
265
  logging.error(
266
+ "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
267
+ table_name,
268
+ attempt,
269
+ e,
270
+ affected_ids,
271
  )
272
  raise
273