n8n-duplicate / python /supabase_utils.py
niwayandm
Enhance Supabase utility functions to include affected row IDs. Edited texts in load data.
03810ee
"""
This script contains utility functions for working with Supabase.
"""
import time
import datetime
import json
import logging
import httpx
from storage3.exceptions import StorageApiError
from typing import List, Dict, Any, Optional
try:
from postgrest import APIError # supabase-py v2
except Exception: # pragma: no cover
APIError = Exception
try:
# supabase-py v2
from postgrest.types import ReturnMethod # Returning.MINIMAL
_RETURN_MINIMAL = ReturnMethod.MINIMAL
except Exception:
# supabase-py v1 fallback
_RETURN_MINIMAL = "minimal"
def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
Extract a list of dictionaries containing the on_conflict keys from each row.
If no on_conflict keys are provided, it will attempt to find a suitable key
by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row.
If no conflict keys are found, it will return an empty list.
:param rows: List of dictionaries to extract IDs from
:param on_conflict: List of column names to use as conflict target
:return: List of dictionaries containing the on_conflict keys from each row
"""
if not rows:
return []
keys: List[str] = list(on_conflict or [])
# Fallback if no explicit conflict keys were given
if not keys:
for candidate in ("id", "uuid", "hubspot_id", "object_id"):
if candidate in rows[0]:
keys = [candidate]
break
if not keys:
return []
id_list: List[Dict[str, Any]] = []
for r in rows:
id_list.append({k: r.get(k) for k in keys if k in r})
return id_list
def insert_into_supabase_table(
supabase_client: Any,
table_name: str,
rows: List[Dict[str, Any]],
on_conflict: Optional[List[str]] = None,
*,
max_retries: int = 5,
backoff_base_seconds: float = 1.0,
use_returning_minimal: bool = True,
) -> None:
"""
Insert a list of rows into a Supabase table with optional conflict handling.
:param supabase_client: Supabase client
:param table_name: Name of the table to upsert into
:param rows: List of dictionaries to insert; each dictionary becomes a row
:param on_conflict: List of column names to use as conflict target
:param max_retries: Maximum number of times to retry on failure
:param backoff_base_seconds: Base amount of time to sleep between retries
:param use_returning_minimal: Whether to use returning=MINIMAL (faster but no inserted IDs)
:return: None. If no rows are provided, returns None immediately without attempting an insert.
This function will retry up to `max_retries` times on failure with an exponential
backoff schedule. If `on_conflict` is provided, it will use the given columns as the
conflict target. If `use_returning_minimal`, it will use the `returning=MINIMAL` parameter
to reduce response size (but won't get inserted IDs back).
NOTE: This function does not support transactions; it is intended for use in simple
data pipelines where retrying on failure is sufficient. If you need stronger
consistency guarantees, use transactions or another approach.
"""
if not rows:
logging.info("No rows to insert for table %s.", table_name)
return None
q = supabase_client.table(table_name)
conflict_target = ",".join(on_conflict) if on_conflict else None
if conflict_target and use_returning_minimal:
query = q.upsert(rows, on_conflict=conflict_target,
returning=_RETURN_MINIMAL)
elif conflict_target:
query = q.upsert(rows, on_conflict=conflict_target)
elif use_returning_minimal:
query = q.upsert(rows, returning=_RETURN_MINIMAL)
else:
query = q.upsert(rows)
attempt = 0
while True:
try:
query.execute()
logging.info("Inserted %s records into %s.", len(rows), table_name)
return True
except APIError as e:
code = getattr(e, "code", None)
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
if code == "57014":
logging.warning(
"Timeout (57014) upserting into %s (attempt %d/%d). Retrying in %.1fs.",
table_name, attempt, max_retries, sleep_for
)
else:
logging.warning(
"APIError upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
except (httpx.RequestError, httpx.HTTPStatusError) as e:
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
"HTTP error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
except Exception as e:
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
return False
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
"Unexpected error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
table_name, attempt, max_retries, e, sleep_for
)
time.sleep(sleep_for)
def batched_insert(
supabase_client,
table_name: str,
rows: List[Dict[str, Any]],
batch_size: int = 500,
on_conflict: Optional[List[str]] = None,
max_retries: int = 4,
backoff_base_seconds: float = 1.0,
min_batch_size: int = 50,
) -> list:
"""
Insert a list of rows into a Supabase table with optional conflict handling, batching, and retries.
:param supabase_client: Supabase client
:param table_name: Name of the table to upsert into
:param rows: List of dictionaries to insert; each dictionary becomes a row
:param batch_size: Maximum number of rows to send in a single upsert request
:param on_conflict: List of column names to use as conflict target
:param max_retries: Maximum number of times to retry on failure
:param backoff_base_seconds: Base amount of time to sleep between retries
:param min_batch_size: Minimum size of a batch; if a timeout occurs, the
batch will be shrunk to this size before retrying
:return: List of responses from Supabase (empty if no rows to insert)
This function will retry up to `max_retries` times on failure with an exponential
backoff schedule. If `on_conflict` is provided, it will use the given columns as the
conflict target. If a timeout occurs, it will shrink the batch size to `min_batch_size`
and retry the first half of the batch. If the batch is already at `min_batch_size` or
smaller, it will retry with backoff. If all retries fail, it will raise the last
exception. If the `rows` list is empty, it will return an empty list without sending
any requests to Supabase.
"""
if not rows:
return []
results = []
n = len(rows)
i = 0
while i < n:
current_batch_size = min(batch_size, n - i)
batch = rows[i: i + current_batch_size]
# Attempt to insert this batch with retries
attempt = 0
while True:
try:
q = supabase_client.table(table_name)
if on_conflict:
# `on_conflict` in supabase-py v2 expects a comma-separated string
conflict_target = ",".join(on_conflict)
resp = q.upsert(batch, on_conflict=conflict_target,
returning=_RETURN_MINIMAL).execute()
else:
resp = q.upsert(batch, returning=_RETURN_MINIMAL).execute()
results.append(resp)
logging.info(
f"Inserted batch rows {i}{i + len(batch)} into {table_name}")
i += len(batch) # advance window
break # batch succeeded, move to next
except APIError as e:
if getattr(e, "code", None) == "57014" and current_batch_size > min_batch_size:
# Halve the batch and retry the first half now;
# the second half will be handled subsequently
half = max(min_batch_size, current_batch_size // 2)
logging.warning(
f"Timeout on {table_name} rows {i}{i + current_batch_size}. "
f"Shrinking batch to {half} and retrying."
)
batch_size = half
time.sleep(backoff_base_seconds)
break
# Other errors or batch already at min size -> retry with backoff
attempt += 1
if attempt > max_retries:
affected_ids = extract_row_ids(rows, on_conflict)
logging.error(
"Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
table_name,
attempt,
e,
affected_ids,
)
raise
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
f"Error inserting batch rows {i}{i + current_batch_size} into {table_name} "
f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
)
time.sleep(sleep_for)
continue
except Exception as e:
# Non-API errors: retry with backoff
attempt += 1
if attempt > max_retries:
logging.error(
f"Failed inserting batch rows {i}{i + current_batch_size} into {table_name} "
f"after {max_retries} retries: {e}"
)
raise
sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
logging.warning(
f"Unexpected error inserting batch rows {i}{i + current_batch_size} into {table_name} "
f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
)
time.sleep(sleep_for)
continue
return results
def upload_raw_json_to_supabase(supabase_client, json_data, object_type="contacts"):
"""
Uploads raw JSON data to Supabase storage under a specified object type directory.
:param supabase_client: The Supabase client object.
:param json_data: The JSON data to be uploaded.
:param object_type: The type of object, used to determine the directory path for storage.
:return: The path where the JSON file is stored in Supabase.
"""
now_str = datetime.datetime.now(
datetime.timezone.utc).strftime("%Y%m%d_%H%M%S")
path = f"{object_type}/{now_str}.json"
file_bytes = json.dumps(json_data, indent=2, default=str).encode("utf-8")
supabase_client.storage.from_("hubspot-raw-data").remove([path])
try:
supabase_client.storage.from_("hubspot-raw-data").upload(
path, file=file_bytes, file_options={
"content-type": "application/json"}
)
except StorageApiError as e:
if e.status_code == 413:
logging.warning(
"Upload failed: payload too large for Supabase Storage.")
else:
logging.error("Storage API error during upload: %s", e)
except httpx.RequestError as e:
logging.error("Upload error: %s", e)
return path
def get_last_sync_time(supabase_client, object_type):
"""
Retrieves the last sync time for the given object type from the hubspot_sync_metadata table.
:param supabase_client: The Supabase client object.
:param object_type: The type of object to retrieve the last sync time for.
:return: The last sync time for the given object type as a datetime object.
If no previous sync was found, returns None.
"""
res = supabase_client.table("hubspot_sync_metadata").select(
"last_sync_time").eq("object_type", object_type).execute()
data = res.data
if data and data[0]["last_sync_time"]:
last_sync = datetime.datetime.fromisoformat(data[0]["last_sync_time"])
return last_sync
return None
def update_sync_metadata(supabase_client, object_type, sync_time):
"""
Updates the last sync time for the given object type in the hubspot_sync_metadata table.
:param supabase_client: The Supabase client object.
:param object_type: The type of object to update the last sync time for.
:param sync_time: The last sync time to update.
:return: The result of the Supabase upsert operation.
"""
payload = [{
"object_type": object_type,
"last_sync_time": sync_time,
"updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
}]
return insert_into_supabase_table(supabase_client, "hubspot_sync_metadata", payload)
def get_existing_email_content_ids(supabase_client):
"""
Fetches existing email_ids from the hubspot_email_contents table.
:param supabase_client: Supabase client
:return: A set of email_id values already in the database
"""
existing_ids = set()
page_size = 1000
offset = 0
while True:
res = supabase_client.table("hubspot_email_contents")\
.select("email_id")\
.range(offset, offset + page_size - 1)\
.execute()
if not res.data:
break
batch = {row["email_id"] for row in res.data}
existing_ids.update(batch)
offset += page_size
return existing_ids
def fetch_supabase_table(
supabase_client, table_name="hubspot_contacts",
id_column="contact_id"
):
"""
Fetches all rows from a Supabase table and returns them as a dict.
:param supabase_client: Supabase client instance
:param table_name: Name of the table to fetch from
:param id_column: Unique ID column to use as the key for the returned dict
:return: A dict of rows with the ID column as the key and the full row as the value
"""
all_rows = {}
page = 0
page_size = 1000
while True:
res = supabase_client.table(table_name).select(
"*").range(page * page_size, (page + 1) * page_size - 1).execute()
if not res.data:
break
for row in res.data:
all_rows[str(row[id_column])] = row
if len(res.data) < page_size:
break
page += 1
return all_rows
def enrich_supabase_row(base_row: dict) -> dict:
"""
Enriches a Supabase row with additional columns needed for duplicate detection.
:param base_row: The base row to enrich.
:return: The enriched row.
"""
base_row.update({
"duplicate_id": None,
"duplicate_status": None,
"duplicate_action": None,
"is_primary": True,
"updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
})
return base_row