""" Unified orchestrator for HubSpot → Supabase pipelines with a timestamp cursor. CLI: # epoch ms python load_hubspot_data.py 1754025600000 # ISO-8601 python load_hubspot_data.py 2025-08-01T09:30:00Z # Back-compat date (floors to 00:00Z) python load_hubspot_data.py 2025-08-01 # No arg: defaults to today@00:00Z """ import sys import re import logging import datetime # Pipelines (each exposes main(since_ms: Optional[int]) and can fall back to env) import hubspot_deals import hubspot_emails import hubspot_tickets import hubspot_contacts import hubspot_companies import hubspot_billing import hubspot_audit import hubspot_invoices logging.basicConfig( filename=f"logs/hubspot_unified_orchestrator_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # --------------------------- # Time parsing helpers # --------------------------- def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: dt = _ensure_utc(dt) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _parse_iso_like_to_dt(value: str) -> datetime.datetime: if isinstance(value, str) and value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) return _ensure_utc(dt) def to_epoch_ms(dt_or_str) -> int: if isinstance(dt_or_str, str): dt = _parse_iso_like_to_dt(dt_or_str) elif isinstance(dt_or_str, datetime.datetime): dt = _ensure_utc(dt_or_str) else: raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") return int(dt.timestamp() * 1000) def parse_since_arg_to_ms() -> int: """ Accepts: - integer epoch ms (or seconds; seconds auto *1000) - ISO-8601 (Z or offset) - YYYY-MM-DD (floors to 00:00Z) If not provided, defaults to today@00:00Z. """ if len(sys.argv) > 1: arg = sys.argv[1].strip() # epoch seconds or ms if re.fullmatch(r"\d{10,13}", arg): v = int(arg) if v < 10_000_000_000_000: # seconds -> ms v *= 1000 return v # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): d = datetime.datetime.strptime( arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) return to_epoch_ms(floor_to_utc_midnight(d)) # ISO-8601 try: return to_epoch_ms(arg) except Exception: print("Invalid --since argument. Use epoch ms, ISO-8601, or YYYY-MM-DD.") sys.exit(1) # default: today@00:00Z today0 = floor_to_utc_midnight( datetime.datetime.now(datetime.timezone.utc)) return to_epoch_ms(today0) # --------------------------- # Main # --------------------------- def main(): """ Runs pipelines in order with a shared timestamp cursor. Each pipeline may internally advance its own stored cursor. """ since_ms = parse_since_arg_to_ms() print(f"=== Running HubSpot sync pipeline since_ms={since_ms} ===") try: print("\n[1/8] Companies") hubspot_companies.main(since_ms=since_ms) except Exception as e: logging.exception("Error running companies pipeline: %s", e) try: print("\n[2/8] Contacts") hubspot_contacts.main(since_ms=since_ms) except Exception as e: logging.exception("Error running contacts pipeline: %s", e) try: print("\n[3/8] Deals") hubspot_deals.main(since_ms=since_ms) except Exception as e: logging.exception("Error running deals pipeline: %s", e) try: print("\n[4/8] Tickets") hubspot_tickets.main(since_ms=since_ms) except Exception as e: logging.exception("Error running tickets pipeline: %s", e) try: print("\n[5/8] Emails") hubspot_emails.main(since_ms=since_ms) except Exception as e: logging.exception("Error running emails pipeline: %s", e) try: print("\n[6/8] Billing Services") hubspot_billing.main(since_ms=since_ms) except Exception as e: logging.exception("Error running billing services pipeline: %s", e) try: print("\n[7/8] Audit Logs") hubspot_audit.main(since_ms=since_ms) except Exception as e: logging.exception("Error running audit log pipeline: %s", e) try: print("\n[8/8] Invoices") hubspot_invoices.main(since_ms=since_ms) except Exception as e: logging.exception("Error running invoices pipeline: %s", e) print("\n=== HubSpot sync complete ===") if __name__ == "__main__": main()