| | """ |
| | This script contains utility functions for working with HubSpot data. |
| | """ |
| | import time |
| | import datetime |
| | import re |
| | import httpx |
| | import html |
| | import requests |
| | import logging |
| | from typing import Dict, List, Optional, Tuple |
| | from collections import defaultdict |
| | import pandas as pd |
| | from hubspot.crm.objects import ApiException as ObjectApiException |
| | from hubspot.crm.contacts.exceptions import ApiException as ContactsApiException |
| | from hubspot.crm.companies.exceptions import ApiException as CompaniesApiException |
| | from hubspot.crm.contacts import ( |
| | PublicObjectSearchRequest, Filter, FilterGroup, BatchInputSimplePublicObjectId |
| | ) |
| | from hubspot.crm.companies import ( |
| | PublicObjectSearchRequest as CompanySearchRequest, |
| | Filter as CompanyFilter, |
| | FilterGroup as CompanyFilterGroup |
| | ) |
| |
|
| | _MISSING = {None, ""} |
| |
|
| |
|
| | def serialize(obj): |
| | """ |
| | Recursively serialize a complex object into a plain dictionary. |
| | |
| | This function is useful for serializing HubSpot API responses into a form |
| | that can be easily stored in a database or file. |
| | |
| | It currently supports the following types: |
| | |
| | - Objects with a `to_dict` method |
| | - Lists |
| | - Dictionaries |
| | - Datetime objects |
| | - Objects with a `__dict__` attribute |
| | |
| | If an object doesn't match any of the above, it is converted to a string |
| | using the `str` function. |
| | |
| | :param obj: The object to serialize |
| | :return: A plain dictionary |
| | """ |
| |
|
| | if hasattr(obj, "to_dict"): |
| | return serialize(obj.to_dict()) |
| | elif isinstance(obj, list): |
| | return [serialize(item) for item in obj] |
| | elif isinstance(obj, dict): |
| | return {key: serialize(value) for key, value in obj.items()} |
| | elif isinstance(obj, datetime.datetime): |
| | return obj.isoformat() |
| | elif hasattr(obj, "__dict__"): |
| | return {key: serialize(value) for key, value in obj.__dict__.items() if not key.startswith("_")} |
| | else: |
| | return str(obj) |
| |
|
| |
|
| | def try_parse_int(value): |
| | """ |
| | Attempts to parse the given value as an integer. |
| | |
| | :param value: The value to be parsed. |
| | :return: The integer representation of the value if parsing is successful, |
| | otherwise None. |
| | """ |
| |
|
| | if value is None: |
| | return None |
| |
|
| | try: |
| | return int(value) |
| | except (ValueError, TypeError): |
| | return None |
| |
|
| |
|
| | def try_parse_float(value): |
| | """ |
| | Attempts to parse the given value as a float. |
| | |
| | :param value: The value to be parsed. |
| | :return: The float representation of the value if parsing is successful, |
| | otherwise None. |
| | """ |
| |
|
| | if value is None: |
| | return None |
| | try: |
| | return float(value) |
| | except (ValueError, TypeError): |
| | return None |
| |
|
| |
|
| | def parse_ts(ts): |
| | """ |
| | Parses a timestamp string in ISO 8601 format and converts it to a UTC datetime string. |
| | |
| | This function attempts to parse the given timestamp string, adjusting for the |
| | timezone offset if provided. If the parsing is successful, it returns the |
| | datetime in ISO 8601 format with UTC timezone. If the parsing fails or if |
| | the input is None, it returns the original timestamp or None. |
| | |
| | :param ts: The timestamp string to be parsed. |
| | :return: A UTC timezone-aware datetime string in ISO 8601 format, the original |
| | timestamp if parsing fails, or None if the input is None. |
| | """ |
| |
|
| | if not ts: |
| | return None |
| | try: |
| | if isinstance(ts, (int, float)) or str(ts).isdigit(): |
| | ts = int(ts) |
| | dt = datetime.datetime.fromtimestamp( |
| | ts / 1000.0, tz=datetime.timezone.utc) |
| | else: |
| | dt = datetime.datetime.fromisoformat( |
| | str(ts).replace("Z", "+00:00")) |
| | return dt.isoformat() |
| | except (ValueError, TypeError) as e: |
| | logging.error( |
| | "Failed to parse timestamp '%s': %s", ts, str(e) |
| | ) |
| | return None |
| |
|
| |
|
| | def deduplicate_by_key(rows, key): |
| | """ |
| | Removes duplicate rows from a list of dictionaries based on a specified key or tuple of keys. |
| | |
| | :param rows: The list of dictionaries to deduplicate. |
| | :param key: A string or tuple of strings representing the key(s) to deduplicate by. |
| | If a string, the value of the dictionary at that key will be used. |
| | If a tuple, the values of the dictionary at each key in the tuple |
| | will be combined into a tuple and used as the deduplication key. |
| | :return: A list of unique dictionaries, with duplicates removed based on the key(s) specified. |
| | """ |
| |
|
| | seen = set() |
| | unique_rows = [] |
| | for row in rows: |
| | val = tuple(row[k] for k in key) if isinstance( |
| | key, tuple) else row.get(key) |
| | if val and val not in seen: |
| | seen.add(val) |
| | unique_rows.append(row) |
| | return unique_rows |
| |
|
| |
|
| | def fix_surrogates(text: str) -> str: |
| | """ |
| | Fix surrogates in a string. |
| | Some emails may contain surrogates, which are invalid utf-8 characters. |
| | This function will convert such characters to valid utf-8 characters. |
| | :param text: The string to fix. |
| | :return: The fixed string. |
| | """ |
| | return text.encode('utf-16', 'surrogatepass').decode('utf-16') |
| |
|
| |
|
| | def strip_footer(text: str) -> str: |
| | """ |
| | Strips common email footers like confidentiality notices and signatures. |
| | """ |
| | footer_cutoff_patterns = [ |
| | r"(?i)Registration Number \\d+[A-Z]?", |
| | r"(?i)this e[- ]?mail message contains confidential information", |
| | r"(?i)this email and any attachments.*?confidential", |
| | r"(?i)if you are not the intended recipient", |
| | r"(?i)please notify us immediately by return e[- ]?mail", |
| | r"(?i)no liability is accepted for any damage", |
| | r"(?i)this message is intended only for the use of the individual", |
| | r"(?i)confidentiality notice", |
| | r"(?i)please consider the environment before printing", |
| | r"(?i)registered in England and Wales", |
| | r"(?i)the views expressed in this email are those of the sender", |
| | r"(?i)accepts no liability", |
| | r"(?i)has taken steps to ensure", |
| | ] |
| |
|
| | for pattern in footer_cutoff_patterns: |
| | match = re.search(pattern, text) |
| | if match: |
| | return text[:match.start()].strip() |
| | return text |
| |
|
| |
|
| | def clean_text(raw_text: str) -> str: |
| | """ |
| | Clean an email text by removing HTML, unescaping entities, and trimming |
| | quoted sections, footers, and signatures. |
| | """ |
| | |
| | if raw_text is None: |
| | raw_text = "" |
| |
|
| | |
| | try: |
| | fixed = fix_surrogates(raw_text) |
| | except NameError: |
| | |
| | fixed = raw_text |
| |
|
| | |
| | text = fixed |
| |
|
| | |
| | text = text.replace("\r\n", "\n").replace("\r", "\n") |
| |
|
| | |
| | text = re.sub(r"(?i)<br\s*/?>", "\n", text) |
| |
|
| | |
| | text = re.sub(r"<[^>]+>", "", text) |
| |
|
| | |
| | text = html.unescape(text) |
| |
|
| | |
| | on_wrote_pat = re.compile(r"(?im)^[>\s]*on\s+.+?wrote:") |
| | fwd_hdr_pat = re.compile( |
| | r"(?im)^[>\s]*(?:from|sent|date|subject|to)\s*:\s.+$") |
| | sep_pat = re.compile( |
| | r"(?im)^(?:[>\s]*-----\s*original message\s*-----|[>\s]*begin forwarded message)") |
| |
|
| | cut_idx = None |
| | for m in (on_wrote_pat.search(text), fwd_hdr_pat.search(text), sep_pat.search(text)): |
| | if m: |
| | idx = m.start() |
| | cut_idx = idx if cut_idx is None or idx < cut_idx else cut_idx |
| |
|
| | if cut_idx is not None: |
| | text = text[:cut_idx].rstrip("\n") |
| |
|
| | |
| | lines = text.split("\n") |
| | cleaned = [] |
| |
|
| | footer_regex = re.compile( |
| | r"(?i)\b(confidential|privacy policy|unsubscribe|follow us|visit our website|" |
| | r"please consider the environment|registered office|copyright|" |
| | r"this e-?mail and any attachments|do not print this email)\b" |
| | ) |
| | sig_sep = re.compile(r"^--\s?$") |
| |
|
| | for line in lines: |
| | stripped = line.strip() |
| |
|
| | |
| | if re.match(r"^[-=_*]{3,}\s*$", stripped): |
| | continue |
| |
|
| | |
| | if footer_regex.search(stripped): |
| | continue |
| |
|
| | |
| | if sig_sep.match(stripped): |
| | break |
| |
|
| | |
| | if stripped == "": |
| | cleaned.append("") |
| | continue |
| |
|
| | cleaned.append(stripped) |
| |
|
| | out = "\n".join(cleaned) |
| | out = re.sub(r"\n{3,}", "\n\n", out).strip() |
| |
|
| | |
| | try: |
| | out = strip_footer(out) |
| | except NameError: |
| | pass |
| |
|
| | return out |
| |
|
| |
|
| | def get_search_config(hubspot_client, object_type): |
| | """ |
| | Retrieves the necessary classes and search API instance for the given HubSpot object type. |
| | |
| | :param hubspot_client: The HubSpot client object. |
| | :param object_type: The type of object to search for. |
| | :return: A dictionary containing the necessary classes and search API instance. |
| | :raises ValueError: If the object_type is unsupported. |
| | """ |
| |
|
| | if object_type == "contacts": |
| | return { |
| | "FilterCls": Filter, |
| | "FilterGroupCls": FilterGroup, |
| | "SearchRequestCls": PublicObjectSearchRequest, |
| | "search_api": hubspot_client.crm.contacts.search_api, |
| | "modified_prop": "createdate", |
| | } |
| | if object_type == "companies": |
| | return { |
| | "FilterCls": CompanyFilter, |
| | "FilterGroupCls": CompanyFilterGroup, |
| | "SearchRequestCls": CompanySearchRequest, |
| | "search_api": hubspot_client.crm.companies.search_api, |
| | "modified_prop": "createdate", |
| | } |
| |
|
| | raise ValueError(f"Unsupported object_type '{object_type}'") |
| |
|
| |
|
| | def get_property_label_mapping(hubspot_client, object_type: str, property_name: str) -> dict: |
| | """ |
| | Retrieves the label mapping for a HubSpot property. |
| | |
| | :param hubspot_client: The HubSpot client instance. |
| | :param object_type: "contacts" or "companies" |
| | :param property_name: The internal name of the property (e.g., "industry"). |
| | :return: Dictionary mapping internal values to human-readable labels. |
| | """ |
| | try: |
| | prop_info = hubspot_client.crm.properties.core_api.get_by_name( |
| | object_type, property_name) |
| | return {opt.value: opt.label for opt in prop_info.options} |
| | except Exception as e: |
| | logging.warning("Failed to fetch mapping for %s: %s", |
| | property_name, str(e)) |
| | return {} |
| |
|
| |
|
| | def build_filter_request(since, after): |
| | """ |
| | Builds a HubSpot SearchRequest to fetch companies created after a given datetime. |
| | |
| | :param since: The datetime to filter by createdate |
| | :param after: Optional result offset to use for pagination |
| | :return: PublicObjectSearchRequest |
| | """ |
| |
|
| | value = int(since.timestamp() * 1000) |
| | filter_obj = Filter(property_name="createdate", |
| | operator="GTE", value=value) |
| | filter_group = FilterGroup(filters=[filter_obj]) |
| | return PublicObjectSearchRequest( |
| | filter_groups=[filter_group], |
| | properties=[ |
| | "full_name", |
| | "firstname", |
| | "lastname", |
| | "email", |
| | "phone", |
| | "createdate", |
| | "lastmodifieddate", |
| | "lastactivitydate", |
| | "associatedcompanyid", |
| | ], |
| | limit=100, |
| | after=after |
| | ) |
| |
|
| |
|
| | def fetch_custom_object(hubspot_client, object_name="tickets", properties=None, |
| | associations=None): |
| | """ |
| | Fetches all records of a custom HubSpot object and returns them as a list of dictionaries. |
| | |
| | :param hubspot_client: The HubSpot client object. |
| | :param object_name: The name of the custom object to fetch. Defaults to "tickets". |
| | :param properties: A list of properties to include in the response. If None, all properties are included. |
| | :param associations: A list of associations to include in the response. If None, no associations are included. |
| | :return: A list of dictionaries, where each dictionary represents a record of the custom object. |
| | :raises ObjectApiException: If the HubSpot API returns an error when fetching the custom object. |
| | """ |
| | all_objects = [] |
| | after = None |
| |
|
| | while True: |
| | try: |
| | response = hubspot_client.crm.objects.basic_api.get_page( |
| | object_type=object_name, |
| | properties=properties, |
| | limit=100, |
| | after=after, |
| | archived=False, |
| | associations=associations |
| | ) |
| |
|
| | if not response.results: |
| | break |
| |
|
| | for record in response.results: |
| | props = record.properties |
| | object_id = record.id |
| |
|
| | |
| | if properties: |
| | filtered_props = {k: props.get( |
| | k) for k in properties if k != "hs_object_id"} |
| | else: |
| | filtered_props = { |
| | k: v for k, v in props.items() if k != "hs_object_id"} |
| |
|
| | object_dict = {"billing_id": object_id, **filtered_props} |
| |
|
| | |
| | for key in ["hs_created_by_user_id"]: |
| | if key in object_dict: |
| | object_dict[key] = try_parse_int(object_dict[key]) |
| |
|
| | |
| | for key in ["hs_createdate", "hs_lastmodifieddate"]: |
| | if key in object_dict: |
| | object_dict[key] = parse_ts(object_dict[key]) |
| |
|
| | |
| | if associations and record.associations: |
| | assoc_data = {} |
| | for assoc_type, assoc_records in record.associations.items(): |
| | assoc_data[assoc_type] = [ |
| | ar.id for ar in assoc_records.results] |
| | object_dict["associations"] = assoc_data |
| |
|
| | all_objects.append(object_dict) |
| |
|
| | if response.paging and response.paging.next: |
| | after = response.paging.next.after |
| | else: |
| | break |
| |
|
| | time.sleep(0.1) |
| |
|
| | except ObjectApiException as e: |
| | logging.error("Exception when fetching %s: %s", object_name, e) |
| | break |
| |
|
| | return all_objects |
| |
|
| |
|
| | def fetch_total_objects( |
| | hubspot_client, |
| | object_type: str, |
| | modified_after: datetime.datetime = None, |
| | archived: bool = False |
| | ) -> int: |
| | """ |
| | Fetches the total number of HubSpot objects of the given type. |
| | Supports counting archived objects using pagination. |
| | |
| | :param hubspot_client: HubSpot client |
| | :param object_type: "contacts" or "companies" |
| | :param modified_after: Only used for non-archived search_api queries |
| | :param archived: Whether to count archived records |
| | :return: Total number of matching objects |
| | """ |
| | try: |
| | if archived: |
| | total = 0 |
| | after = None |
| | while True: |
| | if object_type == "contacts": |
| | response = hubspot_client.crm.contacts.basic_api.get_page( |
| | limit=100, archived=True, after=after |
| | ) |
| | elif object_type == "companies": |
| | response = hubspot_client.crm.companies.basic_api.get_page( |
| | limit=100, archived=True, after=after |
| | ) |
| | else: |
| | raise ValueError( |
| | f"Unsupported object_type '{object_type}' for archived=True") |
| |
|
| | total += len(response.results) |
| |
|
| | if response.paging and response.paging.next: |
| | after = response.paging.next.after |
| | else: |
| | break |
| | time.sleep(0.1) |
| |
|
| | logging.info("Total %s (archived): %d", object_type, total) |
| | print(f"Total {object_type} (archived): {total}") |
| | return total |
| |
|
| | |
| | config = get_search_config(hubspot_client, object_type) |
| | filters = [] |
| |
|
| | if modified_after: |
| | value = int(modified_after.timestamp() * 1000) |
| | filters.append(config["FilterCls"]( |
| | property_name=config["modified_prop"], operator="GTE", value=value |
| | )) |
| |
|
| | request_body = config["SearchRequestCls"]( |
| | filter_groups=[config["FilterGroupCls"]( |
| | filters=filters)] if filters else None, |
| | properties=["id"], |
| | limit=1 |
| | ) |
| | response = config["search_api"].do_search(request_body) |
| | total = response.total |
| |
|
| | logging.info("Total %s in HubSpot: %d", object_type, total) |
| | print(f"Total {object_type} in HubSpot: {total}") |
| | return total |
| |
|
| | except (ContactsApiException, CompaniesApiException) as api_err: |
| | logging.error("API error occurred while fetching %s: %s", |
| | object_type, str(api_err)) |
| | except httpx.HTTPError as http_err: |
| | logging.error("HTTP error occurred while fetching %s: %s", |
| | object_type, str(http_err)) |
| | except ValueError as val_err: |
| | logging.error("ValueError while fetching %s: %s", |
| | object_type, str(val_err)) |
| | except BaseException as critical_err: |
| | logging.critical("Unexpected error fetching %s: %s", |
| | object_type, str(critical_err)) |
| | raise |
| |
|
| | return 0 |
| |
|
| |
|
| | def _coalesce(*vals): |
| | """ |
| | Returns the first non-missing value from the given arguments. |
| | |
| | A value is considered "missing" if it is None or an empty string. |
| | |
| | :param vals: Variable number of arguments to coalesce. |
| | :return: The first non-missing value, or None if all values are missing. |
| | """ |
| | for v in vals: |
| | if v not in _MISSING: |
| | return v |
| | return None |
| |
|
| |
|
| | def parse_ts_dt(ts: Optional[str]) -> Optional[datetime.datetime]: |
| | """ |
| | Parses a timestamp string in ISO 8601 format and converts it to a UTC datetime object. |
| | |
| | :param ts: The timestamp string to be parsed. |
| | :return: A UTC timezone-aware datetime object, or None if the input is None or parsing fails. |
| | """ |
| | if not ts: |
| | return None |
| | try: |
| | return datetime.datetime.fromisoformat(str(ts).replace("Z", "+00:00")).astimezone(datetime.timezone.utc) |
| | except Exception: |
| | return None |
| |
|
| |
|
| | def to_epoch_ms_from_utc_iso(date_str: str) -> int: |
| | """ |
| | Converts a date string in ISO 8601 format or 'YYYY-MM-DD' format to milliseconds since the Unix epoch (January 1, 1970, 00:00:00 UTC). |
| | |
| | :param date_str: The date string to be converted. |
| | :return: The number of milliseconds since the Unix epoch. |
| | """ |
| | if not date_str: |
| | raise ValueError("date_str is required") |
| | if len(date_str) == 10 and date_str[4] == "-" and date_str[7] == "-": |
| | |
| | dt = datetime.datetime.strptime( |
| | date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) |
| | else: |
| | dt = datetime.datetime.fromisoformat(date_str.replace("Z", "+00:00")) |
| | return int(dt.timestamp() * 1000) |
| |
|
| |
|
| | def request_with_retry(method: str, |
| | url: str, |
| | headers: Dict[str, str], |
| | params: Dict[str, str], |
| | initial_backoff: float = 1.0, |
| | max_backoff: float = 16.0, |
| | retries: int = 8) -> Dict: |
| | """ |
| | Sends a request to the given URL with the given method, headers, and parameters, and |
| | retries the request up to the given number of times if it fails with a HubSpot API error |
| | (429, 500, 502, 503, 504). If the request fails with another error code, it raises a |
| | RuntimeError. If all retries fail, it raises a RuntimeError with a message indicating the |
| | number of retry attempts. |
| | |
| | :param method: The HTTP method to use (e.g. GET, POST). |
| | :param url: The URL to send the request to. |
| | :param headers: A dictionary of HTTP headers to include in the request. |
| | :param params: A dictionary of URL parameters to include in the request. |
| | :param initial_backoff: The initial backoff time in seconds. |
| | :param max_backoff: The maximum backoff time in seconds. |
| | :param retries: The number of times to retry the request before giving up. |
| | :return: The JSON response from the request, decoded as a dictionary. |
| | :raises RuntimeError: If all retry attempts fail. |
| | """ |
| | backoff = initial_backoff |
| | for attempt in range(1, retries + 1): |
| | try: |
| | resp = requests.request( |
| | method, url, headers=headers, params=params, timeout=60) |
| | if resp.status_code < 400: |
| | try: |
| | return resp.json() |
| | except Exception as e: |
| | logging.error( |
| | "Failed to decode JSON response from %s: %s", url, e) |
| | raise |
| |
|
| | |
| | if resp.status_code in (429, 500, 502, 503, 504): |
| | logging.warning( |
| | "HubSpot API %s (%d). Retrying in %.1fs (attempt %d/%d)...", |
| | url, resp.status_code, backoff, attempt, retries |
| | ) |
| | time.sleep(backoff) |
| | backoff = min(max_backoff, backoff * 2) |
| | continue |
| |
|
| | |
| | logging.error("HubSpot API error %s: %s", |
| | resp.status_code, resp.text) |
| | resp.raise_for_status() |
| |
|
| | except (requests.exceptions.ConnectionError, |
| | requests.exceptions.Timeout) as e: |
| | logging.warning( |
| | "Network error on attempt %d/%d (%s). Retrying in %.1fs...", |
| | attempt, retries, str(e), backoff |
| | ) |
| | time.sleep(backoff) |
| | backoff = min(max_backoff, backoff * 2) |
| | continue |
| |
|
| | except Exception as e: |
| | logging.error("Unexpected error during HubSpot request: %s", e) |
| | raise |
| |
|
| | |
| | raise RuntimeError(f"Failed after {retries} attempts calling {url}") |
| |
|
| |
|
| | def page_account_activity(base_url: str, |
| | token: str, |
| | occurred_after_ms: int, |
| | occurred_before_ms: int, |
| | limit: int = 200, |
| | max_pages: int = 1000) -> List[Dict]: |
| | """ |
| | Fetches all account activity events from the given base URL that occurred |
| | after the given timestamp in milliseconds. |
| | |
| | :param base_url: The base URL to fetch events from. |
| | :param token: The HubSpot API token to use for authentication. |
| | :param occurred_after_ms: The timestamp in milliseconds to fetch events after. |
| | :param limit: The maximum number of events to fetch per page (default=200). |
| | :param max_pages: The maximum number of pages to fetch before stopping pagination |
| | (default=1000). |
| | :return: A list of all fetched events. |
| | """ |
| | headers = {"Authorization": f"Bearer {token}", |
| | "Accept": "application/json"} |
| |
|
| | is_security = "activity/security" in base_url |
| | is_audit = "activity/audit-logs" in base_url |
| |
|
| | params: Dict[str, str] = {"limit": str(limit)} |
| | if is_audit: |
| | params["occurredAfter"] = str(occurred_after_ms) |
| | if occurred_before_ms is not None: |
| | params["occurredBefore"] = str(occurred_before_ms) |
| | elif is_security: |
| | params["fromTimestamp"] = str(occurred_after_ms) |
| | if occurred_before_ms is not None: |
| | params["toTimestamp"] = str(occurred_before_ms) |
| |
|
| | all_items: List[Dict] = [] |
| | after: Optional[str] = None |
| | prev_after: Optional[str] = None |
| | pages = 0 |
| |
|
| | |
| | TERMINAL_AFTER_VALUES = {"MC0w", "0-0", ""} |
| |
|
| | while True: |
| | if after: |
| | params["after"] = after |
| | else: |
| | params.pop("after", None) |
| |
|
| | data = request_with_retry("GET", base_url, headers, params) |
| | results = data.get("results") or data.get("events") or [] |
| | if not isinstance(results, list): |
| | logging.warning("Unexpected results shape at %s: %s", |
| | base_url, type(results)) |
| | results = [] |
| |
|
| | all_items.extend(results) |
| |
|
| | paging = data.get("paging") or {} |
| | next_obj = paging.get("next") or {} |
| | next_after = next_obj.get("after") |
| |
|
| | pages += 1 |
| | logging.info("Fetched page %d from %s: %d items (total=%d). after=%s", |
| | pages, base_url, len(results), len(all_items), str(next_after)) |
| |
|
| | if len(results) == 0: |
| | logging.info( |
| | "No results returned; stopping pagination for %s.", base_url) |
| | break |
| |
|
| | if not next_after: |
| | logging.info( |
| | "No next cursor; stopping pagination for %s.", base_url) |
| | break |
| |
|
| | if next_after in TERMINAL_AFTER_VALUES: |
| | logging.info( |
| | "Terminal 'after' (%s); stopping pagination for %s.", next_after, base_url) |
| | break |
| |
|
| | if prev_after is not None and next_after == prev_after: |
| | logging.info( |
| | "Repeated 'after' cursor (%s); stopping pagination for %s.", next_after, base_url) |
| | break |
| |
|
| | if pages >= max_pages: |
| | logging.warning( |
| | "Reached max_pages=%d for %s; stopping pagination.", max_pages, base_url) |
| | break |
| |
|
| | prev_after, after = after, next_after |
| |
|
| | time.sleep(0.1) |
| |
|
| | return all_items |
| |
|
| |
|
| | def safe_get_actor(ev: Dict) -> Dict: |
| | """ |
| | Safely extracts the actor information from an audit event. |
| | |
| | This function is robust against the possibility of the actor being None or |
| | not a dictionary. |
| | |
| | :param ev: The audit event to extract the actor from |
| | :return: A dictionary containing the actor's userId and userEmail |
| | """ |
| |
|
| | actor = ev.get("actingUser") |
| | if not isinstance(actor, dict): |
| | actor = {} |
| | return { |
| | "userId": ev.get("userId") or actor.get("userId"), |
| | "userEmail": ev.get("userEmail") or actor.get("userEmail") or actor.get("email"), |
| | } |
| |
|
| |
|
| | def build_login_index(login_events: List[Dict]) -> Dict[Tuple[Optional[str], Optional[str]], List[Dict]]: |
| | """ |
| | Builds an in-memory index of login events by (userId,email) and sorts by loginAt (UTC). |
| | |
| | :param login_events: List of login events to index. |
| | :return: A dictionary containing the indexed login events, where each key is a tuple of (userId,email) |
| | and the value is a sorted list of login events for that key. |
| | """ |
| | idx = defaultdict(list) |
| | for e in login_events: |
| | user_id = e.get("userId") |
| | email = e.get("email") |
| | ts = parse_ts_dt(e.get("loginAt")) |
| | if ts is None: |
| | continue |
| | e["_ts"] = ts |
| | idx[(str(user_id) if user_id is not None else None, email)].append(e) |
| | for k in idx: |
| | idx[k].sort(key=lambda r: r["_ts"]) |
| | return idx |
| |
|
| |
|
| | def build_security_index(security_events: List[Dict]) -> Dict[Tuple[Optional[str], Optional[str]], List[Dict]]: |
| | """ |
| | Builds an in-memory index of security events by (userId,email) and sorts by createdAt (UTC). |
| | |
| | :param security_events: List of security events to index. |
| | :return: A dictionary containing the indexed security events, |
| | where each key is a tuple of (userId,email) and the value is a sorted list of security events for that key. |
| | """ |
| | idx = defaultdict(list) |
| | for e in security_events: |
| | actor = safe_get_actor(e) |
| | user_id = actor.get("userId") |
| | email = actor.get("userEmail") or e.get("actingUser") |
| | ts = parse_ts_dt(e.get("createdAt")) |
| | if ts is None: |
| | continue |
| | e["_ts"] = ts |
| | idx[(str(user_id) if user_id is not None else None, email)].append(e) |
| | for k in idx: |
| | idx[k].sort(key=lambda r: r["_ts"]) |
| | return idx |
| |
|
| |
|
| | def find_best_time_match(ts: Optional[datetime.datetime], |
| | key: Tuple[Optional[str], Optional[str]], |
| | index: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], |
| | window_seconds: int) -> Optional[Dict]: |
| | """ |
| | Find the best match for a given timestamp in the given index within |
| | the given window of seconds. |
| | |
| | :param ts: The timestamp to search for |
| | :param key: The key to search the index for |
| | :param index: The index to search |
| | :param window_seconds: The window of seconds to search in |
| | :return: The best matching event, or None if no match found |
| | """ |
| | if ts is None: |
| | return None |
| | candidates = index.get(key, []) |
| | if not candidates: |
| | return None |
| | best, best_diff = None, float("inf") |
| | for c in candidates: |
| | diff = abs((ts - c["_ts"]).total_seconds()) |
| | if diff <= window_seconds and diff < best_diff: |
| | best, best_diff = c, diff |
| | return best |
| |
|
| |
|
| | def fill_network_fields(row: Dict, src_event: Dict) -> None: |
| | """ |
| | Fill in network fields (ip_address, country_code, region_code) in the given row |
| | from the given src_event if they are missing. |
| | |
| | :param row: The row to fill in network fields for |
| | :param src_event: The source event to draw network fields from |
| | :return: None |
| | """ |
| |
|
| | if not isinstance(src_event, dict): |
| | return |
| |
|
| | ip = _coalesce( |
| | src_event.get("ipAddress"), |
| | src_event.get("sourceIp"), |
| | src_event.get("ip"), |
| | (src_event.get("context") or {}).get("ipAddress"), |
| | ) |
| | country = _coalesce( |
| | src_event.get("countryCode"), |
| | (src_event.get("context") or {}).get("countryCode"), |
| | ) |
| | region = _coalesce( |
| | src_event.get("regionCode"), |
| | (src_event.get("context") or {}).get("regionCode"), |
| | ) |
| |
|
| | if row.get("ip_address") in _MISSING and ip not in _MISSING: |
| | row["ip_address"] = ip |
| | if row.get("country_code") in _MISSING and country not in _MISSING: |
| | row["country_code"] = country |
| | if row.get("region_code") in _MISSING and region not in _MISSING: |
| | row["region_code"] = region |
| |
|
| |
|
| | def normalize_audit_event(ev: Dict) -> Dict: |
| | """ |
| | Normalize an audit event by extracting relevant fields into a standard format. |
| | |
| | The normalized format includes the following fields: |
| | |
| | - audit_id: The unique identifier for the audit event. |
| | - category: The category of the audit event. |
| | - sub_category: The sub-category of the audit event. |
| | - action: The action taken in the audit event. |
| | - target_object_id: The ID of the object targeted in the audit event. |
| | - user_id: The ID of the user who triggered the audit event. |
| | - user_email: The email address of the user who triggered the audit event. |
| | - hubspot_occurred_at: The timestamp of when the audit event occurred in HubSpot. |
| | - ip_address: The IP address associated with the audit event. |
| | - country_code: The country code associated with the audit event. |
| | - region_code: The region code associated with the audit event. |
| | |
| | :param ev: The audit event to normalize. |
| | :return: A dictionary containing the normalized audit event fields. |
| | """ |
| | actor = safe_get_actor(ev) |
| | return {"audit_id": ev.get("id"), |
| | "category": ev.get("category"), |
| | "sub_category": ev.get("subCategory"), |
| | "action": ev.get("action"), |
| | "target_object_id": ev.get("targetObjectId") or ev.get("objectId"), |
| | "user_id": actor.get("userId"), |
| | "user_email": actor.get("userEmail"), |
| | "hubspot_occured_at": ev.get("occurredAt") or ev.get("timestamp"), |
| | "ip_address": ev.get("ipAddress"), |
| | "country_code": ev.get("countryCode"), |
| | "region_code": ev.get("regionCode"), |
| | } |
| |
|
| |
|
| | def enrich_audit_row_by_category(row: Dict, |
| | login_idx: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], |
| | security_idx: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], |
| | match_window_seconds: int = 300) -> Dict: |
| | """ |
| | Enriches an audit event row by looking up the corresponding user's most recent login |
| | or critical action event within a given time window. |
| | |
| | :param row: The audit event row to enrich. |
| | :param login_idx: A dictionary mapping user IDs/emails to lists of login events. |
| | :param security_idx: A dictionary mapping user IDs/emails to lists of critical action events. |
| | :param match_window_seconds: The time window in seconds to search for a matching login |
| | or critical action event. |
| | :return: The enriched audit event row with network fields filled in from the matching event, if found. |
| | """ |
| | cat = (row.get("category") or "").upper() |
| | if cat not in ("LOGIN", "CRITICAL_ACTION"): |
| | return row |
| |
|
| | uid = str(row.get("user_id")) if row.get("user_id") is not None else None |
| | email = row.get("user_email") |
| | key = (uid, email) |
| |
|
| | ts = parse_ts_dt(row.get("hubspot_occured_at")) |
| |
|
| | picked = None |
| | if cat == "LOGIN": |
| | picked = find_best_time_match(ts, key, login_idx, match_window_seconds) |
| | elif cat == "CRITICAL_ACTION": |
| | picked = find_best_time_match( |
| | ts, key, security_idx, match_window_seconds) |
| |
|
| | if picked: |
| | fill_network_fields(row, picked) |
| | |
| | if row.get("user_email") in _MISSING and picked.get("email") not in _MISSING: |
| | row["user_email"] = picked.get("email") |
| | if row.get("user_id") in _MISSING and picked.get("userId") not in _MISSING: |
| | row["user_id"] = picked.get("userId") |
| |
|
| | return row |
| |
|