| | """ |
| | HubSpot Companies → Supabase (incremental since a millisecond cursor) |
| | |
| | Usage from orchestrator: |
| | import load_hubspot_companies |
| | load_hubspot_companies.main(since_ms=<int milliseconds since epoch UTC>) |
| | |
| | Direct CLI: |
| | # epoch ms |
| | python load_hubspot_companies.py 1754025600000 |
| | # ISO-8601 |
| | python load_hubspot_companies.py 2025-08-01T09:30:00Z |
| | # Back-compat date (floors to 00:00Z) |
| | python load_hubspot_companies.py 2025-08-01 |
| | """ |
| |
|
| | import os |
| | import re |
| | import time |
| | import logging |
| | import datetime |
| | from typing import List, Dict, Optional, Tuple, Union |
| |
|
| | import httpx |
| | import hubspot |
| | from dotenv import load_dotenv |
| | from supabase import create_client |
| | from hubspot.crm.companies import ApiException as CompaniesApiException |
| |
|
| | from hubspot_utils import ( |
| | try_parse_int, parse_ts, get_property_label_mapping, |
| | ) |
| | from supabase_utils import ( |
| | update_sync_metadata, enrich_supabase_row, upload_raw_json_to_supabase, |
| | batched_insert, fetch_supabase_table, |
| | ) |
| |
|
| | |
| | |
| | |
| | logging.basicConfig( |
| | filename=f"logs/hubspot_company_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", |
| | filemode="a", |
| | level=logging.INFO, |
| | format="%(asctime)s [%(levelname)s] %(message)s", |
| | ) |
| |
|
| | |
| | |
| | |
| | load_dotenv() |
| | HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") |
| | SUPABASE_URL = os.getenv("SUPABASE_URL") |
| | SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| | |
| | BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_COMPANIES_SINCE_MS") |
| |
|
| | if not HUBSPOT_TOKEN: |
| | raise RuntimeError("HUBSPOT_TOKEN is not set") |
| | if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: |
| | raise RuntimeError("Supabase env vars are not set") |
| |
|
| | hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) |
| | supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
| |
|
| | |
| | |
| | |
| | COMPANY_PROPERTIES = [ |
| | "name", |
| | "city", |
| | "company_email", |
| | "address", |
| | "address2", |
| | "domain", |
| | "number_of_active__cli_s", |
| | "number_of__mobile___cloned_", |
| | "createdate", |
| | "hs_lastmodifieddate", |
| | "lastmodifieddate", |
| | "review_date_updated_by___clone", |
| | "industry", |
| | "macro_industry_grouping", |
| | "closest_review_date___clone", |
| | "region", |
| | "industry_grouping", |
| | "source_1", |
| | "source_2", |
| | "hs_object_source_label", |
| | "hs_analytics_source", |
| | "hs_analytics_latest_source", |
| | "hs_object_source_detail_1", |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: |
| | if dt.tzinfo is None: |
| | dt = dt.replace(tzinfo=datetime.timezone.utc) |
| | return dt.astimezone(datetime.timezone.utc) |
| |
|
| |
|
| | def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: |
| | dt = _ensure_utc(dt) |
| | return dt.replace(hour=0, minute=0, second=0, microsecond=0) |
| |
|
| |
|
| | def _parse_iso_like_to_dt(value: str) -> datetime.datetime: |
| | if isinstance(value, str) and value.endswith("Z"): |
| | value = value[:-1] + "+00:00" |
| | dt = datetime.datetime.fromisoformat(value) |
| | return _ensure_utc(dt) |
| |
|
| |
|
| | def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: |
| | if isinstance(dt_or_str, str): |
| | dt = _parse_iso_like_to_dt(dt_or_str) |
| | elif isinstance(dt_or_str, datetime.datetime): |
| | dt = _ensure_utc(dt_or_str) |
| | else: |
| | raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") |
| | return int(dt.timestamp() * 1000) |
| |
|
| |
|
| | def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: |
| | if value is None: |
| | return None |
| | try: |
| | v = int(str(value)) |
| | if v < 10_000_000_000_000: |
| | v *= 1000 |
| | return v |
| | except ValueError: |
| | pass |
| | try: |
| | return to_epoch_ms(str(value)) |
| | except Exception: |
| | logging.warning("Could not parse timestamp value=%r", value) |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _search_company_ids_from(since_ms: int, prop: str) -> List[str]: |
| | """ |
| | Search companies where {prop} > since_ms (epoch-ms). |
| | Sort ascending so we can advance the cursor monotonically. |
| | """ |
| | url = "https://api.hubapi.com/crm/v3/objects/companies/search" |
| | headers = { |
| | "Authorization": f"Bearer {HUBSPOT_TOKEN}", |
| | "Content-Type": "application/json", |
| | "Accept": "application/json", |
| | } |
| | payload = { |
| | "filterGroups": [{ |
| | "filters": [ |
| | {"propertyName": prop, "operator": "GT", |
| | "value": str(since_ms)}, |
| | ] |
| | }], |
| | "limit": 100, |
| | "sorts": [{"propertyName": prop, "direction": "ASCENDING"}], |
| | } |
| |
|
| | ids: List[str] = [] |
| | after: Optional[str] = None |
| | with httpx.Client(timeout=30.0) as client: |
| | while True: |
| | body = dict(payload) |
| | if after: |
| | body["after"] = after |
| |
|
| | resp = client.post(url, headers=headers, json=body) |
| | if resp.status_code >= 400: |
| | try: |
| | logging.error( |
| | "Company search error for prop '%s': %s", prop, resp.json()) |
| | except Exception: |
| | logging.error( |
| | "Company search error for prop '%s': %s", prop, resp.text) |
| | resp.raise_for_status() |
| |
|
| | data = resp.json() |
| | ids.extend([obj["id"] for obj in data.get("results", []) or []]) |
| |
|
| | after = (data.get("paging") or {}).get("next", {}).get("after") |
| | if not after: |
| | break |
| | time.sleep(0.1) |
| |
|
| | return ids |
| |
|
| |
|
| | def search_company_ids_after_ms(since_ms: int) -> Tuple[List[str], str]: |
| | """ |
| | Try these properties in order; return (ids, prop_used) for the first successful search: |
| | 1) hs_lastmodifieddate |
| | 2) lastmodifieddate |
| | 3) createdate |
| | """ |
| | props_to_try = ["createdate"] |
| | last_err = None |
| |
|
| | for prop in props_to_try: |
| | try: |
| | ids = _search_company_ids_from(since_ms, prop) |
| | logging.info( |
| | "Company search with '%s' returned %d IDs.", prop, len(ids)) |
| | return ids, prop |
| | except httpx.HTTPStatusError as e: |
| | last_err = e |
| | continue |
| |
|
| | if last_err: |
| | raise last_err |
| | return [], "hs_lastmodifieddate" |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _enrich_company_data_from_record( |
| | record, |
| | industry_map: Dict[str, str], |
| | macro_industry_map: Dict[str, str], |
| | region_map: Dict[str, str], |
| | ) -> Dict: |
| | props = record.properties or {} |
| | company_data: Dict[str, Optional[str]] = {"id": record.id} |
| | for p in COMPANY_PROPERTIES: |
| | company_data[p] = props.get(p) |
| |
|
| | |
| | num_contacts = 0 |
| | num_deals = 0 |
| | if getattr(record, "associations", None): |
| | if record.associations.get("contacts") and getattr(record.associations["contacts"], "results", None): |
| | num_contacts = len( |
| | {a.id for a in record.associations["contacts"].results if getattr(a, "id", None)}) |
| | if record.associations.get("deals") and getattr(record.associations["deals"], "results", None): |
| | num_deals = len( |
| | {a.id for a in record.associations["deals"].results if getattr(a, "id", None)}) |
| | company_data["number_of_associated_contacts"] = num_contacts |
| | company_data["number_of_associated_deals"] = num_deals |
| |
|
| | |
| | code = company_data.get("industry") |
| | company_data["industry"] = industry_map.get( |
| | code) if code in industry_map else None |
| |
|
| | macro = company_data.get("macro_industry_grouping") |
| | company_data["macro_industry_grouping"] = macro_industry_map.get( |
| | macro) if macro in macro_industry_map else None |
| | |
| | region = company_data.get("region") |
| | company_data["region"] = region_map.get( |
| | region) if region in region_map else None |
| |
|
| | return company_data |
| |
|
| |
|
| | def read_companies_by_ids(company_ids: List[str], cursor_prop: str) -> Tuple[List[Dict], Optional[int]]: |
| | if not company_ids: |
| | return [], None |
| |
|
| | companies: List[Dict] = [] |
| | assoc_types = ["contacts", "deals"] |
| |
|
| | |
| | try: |
| | industry_map = get_property_label_mapping( |
| | hubspot_client, "companies", "industry") |
| | except Exception as e: |
| | logging.warning("Failed to fetch industry map: %s", e) |
| | industry_map = {} |
| |
|
| | try: |
| | macro_industry_map = get_property_label_mapping( |
| | hubspot_client, "companies", "macro_industry_grouping") |
| | except Exception as e: |
| | logging.warning("Failed to fetch macro_industry_grouping map: %s", e) |
| | macro_industry_map = {} |
| |
|
| | try: |
| | region_map = get_property_label_mapping( |
| | hubspot_client, "companies", "region") |
| | except Exception as e: |
| | logging.warning("Failed to fetch region map: %s", e) |
| | region_map = {} |
| |
|
| | max_ts_ms: Optional[int] = None |
| |
|
| | for i, cid in enumerate(company_ids, start=1): |
| | try: |
| | record = hubspot_client.crm.companies.basic_api.get_by_id( |
| | company_id=cid, |
| | properties=COMPANY_PROPERTIES, |
| | associations=assoc_types, |
| | archived=False, |
| | ) |
| |
|
| | |
| | cursor_val = (record.properties or {}).get(cursor_prop) |
| | ts_ms = parse_any_ts_ms(cursor_val) |
| | if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms): |
| | max_ts_ms = ts_ms |
| |
|
| | companies.append(_enrich_company_data_from_record( |
| | record, industry_map, macro_industry_map, region_map)) |
| |
|
| | if i % 200 == 0: |
| | logging.info("Read %d companies...", i) |
| |
|
| | time.sleep(0.05) |
| |
|
| | except httpx.HTTPStatusError as e: |
| | logging.error("HTTP error reading company %s: %s", cid, e) |
| | except (CompaniesApiException, httpx.HTTPError) as e: |
| | logging.error("Error reading company %s: %s", cid, e) |
| |
|
| | return companies, max_ts_ms |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def map_company_data_for_db(companies: List[Dict]) -> List[Dict]: |
| | mapped: List[Dict] = [] |
| | for c in companies: |
| | base_row = { |
| | "company_id": try_parse_int(c["id"]), |
| | "company_name": c.get("name"), |
| | "company_email": c.get("company_email"), |
| | "city": c.get("city"), |
| | "domain": c.get("domain"), |
| | "street_address": c.get("address"), |
| | "street_address2": c.get("address2"), |
| | "hubspot_create_date": parse_ts(c.get("createdate")), |
| | "hubspot_modified_date": parse_ts(c.get("hs_lastmodifieddate") or c.get("lastmodifieddate")), |
| | "review_date_updated_by": c.get("review_date_updated_by___clone") or None, |
| | "number_of_active_clis": try_parse_int(c.get("number_of_active__cli_s")), |
| | "number_of_clis": try_parse_int(c.get("number_of__mobile___cloned_")), |
| | "number_of_associated_contacts": c.get("number_of_associated_contacts", 0), |
| | "number_of_associated_deals": c.get("number_of_associated_deals", 0), |
| | "industry": c.get("industry"), |
| | "macro_industry_grouping": c.get("macro_industry_grouping"), |
| | "closest_review_date": parse_ts(c.get("closest_review_date___clone")), |
| | "region": c.get(c.get("region")), |
| | "closest_review_date": parse_ts(c.get("closest_review_date___clone")), |
| | "source_1": c.get("source_1"), |
| | "source_2": c.get("source_2"), |
| | "record_source": c.get("hs_object_source_label"), |
| | "record_source_detail_1": c.get("hs_object_source_detail_1"), |
| | "original_traffic_source": c.get("hs_analytics_source"), |
| | "latest_traffic_source": c.get("hs_analytics_latest_source"), |
| | } |
| | mapped.append(enrich_supabase_row(base_row)) |
| | return mapped |
| |
|
| |
|
| | def companies_are_different(new_row: Dict, old_row: Dict) -> bool: |
| | compare_keys = [ |
| | "company_name", "company_email", "city", "domain", "street_address", |
| | "street_address2", "hubspot_modified_date", "review_date_updated_by", |
| | "number_of_active_clis", "number_of_clis", |
| | "number_of_associated_contacts", "number_of_associated_deals", |
| | "industry", "macro_industry_grouping", "closest_review_date", |
| | "region", "source_1", "source_2", "record_source", |
| | "record_source_detail_1", "original_traffic_source", |
| | "latest_traffic_source", |
| | ] |
| | for key in compare_keys: |
| | if str(new_row.get(key)) != str(old_row.get(key)): |
| | return True |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def upsert_companies(companies: List[Dict]) -> None: |
| | if not companies: |
| | print("No companies to upsert.") |
| | return |
| |
|
| | existing = fetch_supabase_table( |
| | supabase_client, "hubspot_companies", "company_id") |
| | mapped = map_company_data_for_db(companies) |
| |
|
| | rows_to_upsert: List[Dict] = [] |
| | for row in mapped: |
| | cid = row.get("company_id") |
| | if not cid: |
| | continue |
| | old_row = existing.get(str(cid)) |
| | if not old_row or companies_are_different(row, old_row): |
| | rows_to_upsert.append(row) |
| |
|
| | print(f"{len(rows_to_upsert)} companies to insert/update (out of {len(companies)} read).") |
| |
|
| | if rows_to_upsert: |
| | |
| | batched_insert(supabase_client, "hubspot_companies", |
| | rows_to_upsert, batch_size=1000) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def main(since_ms: Optional[int] = None): |
| | """ |
| | Orchestrates: |
| | 1) Search company IDs with <cursor_prop> > since_ms (property fallback) |
| | 2) Read full companies (track max timestamp for <cursor_prop>) |
| | 3) Upsert into Supabase |
| | 4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop } |
| | """ |
| | |
| | if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: |
| | try: |
| | since_ms = int(BOOTSTRAP_SINCE_MS_ENV) |
| | except ValueError: |
| | raise RuntimeError( |
| | "HUBSPOT_COMPANIES_SINCE_MS must be an integer (ms) if set.") |
| |
|
| | if since_ms is None: |
| | |
| | today0 = floor_to_utc_midnight( |
| | datetime.datetime.now(datetime.timezone.utc)) |
| | since_ms = to_epoch_ms(today0) |
| |
|
| | print(f"Searching companies with timestamp > {since_ms} ...") |
| | ids, cursor_prop = search_company_ids_after_ms(since_ms) |
| | print(f"Search property: {cursor_prop}. Found {len(ids)} company IDs.") |
| |
|
| | now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| |
|
| | if not ids: |
| | print("No companies beyond the cursor. Updating sync metadata and exiting.") |
| | update_sync_metadata(supabase_client, "companies", now_iso) |
| | return |
| |
|
| | print("Reading companies (with associations)...") |
| | companies, max_ts_ms = read_companies_by_ids(ids, cursor_prop) |
| |
|
| | print("Upserting into Supabase...") |
| | upsert_companies(companies) |
| |
|
| | |
| | new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms |
| |
|
| | update_sync_metadata(supabase_client, "companies", now_iso) |
| |
|
| | print( |
| | f"Companies sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.") |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _parse_cli_arg_to_ms(arg: str) -> int: |
| | """ |
| | Accept: |
| | - integer epoch ms |
| | - ISO-8601 (Z or offset) |
| | - YYYY-MM-DD (floors to 00:00Z) |
| | """ |
| | |
| | if re.fullmatch(r"\d{10,13}", arg): |
| | v = int(arg) |
| | if v < 10_000_000_000_000: |
| | v *= 1000 |
| | return v |
| |
|
| | |
| | if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): |
| | d = datetime.datetime.strptime( |
| | arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) |
| | return to_epoch_ms(floor_to_utc_midnight(d)) |
| |
|
| | |
| | return to_epoch_ms(arg) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import sys |
| | if len(sys.argv) > 1: |
| | try: |
| | main(since_ms=_parse_cli_arg_to_ms(sys.argv[1])) |
| | except Exception as e: |
| | print( |
| | f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}") |
| | sys.exit(1) |
| | else: |
| | main() |
| |
|