""" HubSpot Companies → Supabase (incremental since a millisecond cursor) Usage from orchestrator: import hubspot_companies hubspot_companies.main(since_ms=) Direct CLI: # epoch ms python hubspot_companies.py 1754025600000 # ISO-8601 python hubspot_companies.py 2025-08-01T09:30:00Z # Back-compat date (floors to 00:00Z) python hubspot_companies.py 2025-08-01 """ import os import re import time import logging import datetime from typing import List, Dict, Optional, Tuple, Union import httpx import hubspot from dotenv import load_dotenv from supabase import create_client from hubspot.crm.companies import ApiException as CompaniesApiException from hubspot_utils import ( try_parse_int, parse_ts, get_property_label_mapping, ) from supabase_utils import ( update_sync_metadata, enrich_supabase_row, batched_insert, fetch_supabase_table, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logging.basicConfig( filename=f"logs/hubspot_company_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # ----------------------------------------------------------------------------- # Environment # ----------------------------------------------------------------------------- load_dotenv() HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Optional bootstrap cursor if orchestrator doesn't provide one BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_COMPANIES_SINCE_MS") if not HUBSPOT_TOKEN: raise RuntimeError("HUBSPOT_TOKEN is not set") if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: raise RuntimeError("Supabase env vars are not set") hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) # ----------------------------------------------------------------------------- # Config # ----------------------------------------------------------------------------- COMPANY_PROPERTIES = [ "name", "city", "company_email", "address", "address2", "domain", "number_of_active__cli_s", "number_of__mobile___cloned_", "createdate", "hs_lastmodifieddate", "lastmodifieddate", "review_date_updated_by___clone", "industry", "macro_industry_grouping", "closest_review_date___clone", "region", "industry_grouping", "source_1", "source_2", "hs_object_source_label", "hs_analytics_source", "hs_analytics_latest_source", "hs_object_source_detail_1", ] # ----------------------------------------------------------------------------- # Time helpers # ----------------------------------------------------------------------------- def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: dt = _ensure_utc(dt) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _parse_iso_like_to_dt(value: str) -> datetime.datetime: if isinstance(value, str) and value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) return _ensure_utc(dt) def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: if isinstance(dt_or_str, str): dt = _parse_iso_like_to_dt(dt_or_str) elif isinstance(dt_or_str, datetime.datetime): dt = _ensure_utc(dt_or_str) else: raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") return int(dt.timestamp() * 1000) def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: if value is None: return None try: v = int(str(value)) if v < 10_000_000_000_000: # seconds → ms v *= 1000 return v except ValueError: pass try: return to_epoch_ms(str(value)) except Exception: logging.warning("Could not parse timestamp value=%r", value) return None # ----------------------------------------------------------------------------- # Search IDs (ts > since_ms) with property fallback # ----------------------------------------------------------------------------- def _search_company_ids_from(since_ms: int, prop: str) -> List[str]: """ Search companies where {prop} > since_ms (epoch-ms). Sort ascending so we can advance the cursor monotonically. """ url = "https://api.hubapi.com/crm/v3/objects/companies/search" headers = { "Authorization": f"Bearer {HUBSPOT_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", } payload = { "filterGroups": [{ "filters": [ {"propertyName": prop, "operator": "GT", "value": str(since_ms)}, ] }], "limit": 100, "sorts": [{"propertyName": prop, "direction": "ASCENDING"}], } ids: List[str] = [] after: Optional[str] = None with httpx.Client(timeout=30.0) as client: while True: body = dict(payload) if after: body["after"] = after resp = client.post(url, headers=headers, json=body) if resp.status_code >= 400: try: logging.error( "Company search error for prop '%s': %s", prop, resp.json()) except Exception: logging.error( "Company search error for prop '%s': %s", prop, resp.text) resp.raise_for_status() data = resp.json() ids.extend([obj["id"] for obj in data.get("results", []) or []]) after = (data.get("paging") or {}).get("next", {}).get("after") if not after: break time.sleep(0.1) return ids def search_company_ids_after_ms(since_ms: int) -> Tuple[List[str], str]: """ Try these properties in order; return (ids, prop_used) for the first successful search: 1) hs_lastmodifieddate 2) lastmodifieddate 3) createdate """ props_to_try = ["createdate"] last_err = None for prop in props_to_try: try: ids = _search_company_ids_from(since_ms, prop) logging.info( "Company search with '%s' returned %d IDs.", prop, len(ids)) return ids, prop except httpx.HTTPStatusError as e: last_err = e continue if last_err: raise last_err return [], "hs_lastmodifieddate" # ----------------------------------------------------------------------------- # Read-by-ID (with associations) → enrich & track max cursor ts # ----------------------------------------------------------------------------- def _enrich_company_data_from_record( record, industry_map: Dict[str, str], macro_industry_map: Dict[str, str], region_map: Dict[str, str], ) -> Dict: props = record.properties or {} company_data: Dict[str, Optional[str]] = {"id": record.id} for p in COMPANY_PROPERTIES: company_data[p] = props.get(p) # Association counts num_contacts = 0 num_deals = 0 if getattr(record, "associations", None): if record.associations.get("contacts") and getattr(record.associations["contacts"], "results", None): num_contacts = len( {a.id for a in record.associations["contacts"].results if getattr(a, "id", None)}) if record.associations.get("deals") and getattr(record.associations["deals"], "results", None): num_deals = len( {a.id for a in record.associations["deals"].results if getattr(a, "id", None)}) company_data["number_of_associated_contacts"] = num_contacts company_data["number_of_associated_deals"] = num_deals # Map label properties code = company_data.get("industry") company_data["industry"] = industry_map.get( code) if code in industry_map else None macro = company_data.get("macro_industry_grouping") company_data["macro_industry_grouping"] = macro_industry_map.get( macro) if macro in macro_industry_map else None region = company_data.get("region") company_data["region"] = region_map.get( region) if region in region_map else None return company_data def read_companies_by_ids(company_ids: List[str], cursor_prop: str) -> Tuple[List[Dict], Optional[int]]: if not company_ids: return [], None companies: List[Dict] = [] assoc_types = ["contacts", "deals"] # Fetch label maps once try: industry_map = get_property_label_mapping( hubspot_client, "companies", "industry") except Exception as e: logging.warning("Failed to fetch industry map: %s", e) industry_map = {} try: macro_industry_map = get_property_label_mapping( hubspot_client, "companies", "macro_industry_grouping") except Exception as e: logging.warning("Failed to fetch macro_industry_grouping map: %s", e) macro_industry_map = {} try: region_map = get_property_label_mapping( hubspot_client, "companies", "region") except Exception as e: logging.warning("Failed to fetch region map: %s", e) region_map = {} max_ts_ms: Optional[int] = None for i, cid in enumerate(company_ids, start=1): try: record = hubspot_client.crm.companies.basic_api.get_by_id( company_id=cid, properties=COMPANY_PROPERTIES, associations=assoc_types, archived=False, ) # Track max timestamp for the chosen cursor property cursor_val = (record.properties or {}).get(cursor_prop) ts_ms = parse_any_ts_ms(cursor_val) if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms): max_ts_ms = ts_ms companies.append(_enrich_company_data_from_record( record, industry_map, macro_industry_map, region_map)) if i % 200 == 0: logging.info("Read %d companies...", i) time.sleep(0.05) except httpx.HTTPStatusError as e: logging.error("HTTP error reading company %s: %s", cid, e) except (CompaniesApiException, httpx.HTTPError) as e: logging.error("Error reading company %s: %s", cid, e) return companies, max_ts_ms # ----------------------------------------------------------------------------- # Map → Supabase rows and diff # ----------------------------------------------------------------------------- def map_company_data_for_db(companies: List[Dict]) -> List[Dict]: mapped: List[Dict] = [] for c in companies: base_row = { "company_id": try_parse_int(c["id"]), "company_name": c.get("name"), "company_email": c.get("company_email"), "city": c.get("city"), "domain": c.get("domain"), "street_address": c.get("address"), "street_address2": c.get("address2"), "hubspot_create_date": parse_ts(c.get("createdate")), "hubspot_modified_date": parse_ts(c.get("hs_lastmodifieddate") or c.get("lastmodifieddate")), "review_date_updated_by": c.get("review_date_updated_by___clone") or None, "number_of_active_clis": try_parse_int(c.get("number_of_active__cli_s")), "number_of_clis": try_parse_int(c.get("number_of__mobile___cloned_")), "number_of_associated_contacts": c.get("number_of_associated_contacts", 0), "number_of_associated_deals": c.get("number_of_associated_deals", 0), "industry": c.get("industry"), "macro_industry_grouping": c.get("macro_industry_grouping"), "closest_review_date": parse_ts(c.get("closest_review_date___clone")), "region": c.get("region"), "source_1": c.get("source_1"), "source_2": c.get("source_2"), "record_source": c.get("hs_object_source_label"), "record_source_detail_1": c.get("hs_object_source_detail_1"), "original_traffic_source": c.get("hs_analytics_source"), "latest_traffic_source": c.get("hs_analytics_latest_source"), } mapped.append(enrich_supabase_row(base_row)) return mapped def companies_are_different(new_row: Dict, old_row: Dict) -> bool: compare_keys = [ "company_name", "company_email", "city", "domain", "street_address", "street_address2", "hubspot_modified_date", "review_date_updated_by", "number_of_active_clis", "number_of_clis", "number_of_associated_contacts", "number_of_associated_deals", "industry", "macro_industry_grouping", "closest_review_date", "region", "source_1", "source_2", "record_source", "record_source_detail_1", "original_traffic_source", "latest_traffic_source", ] for key in compare_keys: if str(new_row.get(key)) != str(old_row.get(key)): return True return False # ----------------------------------------------------------------------------- # Upsert # ----------------------------------------------------------------------------- def upsert_companies(companies: List[Dict]) -> None: if not companies: print("No companies to upsert.") return existing = fetch_supabase_table( supabase_client, "hubspot_companies", "company_id") mapped = map_company_data_for_db(companies) rows_to_upsert: List[Dict] = [] for row in mapped: cid = row.get("company_id") if not cid: continue old_row = existing.get(str(cid)) if not old_row or companies_are_different(row, old_row): rows_to_upsert.append(row) print(f"{len(rows_to_upsert)} companies to insert/update (out of {len(companies)} read).") if rows_to_upsert: # upraw_json_to_supabase(supabase_client, rows_to_upsert, object_type="companies") batched_insert(supabase_client, "hubspot_companies", rows_to_upsert, batch_size=1000) # ----------------------------------------------------------------------------- # Main (timestamp cursor) # ----------------------------------------------------------------------------- def main(since_ms: Optional[int] = None): """ Orchestrates: 1) Search company IDs with > since_ms (property fallback) 2) Read full companies (track max timestamp for ) 3) Upsert into Supabase 4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop } """ # Resolve since_ms if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: try: since_ms = int(BOOTSTRAP_SINCE_MS_ENV) except ValueError: raise RuntimeError( "HUBSPOT_COMPANIES_SINCE_MS must be an integer (ms) if set.") if since_ms is None: # Default: today@00:00:00Z for first run today0 = floor_to_utc_midnight( datetime.datetime.now(datetime.timezone.utc)) since_ms = to_epoch_ms(today0) print(f"Searching companies with timestamp > {since_ms} ...") ids, cursor_prop = search_company_ids_after_ms(since_ms) print(f"Search property: {cursor_prop}. Found {len(ids)} company IDs.") now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() if not ids: print("No companies beyond the cursor. Updating sync metadata and exiting.") update_sync_metadata(supabase_client, "companies", now_iso) return print("Reading companies (with associations)...") companies, max_ts_ms = read_companies_by_ids(ids, cursor_prop) print("Upserting into Supabase...") upsert_companies(companies) # Advance cursor to max timestamp we actually ingested for the chosen property new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms update_sync_metadata(supabase_client, "companies", now_iso) print( f"Companies sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.") # ----------------------------------------------------------------------------- # CLI # ----------------------------------------------------------------------------- def _parse_cli_arg_to_ms(arg: str) -> int: """ Accept: - integer epoch ms - ISO-8601 (Z or offset) - YYYY-MM-DD (floors to 00:00Z) """ # epoch ms or seconds if re.fullmatch(r"\d{10,13}", arg): v = int(arg) if v < 10_000_000_000_000: # seconds -> ms v *= 1000 return v # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): d = datetime.datetime.strptime( arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) return to_epoch_ms(floor_to_utc_midnight(d)) # ISO-8601 return to_epoch_ms(arg) if __name__ == "__main__": import sys if len(sys.argv) > 1: try: since = _parse_cli_arg_to_ms(sys.argv[1]) except Exception as e: print( f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" ) sys.exit(1) main(since_ms=since) else: main()