niwayandm
Enhance Supabase utility functions to include affected row IDs. Edited texts in load data.
03810ee | """ | |
| Unified orchestrator for HubSpot → Supabase pipelines with a timestamp cursor. | |
| CLI: | |
| # epoch ms | |
| python hubspot_orchestrator.py 1754025600000 | |
| # ISO-8601 | |
| python hubspot_orchestrator.py 2025-08-01T09:30:00Z | |
| # Back-compat date (floors to 00:00Z) | |
| python hubspot_orchestrator.py 2025-08-01 | |
| # No arg: defaults to today@00:00Z | |
| """ | |
| import sys | |
| import re | |
| import logging | |
| import datetime | |
| # Pipelines (each exposes main(since_ms: Optional[int]) and can fall back to env) | |
| import hubspot_deals | |
| import hubspot_emails | |
| import hubspot_tickets | |
| import hubspot_contacts | |
| import hubspot_companies | |
| import hubspot_billing | |
| import hubspot_audit | |
| logging.basicConfig( | |
| filename=f"logs/hubspot_unified_orchestrator_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", | |
| filemode="a", | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| # --------------------------- | |
| # Time parsing helpers | |
| # --------------------------- | |
| def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=datetime.timezone.utc) | |
| return dt.astimezone(datetime.timezone.utc) | |
| def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: | |
| dt = _ensure_utc(dt) | |
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| def _parse_iso_like_to_dt(value: str) -> datetime.datetime: | |
| if isinstance(value, str) and value.endswith("Z"): | |
| value = value[:-1] + "+00:00" | |
| dt = datetime.datetime.fromisoformat(value) | |
| return _ensure_utc(dt) | |
| def to_epoch_ms(dt_or_str) -> int: | |
| if isinstance(dt_or_str, str): | |
| dt = _parse_iso_like_to_dt(dt_or_str) | |
| elif isinstance(dt_or_str, datetime.datetime): | |
| dt = _ensure_utc(dt_or_str) | |
| else: | |
| raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") | |
| return int(dt.timestamp() * 1000) | |
| def parse_since_arg_to_ms() -> int: | |
| """ | |
| Accepts: | |
| - integer epoch ms (or seconds; seconds auto *1000) | |
| - ISO-8601 (Z or offset) | |
| - YYYY-MM-DD (floors to 00:00Z) | |
| If not provided, defaults to today@00:00Z. | |
| """ | |
| if len(sys.argv) > 1: | |
| arg = sys.argv[1].strip() | |
| # epoch seconds or ms | |
| if re.fullmatch(r"\d{10,13}", arg): | |
| v = int(arg) | |
| if v < 10_000_000_000_000: # seconds -> ms | |
| v *= 1000 | |
| return v | |
| # YYYY-MM-DD | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): | |
| d = datetime.datetime.strptime( | |
| arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) | |
| return to_epoch_ms(floor_to_utc_midnight(d)) | |
| # ISO-8601 | |
| try: | |
| return to_epoch_ms(arg) | |
| except Exception: | |
| print("Invalid --since argument. Use epoch ms, ISO-8601, or YYYY-MM-DD.") | |
| sys.exit(1) | |
| # default: today@00:00Z | |
| today0 = floor_to_utc_midnight( | |
| datetime.datetime.now(datetime.timezone.utc)) | |
| return to_epoch_ms(today0) | |
| # --------------------------- | |
| # Main | |
| # --------------------------- | |
| def main(): | |
| """ | |
| Runs pipelines in order with a shared timestamp cursor. | |
| Each pipeline may internally advance its own stored cursor. | |
| """ | |
| since_ms = parse_since_arg_to_ms() | |
| print(f"=== Running HubSpot sync pipeline since_ms={since_ms} ===") | |
| try: | |
| print("\n[1/7] Companies") | |
| hubspot_companies.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running companies pipeline: %s", e) | |
| try: | |
| print("\n[2/7] Contacts") | |
| hubspot_contacts.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running contacts pipeline: %s", e) | |
| try: | |
| print("\n[3/7] Deals") | |
| hubspot_deals.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running deals pipeline: %s", e) | |
| try: | |
| print("\n[4/7] Tickets") | |
| hubspot_tickets.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running tickets pipeline: %s", e) | |
| try: | |
| print("\n[5/7] Emails") | |
| hubspot_emails.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running emails pipeline: %s", e) | |
| try: | |
| print("\n[6/7] Billing Services") | |
| hubspot_billing.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running billing services pipeline: %s", e) | |
| try: | |
| print("\n[7/7] Audit Logs") | |
| hubspot_audit.main(since_ms=since_ms) | |
| except Exception as e: | |
| logging.exception("Error running audit log pipeline: %s", e) | |
| print("\n=== HubSpot sync complete ===") | |
| if __name__ == "__main__": | |
| main() | |