Spaces:
Paused
Paused
| """ | |
| This script contains utility functions for working with HubSpot data. | |
| """ | |
| import time | |
| import datetime | |
| import re | |
| import httpx | |
| import html | |
| import requests | |
| import logging | |
| from typing import Dict, List, Optional, Tuple | |
| from collections import defaultdict | |
| import pandas as pd | |
| from hubspot.crm.objects import ApiException as ObjectApiException | |
| from hubspot.crm.contacts.exceptions import ApiException as ContactsApiException | |
| from hubspot.crm.companies.exceptions import ApiException as CompaniesApiException | |
| from hubspot.crm.contacts import ( | |
| PublicObjectSearchRequest, Filter, FilterGroup, BatchInputSimplePublicObjectId | |
| ) | |
| from hubspot.crm.companies import ( | |
| PublicObjectSearchRequest as CompanySearchRequest, | |
| Filter as CompanyFilter, | |
| FilterGroup as CompanyFilterGroup | |
| ) | |
| _MISSING = {None, ""} | |
| def serialize(obj): | |
| """ | |
| Recursively serialize a complex object into a plain dictionary. | |
| This function is useful for serializing HubSpot API responses into a form | |
| that can be easily stored in a database or file. | |
| It currently supports the following types: | |
| - Objects with a `to_dict` method | |
| - Lists | |
| - Dictionaries | |
| - Datetime objects | |
| - Objects with a `__dict__` attribute | |
| If an object doesn't match any of the above, it is converted to a string | |
| using the `str` function. | |
| :param obj: The object to serialize | |
| :return: A plain dictionary | |
| """ | |
| if hasattr(obj, "to_dict"): | |
| return serialize(obj.to_dict()) | |
| elif isinstance(obj, list): | |
| return [serialize(item) for item in obj] | |
| elif isinstance(obj, dict): | |
| return {key: serialize(value) for key, value in obj.items()} | |
| elif isinstance(obj, datetime.datetime): | |
| return obj.isoformat() | |
| elif hasattr(obj, "__dict__"): | |
| return {key: serialize(value) for key, value in obj.__dict__.items() if not key.startswith("_")} | |
| else: | |
| return str(obj) | |
| def try_parse_int(value): | |
| """ | |
| Attempts to parse the given value as an integer. | |
| :param value: The value to be parsed. | |
| :return: The integer representation of the value if parsing is successful, | |
| otherwise None. | |
| """ | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (ValueError, TypeError): | |
| return None | |
| def try_parse_float(value): | |
| """ | |
| Attempts to parse the given value as a float. | |
| :param value: The value to be parsed. | |
| :return: The float representation of the value if parsing is successful, | |
| otherwise None. | |
| """ | |
| if value is None: | |
| return None | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| return None | |
| def parse_ts(ts): | |
| """ | |
| Parses a timestamp string in ISO 8601 format and converts it to a UTC datetime string. | |
| This function attempts to parse the given timestamp string, adjusting for the | |
| timezone offset if provided. If the parsing is successful, it returns the | |
| datetime in ISO 8601 format with UTC timezone. If the parsing fails or if | |
| the input is None, it returns the original timestamp or None. | |
| :param ts: The timestamp string to be parsed. | |
| :return: A UTC timezone-aware datetime string in ISO 8601 format, the original | |
| timestamp if parsing fails, or None if the input is None. | |
| """ | |
| if not ts: | |
| return None | |
| try: | |
| if isinstance(ts, (int, float)) or str(ts).isdigit(): | |
| ts = int(ts) | |
| dt = datetime.datetime.fromtimestamp( | |
| ts / 1000.0, tz=datetime.timezone.utc) | |
| else: | |
| dt = datetime.datetime.fromisoformat( | |
| str(ts).replace("Z", "+00:00")) | |
| return dt.isoformat() | |
| except (ValueError, TypeError) as e: | |
| logging.error( | |
| "Failed to parse timestamp '%s': %s", ts, str(e) | |
| ) | |
| return None | |
| def deduplicate_by_key(rows, key): | |
| """ | |
| Removes duplicate rows from a list of dictionaries based on a specified key or tuple of keys. | |
| :param rows: The list of dictionaries to deduplicate. | |
| :param key: A string or tuple of strings representing the key(s) to deduplicate by. | |
| If a string, the value of the dictionary at that key will be used. | |
| If a tuple, the values of the dictionary at each key in the tuple | |
| will be combined into a tuple and used as the deduplication key. | |
| :return: A list of unique dictionaries, with duplicates removed based on the key(s) specified. | |
| """ | |
| seen = set() | |
| unique_rows = [] | |
| for row in rows: | |
| val = tuple(row[k] for k in key) if isinstance( | |
| key, tuple) else row.get(key) | |
| if val and val not in seen: | |
| seen.add(val) | |
| unique_rows.append(row) | |
| return unique_rows | |
| def fix_surrogates(text: str) -> str: | |
| """ | |
| Fix surrogates in a string. | |
| Some emails may contain surrogates, which are invalid utf-8 characters. | |
| This function will convert such characters to valid utf-8 characters. | |
| :param text: The string to fix. | |
| :return: The fixed string. | |
| """ | |
| return text.encode('utf-16', 'surrogatepass').decode('utf-16') | |
| def strip_footer(text: str) -> str: | |
| """ | |
| Strips common email footers like confidentiality notices and signatures. | |
| """ | |
| footer_cutoff_patterns = [ | |
| r"(?i)Registration Number \\d+[A-Z]?", | |
| r"(?i)this e[- ]?mail message contains confidential information", | |
| r"(?i)this email and any attachments.*?confidential", | |
| r"(?i)if you are not the intended recipient", | |
| r"(?i)please notify us immediately by return e[- ]?mail", | |
| r"(?i)no liability is accepted for any damage", | |
| r"(?i)this message is intended only for the use of the individual", | |
| r"(?i)confidentiality notice", | |
| r"(?i)please consider the environment before printing", | |
| r"(?i)registered in England and Wales", | |
| r"(?i)the views expressed in this email are those of the sender", | |
| r"(?i)accepts no liability", | |
| r"(?i)has taken steps to ensure", | |
| ] | |
| for pattern in footer_cutoff_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return text[:match.start()].strip() | |
| return text | |
| def clean_text(raw_text: str) -> str: | |
| """ | |
| Clean an email text by removing HTML, unescaping entities, and trimming | |
| quoted sections, footers, and signatures. | |
| """ | |
| # Defensive defaults | |
| if raw_text is None: | |
| raw_text = "" | |
| # Optional helper fallbacks | |
| try: | |
| fixed = fix_surrogates(raw_text) | |
| except NameError: | |
| # If fix_surrogates isn't available, just pass through | |
| fixed = raw_text | |
| # Start from the corrected source | |
| text = fixed | |
| # Normalize Windows/Mac line endings early (before HTML handling) | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| # Normalize common HTML line breaks to real newlines before removing tags | |
| text = re.sub(r"(?i)<br\s*/?>", "\n", text) | |
| # Strip HTML tags | |
| text = re.sub(r"<[^>]+>", "", text) | |
| # Unescape HTML entities | |
| text = html.unescape(text) | |
| # --- Trim quoted / forwarded blocks heuristics --- | |
| on_wrote_pat = re.compile(r"(?im)^[>\s]*on\s+.+?wrote:") | |
| fwd_hdr_pat = re.compile( | |
| r"(?im)^[>\s]*(?:from|sent|date|subject|to)\s*:\s.+$") | |
| sep_pat = re.compile( | |
| r"(?im)^(?:[>\s]*-----\s*original message\s*-----|[>\s]*begin forwarded message)") | |
| cut_idx = None | |
| for m in (on_wrote_pat.search(text), fwd_hdr_pat.search(text), sep_pat.search(text)): | |
| if m: | |
| idx = m.start() | |
| cut_idx = idx if cut_idx is None or idx < cut_idx else cut_idx | |
| if cut_idx is not None: | |
| text = text[:cut_idx].rstrip("\n") | |
| # --- Line-by-line cleanup, footer & signature handling --- | |
| lines = text.split("\n") | |
| cleaned = [] | |
| footer_regex = re.compile( | |
| r"(?i)\b(confidential|privacy policy|unsubscribe|follow us|visit our website|" | |
| r"please consider the environment|registered office|copyright|" | |
| r"this e-?mail and any attachments|do not print this email)\b" | |
| ) | |
| sig_sep = re.compile(r"^--\s?$") # standard sig delimiter | |
| for line in lines: | |
| stripped = line.strip() | |
| # Skip pure separators | |
| if re.match(r"^[-=_*]{3,}\s*$", stripped): | |
| continue | |
| # Skip common footer lines | |
| if footer_regex.search(stripped): | |
| continue | |
| # Stop including anything after a signature separator | |
| if sig_sep.match(stripped): | |
| break | |
| # Keep empty lines (collapse later) | |
| if stripped == "": | |
| cleaned.append("") | |
| continue | |
| cleaned.append(stripped) | |
| out = "\n".join(cleaned) | |
| out = re.sub(r"\n{3,}", "\n\n", out).strip() | |
| # Optional final footer stripping | |
| try: | |
| out = strip_footer(out) | |
| except NameError: | |
| pass | |
| return out | |
| def get_search_config(hubspot_client, object_type): | |
| """ | |
| Retrieves the necessary classes and search API instance for the given HubSpot object type. | |
| :param hubspot_client: The HubSpot client object. | |
| :param object_type: The type of object to search for. | |
| :return: A dictionary containing the necessary classes and search API instance. | |
| :raises ValueError: If the object_type is unsupported. | |
| """ | |
| if object_type == "contacts": | |
| return { | |
| "FilterCls": Filter, | |
| "FilterGroupCls": FilterGroup, | |
| "SearchRequestCls": PublicObjectSearchRequest, | |
| "search_api": hubspot_client.crm.contacts.search_api, | |
| "modified_prop": "createdate", | |
| } | |
| if object_type == "companies": | |
| return { | |
| "FilterCls": CompanyFilter, | |
| "FilterGroupCls": CompanyFilterGroup, | |
| "SearchRequestCls": CompanySearchRequest, | |
| "search_api": hubspot_client.crm.companies.search_api, | |
| "modified_prop": "createdate", | |
| } | |
| raise ValueError(f"Unsupported object_type '{object_type}'") | |
| def get_property_label_mapping(hubspot_client, object_type: str, property_name: str) -> dict: | |
| """ | |
| Retrieves the label mapping for a HubSpot property. | |
| :param hubspot_client: The HubSpot client instance. | |
| :param object_type: "contacts" or "companies" | |
| :param property_name: The internal name of the property (e.g., "industry"). | |
| :return: Dictionary mapping internal values to human-readable labels. | |
| """ | |
| try: | |
| prop_info = hubspot_client.crm.properties.core_api.get_by_name( | |
| object_type, property_name) | |
| return {opt.value: opt.label for opt in prop_info.options} | |
| except Exception as e: | |
| logging.warning("Failed to fetch mapping for %s: %s", | |
| property_name, str(e)) | |
| return {} | |
| def build_filter_request(since, after): | |
| """ | |
| Builds a HubSpot SearchRequest to fetch companies created after a given datetime. | |
| :param since: The datetime to filter by createdate | |
| :param after: Optional result offset to use for pagination | |
| :return: PublicObjectSearchRequest | |
| """ | |
| value = int(since.timestamp() * 1000) | |
| filter_obj = Filter(property_name="createdate", | |
| operator="GTE", value=value) | |
| filter_group = FilterGroup(filters=[filter_obj]) | |
| return PublicObjectSearchRequest( | |
| filter_groups=[filter_group], | |
| properties=[ | |
| "full_name", | |
| "firstname", | |
| "lastname", | |
| "email", | |
| "phone", | |
| "createdate", | |
| "lastmodifieddate", | |
| "lastactivitydate", | |
| "associatedcompanyid", | |
| ], | |
| limit=100, | |
| after=after | |
| ) | |
| def fetch_custom_object(hubspot_client, object_name="tickets", properties=None, | |
| associations=None): | |
| """ | |
| Fetches all records of a custom HubSpot object and returns them as a list of dictionaries. | |
| :param hubspot_client: The HubSpot client object. | |
| :param object_name: The name of the custom object to fetch. Defaults to "tickets". | |
| :param properties: A list of properties to include in the response. If None, all properties are included. | |
| :param associations: A list of associations to include in the response. If None, no associations are included. | |
| :return: A list of dictionaries, where each dictionary represents a record of the custom object. | |
| :raises ObjectApiException: If the HubSpot API returns an error when fetching the custom object. | |
| """ | |
| all_objects = [] | |
| after = None | |
| while True: | |
| try: | |
| response = hubspot_client.crm.objects.basic_api.get_page( | |
| object_type=object_name, | |
| properties=properties, | |
| limit=100, | |
| after=after, | |
| archived=False, | |
| associations=associations | |
| ) | |
| if not response.results: | |
| break | |
| for record in response.results: | |
| props = record.properties | |
| object_id = record.id | |
| # exclude hs_object_id and optionally filter properties | |
| if properties: | |
| filtered_props = {k: props.get( | |
| k) for k in properties if k != "hs_object_id"} | |
| else: | |
| filtered_props = { | |
| k: v for k, v in props.items() if k != "hs_object_id"} | |
| object_dict = {"billing_id": object_id, **filtered_props} | |
| # cast ints if present | |
| for key in ["hs_created_by_user_id"]: | |
| if key in object_dict: | |
| object_dict[key] = try_parse_int(object_dict[key]) | |
| # parse timestamps if present | |
| for key in ["hs_createdate", "hs_lastmodifieddate"]: | |
| if key in object_dict: | |
| object_dict[key] = parse_ts(object_dict[key]) | |
| # include associations if requested | |
| if associations and record.associations: | |
| assoc_data = {} | |
| for assoc_type, assoc_records in record.associations.items(): | |
| assoc_data[assoc_type] = [ | |
| ar.id for ar in assoc_records.results] | |
| object_dict["associations"] = assoc_data | |
| all_objects.append(object_dict) | |
| if response.paging and response.paging.next: | |
| after = response.paging.next.after | |
| else: | |
| break | |
| time.sleep(0.1) | |
| except ObjectApiException as e: | |
| logging.error("Exception when fetching %s: %s", object_name, e) | |
| break | |
| return all_objects | |
| def fetch_total_objects( | |
| hubspot_client, | |
| object_type: str, | |
| modified_after: datetime.datetime = None, | |
| archived: bool = False | |
| ) -> int: | |
| """ | |
| Fetches the total number of HubSpot objects of the given type. | |
| Supports counting archived objects using pagination. | |
| :param hubspot_client: HubSpot client | |
| :param object_type: "contacts" or "companies" | |
| :param modified_after: Only used for non-archived search_api queries | |
| :param archived: Whether to count archived records | |
| :return: Total number of matching objects | |
| """ | |
| try: | |
| if archived: | |
| total = 0 | |
| after = None | |
| while True: | |
| if object_type == "contacts": | |
| response = hubspot_client.crm.contacts.basic_api.get_page( | |
| limit=100, archived=True, after=after | |
| ) | |
| elif object_type == "companies": | |
| response = hubspot_client.crm.companies.basic_api.get_page( | |
| limit=100, archived=True, after=after | |
| ) | |
| else: | |
| raise ValueError( | |
| f"Unsupported object_type '{object_type}' for archived=True") | |
| total += len(response.results) | |
| if response.paging and response.paging.next: | |
| after = response.paging.next.after | |
| else: | |
| break | |
| time.sleep(0.1) | |
| logging.info("Total %s (archived): %d", object_type, total) | |
| print(f"Total {object_type} (archived): {total}") | |
| return total | |
| # Non-archived path via search API | |
| config = get_search_config(hubspot_client, object_type) | |
| filters = [] | |
| if modified_after: | |
| value = int(modified_after.timestamp() * 1000) | |
| filters.append(config["FilterCls"]( | |
| property_name=config["modified_prop"], operator="GTE", value=value | |
| )) | |
| request_body = config["SearchRequestCls"]( | |
| filter_groups=[config["FilterGroupCls"]( | |
| filters=filters)] if filters else None, | |
| properties=["id"], | |
| limit=1 | |
| ) | |
| response = config["search_api"].do_search(request_body) | |
| total = response.total | |
| logging.info("Total %s in HubSpot: %d", object_type, total) | |
| print(f"Total {object_type} in HubSpot: {total}") | |
| return total | |
| except (ContactsApiException, CompaniesApiException) as api_err: | |
| logging.error("API error occurred while fetching %s: %s", | |
| object_type, str(api_err)) | |
| except httpx.HTTPError as http_err: | |
| logging.error("HTTP error occurred while fetching %s: %s", | |
| object_type, str(http_err)) | |
| except ValueError as val_err: | |
| logging.error("ValueError while fetching %s: %s", | |
| object_type, str(val_err)) | |
| except BaseException as critical_err: | |
| logging.critical("Unexpected error fetching %s: %s", | |
| object_type, str(critical_err)) | |
| raise | |
| return 0 | |
| def _coalesce(*vals): | |
| """ | |
| Returns the first non-missing value from the given arguments. | |
| A value is considered "missing" if it is None or an empty string. | |
| :param vals: Variable number of arguments to coalesce. | |
| :return: The first non-missing value, or None if all values are missing. | |
| """ | |
| for v in vals: | |
| if v not in _MISSING: | |
| return v | |
| return None | |
| def parse_ts_dt(ts: Optional[str]) -> Optional[datetime.datetime]: | |
| """ | |
| Parses a timestamp string in ISO 8601 format and converts it to a UTC datetime object. | |
| :param ts: The timestamp string to be parsed. | |
| :return: A UTC timezone-aware datetime object, or None if the input is None or parsing fails. | |
| """ | |
| if not ts: | |
| return None | |
| try: | |
| return datetime.datetime.fromisoformat(str(ts).replace("Z", "+00:00")).astimezone(datetime.timezone.utc) | |
| except Exception: | |
| return None | |
| def to_epoch_ms_from_utc_iso(date_str: str) -> int: | |
| """ | |
| Converts a date string in ISO 8601 format or 'YYYY-MM-DD' format to milliseconds since the Unix epoch (January 1, 1970, 00:00:00 UTC). | |
| :param date_str: The date string to be converted. | |
| :return: The number of milliseconds since the Unix epoch. | |
| """ | |
| if not date_str: | |
| raise ValueError("date_str is required") | |
| if len(date_str) == 10 and date_str[4] == "-" and date_str[7] == "-": | |
| # YYYY-MM-DD -> midnight UTC | |
| dt = datetime.datetime.strptime( | |
| date_str, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) | |
| else: | |
| dt = datetime.datetime.fromisoformat(date_str.replace("Z", "+00:00")) | |
| return int(dt.timestamp() * 1000) | |
| def request_with_retry(method: str, | |
| url: str, | |
| headers: Dict[str, str], | |
| params: Dict[str, str], | |
| initial_backoff: float = 1.0, | |
| max_backoff: float = 16.0, | |
| retries: int = 8) -> Dict: | |
| """ | |
| Sends a request to the given URL with the given method, headers, and parameters, and | |
| retries the request up to the given number of times if it fails with a HubSpot API error | |
| (429, 500, 502, 503, 504). If the request fails with another error code, it raises a | |
| RuntimeError. If all retries fail, it raises a RuntimeError with a message indicating the | |
| number of retry attempts. | |
| :param method: The HTTP method to use (e.g. GET, POST). | |
| :param url: The URL to send the request to. | |
| :param headers: A dictionary of HTTP headers to include in the request. | |
| :param params: A dictionary of URL parameters to include in the request. | |
| :param initial_backoff: The initial backoff time in seconds. | |
| :param max_backoff: The maximum backoff time in seconds. | |
| :param retries: The number of times to retry the request before giving up. | |
| :return: The JSON response from the request, decoded as a dictionary. | |
| :raises RuntimeError: If all retry attempts fail. | |
| """ | |
| backoff = initial_backoff | |
| for attempt in range(1, retries + 1): | |
| try: | |
| resp = requests.request( | |
| method, url, headers=headers, params=params, timeout=60) | |
| if resp.status_code < 400: | |
| try: | |
| return resp.json() | |
| except Exception as e: | |
| logging.error( | |
| "Failed to decode JSON response from %s: %s", url, e) | |
| raise | |
| # HubSpot rate limits or transient server error | |
| if resp.status_code in (429, 500, 502, 503, 504): | |
| logging.warning( | |
| "HubSpot API %s (%d). Retrying in %.1fs (attempt %d/%d)...", | |
| url, resp.status_code, backoff, attempt, retries | |
| ) | |
| time.sleep(backoff) | |
| backoff = min(max_backoff, backoff * 2) | |
| continue | |
| # Permanent error | |
| logging.error("HubSpot API error %s: %s", | |
| resp.status_code, resp.text) | |
| resp.raise_for_status() | |
| except (requests.exceptions.ConnectionError, | |
| requests.exceptions.Timeout) as e: | |
| logging.warning( | |
| "Network error on attempt %d/%d (%s). Retrying in %.1fs...", | |
| attempt, retries, str(e), backoff | |
| ) | |
| time.sleep(backoff) | |
| backoff = min(max_backoff, backoff * 2) | |
| continue | |
| except Exception as e: | |
| logging.error("Unexpected error during HubSpot request: %s", e) | |
| raise | |
| # If we exhausted retries | |
| raise RuntimeError(f"Failed after {retries} attempts calling {url}") | |
| def page_account_activity(base_url: str, | |
| token: str, | |
| occurred_after_ms: int, | |
| occurred_before_ms: int, | |
| limit: int = 200, | |
| max_pages: int = 1000) -> List[Dict]: | |
| """ | |
| Fetches all account activity events from the given base URL that occurred | |
| after the given timestamp in milliseconds. | |
| :param base_url: The base URL to fetch events from. | |
| :param token: The HubSpot API token to use for authentication. | |
| :param occurred_after_ms: The timestamp in milliseconds to fetch events after. | |
| :param limit: The maximum number of events to fetch per page (default=200). | |
| :param max_pages: The maximum number of pages to fetch before stopping pagination | |
| (default=1000). | |
| :return: A list of all fetched events. | |
| """ | |
| headers = {"Authorization": f"Bearer {token}", | |
| "Accept": "application/json"} | |
| is_security = "activity/security" in base_url | |
| is_audit = "activity/audit-logs" in base_url | |
| params: Dict[str, str] = {"limit": str(limit)} | |
| if is_audit: | |
| params["occurredAfter"] = str(occurred_after_ms) | |
| if occurred_before_ms is not None: | |
| params["occurredBefore"] = str(occurred_before_ms) | |
| elif is_security: | |
| params["fromTimestamp"] = str(occurred_after_ms) | |
| if occurred_before_ms is not None: | |
| params["toTimestamp"] = str(occurred_before_ms) | |
| all_items: List[Dict] = [] | |
| after: Optional[str] = None | |
| prev_after: Optional[str] = None | |
| pages = 0 | |
| # seen on /security when exhausted | |
| TERMINAL_AFTER_VALUES = {"MC0w", "0-0", ""} | |
| while True: | |
| if after: | |
| params["after"] = after | |
| else: | |
| params.pop("after", None) | |
| data = request_with_retry("GET", base_url, headers, params) | |
| results = data.get("results") or data.get("events") or [] | |
| if not isinstance(results, list): | |
| logging.warning("Unexpected results shape at %s: %s", | |
| base_url, type(results)) | |
| results = [] | |
| all_items.extend(results) | |
| paging = data.get("paging") or {} | |
| next_obj = paging.get("next") or {} | |
| next_after = next_obj.get("after") | |
| pages += 1 | |
| logging.info("Fetched page %d from %s: %d items (total=%d). after=%s", | |
| pages, base_url, len(results), len(all_items), str(next_after)) | |
| if len(results) == 0: | |
| logging.info( | |
| "No results returned; stopping pagination for %s.", base_url) | |
| break | |
| if not next_after: | |
| logging.info( | |
| "No next cursor; stopping pagination for %s.", base_url) | |
| break | |
| if next_after in TERMINAL_AFTER_VALUES: | |
| logging.info( | |
| "Terminal 'after' (%s); stopping pagination for %s.", next_after, base_url) | |
| break | |
| if prev_after is not None and next_after == prev_after: | |
| logging.info( | |
| "Repeated 'after' cursor (%s); stopping pagination for %s.", next_after, base_url) | |
| break | |
| if pages >= max_pages: | |
| logging.warning( | |
| "Reached max_pages=%d for %s; stopping pagination.", max_pages, base_url) | |
| break | |
| prev_after, after = after, next_after | |
| time.sleep(0.1) | |
| return all_items | |
| def safe_get_actor(ev: Dict) -> Dict: | |
| """ | |
| Safely extracts the actor information from an audit event. | |
| This function is robust against the possibility of the actor being None or | |
| not a dictionary. | |
| :param ev: The audit event to extract the actor from | |
| :return: A dictionary containing the actor's userId and userEmail | |
| """ | |
| actor = ev.get("actingUser") | |
| if not isinstance(actor, dict): | |
| actor = {} | |
| return { | |
| "userId": ev.get("userId") or actor.get("userId"), | |
| "userEmail": ev.get("userEmail") or actor.get("userEmail") or actor.get("email"), | |
| } | |
| def build_login_index(login_events: List[Dict]) -> Dict[Tuple[Optional[str], Optional[str]], List[Dict]]: | |
| """ | |
| Builds an in-memory index of login events by (userId,email) and sorts by loginAt (UTC). | |
| :param login_events: List of login events to index. | |
| :return: A dictionary containing the indexed login events, where each key is a tuple of (userId,email) | |
| and the value is a sorted list of login events for that key. | |
| """ | |
| idx = defaultdict(list) | |
| for e in login_events: | |
| user_id = e.get("userId") | |
| email = e.get("email") | |
| ts = parse_ts_dt(e.get("loginAt")) | |
| if ts is None: | |
| continue | |
| e["_ts"] = ts | |
| idx[(str(user_id) if user_id is not None else None, email)].append(e) | |
| for k in idx: | |
| idx[k].sort(key=lambda r: r["_ts"]) | |
| return idx | |
| def build_security_index(security_events: List[Dict]) -> Dict[Tuple[Optional[str], Optional[str]], List[Dict]]: | |
| """ | |
| Builds an in-memory index of security events by (userId,email) and sorts by createdAt (UTC). | |
| :param security_events: List of security events to index. | |
| :return: A dictionary containing the indexed security events, | |
| where each key is a tuple of (userId,email) and the value is a sorted list of security events for that key. | |
| """ | |
| idx = defaultdict(list) | |
| for e in security_events: | |
| actor = safe_get_actor(e) | |
| user_id = actor.get("userId") | |
| email = actor.get("userEmail") or e.get("actingUser") | |
| ts = parse_ts_dt(e.get("createdAt")) | |
| if ts is None: | |
| continue | |
| e["_ts"] = ts | |
| idx[(str(user_id) if user_id is not None else None, email)].append(e) | |
| for k in idx: | |
| idx[k].sort(key=lambda r: r["_ts"]) | |
| return idx | |
| def find_best_time_match(ts: Optional[datetime.datetime], | |
| key: Tuple[Optional[str], Optional[str]], | |
| index: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], | |
| window_seconds: int) -> Optional[Dict]: | |
| """ | |
| Find the best match for a given timestamp in the given index within | |
| the given window of seconds. | |
| :param ts: The timestamp to search for | |
| :param key: The key to search the index for | |
| :param index: The index to search | |
| :param window_seconds: The window of seconds to search in | |
| :return: The best matching event, or None if no match found | |
| """ | |
| if ts is None: | |
| return None | |
| candidates = index.get(key, []) | |
| if not candidates: | |
| return None | |
| best, best_diff = None, float("inf") | |
| for c in candidates: | |
| diff = abs((ts - c["_ts"]).total_seconds()) | |
| if diff <= window_seconds and diff < best_diff: | |
| best, best_diff = c, diff | |
| return best | |
| def fill_network_fields(row: Dict, src_event: Dict) -> None: | |
| """ | |
| Fill in network fields (ip_address, country_code, region_code) in the given row | |
| from the given src_event if they are missing. | |
| :param row: The row to fill in network fields for | |
| :param src_event: The source event to draw network fields from | |
| :return: None | |
| """ | |
| if not isinstance(src_event, dict): | |
| return | |
| ip = _coalesce( | |
| src_event.get("ipAddress"), | |
| src_event.get("sourceIp"), | |
| src_event.get("ip"), | |
| (src_event.get("context") or {}).get("ipAddress"), | |
| ) | |
| country = _coalesce( | |
| src_event.get("countryCode"), | |
| (src_event.get("context") or {}).get("countryCode"), | |
| ) | |
| region = _coalesce( | |
| src_event.get("regionCode"), | |
| (src_event.get("context") or {}).get("regionCode"), | |
| ) | |
| if row.get("ip_address") in _MISSING and ip not in _MISSING: | |
| row["ip_address"] = ip | |
| if row.get("country_code") in _MISSING and country not in _MISSING: | |
| row["country_code"] = country | |
| if row.get("region_code") in _MISSING and region not in _MISSING: | |
| row["region_code"] = region | |
| def normalize_audit_event(ev: Dict) -> Dict: | |
| """ | |
| Normalize an audit event by extracting relevant fields into a standard format. | |
| The normalized format includes the following fields: | |
| - audit_id: The unique identifier for the audit event. | |
| - category: The category of the audit event. | |
| - sub_category: The sub-category of the audit event. | |
| - action: The action taken in the audit event. | |
| - target_object_id: The ID of the object targeted in the audit event. | |
| - user_id: The ID of the user who triggered the audit event. | |
| - user_email: The email address of the user who triggered the audit event. | |
| - hubspot_occurred_at: The timestamp of when the audit event occurred in HubSpot. | |
| - ip_address: The IP address associated with the audit event. | |
| - country_code: The country code associated with the audit event. | |
| - region_code: The region code associated with the audit event. | |
| :param ev: The audit event to normalize. | |
| :return: A dictionary containing the normalized audit event fields. | |
| """ | |
| actor = safe_get_actor(ev) | |
| return {"audit_id": ev.get("id"), | |
| "category": ev.get("category"), | |
| "sub_category": ev.get("subCategory"), | |
| "action": ev.get("action"), | |
| "target_object_id": ev.get("targetObjectId") or ev.get("objectId"), | |
| "user_id": actor.get("userId"), | |
| "user_email": actor.get("userEmail"), | |
| "hubspot_occured_at": ev.get("occurredAt") or ev.get("timestamp"), | |
| "ip_address": ev.get("ipAddress"), | |
| "country_code": ev.get("countryCode"), | |
| "region_code": ev.get("regionCode"), | |
| } | |
| def enrich_audit_row_by_category(row: Dict, | |
| login_idx: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], | |
| security_idx: Dict[Tuple[Optional[str], Optional[str]], List[Dict]], | |
| match_window_seconds: int = 300) -> Dict: | |
| """ | |
| Enriches an audit event row by looking up the corresponding user's most recent login | |
| or critical action event within a given time window. | |
| :param row: The audit event row to enrich. | |
| :param login_idx: A dictionary mapping user IDs/emails to lists of login events. | |
| :param security_idx: A dictionary mapping user IDs/emails to lists of critical action events. | |
| :param match_window_seconds: The time window in seconds to search for a matching login | |
| or critical action event. | |
| :return: The enriched audit event row with network fields filled in from the matching event, if found. | |
| """ | |
| cat = (row.get("category") or "").upper() | |
| if cat not in ("LOGIN", "CRITICAL_ACTION"): | |
| return row | |
| uid = str(row.get("user_id")) if row.get("user_id") is not None else None | |
| email = row.get("user_email") | |
| key = (uid, email) | |
| ts = parse_ts_dt(row.get("hubspot_occured_at")) | |
| picked = None | |
| if cat == "LOGIN": | |
| picked = find_best_time_match(ts, key, login_idx, match_window_seconds) | |
| elif cat == "CRITICAL_ACTION": | |
| picked = find_best_time_match( | |
| ts, key, security_idx, match_window_seconds) | |
| if picked: | |
| fill_network_fields(row, picked) | |
| # Optional backfill of user fields | |
| if row.get("user_email") in _MISSING and picked.get("email") not in _MISSING: | |
| row["user_email"] = picked.get("email") | |
| if row.get("user_id") in _MISSING and picked.get("userId") not in _MISSING: | |
| row["user_id"] = picked.get("userId") | |
| return row | |