| | """ |
| | HubSpot Audit → Supabase (incremental since a millisecond cursor) |
| | |
| | Usage from orchestrator: |
| | import hubspot_audit |
| | hubspot_audit.main(since_ms=<int milliseconds since epoch UTC>) |
| | |
| | Direct CLI: |
| | # epoch ms |
| | python hubspot_audit.py 1754025600000 |
| | # ISO-8601 |
| | python hubspot_audit.py 2025-08-01T09:30:00Z |
| | # Back-compat date (floors to 00:00Z) |
| | python hubspot_audit.py 2025-08-01 |
| | """ |
| | import os |
| | import logging |
| | import datetime |
| | from typing import Dict, List, Optional, Union |
| | import re |
| | from dotenv import load_dotenv |
| | from supabase import create_client |
| | from supabase_utils import batched_insert, update_sync_metadata |
| | from hubspot_utils import ( |
| | to_epoch_ms_from_utc_iso, |
| | page_account_activity, |
| | build_login_index, |
| | build_security_index, |
| | normalize_audit_event, |
| | enrich_audit_row_by_category, |
| | deduplicate_by_key, |
| | ) |
| |
|
| | |
| | logging.basicConfig( |
| | filename=f"logs/hubspot_audit_logs_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", |
| | filemode="a", |
| | level=logging.INFO, |
| | format="%(asctime)s [%(levelname)s] %(message)s", |
| | ) |
| |
|
| | |
| | load_dotenv() |
| | HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") |
| | SUPABASE_URL = os.getenv("SUPABASE_URL") |
| | SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| |
|
| | if not HUBSPOT_TOKEN: |
| | raise RuntimeError("HUBSPOT_TOKEN is required") |
| | if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: |
| | raise RuntimeError( |
| | "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY are required") |
| |
|
| | supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
| |
|
| | |
| | AUDITLOG_TABLE = os.getenv("HUBSPOT_AUDITLOG_TABLE", "hubspot_audits") |
| | HUBSPOT_LIMIT = int(os.getenv("HUBSPOT_AUDITLOG_LIMIT", "100")) |
| | HUBSPOT_MAX_PAGES = int(os.getenv("HUBSPOT_AUDITLOG_MAX_PAGES", "10000")) |
| | MATCH_WINDOW_SECONDS = int(os.getenv("HUBSPOT_MATCH_WINDOW_SECONDS", "300")) |
| |
|
| | INITIAL_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_START", "1.0")) |
| | MAX_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_MAX", "16.0")) |
| |
|
| | BOOTSTRAP_SINCE_MS_ENV = os.getenv( |
| | "BOOTSTRAP_SINCE_MS_ENV") |
| |
|
| |
|
| | def _today_midnight_ms_utc() -> int: |
| | now = datetime.datetime.now(datetime.timezone.utc) |
| | dt = now.replace(hour=0, minute=0, second=0, microsecond=0) |
| | |
| | return to_epoch_ms_from_utc_iso(dt.isoformat().replace("+00:00", "Z")) |
| |
|
| |
|
| | def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: |
| | if dt.tzinfo is None: |
| | dt = dt.replace(tzinfo=datetime.timezone.utc) |
| | return dt.astimezone(datetime.timezone.utc) |
| |
|
| |
|
| | def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: |
| | dt = _ensure_utc(dt) |
| | return dt.replace(hour=0, minute=0, second=0, microsecond=0) |
| |
|
| |
|
| | def _parse_iso_like_to_dt(value: str) -> datetime.datetime: |
| | if value.endswith("Z"): |
| | value = value[:-1] + "+00:00" |
| | dt = datetime.datetime.fromisoformat(value) |
| | return _ensure_utc(dt) |
| |
|
| |
|
| | def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: |
| | if isinstance(dt_or_str, str): |
| | dt = _parse_iso_like_to_dt(dt_or_str) |
| | elif isinstance(dt_or_str, datetime.datetime): |
| | dt = _ensure_utc(dt_or_str) |
| | else: |
| | raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") |
| | return int(dt.timestamp() * 1000) |
| |
|
| |
|
| | BASE_AUDIT_URL = "https://api.hubapi.com/account-info/v3/activity/audit-logs" |
| | BASE_SECURITY_URL = "https://api.hubspot.com/account-info/v3/activity/security".replace( |
| | "hubspot.com", "hubapi.com") |
| | BASE_LOGIN_URL = "https://api.hubapi.com/account-info/v3/activity/login" |
| |
|
| |
|
| | def fetch_streams(occurred_after_ms: int) -> Dict[str, List[dict]]: |
| | """ |
| | Fetch the three streams from HubSpot account-info API: |
| | - Audit logs |
| | - Login activity |
| | - Security activity |
| | |
| | We pass occurred_after_ms and omit occurred_before_ms (None). |
| | """ |
| | audit = page_account_activity( |
| | base_url=BASE_AUDIT_URL, |
| | token=HUBSPOT_TOKEN, |
| | occurred_after_ms=occurred_after_ms, |
| | occurred_before_ms=None, |
| | limit=HUBSPOT_LIMIT, |
| | max_pages=HUBSPOT_MAX_PAGES, |
| | ) |
| | login = page_account_activity( |
| | base_url=BASE_LOGIN_URL, |
| | token=HUBSPOT_TOKEN, |
| | occurred_after_ms=occurred_after_ms, |
| | occurred_before_ms=None, |
| | limit=HUBSPOT_LIMIT, |
| | max_pages=HUBSPOT_MAX_PAGES, |
| | ) |
| | security = page_account_activity( |
| | base_url=BASE_SECURITY_URL, |
| | token=HUBSPOT_TOKEN, |
| | occurred_after_ms=occurred_after_ms, |
| | occurred_before_ms=None, |
| | limit=HUBSPOT_LIMIT, |
| | max_pages=HUBSPOT_MAX_PAGES, |
| | ) |
| | logging.info("Fetched counts: audit=%d login=%d security=%d", |
| | len(audit), len(login), len(security)) |
| | return {"audit": audit, "login": login, "security": security} |
| |
|
| |
|
| | def build_indices(login_events: List[dict], security_events: List[dict]): |
| | login_idx = build_login_index(login_events) |
| | security_idx = build_security_index(security_events) |
| | return login_idx, security_idx |
| |
|
| |
|
| | def normalize_and_enrich(audit_events: List[dict], login_idx, security_idx) -> List[dict]: |
| | rows: List[dict] = [] |
| | for ev in audit_events: |
| | row = normalize_audit_event(ev) |
| | if not row.get("audit_id"): |
| | continue |
| | row = enrich_audit_row_by_category( |
| | row, login_idx, security_idx, match_window_seconds=MATCH_WINDOW_SECONDS) |
| | rows.append(row) |
| | rows = deduplicate_by_key(rows, "audit_id") |
| | return rows |
| |
|
| |
|
| | def upsert(rows: List[dict]) -> None: |
| | if not rows: |
| | logging.info("No rows to upsert.") |
| | sync_time = datetime.datetime.now(datetime.timezone.utc) |
| | update_sync_metadata( |
| | supabase_client, AUDITLOG_TABLE, sync_time.isoformat()) |
| | return |
| |
|
| | batched_insert( |
| | supabase_client, |
| | AUDITLOG_TABLE, |
| | rows, |
| | batch_size=500, |
| | on_conflict=["audit_id"], |
| | ) |
| | logging.info("Upserted %d rows into %s", len(rows), AUDITLOG_TABLE) |
| |
|
| | sync_time = datetime.datetime.now(datetime.timezone.utc) |
| | update_sync_metadata(supabase_client, AUDITLOG_TABLE, |
| | sync_time.isoformat()) |
| |
|
| |
|
| | def main(since_ms: Optional[int] = None): |
| | logging.info( |
| | "Starting HubSpot audit sync (occurredAfter_ms=%s, occurredBefore=SKIPPED).", since_ms) |
| |
|
| | if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: |
| | try: |
| | since_ms = int(BOOTSTRAP_SINCE_MS_ENV) |
| | except ValueError: |
| | raise RuntimeError( |
| | "HUBSPOT_BILLING_SINCE_MS must be an integer (ms) if set.") |
| |
|
| | if since_ms is None: |
| | |
| | today0 = floor_to_utc_midnight( |
| | datetime.datetime.now(datetime.timezone.utc)) |
| | since_ms = to_epoch_ms(today0) |
| |
|
| | print(f"Fetching HubSpot audit logs occurredAfter > {since_ms} ...") |
| | streams = fetch_streams(occurred_after_ms=since_ms) |
| | login_idx, security_idx = build_indices( |
| | streams["login"], streams["security"]) |
| | rows = normalize_and_enrich(streams["audit"], login_idx, security_idx) |
| |
|
| | print("Upserting into Supabase...") |
| | upsert(rows) |
| | print(f"Synced {len(rows)} audit rows into '{AUDITLOG_TABLE}'.") |
| | print("Audit logs sync complete.") |
| |
|
| |
|
| | def _parse_cli_arg_to_ms(arg: str) -> int: |
| | """ |
| | Accept: |
| | - integer epoch ms |
| | - ISO-8601 (Z or offset) |
| | - YYYY-MM-DD (floors to 00:00Z) |
| | """ |
| | |
| | if re.fullmatch(r"\d{10,13}", arg): |
| | v = int(arg) |
| | if v < 10_000_000_000_000: |
| | v *= 1000 |
| | return v |
| |
|
| | |
| | if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): |
| | d = datetime.datetime.strptime( |
| | arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) |
| | return to_epoch_ms(floor_to_utc_midnight(d)) |
| |
|
| | |
| | return to_epoch_ms(arg) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import sys |
| | if len(sys.argv) > 1: |
| | try: |
| | since = _parse_cli_arg_to_ms(sys.argv[1]) |
| | except Exception as e: |
| | print( |
| | f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" |
| | ) |
| | sys.exit(1) |
| | main(since_ms=since) |
| | else: |
| | main() |
| |
|