""" This script contains utility functions for working with Supabase. """ import time import datetime import json import logging import httpx from storage3.exceptions import StorageApiError from typing import List, Dict, Any, Optional try: from postgrest import APIError # supabase-py v2 except Exception: # pragma: no cover APIError = Exception try: # supabase-py v2 from postgrest.types import ReturnMethod # Returning.MINIMAL _RETURN_MINIMAL = ReturnMethod.MINIMAL except Exception: # supabase-py v1 fallback _RETURN_MINIMAL = "minimal" def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]: """ Extract a list of dictionaries containing the on_conflict keys from each row. If no on_conflict keys are provided, it will attempt to find a suitable key by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row. If no conflict keys are found, it will return an empty list. :param rows: List of dictionaries to extract IDs from :param on_conflict: List of column names to use as conflict target :return: List of dictionaries containing the on_conflict keys from each row """ if not rows: return [] keys: List[str] = list(on_conflict or []) # Fallback if no explicit conflict keys were given if not keys: for candidate in ("id", "uuid", "hubspot_id", "object_id"): if candidate in rows[0]: keys = [candidate] break if not keys: return [] id_list: List[Dict[str, Any]] = [] for r in rows: id_list.append({k: r.get(k) for k in keys if k in r}) return id_list def insert_into_supabase_table( supabase_client: Any, table_name: str, rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None, *, max_retries: int = 5, backoff_base_seconds: float = 1.0, use_returning_minimal: bool = True, ) -> None: """ Insert a list of rows into a Supabase table with optional conflict handling. :param supabase_client: Supabase client :param table_name: Name of the table to upsert into :param rows: List of dictionaries to insert; each dictionary becomes a row :param on_conflict: List of column names to use as conflict target :param max_retries: Maximum number of times to retry on failure :param backoff_base_seconds: Base amount of time to sleep between retries :param use_returning_minimal: Whether to use returning=MINIMAL (faster but no inserted IDs) :return: None. If no rows are provided, returns None immediately without attempting an insert. This function will retry up to `max_retries` times on failure with an exponential backoff schedule. If `on_conflict` is provided, it will use the given columns as the conflict target. If `use_returning_minimal`, it will use the `returning=MINIMAL` parameter to reduce response size (but won't get inserted IDs back). NOTE: This function does not support transactions; it is intended for use in simple data pipelines where retrying on failure is sufficient. If you need stronger consistency guarantees, use transactions or another approach. """ if not rows: logging.info("No rows to insert for table %s.", table_name) return None q = supabase_client.table(table_name) conflict_target = ",".join(on_conflict) if on_conflict else None if conflict_target and use_returning_minimal: query = q.upsert(rows, on_conflict=conflict_target, returning=_RETURN_MINIMAL) elif conflict_target: query = q.upsert(rows, on_conflict=conflict_target) elif use_returning_minimal: query = q.upsert(rows, returning=_RETURN_MINIMAL) else: query = q.upsert(rows) attempt = 0 while True: try: query.execute() logging.info("Inserted %s records into %s.", len(rows), table_name) return True except APIError as e: code = getattr(e, "code", None) attempt += 1 if attempt > max_retries: affected_ids = extract_row_ids(rows, on_conflict) logging.error( "Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s", table_name, attempt, e, affected_ids, ) return False sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) if code == "57014": logging.warning( "Timeout (57014) upserting into %s (attempt %d/%d). Retrying in %.1fs.", table_name, attempt, max_retries, sleep_for ) else: logging.warning( "APIError upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", table_name, attempt, max_retries, e, sleep_for ) time.sleep(sleep_for) except (httpx.RequestError, httpx.HTTPStatusError) as e: attempt += 1 if attempt > max_retries: affected_ids = extract_row_ids(rows, on_conflict) logging.error( "Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s", table_name, attempt, e, affected_ids, ) return False sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) logging.warning( "HTTP error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", table_name, attempt, max_retries, e, sleep_for ) time.sleep(sleep_for) except Exception as e: attempt += 1 if attempt > max_retries: affected_ids = extract_row_ids(rows, on_conflict) logging.error( "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s", table_name, attempt, e, affected_ids, ) return False sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) logging.warning( "Unexpected error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", table_name, attempt, max_retries, e, sleep_for ) time.sleep(sleep_for) def batched_insert( supabase_client, table_name: str, rows: List[Dict[str, Any]], batch_size: int = 500, on_conflict: Optional[List[str]] = None, max_retries: int = 4, backoff_base_seconds: float = 1.0, min_batch_size: int = 50, ) -> list: """ Insert a list of rows into a Supabase table with optional conflict handling, batching, and retries. :param supabase_client: Supabase client :param table_name: Name of the table to upsert into :param rows: List of dictionaries to insert; each dictionary becomes a row :param batch_size: Maximum number of rows to send in a single upsert request :param on_conflict: List of column names to use as conflict target :param max_retries: Maximum number of times to retry on failure :param backoff_base_seconds: Base amount of time to sleep between retries :param min_batch_size: Minimum size of a batch; if a timeout occurs, the batch will be shrunk to this size before retrying :return: List of responses from Supabase (empty if no rows to insert) This function will retry up to `max_retries` times on failure with an exponential backoff schedule. If `on_conflict` is provided, it will use the given columns as the conflict target. If a timeout occurs, it will shrink the batch size to `min_batch_size` and retry the first half of the batch. If the batch is already at `min_batch_size` or smaller, it will retry with backoff. If all retries fail, it will raise the last exception. If the `rows` list is empty, it will return an empty list without sending any requests to Supabase. """ if not rows: return [] results = [] n = len(rows) i = 0 while i < n: current_batch_size = min(batch_size, n - i) batch = rows[i: i + current_batch_size] # Attempt to insert this batch with retries attempt = 0 while True: try: q = supabase_client.table(table_name) if on_conflict: # `on_conflict` in supabase-py v2 expects a comma-separated string conflict_target = ",".join(on_conflict) resp = q.upsert(batch, on_conflict=conflict_target, returning=_RETURN_MINIMAL).execute() else: resp = q.upsert(batch, returning=_RETURN_MINIMAL).execute() results.append(resp) logging.info( f"Inserted batch rows {i}–{i + len(batch)} into {table_name}") i += len(batch) # advance window break # batch succeeded, move to next except APIError as e: if getattr(e, "code", None) == "57014" and current_batch_size > min_batch_size: # Halve the batch and retry the first half now; # the second half will be handled subsequently half = max(min_batch_size, current_batch_size // 2) logging.warning( f"Timeout on {table_name} rows {i}–{i + current_batch_size}. " f"Shrinking batch to {half} and retrying." ) batch_size = half time.sleep(backoff_base_seconds) break # Other errors or batch already at min size -> retry with backoff attempt += 1 if attempt > max_retries: affected_ids = extract_row_ids(rows, on_conflict) logging.error( "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s", table_name, attempt, e, affected_ids, ) raise sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) logging.warning( f"Error inserting batch rows {i}–{i + current_batch_size} into {table_name} " f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s." ) time.sleep(sleep_for) continue except Exception as e: # Non-API errors: retry with backoff attempt += 1 if attempt > max_retries: logging.error( f"Failed inserting batch rows {i}–{i + current_batch_size} into {table_name} " f"after {max_retries} retries: {e}" ) raise sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) logging.warning( f"Unexpected error inserting batch rows {i}–{i + current_batch_size} into {table_name} " f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s." ) time.sleep(sleep_for) continue return results def upload_raw_json_to_supabase(supabase_client, json_data, object_type="contacts"): """ Uploads raw JSON data to Supabase storage under a specified object type directory. :param supabase_client: The Supabase client object. :param json_data: The JSON data to be uploaded. :param object_type: The type of object, used to determine the directory path for storage. :return: The path where the JSON file is stored in Supabase. """ now_str = datetime.datetime.now( datetime.timezone.utc).strftime("%Y%m%d_%H%M%S") path = f"{object_type}/{now_str}.json" file_bytes = json.dumps(json_data, indent=2, default=str).encode("utf-8") supabase_client.storage.from_("hubspot-raw-data").remove([path]) try: supabase_client.storage.from_("hubspot-raw-data").upload( path, file=file_bytes, file_options={ "content-type": "application/json"} ) except StorageApiError as e: if e.status_code == 413: logging.warning( "Upload failed: payload too large for Supabase Storage.") else: logging.error("Storage API error during upload: %s", e) except httpx.RequestError as e: logging.error("Upload error: %s", e) return path def get_last_sync_time(supabase_client, object_type): """ Retrieves the last sync time for the given object type from the hubspot_sync_metadata table. :param supabase_client: The Supabase client object. :param object_type: The type of object to retrieve the last sync time for. :return: The last sync time for the given object type as a datetime object. If no previous sync was found, returns None. """ res = supabase_client.table("hubspot_sync_metadata").select( "last_sync_time").eq("object_type", object_type).execute() data = res.data if data and data[0]["last_sync_time"]: last_sync = datetime.datetime.fromisoformat(data[0]["last_sync_time"]) return last_sync return None def update_sync_metadata(supabase_client, object_type, sync_time): """ Updates the last sync time for the given object type in the hubspot_sync_metadata table. :param supabase_client: The Supabase client object. :param object_type: The type of object to update the last sync time for. :param sync_time: The last sync time to update. :return: The result of the Supabase upsert operation. """ payload = [{ "object_type": object_type, "last_sync_time": sync_time, "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat() }] return insert_into_supabase_table(supabase_client, "hubspot_sync_metadata", payload) def get_existing_email_content_ids(supabase_client): """ Fetches existing email_ids from the hubspot_email_contents table. :param supabase_client: Supabase client :return: A set of email_id values already in the database """ existing_ids = set() page_size = 1000 offset = 0 while True: res = supabase_client.table("hubspot_email_contents")\ .select("email_id")\ .range(offset, offset + page_size - 1)\ .execute() if not res.data: break batch = {row["email_id"] for row in res.data} existing_ids.update(batch) offset += page_size return existing_ids def fetch_supabase_table( supabase_client, table_name="hubspot_contacts", id_column="contact_id" ): """ Fetches all rows from a Supabase table and returns them as a dict. :param supabase_client: Supabase client instance :param table_name: Name of the table to fetch from :param id_column: Unique ID column to use as the key for the returned dict :return: A dict of rows with the ID column as the key and the full row as the value """ all_rows = {} page = 0 page_size = 1000 while True: res = supabase_client.table(table_name).select( "*").range(page * page_size, (page + 1) * page_size - 1).execute() if not res.data: break for row in res.data: all_rows[str(row[id_column])] = row if len(res.data) < page_size: break page += 1 return all_rows def enrich_supabase_row(base_row: dict) -> dict: """ Enriches a Supabase row with additional columns needed for duplicate detection. :param base_row: The base row to enrich. :return: The enriched row. """ base_row.update({ "duplicate_id": None, "duplicate_status": None, "duplicate_action": None, "is_primary": True, "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat() }) return base_row