File size: 16,598 Bytes
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03810ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03810ee
a16378e
03810ee
 
 
 
 
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03810ee
a16378e
03810ee
 
 
 
 
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
03810ee
a16378e
03810ee
 
 
 
 
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03810ee
a16378e
03810ee
 
 
 
 
a16378e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
"""
This script contains utility functions for working with Supabase.
"""
import time
import datetime
import json
import logging
import httpx
from storage3.exceptions import StorageApiError
from typing import List, Dict, Any, Optional

try:
    from postgrest import APIError  # supabase-py v2
except Exception:  # pragma: no cover
    APIError = Exception
try:
    # supabase-py v2
    from postgrest.types import ReturnMethod  # Returning.MINIMAL
    _RETURN_MINIMAL = ReturnMethod.MINIMAL
except Exception:
    # supabase-py v1 fallback
    _RETURN_MINIMAL = "minimal"


def extract_row_ids(rows: List[Dict[str, Any]], on_conflict: Optional[List[str]] = None) -> List[Dict[str, Any]]:
    """
    Extract a list of dictionaries containing the on_conflict keys from each row.

    If no on_conflict keys are provided, it will attempt to find a suitable key
    by looking for the presence of "id", "uuid", "hubspot_id", or "object_id" in the first row.

    If no conflict keys are found, it will return an empty list.

    :param rows: List of dictionaries to extract IDs from
    :param on_conflict: List of column names to use as conflict target
    :return: List of dictionaries containing the on_conflict keys from each row
    """
    if not rows:
        return []

    keys: List[str] = list(on_conflict or [])

    # Fallback if no explicit conflict keys were given
    if not keys:
        for candidate in ("id", "uuid", "hubspot_id", "object_id"):
            if candidate in rows[0]:
                keys = [candidate]
                break

    if not keys:
        return []

    id_list: List[Dict[str, Any]] = []
    for r in rows:
        id_list.append({k: r.get(k) for k in keys if k in r})

    return id_list


def insert_into_supabase_table(
    supabase_client: Any,
    table_name: str,
    rows: List[Dict[str, Any]],
    on_conflict: Optional[List[str]] = None,
    *,
    max_retries: int = 5,
    backoff_base_seconds: float = 1.0,
    use_returning_minimal: bool = True,
) -> None:
    """
    Insert a list of rows into a Supabase table with optional conflict handling.

    :param supabase_client: Supabase client
    :param table_name: Name of the table to upsert into
    :param rows: List of dictionaries to insert; each dictionary becomes a row
    :param on_conflict: List of column names to use as conflict target
    :param max_retries: Maximum number of times to retry on failure
    :param backoff_base_seconds: Base amount of time to sleep between retries
    :param use_returning_minimal: Whether to use returning=MINIMAL (faster but no inserted IDs)
    :return: None. If no rows are provided, returns None immediately without attempting an insert.

    This function will retry up to `max_retries` times on failure with an exponential
    backoff schedule. If `on_conflict` is provided, it will use the given columns as the
    conflict target. If `use_returning_minimal`, it will use the `returning=MINIMAL` parameter
    to reduce response size (but won't get inserted IDs back).

    NOTE: This function does not support transactions; it is intended for use in simple
    data pipelines where retrying on failure is sufficient. If you need stronger
    consistency guarantees, use transactions or another approach.
    """

    if not rows:
        logging.info("No rows to insert for table %s.", table_name)
        return None

    q = supabase_client.table(table_name)
    conflict_target = ",".join(on_conflict) if on_conflict else None

    if conflict_target and use_returning_minimal:
        query = q.upsert(rows, on_conflict=conflict_target,
                         returning=_RETURN_MINIMAL)
    elif conflict_target:
        query = q.upsert(rows, on_conflict=conflict_target)
    elif use_returning_minimal:
        query = q.upsert(rows, returning=_RETURN_MINIMAL)
    else:
        query = q.upsert(rows)

    attempt = 0
    while True:
        try:
            query.execute()
            logging.info("Inserted %s records into %s.", len(rows), table_name)
            return True

        except APIError as e:
            code = getattr(e, "code", None)
            attempt += 1

            if attempt > max_retries:
                affected_ids = extract_row_ids(rows, on_conflict)
                logging.error(
                    "Permanent failure upserting into %s after %d attempts: %s. Affected row ids: %s",
                    table_name,
                    attempt,
                    e,
                    affected_ids,
                )
                return False

            sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
            if code == "57014":
                logging.warning(
                    "Timeout (57014) upserting into %s (attempt %d/%d). Retrying in %.1fs.",
                    table_name, attempt, max_retries, sleep_for
                )
            else:
                logging.warning(
                    "APIError upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
                    table_name, attempt, max_retries, e, sleep_for
                )
            time.sleep(sleep_for)

        except (httpx.RequestError, httpx.HTTPStatusError) as e:
            attempt += 1
            if attempt > max_retries:
                affected_ids = extract_row_ids(rows, on_conflict)
                logging.error(
                    "Permanent HTTP failure upserting into %s after %d attempts: %s. Affected row ids: %s",
                    table_name,
                    attempt,
                    e,
                    affected_ids,
                )
                return False

            sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
            logging.warning(
                "HTTP error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
                table_name, attempt, max_retries, e, sleep_for
            )
            time.sleep(sleep_for)

        except Exception as e:
            attempt += 1
            if attempt > max_retries:
                affected_ids = extract_row_ids(rows, on_conflict)
                logging.error(
                    "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
                    table_name,
                    attempt,
                    e,
                    affected_ids,
                )
                return False

            sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
            logging.warning(
                "Unexpected error upserting into %s (attempt %d/%d): %s. Retrying in %.1fs.",
                table_name, attempt, max_retries, e, sleep_for
            )
            time.sleep(sleep_for)


