File size: 4,918 Bytes
a16378e ebf1ef6 a16378e ebf1ef6 a16378e ebf1ef6 a16378e 1319106 a16378e 1319106 a16378e 1319106 a16378e 1319106 a16378e 1319106 a16378e 1319106 a16378e 1319106 a16378e 03810ee a16378e 1319106 a16378e 03810ee a16378e 1319106 a16378e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | """
Unified orchestrator for HubSpot → Supabase pipelines with a timestamp cursor.
CLI:
# epoch ms
python load_hubspot_data.py 1754025600000
# ISO-8601
python load_hubspot_data.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00Z)
python load_hubspot_data.py 2025-08-01
# No arg: defaults to today@00:00Z
"""
import sys
import re
import logging
import datetime
# Pipelines (each exposes main(since_ms: Optional[int]) and can fall back to env)
import hubspot_deals
import hubspot_emails
import hubspot_tickets
import hubspot_contacts
import hubspot_companies
import hubspot_billing
import hubspot_audit
import hubspot_invoices
logging.basicConfig(
filename=f"logs/hubspot_unified_orchestrator_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# ---------------------------
# Time parsing helpers
# ---------------------------
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if isinstance(value, str) and value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
def parse_since_arg_to_ms() -> int:
"""
Accepts:
- integer epoch ms (or seconds; seconds auto *1000)
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z)
If not provided, defaults to today@00:00Z.
"""
if len(sys.argv) > 1:
arg = sys.argv[1].strip()
# epoch seconds or ms
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
try:
return to_epoch_ms(arg)
except Exception:
print("Invalid --since argument. Use epoch ms, ISO-8601, or YYYY-MM-DD.")
sys.exit(1)
# default: today@00:00Z
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
return to_epoch_ms(today0)
# ---------------------------
# Main
# ---------------------------
def main():
"""
Runs pipelines in order with a shared timestamp cursor.
Each pipeline may internally advance its own stored cursor.
"""
since_ms = parse_since_arg_to_ms()
print(f"=== Running HubSpot sync pipeline since_ms={since_ms} ===")
try:
print("\n[1/8] Companies")
hubspot_companies.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running companies pipeline: %s", e)
try:
print("\n[2/8] Contacts")
hubspot_contacts.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running contacts pipeline: %s", e)
try:
print("\n[3/8] Deals")
hubspot_deals.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running deals pipeline: %s", e)
try:
print("\n[4/8] Tickets")
hubspot_tickets.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running tickets pipeline: %s", e)
try:
print("\n[5/8] Emails")
hubspot_emails.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running emails pipeline: %s", e)
try:
print("\n[6/8] Billing Services")
hubspot_billing.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running billing services pipeline: %s", e)
try:
print("\n[7/8] Audit Logs")
hubspot_audit.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running audit log pipeline: %s", e)
try:
print("\n[8/8] Invoices")
hubspot_invoices.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running invoices pipeline: %s", e)
print("\n=== HubSpot sync complete ===")
if __name__ == "__main__":
main()
|