""" HubSpot Audit → Supabase (incremental since a millisecond cursor) Usage from orchestrator: import hubspot_audit hubspot_audit.main(since_ms=) Direct CLI: # epoch ms python hubspot_audit.py 1754025600000 # ISO-8601 python hubspot_audit.py 2025-08-01T09:30:00Z # Back-compat date (floors to 00:00Z) python hubspot_audit.py 2025-08-01 """ import os import logging import datetime from typing import Dict, List, Optional, Union import re from dotenv import load_dotenv from supabase import create_client from supabase_utils import batched_insert, update_sync_metadata from hubspot_utils import ( to_epoch_ms_from_utc_iso, page_account_activity, build_login_index, build_security_index, normalize_audit_event, enrich_audit_row_by_category, deduplicate_by_key, ) # Logging logging.basicConfig( filename=f"logs/hubspot_audit_logs_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # Environment load_dotenv() HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") if not HUBSPOT_TOKEN: raise RuntimeError("HUBSPOT_TOKEN is required") if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: raise RuntimeError( "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY are required") supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) # Config AUDITLOG_TABLE = os.getenv("HUBSPOT_AUDITLOG_TABLE", "hubspot_audits") HUBSPOT_LIMIT = int(os.getenv("HUBSPOT_AUDITLOG_LIMIT", "100")) HUBSPOT_MAX_PAGES = int(os.getenv("HUBSPOT_AUDITLOG_MAX_PAGES", "10000")) MATCH_WINDOW_SECONDS = int(os.getenv("HUBSPOT_MATCH_WINDOW_SECONDS", "300")) INITIAL_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_START", "1.0")) MAX_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_MAX", "16.0")) BOOTSTRAP_SINCE_MS_ENV = os.getenv( "BOOTSTRAP_SINCE_MS_ENV") # may be epoch ms or ISO-8601 def _today_midnight_ms_utc() -> int: now = datetime.datetime.now(datetime.timezone.utc) dt = now.replace(hour=0, minute=0, second=0, microsecond=0) # hubspot_utils.to_epoch_ms_from_utc_iso expects ISO; format accordingly return to_epoch_ms_from_utc_iso(dt.isoformat().replace("+00:00", "Z")) def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: dt = _ensure_utc(dt) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _parse_iso_like_to_dt(value: str) -> datetime.datetime: if value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) return _ensure_utc(dt) def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: if isinstance(dt_or_str, str): dt = _parse_iso_like_to_dt(dt_or_str) elif isinstance(dt_or_str, datetime.datetime): dt = _ensure_utc(dt_or_str) else: raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") return int(dt.timestamp() * 1000) BASE_AUDIT_URL = "https://api.hubapi.com/account-info/v3/activity/audit-logs" BASE_SECURITY_URL = "https://api.hubspot.com/account-info/v3/activity/security".replace( "hubspot.com", "hubapi.com") # ensure hubapi BASE_LOGIN_URL = "https://api.hubapi.com/account-info/v3/activity/login" def fetch_streams(occurred_after_ms: int) -> Dict[str, List[dict]]: """ Fetch the three streams from HubSpot account-info API: - Audit logs - Login activity - Security activity We pass occurred_after_ms and omit occurred_before_ms (None). """ audit = page_account_activity( base_url=BASE_AUDIT_URL, token=HUBSPOT_TOKEN, occurred_after_ms=occurred_after_ms, occurred_before_ms=None, limit=HUBSPOT_LIMIT, max_pages=HUBSPOT_MAX_PAGES, ) login = page_account_activity( base_url=BASE_LOGIN_URL, token=HUBSPOT_TOKEN, occurred_after_ms=occurred_after_ms, occurred_before_ms=None, limit=HUBSPOT_LIMIT, max_pages=HUBSPOT_MAX_PAGES, ) security = page_account_activity( base_url=BASE_SECURITY_URL, token=HUBSPOT_TOKEN, occurred_after_ms=occurred_after_ms, occurred_before_ms=None, limit=HUBSPOT_LIMIT, max_pages=HUBSPOT_MAX_PAGES, ) logging.info("Fetched counts: audit=%d login=%d security=%d", len(audit), len(login), len(security)) return {"audit": audit, "login": login, "security": security} def build_indices(login_events: List[dict], security_events: List[dict]): login_idx = build_login_index(login_events) security_idx = build_security_index(security_events) return login_idx, security_idx def normalize_and_enrich(audit_events: List[dict], login_idx, security_idx) -> List[dict]: rows: List[dict] = [] for ev in audit_events: row = normalize_audit_event(ev) if not row.get("audit_id"): continue row = enrich_audit_row_by_category( row, login_idx, security_idx, match_window_seconds=MATCH_WINDOW_SECONDS) rows.append(row) rows = deduplicate_by_key(rows, "audit_id") return rows def upsert(rows: List[dict]) -> None: if not rows: logging.info("No rows to upsert.") sync_time = datetime.datetime.now(datetime.timezone.utc) update_sync_metadata( supabase_client, AUDITLOG_TABLE, sync_time.isoformat()) return batched_insert( supabase_client, AUDITLOG_TABLE, rows, batch_size=500, on_conflict=["audit_id"], ) logging.info("Upserted %d rows into %s", len(rows), AUDITLOG_TABLE) sync_time = datetime.datetime.now(datetime.timezone.utc) update_sync_metadata(supabase_client, AUDITLOG_TABLE, sync_time.isoformat()) def main(since_ms: Optional[int] = None): logging.info( "Starting HubSpot audit sync (occurredAfter_ms=%s, occurredBefore=SKIPPED).", since_ms) if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: try: since_ms = int(BOOTSTRAP_SINCE_MS_ENV) except ValueError: raise RuntimeError( "HUBSPOT_BILLING_SINCE_MS must be an integer (ms) if set.") if since_ms is None: # Default: today@00:00:00Z for first run today0 = floor_to_utc_midnight( datetime.datetime.now(datetime.timezone.utc)) since_ms = to_epoch_ms(today0) print(f"Fetching HubSpot audit logs occurredAfter > {since_ms} ...") streams = fetch_streams(occurred_after_ms=since_ms) login_idx, security_idx = build_indices( streams["login"], streams["security"]) rows = normalize_and_enrich(streams["audit"], login_idx, security_idx) print("Upserting into Supabase...") upsert(rows) print(f"Synced {len(rows)} audit rows into '{AUDITLOG_TABLE}'.") print("Audit logs sync complete.") def _parse_cli_arg_to_ms(arg: str) -> int: """ Accept: - integer epoch ms - ISO-8601 (Z or offset) - YYYY-MM-DD (floors to 00:00Z) """ # epoch ms or seconds if re.fullmatch(r"\d{10,13}", arg): v = int(arg) if v < 10_000_000_000_000: # seconds -> ms v *= 1000 return v # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): d = datetime.datetime.strptime( arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) return to_epoch_ms(floor_to_utc_midnight(d)) # ISO-8601 return to_epoch_ms(arg) if __name__ == "__main__": import sys if len(sys.argv) > 1: try: since = _parse_cli_arg_to_ms(sys.argv[1]) except Exception as e: print( f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" ) sys.exit(1) main(since_ms=since) else: main()