def batched_insert(
    supabase_client,
    table_name: str,
    rows: List[Dict[str, Any]],
    batch_size: int = 500,
    on_conflict: Optional[List[str]] = None,
    max_retries: int = 4,
    backoff_base_seconds: float = 1.0,
    min_batch_size: int = 50,
) -> list:
    """
    Insert a list of rows into a Supabase table with optional conflict handling, batching, and retries.

    :param supabase_client: Supabase client
    :param table_name: Name of the table to upsert into
    :param rows: List of dictionaries to insert; each dictionary becomes a row
    :param batch_size: Maximum number of rows to send in a single upsert request
    :param on_conflict: List of column names to use as conflict target
    :param max_retries: Maximum number of times to retry on failure
    :param backoff_base_seconds: Base amount of time to sleep between retries
    :param min_batch_size: Minimum size of a batch; if a timeout occurs, the
        batch will be shrunk to this size before retrying
    :return: List of responses from Supabase (empty if no rows to insert)

    This function will retry up to `max_retries` times on failure with an exponential
    backoff schedule. If `on_conflict` is provided, it will use the given columns as the
    conflict target. If a timeout occurs, it will shrink the batch size to `min_batch_size`
    and retry the first half of the batch. If the batch is already at `min_batch_size` or
    smaller, it will retry with backoff. If all retries fail, it will raise the last
    exception. If the `rows` list is empty, it will return an empty list without sending
    any requests to Supabase.
    """

    if not rows:
        return []

    results = []
    n = len(rows)
    i = 0

    while i < n:
        current_batch_size = min(batch_size, n - i)
        batch = rows[i: i + current_batch_size]

        # Attempt to insert this batch with retries
        attempt = 0
        while True:
            try:
                q = supabase_client.table(table_name)
                if on_conflict:
                    # `on_conflict` in supabase-py v2 expects a comma-separated string
                    conflict_target = ",".join(on_conflict)
                    resp = q.upsert(batch, on_conflict=conflict_target,
                                    returning=_RETURN_MINIMAL).execute()
                else:
                    resp = q.upsert(batch, returning=_RETURN_MINIMAL).execute()

                results.append(resp)
                logging.info(
                    f"Inserted batch rows {i}{i + len(batch)} into {table_name}")
                i += len(batch)  # advance window
                break  # batch succeeded, move to next

            except APIError as e:
                if getattr(e, "code", None) == "57014" and current_batch_size > min_batch_size:
                    # Halve the batch and retry the first half now;
                    # the second half will be handled subsequently
                    half = max(min_batch_size, current_batch_size // 2)
                    logging.warning(
                        f"Timeout on {table_name} rows {i}{i + current_batch_size}. "
                        f"Shrinking batch to {half} and retrying."
                    )
                    batch_size = half
                    time.sleep(backoff_base_seconds)
                    break

                # Other errors or batch already at min size -> retry with backoff
                attempt += 1
                if attempt > max_retries:
                    affected_ids = extract_row_ids(rows, on_conflict)
                    logging.error(
                        "Permanent unexpected failure upserting into %s after %d attempts: %s. Affected row ids: %s",
                        table_name,
                        attempt,
                        e,
                        affected_ids,
                    )
                    raise

                sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
                logging.warning(
                    f"Error inserting batch rows {i}{i + current_batch_size} into {table_name} "
                    f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
                )
                time.sleep(sleep_for)
                continue

            except Exception as e:
                # Non-API errors: retry with backoff
                attempt += 1
                if attempt > max_retries:
                    logging.error(
                        f"Failed inserting batch rows {i}{i + current_batch_size} into {table_name} "
                        f"after {max_retries} retries: {e}"
                    )
                    raise

                sleep_for = backoff_base_seconds * (2 ** (attempt - 1))
                logging.warning(
                    f"Unexpected error inserting batch rows {i}{i + current_batch_size} into {table_name} "
                    f"(attempt {attempt}/{max_retries}): {e}. Retrying in {sleep_for:.1f}s."
                )
                time.sleep(sleep_for)
                continue

    return results


def upload_raw_json_to_supabase(supabase_client, json_data, object_type="contacts"):
    """
    Uploads raw JSON data to Supabase storage under a specified object type directory.

    :param supabase_client: The Supabase client object.
    :param json_data: The JSON data to be uploaded.
    :param object_type: The type of object, used to determine the directory path for storage.
    :return: The path where the JSON file is stored in Supabase.
    """

    now_str = datetime.datetime.now(
        datetime.timezone.utc).strftime("%Y%m%d_%H%M%S")
    path = f"{object_type}/{now_str}.json"
    file_bytes = json.dumps(json_data, indent=2, default=str).encode("utf-8")

    supabase_client.storage.from_("hubspot-raw-data").remove([path])
    try:
        supabase_client.storage.from_("hubspot-raw-data").upload(
            path, file=file_bytes, file_options={
                "content-type": "application/json"}
        )
    except StorageApiError as e:
        if e.status_code == 413:
            logging.warning(
                "Upload failed: payload too large for Supabase Storage.")
        else:
            logging.error("Storage API error during upload: %s", e)
    except httpx.RequestError as e:
        logging.error("Upload error: %s", e)
    return path


def get_last_sync_time(supabase_client, object_type):
    """
    Retrieves the last sync time for the given object type from the hubspot_sync_metadata table.

    :param supabase_client: The Supabase client object.
    :param object_type: The type of object to retrieve the last sync time for.
    :return: The last sync time for the given object type as a datetime object. 
             If no previous sync was found, returns None.
    """

    res = supabase_client.table("hubspot_sync_metadata").select(
        "last_sync_time").eq("object_type", object_type).execute()
    data = res.data
    if data and data[0]["last_sync_time"]:
        last_sync = datetime.datetime.fromisoformat(data[0]["last_sync_time"])
        return last_sync

    return None


def update_sync_metadata(supabase_client, object_type, sync_time):
    """
    Updates the last sync time for the given object type in the hubspot_sync_metadata table.

    :param supabase_client: The Supabase client object.
    :param object_type: The type of object to update the last sync time for.
    :param sync_time: The last sync time to update.
    :return: The result of the Supabase upsert operation.
    """

    payload = [{
        "object_type": object_type,
        "last_sync_time": sync_time,
        "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
    }]

    return insert_into_supabase_table(supabase_client, "hubspot_sync_metadata", payload)


def get_existing_email_content_ids(supabase_client):
    """
    Fetches existing email_ids from the hubspot_email_contents table.

    :param supabase_client: Supabase client
    :return: A set of email_id values already in the database
    """
    existing_ids = set()
    page_size = 1000
    offset = 0

    while True:
        res = supabase_client.table("hubspot_email_contents")\
            .select("email_id")\
            .range(offset, offset + page_size - 1)\
            .execute()
        if not res.data:
            break
        batch = {row["email_id"] for row in res.data}
        existing_ids.update(batch)
        offset += page_size

    return existing_ids


def fetch_supabase_table(
    supabase_client, table_name="hubspot_contacts",
    id_column="contact_id"
):
    """
    Fetches all rows from a Supabase table and returns them as a dict.

    :param supabase_client: Supabase client instance
    :param table_name: Name of the table to fetch from
    :param id_column: Unique ID column to use as the key for the returned dict
    :return: A dict of rows with the ID column as the key and the full row as the value
    """

    all_rows = {}
    page = 0
    page_size = 1000

    while True:
        res = supabase_client.table(table_name).select(
            "*").range(page * page_size, (page + 1) * page_size - 1).execute()
        if not res.data:
            break
        for row in res.data:
            all_rows[str(row[id_column])] = row
        if len(res.data) < page_size:
            break
        page += 1

    return all_rows


def enrich_supabase_row(base_row: dict) -> dict:
    """
    Enriches a Supabase row with additional columns needed for duplicate detection.

    :param base_row: The base row to enrich.
    :return: The enriched row.
    """

    base_row.update({
        "duplicate_id": None,
        "duplicate_status": None,
        "duplicate_action": None,
        "is_primary": True,
        "updated_at": datetime.datetime.now(datetime.timezone.utc).isoformat()
    })
    return base_row