n8n-duplicate / python /load_hubspot_data.py
niwayandm
Enhance Supabase utility functions to include affected row IDs. Edited texts in load data.
03810ee
raw
history blame
4.72 kB
"""
Unified orchestrator for HubSpot → Supabase pipelines with a timestamp cursor.
CLI:
# epoch ms
python hubspot_orchestrator.py 1754025600000
# ISO-8601
python hubspot_orchestrator.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00Z)
python hubspot_orchestrator.py 2025-08-01
# No arg: defaults to today@00:00Z
"""
import sys
import re
import logging
import datetime
# Pipelines (each exposes main(since_ms: Optional[int]) and can fall back to env)
import hubspot_deals
import hubspot_emails
import hubspot_tickets
import hubspot_contacts
import hubspot_companies
import hubspot_billing
import hubspot_audit
logging.basicConfig(
filename=f"logs/hubspot_unified_orchestrator_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# ---------------------------
# Time parsing helpers
# ---------------------------
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if isinstance(value, str) and value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
def parse_since_arg_to_ms() -> int:
"""
Accepts:
- integer epoch ms (or seconds; seconds auto *1000)
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z)
If not provided, defaults to today@00:00Z.
"""
if len(sys.argv) > 1:
arg = sys.argv[1].strip()
# epoch seconds or ms
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
try:
return to_epoch_ms(arg)
except Exception:
print("Invalid --since argument. Use epoch ms, ISO-8601, or YYYY-MM-DD.")
sys.exit(1)
# default: today@00:00Z
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
return to_epoch_ms(today0)
# ---------------------------
# Main
# ---------------------------
def main():
"""
Runs pipelines in order with a shared timestamp cursor.
Each pipeline may internally advance its own stored cursor.
"""
since_ms = parse_since_arg_to_ms()
print(f"=== Running HubSpot sync pipeline since_ms={since_ms} ===")
try:
print("\n[1/7] Companies")
hubspot_companies.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running companies pipeline: %s", e)
try:
print("\n[2/7] Contacts")
hubspot_contacts.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running contacts pipeline: %s", e)
try:
print("\n[3/7] Deals")
hubspot_deals.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running deals pipeline: %s", e)
try:
print("\n[4/7] Tickets")
hubspot_tickets.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running tickets pipeline: %s", e)
try:
print("\n[5/7] Emails")
hubspot_emails.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running emails pipeline: %s", e)
try:
print("\n[6/7] Billing Services")
hubspot_billing.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running billing services pipeline: %s", e)
try:
print("\n[7/7] Audit Logs")
hubspot_audit.main(since_ms=since_ms)
except Exception as e:
logging.exception("Error running audit log pipeline: %s", e)
print("\n=== HubSpot sync complete ===")
if __name__ == "__main__":
main()