niwayandm
Enhance Supabase utility functions to include affected row IDs. Edited texts in load data.
03810ee | """ | |
| This script contains utility functions for working with Supabase. | |
| """ | |
| import time | |
| import datetime | |
| import json | |
| import logging | |
| import httpx | |
| from storage3.exceptions import StorageApiError | |
| from typing import List, Dict, Any, Optional | |
| try: | |
| from postgrest import APIError # supabase-py v2 | |
| except Exception: # pragma: no cover | |
| APIError = Exception | |
| try: | |
| # supabase-py v2 | |
| from postgrest.types import ReturnMethod # Returning.MINIMAL | |
| _RETURN_MINIMAL = ReturnMethod.MINIMAL | |
| except Exception: | |
| # supabase-py v1 fallback | |
| _RETURN_MINIMAL = "minimal" | |
| def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Extract a list of dictionaries containing the on_conflict keys from each row. | |
| If no on_conflict keys are provided, it will attempt to find a suitable key | |
| by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row. | |
| If no conflict keys are found, it will return an empty list. | |
| :param rows: List of dictionaries to extract IDs from | |
| :param on_conflict: List of column names to use as conflict target | |
| :return: List of dictionaries containing the on_conflict keys from each row | |
| """ | |
| if not rows: | |
| return [] | |
| keys: List[str] = list(on_conflict or []) | |
| # Fallback if no explicit conflict keys were given | |
| if not keys: | |
| for candidate in ("id", "uuid", "hubspot_id", "object_id"): | |
| if candidate in rows[0]: | |
| keys = [candidate] | |
| break | |
| if not keys: | |
| return [] | |
| id_list: List[Dict[str, Any]] = [] | |
| for r in rows: | |
| id_list.append({k: r.get(k) for k in keys if k in r}) | |
| return id_list | |
| def insert_into_supabase_table( | |
| supabase_client: Any, | |
| table_name: str, | |
| rows: List[Dict[str, Any]], | |
| on_conflict: Optional[List[str]] = None, | |
| *, | |
| max_retries: int = 5, | |
| backoff_base_seconds: float = 1.0, | |
| use_returning_minimal: bool = True, | |
| ) -> None: | |
| """ | |
| Insert a list of rows into a Supabase table with optional conflict handling. | |
| :param supabase_client: Supabase client | |
| :param table_name: Name of the table to upsert into | |
| :param rows: List of dictionaries to insert; each dictionary becomes a row | |
| :param on_conflict: List of column names to use as conflict target | |
| :param max_retries: Maximum number of times to retry on failure | |
| :param backoff_base_seconds: Base amount of time to sleep between retries | |
| :param use_returning_minimal: Whether to use returning=MINIMAL (faster but no inserted IDs) | |
| :return: None. If no rows are provided, returns None immediately without attempting an insert. | |
| This function will retry up to `max_retries` times on failure with an exponential | |
| backoff schedule. If `on_conflict` is provided, it will use the given columns as the | |
| conflict target. If `use_returning_minimal`, it will use the `returning=MINIMAL` parameter | |
| to reduce response size (but won't get inserted IDs back). | |
| NOTE: This function does not support transactions; it is intended for use in simple | |
| data pipelines where retrying on failure is sufficient. If you need stronger | |
| consistency guarantees, use transactions or another approach. | |
| """ | |
| if not rows: | |
| logging.info("No rows to insert for table %s.", table_name) | |
| return None | |
| q = supabase_client.table(table_name) | |
| conflict_target = ",".join(on_conflict) if on_conflict else None | |
| if conflict_target and use_returning_minimal: | |
| query = q.upsert(rows, on_conflict=conflict_target, | |
| returning=_RETURN_MINIMAL) | |
| elif conflict_target: | |
| query = q.upsert(rows, on_conflict=conflict_target) | |
| elif use_returning_minimal: | |
| query = q.upsert(rows, returning=_RETURN_MINIMAL) | |
| else: | |
| query = q.upsert(rows) | |
| attempt = 0 | |
| while True: | |
| try: | |
| query.execute() | |
| logging.info("Inserted %s records into %s.", len(rows), table_name) | |
| return True | |
| except APIError as e: | |
| code = getattr(e, "code", None) | |
| attempt += 1 | |
| if attempt > max_retries: | |
| affected_ids = extract_row_ids(rows, on_conflict) | |
| logging.error( | |
| "Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s", | |
| table_name, | |
| attempt, | |
| e, | |
| affected_ids, | |
| ) | |
| return False | |
| sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) | |
| if code == "57014": | |
| logging.warning( | |
| "Timeout (57014) upserting into %s (attempt %d/%d). Retrying in %.1fs.", | |
| table_name, attempt, max_retries, sleep_for | |
| ) | |
| else: | |
| logging.warning( | |
| "APIError upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", | |
| table_name, attempt, max_retries, e, sleep_for | |
| ) | |
| time.sleep(sleep_for) | |
| except (httpx.RequestError, httpx.HTTPStatusError) as e: | |
| attempt += 1 | |
| if attempt > max_retries: | |
| affected_ids = extract_row_ids(rows, on_conflict) | |
| logging.error( | |
| "Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s", | |
| table_name, | |
| attempt, | |
| e, | |
| affected_ids, | |
| ) | |
| return False | |
| sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) | |
| logging.warning( | |
| "HTTP error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", | |
| table_name, attempt, max_retries, e, sleep_for | |
| ) | |
| time.sleep(sleep_for) | |
| except Exception as e: | |
| attempt += 1 | |
| if attempt > max_retries: | |
| affected_ids = extract_row_ids(rows, on_conflict) | |
| logging.error( | |
| "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s", | |
| table_name, | |
| attempt, | |
| e, | |
| affected_ids, | |
| ) | |
| return False | |
| sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) | |
| logging.warning( | |
| "Unexpected error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.", | |
| table_name, attempt, max_retries, e, sleep_for | |
| ) | |
| time.sleep(sleep_for) | |
| def batched_insert( | |
| supabase_client, | |
| table_name: str, | |
| rows: List[Dict[str, Any]], | |
| batch_size: int = 500, | |
| on_conflict: Optional[List[str]] = None, | |
| max_retries: int = 4, | |
| backoff_base_seconds: float = 1.0, | |
| min_batch_size: int = 50, | |
| ) -> list: | |
| """ | |
| Insert a list of rows into a Supabase table with optional conflict handling, batching, and retries. | |
| :param supabase_client: Supabase client | |
| :param table_name: Name of the table to upsert into | |
| :param rows: List of dictionaries to insert; each dictionary becomes a row | |
| :param batch_size: Maximum number of rows to send in a single upsert request | |
| :param on_conflict: List of column names to use as conflict target | |
| :param max_retries: Maximum number of times to retry on failure | |
| :param backoff_base_seconds: Base amount of time to sleep between retries | |
| :param min_batch_size: Minimum size of a batch; if a timeout occurs, the | |
| batch will be shrunk to this size before retrying | |
| :return: List of responses from Supabase (empty if no rows to insert) | |
| This function will retry up to `max_retries` times on failure with an exponential | |
| backoff schedule. If `on_conflict` is provided, it will use the given columns as the | |
| conflict target. If a timeout occurs, it will shrink the batch size to `min_batch_size` | |
| and retry the first half of the batch. If the batch is already at `min_batch_size` or | |
| smaller, it will retry with backoff. If all retries fail, it will raise the last | |
| exception. If the `rows` list is empty, it will return an empty list without sending | |
| any requests to Supabase. | |
| """ | |
| if not rows: | |
| return [] | |
| results = [] | |
| n = len(rows) | |
| i = 0 | |
| while i < n: | |
| current_batch_size = min(batch_size, n - i) | |
| batch = rows[i: i + current_batch_size] | |
| # Attempt to insert this batch with retries | |
| attempt = 0 | |
| while True: | |
| try: | |
| q = supabase_client.table(table_name) | |
| if on_conflict: | |
| # `on_conflict` in supabase-py v2 expects a comma-separated string | |
| conflict_target = ",".join(on_conflict) | |
| resp = q.upsert(batch, on_conflict=conflict_target, | |
| returning=_RETURN_MINIMAL).execute() | |
| else: | |
| resp = q.upsert(batch, returning=_RETURN_MINIMAL).execute() | |
| results.append(resp) | |
| logging.info( | |
| f"Inserted batch rows {i}–{i + len(batch)} into {table_name}") | |
| i += len(batch) # advance window | |
| break # batch succeeded, move to next | |
| except APIError as e: | |
| if getattr(e, "code", None) == "57014" and current_batch_size > min_batch_size: | |
| # Halve the batch and retry the first half now; | |
| # the second half will be handled subsequently | |
| half = max(min_batch_size, current_batch_size // 2) | |
| logging.warning( | |
| f"Timeout on {table_name} rows {i}–{i + current_batch_size}. " | |
| f"Shrinking batch to {half} and retrying." | |
| ) | |
| batch_size = half | |
| time.sleep(backoff_base_seconds) | |
| break | |
| # Other errors or batch already at min size -> retry with backoff | |
| attempt += 1 | |
| if attempt > max_retries: | |
| affected_ids = extract_row_ids(rows, on_conflict) | |
| logging.error( | |
| "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s", | |
| table_name, | |
| attempt, | |
| e, | |
| affected_ids, | |
| ) | |
| raise | |
| sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) | |
| logging.warning( | |
| f"Error inserting batch rows {i}–{i + current_batch_size} into {table_name} " | |
| f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s." | |
| ) | |
| time.sleep(sleep_for) | |
| continue | |
| except Exception as e: | |
| # Non-API errors: retry with backoff | |
| attempt += 1 | |
| if attempt > max_retries: | |
| logging.error( | |
| f"Failed inserting batch rows {i}–{i + current_batch_size} into {table_name} " | |
| f"after {max_retries} retries: {e}" | |
| ) | |
| raise | |
| sleep_for = backoff_base_seconds * (2 ** (attempt - 1)) | |
| logging.warning( | |
| f"Unexpected error inserting batch rows {i}–{i + current_batch_size} into {table_name} " | |
| f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s." | |
| ) | |
| time.sleep(sleep_for) | |
| continue | |
| return results | |
| def upload_raw_json_to_supabase(supabase_client, json_data, object_type="contacts"): | |
| """ | |
| Uploads raw JSON data to Supabase storage under a specified object type directory. | |
| :param supabase_client: The Supabase client object. | |
| :param json_data: The JSON data to be uploaded. | |
| :param object_type: The type of object, used to determine the directory path for storage. | |
| :return: The path where the JSON file is stored in Supabase. | |
| """ | |
| now_str = datetime.datetime.now( | |
| datetime.timezone.utc).strftime("%Y%m%d_%H%M%S") | |
| path = f"{object_type}/{now_str}.json" | |
| file_bytes = json.dumps(json_data, indent=2, default=str).encode("utf-8") | |
| supabase_client.storage.from_("hubspot-raw-data").remove([path]) | |
| try: | |
| supabase_client.storage.from_("hubspot-raw-data").upload( | |
| path, file=file_bytes, file_options={ | |
| "content-type": "application/json"} | |
| ) | |
| except StorageApiError as e: | |
| if e.status_code == 413: | |
| logging.warning( | |
| "Upload failed: payload too large for Supabase Storage.") | |
| else: | |
| logging.error("Storage API error during upload: %s", e) | |
| except httpx.RequestError as e: | |
| logging.error("Upload error: %s", e) | |
| return path | |
| def get_last_sync_time(supabase_client, object_type): | |
| """ | |
| Retrieves the last sync time for the given object type from the hubspot_sync_metadata table. | |
| :param supabase_client: The Supabase client object. | |
| :param object_type: The type of object to retrieve the last sync time for. | |
| :return: The last sync time for the given object type as a datetime object. | |
| If no previous sync was found, returns None. | |
| """ | |
| res = supabase_client.table("hubspot_sync_metadata").select( | |
| "last_sync_time").eq("object_type", object_type).execute() | |
| data = res.data | |
| if data and data[0]["last_sync_time"]: | |
| last_sync = datetime.datetime.fromisoformat(data[0]["last_sync_time"]) | |
| return last_sync | |
| return None | |
| def update_sync_metadata(supabase_client, object_type, sync_time): | |
| """ | |
| Updates the last sync time for the given object type in the hubspot_sync_metadata table. | |
| :param supabase_client: The Supabase client object. | |
| :param object_type: The type of object to update the last sync time for. | |
| :param sync_time: The last sync time to update. | |
| :return: The result of the Supabase upsert operation. | |
| """ | |
| payload = [{ | |
| "object_type": object_type, | |
| "last_sync_time": sync_time, | |
| "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| }] | |
| return insert_into_supabase_table(supabase_client, "hubspot_sync_metadata", payload) | |
| def get_existing_email_content_ids(supabase_client): | |
| """ | |
| Fetches existing email_ids from the hubspot_email_contents table. | |
| :param supabase_client: Supabase client | |
| :return: A set of email_id values already in the database | |
| """ | |
| existing_ids = set() | |
| page_size = 1000 | |
| offset = 0 | |
| while True: | |
| res = supabase_client.table("hubspot_email_contents")\ | |
| .select("email_id")\ | |
| .range(offset, offset + page_size - 1)\ | |
| .execute() | |
| if not res.data: | |
| break | |
| batch = {row["email_id"] for row in res.data} | |
| existing_ids.update(batch) | |
| offset += page_size | |
| return existing_ids | |
| def fetch_supabase_table( | |
| supabase_client, table_name="hubspot_contacts", | |
| id_column="contact_id" | |
| ): | |
| """ | |
| Fetches all rows from a Supabase table and returns them as a dict. | |
| :param supabase_client: Supabase client instance | |
| :param table_name: Name of the table to fetch from | |
| :param id_column: Unique ID column to use as the key for the returned dict | |
| :return: A dict of rows with the ID column as the key and the full row as the value | |
| """ | |
| all_rows = {} | |
| page = 0 | |
| page_size = 1000 | |
| while True: | |
| res = supabase_client.table(table_name).select( | |
| "*").range(page * page_size, (page + 1) * page_size - 1).execute() | |
| if not res.data: | |
| break | |
| for row in res.data: | |
| all_rows[str(row[id_column])] = row | |
| if len(res.data) < page_size: | |
| break | |
| page += 1 | |
| return all_rows | |
| def enrich_supabase_row(base_row: dict) -> dict: | |
| """ | |
| Enriches a Supabase row with additional columns needed for duplicate detection. | |
| :param base_row: The base row to enrich. | |
| :return: The enriched row. | |
| """ | |
| base_row.update({ | |
| "duplicate_id": None, | |
| "duplicate_status": None, | |
| "duplicate_action": None, | |
| "is_primary": True, | |
| "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| }) | |
| return base_row | |