Spaces:
Paused
Paused
| """ | |
| HubSpot Audit → Supabase (incremental since a millisecond cursor) | |
| Usage from orchestrator: | |
| import hubspot_audit | |
| hubspot_audit.main(since_ms=<int milliseconds since epoch UTC>) | |
| Direct CLI: | |
| # epoch ms | |
| python hubspot_audit.py 1754025600000 | |
| # ISO-8601 | |
| python hubspot_audit.py 2025-08-01T09:30:00Z | |
| # Back-compat date (floors to 00:00Z) | |
| python hubspot_audit.py 2025-08-01 | |
| """ | |
| import os | |
| import logging | |
| import datetime | |
| from typing import Dict, List, Optional, Union | |
| import re | |
| from dotenv import load_dotenv | |
| from supabase import create_client | |
| from supabase_utils import batched_insert, update_sync_metadata | |
| from hubspot_utils import ( | |
| to_epoch_ms_from_utc_iso, | |
| page_account_activity, | |
| build_login_index, | |
| build_security_index, | |
| normalize_audit_event, | |
| enrich_audit_row_by_category, | |
| deduplicate_by_key, | |
| ) | |
| # Logging | |
| logging.basicConfig( | |
| filename=f"logs/hubspot_audit_logs_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", | |
| filemode="a", | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| # Environment | |
| load_dotenv() | |
| HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| if not HUBSPOT_TOKEN: | |
| raise RuntimeError("HUBSPOT_TOKEN is required") | |
| if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: | |
| raise RuntimeError( | |
| "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY are required") | |
| supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) | |
| # Config | |
| AUDITLOG_TABLE = os.getenv("HUBSPOT_AUDITLOG_TABLE", "hubspot_audits") | |
| HUBSPOT_LIMIT = int(os.getenv("HUBSPOT_AUDITLOG_LIMIT", "100")) | |
| HUBSPOT_MAX_PAGES = int(os.getenv("HUBSPOT_AUDITLOG_MAX_PAGES", "10000")) | |
| MATCH_WINDOW_SECONDS = int(os.getenv("HUBSPOT_MATCH_WINDOW_SECONDS", "300")) | |
| INITIAL_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_START", "1.0")) | |
| MAX_BACKOFF_SECONDS = float(os.getenv("HUBSPOT_BACKOFF_MAX", "16.0")) | |
| BOOTSTRAP_SINCE_MS_ENV = os.getenv( | |
| "BOOTSTRAP_SINCE_MS_ENV") # may be epoch ms or ISO-8601 | |
| def _today_midnight_ms_utc() -> int: | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| dt = now.replace(hour=0, minute=0, second=0, microsecond=0) | |
| # hubspot_utils.to_epoch_ms_from_utc_iso expects ISO; format accordingly | |
| return to_epoch_ms_from_utc_iso(dt.isoformat().replace("+00:00", "Z")) | |
| def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=datetime.timezone.utc) | |
| return dt.astimezone(datetime.timezone.utc) | |
| def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: | |
| dt = _ensure_utc(dt) | |
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| def _parse_iso_like_to_dt(value: str) -> datetime.datetime: | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+00:00" | |
| dt = datetime.datetime.fromisoformat(value) | |
| return _ensure_utc(dt) | |
| def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: | |
| if isinstance(dt_or_str, str): | |
| dt = _parse_iso_like_to_dt(dt_or_str) | |
| elif isinstance(dt_or_str, datetime.datetime): | |
| dt = _ensure_utc(dt_or_str) | |
| else: | |
| raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") | |
| return int(dt.timestamp() * 1000) | |
| BASE_AUDIT_URL = "https://api.hubapi.com/account-info/v3/activity/audit-logs" | |
| BASE_SECURITY_URL = "https://api.hubspot.com/account-info/v3/activity/security".replace( | |
| "hubspot.com", "hubapi.com") # ensure hubapi | |
| BASE_LOGIN_URL = "https://api.hubapi.com/account-info/v3/activity/login" | |
| def fetch_streams(occurred_after_ms: int) -> Dict[str, List[dict]]: | |
| """ | |
| Fetch the three streams from HubSpot account-info API: | |
| - Audit logs | |
| - Login activity | |
| - Security activity | |
| We pass occurred_after_ms and omit occurred_before_ms (None). | |
| """ | |
| audit = page_account_activity( | |
| base_url=BASE_AUDIT_URL, | |
| token=HUBSPOT_TOKEN, | |
| occurred_after_ms=occurred_after_ms, | |
| occurred_before_ms=None, | |
| limit=HUBSPOT_LIMIT, | |
| max_pages=HUBSPOT_MAX_PAGES, | |
| ) | |
| login = page_account_activity( | |
| base_url=BASE_LOGIN_URL, | |
| token=HUBSPOT_TOKEN, | |
| occurred_after_ms=occurred_after_ms, | |
| occurred_before_ms=None, | |
| limit=HUBSPOT_LIMIT, | |
| max_pages=HUBSPOT_MAX_PAGES, | |
| ) | |
| security = page_account_activity( | |
| base_url=BASE_SECURITY_URL, | |
| token=HUBSPOT_TOKEN, | |
| occurred_after_ms=occurred_after_ms, | |
| occurred_before_ms=None, | |
| limit=HUBSPOT_LIMIT, | |
| max_pages=HUBSPOT_MAX_PAGES, | |
| ) | |
| logging.info("Fetched counts: audit=%d login=%d security=%d", | |
| len(audit), len(login), len(security)) | |
| return {"audit": audit, "login": login, "security": security} | |
| def build_indices(login_events: List[dict], security_events: List[dict]): | |
| login_idx = build_login_index(login_events) | |
| security_idx = build_security_index(security_events) | |
| return login_idx, security_idx | |
| def normalize_and_enrich(audit_events: List[dict], login_idx, security_idx) -> List[dict]: | |
| rows: List[dict] = [] | |
| for ev in audit_events: | |
| row = normalize_audit_event(ev) | |
| if not row.get("audit_id"): | |
| continue | |
| row = enrich_audit_row_by_category( | |
| row, login_idx, security_idx, match_window_seconds=MATCH_WINDOW_SECONDS) | |
| rows.append(row) | |
| rows = deduplicate_by_key(rows, "audit_id") | |
| return rows | |
| def upsert(rows: List[dict]) -> None: | |
| if not rows: | |
| logging.info("No rows to upsert.") | |
| sync_time = datetime.datetime.now(datetime.timezone.utc) | |
| update_sync_metadata( | |
| supabase_client, AUDITLOG_TABLE, sync_time.isoformat()) | |
| return | |
| batched_insert( | |
| supabase_client, | |
| AUDITLOG_TABLE, | |
| rows, | |
| batch_size=500, | |
| on_conflict=["audit_id"], | |
| ) | |
| logging.info("Upserted %d rows into %s", len(rows), AUDITLOG_TABLE) | |
| sync_time = datetime.datetime.now(datetime.timezone.utc) | |
| update_sync_metadata(supabase_client, AUDITLOG_TABLE, | |
| sync_time.isoformat()) | |
| def main(since_ms: Optional[int] = None): | |
| logging.info( | |
| "Starting HubSpot audit sync (occurredAfter_ms=%s, occurredBefore=SKIPPED).", since_ms) | |
| if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: | |
| try: | |
| since_ms = int(BOOTSTRAP_SINCE_MS_ENV) | |
| except ValueError: | |
| raise RuntimeError( | |
| "HUBSPOT_BILLING_SINCE_MS must be an integer (ms) if set.") | |
| if since_ms is None: | |
| # Default: today@00:00:00Z for first run | |
| today0 = floor_to_utc_midnight( | |
| datetime.datetime.now(datetime.timezone.utc)) | |
| since_ms = to_epoch_ms(today0) | |
| print(f"Fetching HubSpot audit logs occurredAfter > {since_ms} ...") | |
| streams = fetch_streams(occurred_after_ms=since_ms) | |
| login_idx, security_idx = build_indices( | |
| streams["login"], streams["security"]) | |
| rows = normalize_and_enrich(streams["audit"], login_idx, security_idx) | |
| print("Upserting into Supabase...") | |
| upsert(rows) | |
| print(f"Synced {len(rows)} audit rows into '{AUDITLOG_TABLE}'.") | |
| print("Audit logs sync complete.") | |
| def _parse_cli_arg_to_ms(arg: str) -> int: | |
| """ | |
| Accept: | |
| - integer epoch ms | |
| - ISO-8601 (Z or offset) | |
| - YYYY-MM-DD (floors to 00:00Z) | |
| """ | |
| # epoch ms or seconds | |
| if re.fullmatch(r"\d{10,13}", arg): | |
| v = int(arg) | |
| if v < 10_000_000_000_000: # seconds -> ms | |
| v *= 1000 | |
| return v | |
| # YYYY-MM-DD | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): | |
| d = datetime.datetime.strptime( | |
| arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) | |
| return to_epoch_ms(floor_to_utc_midnight(d)) | |
| # ISO-8601 | |
| return to_epoch_ms(arg) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| try: | |
| since = _parse_cli_arg_to_ms(sys.argv[1]) | |
| except Exception as e: | |
| print( | |
| f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" | |
| ) | |
| sys.exit(1) | |
| main(since_ms=since) | |
| else: | |
| main() | |