| | """ |
| | Unified orchestrator for HubSpot → Supabase pipelines with a timestamp cursor. |
| | |
| | CLI: |
| | # epoch ms |
| | python load_hubspot_data.py 1754025600000 |
| | # ISO-8601 |
| | python load_hubspot_data.py 2025-08-01T09:30:00Z |
| | # Back-compat date (floors to 00:00Z) |
| | python load_hubspot_data.py 2025-08-01 |
| | # No arg: defaults to today@00:00Z |
| | """ |
| |
|
| | import sys |
| | import re |
| | import logging |
| | import datetime |
| |
|
| | |
| | import hubspot_deals |
| | import hubspot_emails |
| | import hubspot_tickets |
| | import hubspot_contacts |
| | import hubspot_companies |
| | import hubspot_billing |
| | import hubspot_audit |
| | import hubspot_invoices |
| |
|
| | logging.basicConfig( |
| | filename=f"logs/hubspot_unified_orchestrator_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", |
| | filemode="a", |
| | level=logging.INFO, |
| | format="%(asctime)s [%(levelname)s] %(message)s", |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: |
| | if dt.tzinfo is None: |
| | dt = dt.replace(tzinfo=datetime.timezone.utc) |
| | return dt.astimezone(datetime.timezone.utc) |
| |
|
| |
|
| | def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: |
| | dt = _ensure_utc(dt) |
| | return dt.replace(hour=0, minute=0, second=0, microsecond=0) |
| |
|
| |
|
| | def _parse_iso_like_to_dt(value: str) -> datetime.datetime: |
| | if isinstance(value, str) and value.endswith("Z"): |
| | value = value[:-1] + "+00:00" |
| | dt = datetime.datetime.fromisoformat(value) |
| | return _ensure_utc(dt) |
| |
|
| |
|
| | def to_epoch_ms(dt_or_str) -> int: |
| | if isinstance(dt_or_str, str): |
| | dt = _parse_iso_like_to_dt(dt_or_str) |
| | elif isinstance(dt_or_str, datetime.datetime): |
| | dt = _ensure_utc(dt_or_str) |
| | else: |
| | raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") |
| | return int(dt.timestamp() * 1000) |
| |
|
| |
|
| | def parse_since_arg_to_ms() -> int: |
| | """ |
| | Accepts: |
| | - integer epoch ms (or seconds; seconds auto *1000) |
| | - ISO-8601 (Z or offset) |
| | - YYYY-MM-DD (floors to 00:00Z) |
| | If not provided, defaults to today@00:00Z. |
| | """ |
| | if len(sys.argv) > 1: |
| | arg = sys.argv[1].strip() |
| |
|
| | |
| | if re.fullmatch(r"\d{10,13}", arg): |
| | v = int(arg) |
| | if v < 10_000_000_000_000: |
| | v *= 1000 |
| | return v |
| |
|
| | |
| | if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): |
| | d = datetime.datetime.strptime( |
| | arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) |
| | return to_epoch_ms(floor_to_utc_midnight(d)) |
| |
|
| | |
| | try: |
| | return to_epoch_ms(arg) |
| | except Exception: |
| | print("Invalid --since argument. Use epoch ms, ISO-8601, or YYYY-MM-DD.") |
| | sys.exit(1) |
| |
|
| | |
| | today0 = floor_to_utc_midnight( |
| | datetime.datetime.now(datetime.timezone.utc)) |
| | return to_epoch_ms(today0) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def main(): |
| | """ |
| | Runs pipelines in order with a shared timestamp cursor. |
| | Each pipeline may internally advance its own stored cursor. |
| | """ |
| | since_ms = parse_since_arg_to_ms() |
| | print(f"=== Running HubSpot sync pipeline since_ms={since_ms} ===") |
| |
|
| | try: |
| | print("\n[1/8] Companies") |
| | hubspot_companies.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running companies pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[2/8] Contacts") |
| | hubspot_contacts.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running contacts pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[3/8] Deals") |
| | hubspot_deals.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running deals pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[4/8] Tickets") |
| | hubspot_tickets.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running tickets pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[5/8] Emails") |
| | hubspot_emails.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running emails pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[6/8] Billing Services") |
| | hubspot_billing.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running billing services pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[7/8] Audit Logs") |
| | hubspot_audit.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running audit log pipeline: %s", e) |
| |
|
| | try: |
| | print("\n[8/8] Invoices") |
| | hubspot_invoices.main(since_ms=since_ms) |
| | except Exception as e: |
| | logging.exception("Error running invoices pipeline: %s", e) |
| |
|
| | print("\n=== HubSpot sync complete ===") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|