File size: 16,598 Bytes
a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e 03810ee a16378e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 | """
This script contains utility functions for working with Supabase.
"""
import time
import datetime
import json
import logging
import httpx
from storage3.exceptions import StorageApiError
from typing import List, Dict, Any, Optional
try:
from postgrest import APIError # supabase-py v2
except Exception: # pragma: no cover
APIError = Exception
try:
# supabase-py v2
from postgrest.types import ReturnMethod # Returning.MINIMAL
_RETURN_MINIMAL = ReturnMethod.MINIMAL
except Exception:
# supabase-py v1 fallback
_RETURN_MINIMAL = "minimal"
def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
Extract a list of dictionaries containing the on_conflict keys from each row.
If no on_conflict keys are provided, it will attempt to find a suitable key
by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row.
If no conflict keys are found, it will return an empty list.
:param rows: List of dictionaries to extract IDs from
:param on_conflict: List of column names to use as conflict target
:return: List of dictionaries containing the on_conflict keys from each row
"""
if not rows:
return []
keys: List[str] = list(on_conflict or [])
# Fallback if no explicit conflict keys were given
if not keys:
for candidate in ("id", "uuid", "hubspot_id", "object_id"):
if candidate in rows[0]:
keys = [candidate]
break
if not keys:
return []
id_list: List[Dict[str, Any]] = []
for r in rows:
id_list.append({k: r.get(k) for k in keys if k in r})
return id_list
def insert_into_supabase_table(
supabase_client: Any,
table_name: str,
rows: List[Dict[str, Any]],
on_conflict: Optional[List[str]] = None,
*,
max_retries: int = 5,
backoff_base_seconds: float = 1.0,
use_returning_minimal: bool = True,
) -> None:
"""
Insert a list of rows into a Supabase table with optional conflict handling.
:param supabase_client: Supabase client
:param table_name: Name of the table to upsert into
:param rows: List of dictionaries to insert; each dictionary becomes a row
:param on_conflict: List of column names to use as conflict target
:param max_retries: Maximum number of times to retry on failure
:param backoff_base_seconds: Base amount of time to sleep between retries
:param use_returning_minimal: Whether to use returning=MINIMAL (faster but no inserted IDs)
:return: None. If no rows are provided, returns None immediately without attempting an insert.
This function will retry up to `max_retries` times on failure with an exponential
backoff schedule. If `on_conflict` is provided, it will use the given columns as the
conflict target. If `use_returning_minimal`, it will use the `returning=MINIMAL` parameter
to reduce response size (but won't get inserted IDs back).
NOTE: This function does not support transactions; it is intended for use in simple
data pipelines where retrying on failure is sufficient. If you need stronger
consistency guarantees, use transactions or another approach.
"""
if not rows:
logging.info("No rows to insert for table %s.", table_name)
return None
q = supabase_client.table(table_name)
conflict_target = ",".join(on_conflict) if on_conflict else None
if conflict_target and use_returning_minimal:
query = q.upsert(rows, on_conflict=conflict_target,
returning=_RETURN_MINIMAL)
elif conflict_target:
query = q.upsert(rows, on_conflict=conflict_target)
elif use_returning_minimal:
query = q.upsert(rows, returning=_RETURN_MINIMAL)
else:
query = q.upsert(rows)
attempt = 0
while True:
try:
query.execute()
logging.info("Inserted %s records into %s.", len(rows), table_name)
return True
except APIError as e:
code = getattr(e, "code", None)
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
if code == "57014":
logging.warning(
"Timeout (57014) upserting into %s (attempt %d/%d). Retrying in %.1fs.",
table_name, attempt, max_retries, sleep_for
)
else:
logging.warning(
"APIError upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
except (httpx.RequestError, httpx.HTTPStatusError) as e:
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
"HTTP error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
except Exception as e:
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
"Unexpected error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
def batched_insert(
supabase_client,
table_name: str,
rows: List[Dict[str, Any]],
batch_size: int = 500,
on_conflict: Optional[List[str]] = None,
max_retries: int = 4,
backoff_base_seconds: float = 1.0,
min_batch_size: int = 50,
) -> list:
"""
Insert a list of rows into a Supabase table with optional conflict handling, batching, and retries.
:param supabase_client: Supabase client
:param table_name: Name of the table to upsert into
:param rows: List of dictionaries to insert; each dictionary becomes a row
:param batch_size: Maximum number of rows to send in a single upsert request
:param on_conflict: List of column names to use as conflict target
:param max_retries: Maximum number of times to retry on failure
:param backoff_base_seconds: Base amount of time to sleep between retries
:param min_batch_size: Minimum size of a batch; if a timeout occurs, the
batch will be shrunk to this size before retrying
:return: List of responses from Supabase (empty if no rows to insert)
This function will retry up to `max_retries` times on failure with an exponential
backoff schedule. If `on_conflict` is provided, it will use the given columns as the
conflict target. If a timeout occurs, it will shrink the batch size to `min_batch_size`
and retry the first half of the batch. If the batch is already at `min_batch_size` or
smaller, it will retry with backoff. If all retries fail, it will raise the last
exception. If the `rows` list is empty, it will return an empty list without sending
any requests to Supabase.
"""
if not rows:
return []
results = []
n = len(rows)
i = 0
while i < n:
current_batch_size = min(batch_size, n - i)
batch = rows[i: i + current_batch_size]
# Attempt to insert this batch with retries
attempt = 0
while True:
try:
q = supabase_client.table(table_name)
if on_conflict:
# `on_conflict` in supabase-py v2 expects a comma-separated string
conflict_target = ",".join(on_conflict)
resp = q.upsert(batch, on_conflict=conflict_target,
returning=_RETURN_MINIMAL).execute()
else:
resp = q.upsert(batch, returning=_RETURN_MINIMAL).execute()
results.append(resp)
logging.info(
f"Inserted batch rows {i}–{i + len(batch)} into {table_name}")
i += len(batch) # advance window
break # batch succeeded, move to next
except APIError as e:
if getattr(e, "code", None) == "57014" and current_batch_size > min_batch_size:
# Halve the batch and retry the first half now;
# the second half will be handled subsequently
half = max(min_batch_size, current_batch_size // 2)
logging.warning(
f"Timeout on {table_name} rows {i}–{i + current_batch_size}. "
f"Shrinking batch to {half} and retrying."
)
batch_size = half
time.sleep(backoff_base_seconds)
break
# Other errors or batch already at min size -> retry with backoff
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
raise
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
f"Error inserting batch rows {i}–{i + current_batch_size} into {table_name} "
f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
)
time.sleep(sleep_for)
continue
except Exception as e:
# Non-API errors: retry with backoff
attempt += 1
if attempt > max_retries:
logging.error(
f"Failed inserting batch rows {i}–{i + current_batch_size} into {table_name} "
f"after {max_retries} retries: {e}"
)
raise
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
f"Unexpected error inserting batch rows {i}–{i + current_batch_size} into {table_name} "
f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
)
time.sleep(sleep_for)
continue
return results
def upload_raw_json_to_supabase(supabase_client, json_data, object_type="contacts"):
"""
Uploads raw JSON data to Supabase storage under a specified object type directory.
:param supabase_client: The Supabase client object.
:param json_data: The JSON data to be uploaded.
:param object_type: The type of object, used to determine the directory path for storage.
:return: The path where the JSON file is stored in Supabase.
"""
now_str = datetime.datetime.now(
datetime.timezone.utc).strftime("%Y%m%d_%H%M%S")
path = f"{object_type}/{now_str}.json"
file_bytes = json.dumps(json_data, indent=2, default=str).encode("utf-8")
supabase_client.storage.from_("hubspot-raw-data").remove([path])
try:
supabase_client.storage.from_("hubspot-raw-data").upload(
path, file=file_bytes, file_options={
"content-type": "application/json"}
)
except StorageApiError as e:
if e.status_code == 413:
logging.warning(
"Upload failed: payload too large for Supabase Storage.")
else:
logging.error("Storage API error during upload: %s", e)
except httpx.RequestError as e:
logging.error("Upload error: %s", e)
return path
def get_last_sync_time(supabase_client, object_type):
"""
Retrieves the last sync time for the given object type from the hubspot_sync_metadata table.
:param supabase_client: The Supabase client object.
:param object_type: The type of object to retrieve the last sync time for.
:return: The last sync time for the given object type as a datetime object.
If no previous sync was found, returns None.
"""
res = supabase_client.table("hubspot_sync_metadata").select(
"last_sync_time").eq("object_type", object_type).execute()
data = res.data
if data and data[0]["last_sync_time"]:
last_sync = datetime.datetime.fromisoformat(data[0]["last_sync_time"])
return last_sync
return None
def update_sync_metadata(supabase_client, object_type, sync_time):
"""
Updates the last sync time for the given object type in the hubspot_sync_metadata table.
:param supabase_client: The Supabase client object.
:param object_type: The type of object to update the last sync time for.
:param sync_time: The last sync time to update.
:return: The result of the Supabase upsert operation.
"""
payload = [{
"object_type": object_type,
"last_sync_time": sync_time,
"updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
}]
return insert_into_supabase_table(supabase_client, "hubspot_sync_metadata", payload)
def get_existing_email_content_ids(supabase_client):
"""
Fetches existing email_ids from the hubspot_email_contents table.
:param supabase_client: Supabase client
:return: A set of email_id values already in the database
"""
existing_ids = set()
page_size = 1000
offset = 0
while True:
res = supabase_client.table("hubspot_email_contents")\
.select("email_id")\
.range(offset, offset + page_size - 1)\
.execute()
if not res.data:
break
batch = {row["email_id"] for row in res.data}
existing_ids.update(batch)
offset += page_size
return existing_ids
def fetch_supabase_table(
supabase_client, table_name="hubspot_contacts",
id_column="contact_id"
):
"""
Fetches all rows from a Supabase table and returns them as a dict.
:param supabase_client: Supabase client instance
:param table_name: Name of the table to fetch from
:param id_column: Unique ID column to use as the key for the returned dict
:return: A dict of rows with the ID column as the key and the full row as the value
"""
all_rows = {}
page = 0
page_size = 1000
while True:
res = supabase_client.table(table_name).select(
"*").range(page * page_size, (page + 1) * page_size - 1).execute()
if not res.data:
break
for row in res.data:
all_rows[str(row[id_column])] = row
if len(res.data) < page_size:
break
page += 1
return all_rows
def enrich_supabase_row(base_row: dict) -> dict:
"""
Enriches a Supabase row with additional columns needed for duplicate detection.
:param base_row: The base row to enrich.
:return: The enriched row.
"""
base_row.update({
"duplicate_id": None,
"duplicate_status": None,
"duplicate_action": None,
"is_primary": True,
"updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
})
return base_row
|