n8n-duplicate / python /hubspot_audit.py
niwayandm
Refactor timestamp handle in HubSpot scripts to improve error messaging on n8n
6c5bf90
"""
HubSpot Audit → Supabase (incremental since a millisecond cursor)
Usage from orchestrator:
import hubspot_audit
hubspot_audit.main(since_ms=<int milliseconds since epoch UTC>)
Direct CLI:
# epoch ms
python hubspot_audit.py 1754025600000
# ISO-8601
python hubspot_audit.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00Z)
python hubspot_audit.py 2025-08-01
"""
import os
import logging
import datetime
from typing import Dict, List, Optional, Union
import re
from dotenv import load_dotenv
from supabase import create_client
from supabase_utils import batched_insert, update_sync_metadata
from hubspot_utils import (
to_epoch_ms_from_utc_iso,
page_account_activity,
build_login_index,
build_security_index,
normalize_audit_event,
enrich_audit_row_by_category,
deduplicate_by_key,
)
# Logging
logging.basicConfig(
filename=f"logs/hubspot_audit_logs_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# Environment
load_dotenv()
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not HUBSPOT_TOKEN:
raise RuntimeError("HUBSPOT_TOKEN is required")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise RuntimeError(
"SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY are required")
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# Config
AUDITLOG_TABLE = os.getenv("HUBSPOT_AUDITLOG_TABLE", "hubspot_audits")
HUBSPOT_LIMIT = int(os.getenv("HUBSPOT_AUDITLOG_LIMIT", "100"))
HUBSPOT_MAX_PAGES = int(os.getenv("HUBSPOT_AUDITLOG_MAX_PAGES", "10000"))
MATCH_WINDOW_SECONDS = int(os.getenv("HUBSPOT_MATCH_WINDOW_SECONDS", "300"))
INITIAL_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_START", "1.0"))
MAX_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_MAX", "16.0"))
BOOTSTRAP_SINCE_MS_ENV = os.getenv(
"BOOTSTRAP_SINCE_MS_ENV") # may be epoch ms or ISO-8601
def _today_midnight_ms_utc() -> int:
now = datetime.datetime.now(datetime.timezone.utc)
dt = now.replace(hour=0, minute=0, second=0, microsecond=0)
# hubspot_utils.to_epoch_ms_from_utc_iso expects ISO; format accordingly
return to_epoch_ms_from_utc_iso(dt.isoformat().replace("+00:00", "Z"))
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
BASE_AUDIT_URL = "https://api.hubapi.com/account-info/v3/activity/audit-logs"
BASE_SECURITY_URL = "https://api.hubspot.com/account-info/v3/activity/security".replace(
"hubspot.com", "hubapi.com") # ensure hubapi
BASE_LOGIN_URL = "https://api.hubapi.com/account-info/v3/activity/login"
def fetch_streams(occurred_after_ms: int) -> Dict[str, List[dict]]:
"""
Fetch the three streams from HubSpot account-info API:
- Audit logs
- Login activity
- Security activity
We pass occurred_after_ms and omit occurred_before_ms (None).
"""
audit = page_account_activity(
base_url=BASE_AUDIT_URL,
token=HUBSPOT_TOKEN,
occurred_after_ms=occurred_after_ms,
occurred_before_ms=None,
limit=HUBSPOT_LIMIT,
max_pages=HUBSPOT_MAX_PAGES,
)
login = page_account_activity(
base_url=BASE_LOGIN_URL,
token=HUBSPOT_TOKEN,
occurred_after_ms=occurred_after_ms,
occurred_before_ms=None,
limit=HUBSPOT_LIMIT,
max_pages=HUBSPOT_MAX_PAGES,
)
security = page_account_activity(
base_url=BASE_SECURITY_URL,
token=HUBSPOT_TOKEN,
occurred_after_ms=occurred_after_ms,
occurred_before_ms=None,
limit=HUBSPOT_LIMIT,
max_pages=HUBSPOT_MAX_PAGES,
)
logging.info("Fetched counts: audit=%d login=%d security=%d",
len(audit), len(login), len(security))
return {"audit": audit, "login": login, "security": security}
def build_indices(login_events: List[dict], security_events: List[dict]):
login_idx = build_login_index(login_events)
security_idx = build_security_index(security_events)
return login_idx, security_idx
def normalize_and_enrich(audit_events: List[dict], login_idx, security_idx) -> List[dict]:
rows: List[dict] = []
for ev in audit_events:
row = normalize_audit_event(ev)
if not row.get("audit_id"):
continue
row = enrich_audit_row_by_category(
row, login_idx, security_idx, match_window_seconds=MATCH_WINDOW_SECONDS)
rows.append(row)
rows = deduplicate_by_key(rows, "audit_id")
return rows
def upsert(rows: List[dict]) -> None:
if not rows:
logging.info("No rows to upsert.")
sync_time = datetime.datetime.now(datetime.timezone.utc)
update_sync_metadata(
supabase_client, AUDITLOG_TABLE, sync_time.isoformat())
return
batched_insert(
supabase_client,
AUDITLOG_TABLE,
rows,
batch_size=500,
on_conflict=["audit_id"],
)
logging.info("Upserted %d rows into %s", len(rows), AUDITLOG_TABLE)
sync_time = datetime.datetime.now(datetime.timezone.utc)
update_sync_metadata(supabase_client, AUDITLOG_TABLE,
sync_time.isoformat())
def main(since_ms: Optional[int] = None):
logging.info(
"Starting HubSpot audit sync (occurredAfter_ms=%s, occurredBefore=SKIPPED).", since_ms)
if since_ms is None and BOOTSTRAP_SINCE_MS_ENV:
try:
since_ms = int(BOOTSTRAP_SINCE_MS_ENV)
except ValueError:
raise RuntimeError(
"HUBSPOT_BILLING_SINCE_MS must be an integer (ms) if set.")
if since_ms is None:
# Default: today@00:00:00Z for first run
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
since_ms = to_epoch_ms(today0)
print(f"Fetching HubSpot audit logs occurredAfter > {since_ms} ...")
streams = fetch_streams(occurred_after_ms=since_ms)
login_idx, security_idx = build_indices(
streams["login"], streams["security"])
rows = normalize_and_enrich(streams["audit"], login_idx, security_idx)
print("Upserting into Supabase...")
upsert(rows)
print(f"Synced {len(rows)} audit rows into '{AUDITLOG_TABLE}'.")
print("Audit logs sync complete.")
def _parse_cli_arg_to_ms(arg: str) -> int:
"""
Accept:
- integer epoch ms
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z)
"""
# epoch ms or seconds
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
return to_epoch_ms(arg)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
try:
since = _parse_cli_arg_to_ms(sys.argv[1])
except Exception as e:
print(
f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}"
)
sys.exit(1)
main(since_ms=since)
else:
main()