Spaces:
Paused
Paused
| """ | |
| HubSpot Companies → Supabase (incremental since a millisecond cursor) | |
| Usage from orchestrator: | |
| import hubspot_companies | |
| hubspot_companies.main(since_ms=<int milliseconds since epoch UTC>) | |
| Direct CLI: | |
| # epoch ms | |
| python hubspot_companies.py 1754025600000 | |
| # ISO-8601 | |
| python hubspot_companies.py 2025-08-01T09:30:00Z | |
| # Back-compat date (floors to 00:00Z) | |
| python hubspot_companies.py 2025-08-01 | |
| """ | |
| import os | |
| import re | |
| import time | |
| import logging | |
| import datetime | |
| from typing import List, Dict, Optional, Tuple, Union | |
| import httpx | |
| import hubspot | |
| from dotenv import load_dotenv | |
| from supabase import create_client | |
| from hubspot.crm.companies import ApiException as CompaniesApiException | |
| from hubspot_utils import ( | |
| try_parse_int, parse_ts, get_property_label_mapping, | |
| ) | |
| from supabase_utils import ( | |
| update_sync_metadata, enrich_supabase_row, | |
| batched_insert, fetch_supabase_table, | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Logging | |
| # ----------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| filename=f"logs/hubspot_company_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", | |
| filemode="a", | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Environment | |
| # ----------------------------------------------------------------------------- | |
| load_dotenv() | |
| HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| # Optional bootstrap cursor if orchestrator doesn't provide one | |
| BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_COMPANIES_SINCE_MS") | |
| if not HUBSPOT_TOKEN: | |
| raise RuntimeError("HUBSPOT_TOKEN is not set") | |
| if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: | |
| raise RuntimeError("Supabase env vars are not set") | |
| hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) | |
| supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) | |
| # ----------------------------------------------------------------------------- | |
| # Config | |
| # ----------------------------------------------------------------------------- | |
| COMPANY_PROPERTIES = [ | |
| "name", | |
| "city", | |
| "company_email", | |
| "address", | |
| "address2", | |
| "domain", | |
| "number_of_active__cli_s", | |
| "number_of__mobile___cloned_", | |
| "createdate", | |
| "hs_lastmodifieddate", | |
| "lastmodifieddate", | |
| "review_date_updated_by___clone", | |
| "industry", | |
| "macro_industry_grouping", | |
| "closest_review_date___clone", | |
| "region", | |
| "industry_grouping", | |
| "source_1", | |
| "source_2", | |
| "hs_object_source_label", | |
| "hs_analytics_source", | |
| "hs_analytics_latest_source", | |
| "hs_object_source_detail_1", | |
| ] | |
| # ----------------------------------------------------------------------------- | |
| # Time helpers | |
| # ----------------------------------------------------------------------------- | |
| def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=datetime.timezone.utc) | |
| return dt.astimezone(datetime.timezone.utc) | |
| def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: | |
| dt = _ensure_utc(dt) | |
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| def _parse_iso_like_to_dt(value: str) -> datetime.datetime: | |
| if isinstance(value, str) and value.endswith("Z"): | |
| value = value[:-1] + "+00:00" | |
| dt = datetime.datetime.fromisoformat(value) | |
| return _ensure_utc(dt) | |
| def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: | |
| if isinstance(dt_or_str, str): | |
| dt = _parse_iso_like_to_dt(dt_or_str) | |
| elif isinstance(dt_or_str, datetime.datetime): | |
| dt = _ensure_utc(dt_or_str) | |
| else: | |
| raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") | |
| return int(dt.timestamp() * 1000) | |
| def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: | |
| if value is None: | |
| return None | |
| try: | |
| v = int(str(value)) | |
| if v < 10_000_000_000_000: # seconds → ms | |
| v *= 1000 | |
| return v | |
| except ValueError: | |
| pass | |
| try: | |
| return to_epoch_ms(str(value)) | |
| except Exception: | |
| logging.warning("Could not parse timestamp value=%r", value) | |
| return None | |
| # ----------------------------------------------------------------------------- | |
| # Search IDs (ts > since_ms) with property fallback | |
| # ----------------------------------------------------------------------------- | |
| def _search_company_ids_from(since_ms: int, prop: str) -> List[str]: | |
| """ | |
| Search companies where {prop} > since_ms (epoch-ms). | |
| Sort ascending so we can advance the cursor monotonically. | |
| """ | |
| url = "https://api.hubapi.com/crm/v3/objects/companies/search" | |
| headers = { | |
| "Authorization": f"Bearer {HUBSPOT_TOKEN}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| payload = { | |
| "filterGroups": [{ | |
| "filters": [ | |
| {"propertyName": prop, "operator": "GT", | |
| "value": str(since_ms)}, | |
| ] | |
| }], | |
| "limit": 100, | |
| "sorts": [{"propertyName": prop, "direction": "ASCENDING"}], | |
| } | |
| ids: List[str] = [] | |
| after: Optional[str] = None | |
| with httpx.Client(timeout=30.0) as client: | |
| while True: | |
| body = dict(payload) | |
| if after: | |
| body["after"] = after | |
| resp = client.post(url, headers=headers, json=body) | |
| if resp.status_code >= 400: | |
| try: | |
| logging.error( | |
| "Company search error for prop '%s': %s", prop, resp.json()) | |
| except Exception: | |
| logging.error( | |
| "Company search error for prop '%s': %s", prop, resp.text) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| ids.extend([obj["id"] for obj in data.get("results", []) or []]) | |
| after = (data.get("paging") or {}).get("next", {}).get("after") | |
| if not after: | |
| break | |
| time.sleep(0.1) | |
| return ids | |
| def search_company_ids_after_ms(since_ms: int) -> Tuple[List[str], str]: | |
| """ | |
| Try these properties in order; return (ids, prop_used) for the first successful search: | |
| 1) hs_lastmodifieddate | |
| 2) lastmodifieddate | |
| 3) createdate | |
| """ | |
| props_to_try = ["createdate"] | |
| last_err = None | |
| for prop in props_to_try: | |
| try: | |
| ids = _search_company_ids_from(since_ms, prop) | |
| logging.info( | |
| "Company search with '%s' returned %d IDs.", prop, len(ids)) | |
| return ids, prop | |
| except httpx.HTTPStatusError as e: | |
| last_err = e | |
| continue | |
| if last_err: | |
| raise last_err | |
| return [], "hs_lastmodifieddate" | |
| # ----------------------------------------------------------------------------- | |
| # Read-by-ID (with associations) → enrich & track max cursor ts | |
| # ----------------------------------------------------------------------------- | |
| def _enrich_company_data_from_record( | |
| record, | |
| industry_map: Dict[str, str], | |
| macro_industry_map: Dict[str, str], | |
| region_map: Dict[str, str], | |
| ) -> Dict: | |
| props = record.properties or {} | |
| company_data: Dict[str, Optional[str]] = {"id": record.id} | |
| for p in COMPANY_PROPERTIES: | |
| company_data[p] = props.get(p) | |
| # Association counts | |
| num_contacts = 0 | |
| num_deals = 0 | |
| if getattr(record, "associations", None): | |
| if record.associations.get("contacts") and getattr(record.associations["contacts"], "results", None): | |
| num_contacts = len( | |
| {a.id for a in record.associations["contacts"].results if getattr(a, "id", None)}) | |
| if record.associations.get("deals") and getattr(record.associations["deals"], "results", None): | |
| num_deals = len( | |
| {a.id for a in record.associations["deals"].results if getattr(a, "id", None)}) | |
| company_data["number_of_associated_contacts"] = num_contacts | |
| company_data["number_of_associated_deals"] = num_deals | |
| # Map label properties | |
| code = company_data.get("industry") | |
| company_data["industry"] = industry_map.get( | |
| code) if code in industry_map else None | |
| macro = company_data.get("macro_industry_grouping") | |
| company_data["macro_industry_grouping"] = macro_industry_map.get( | |
| macro) if macro in macro_industry_map else None | |
| region = company_data.get("region") | |
| company_data["region"] = region_map.get( | |
| region) if region in region_map else None | |
| return company_data | |
| def read_companies_by_ids(company_ids: List[str], cursor_prop: str) -> Tuple[List[Dict], Optional[int]]: | |
| if not company_ids: | |
| return [], None | |
| companies: List[Dict] = [] | |
| assoc_types = ["contacts", "deals"] | |
| # Fetch label maps once | |
| try: | |
| industry_map = get_property_label_mapping( | |
| hubspot_client, "companies", "industry") | |
| except Exception as e: | |
| logging.warning("Failed to fetch industry map: %s", e) | |
| industry_map = {} | |
| try: | |
| macro_industry_map = get_property_label_mapping( | |
| hubspot_client, "companies", "macro_industry_grouping") | |
| except Exception as e: | |
| logging.warning("Failed to fetch macro_industry_grouping map: %s", e) | |
| macro_industry_map = {} | |
| try: | |
| region_map = get_property_label_mapping( | |
| hubspot_client, "companies", "region") | |
| except Exception as e: | |
| logging.warning("Failed to fetch region map: %s", e) | |
| region_map = {} | |
| max_ts_ms: Optional[int] = None | |
| for i, cid in enumerate(company_ids, start=1): | |
| try: | |
| record = hubspot_client.crm.companies.basic_api.get_by_id( | |
| company_id=cid, | |
| properties=COMPANY_PROPERTIES, | |
| associations=assoc_types, | |
| archived=False, | |
| ) | |
| # Track max timestamp for the chosen cursor property | |
| cursor_val = (record.properties or {}).get(cursor_prop) | |
| ts_ms = parse_any_ts_ms(cursor_val) | |
| if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms): | |
| max_ts_ms = ts_ms | |
| companies.append(_enrich_company_data_from_record( | |
| record, industry_map, macro_industry_map, region_map)) | |
| if i % 200 == 0: | |
| logging.info("Read %d companies...", i) | |
| time.sleep(0.05) | |
| except httpx.HTTPStatusError as e: | |
| logging.error("HTTP error reading company %s: %s", cid, e) | |
| except (CompaniesApiException, httpx.HTTPError) as e: | |
| logging.error("Error reading company %s: %s", cid, e) | |
| return companies, max_ts_ms | |
| # ----------------------------------------------------------------------------- | |
| # Map → Supabase rows and diff | |
| # ----------------------------------------------------------------------------- | |
| def map_company_data_for_db(companies: List[Dict]) -> List[Dict]: | |
| mapped: List[Dict] = [] | |
| for c in companies: | |
| base_row = { | |
| "company_id": try_parse_int(c["id"]), | |
| "company_name": c.get("name"), | |
| "company_email": c.get("company_email"), | |
| "city": c.get("city"), | |
| "domain": c.get("domain"), | |
| "street_address": c.get("address"), | |
| "street_address2": c.get("address2"), | |
| "hubspot_create_date": parse_ts(c.get("createdate")), | |
| "hubspot_modified_date": parse_ts(c.get("hs_lastmodifieddate") or c.get("lastmodifieddate")), | |
| "review_date_updated_by": c.get("review_date_updated_by___clone") or None, | |
| "number_of_active_clis": try_parse_int(c.get("number_of_active__cli_s")), | |
| "number_of_clis": try_parse_int(c.get("number_of__mobile___cloned_")), | |
| "number_of_associated_contacts": c.get("number_of_associated_contacts", 0), | |
| "number_of_associated_deals": c.get("number_of_associated_deals", 0), | |
| "industry": c.get("industry"), | |
| "macro_industry_grouping": c.get("macro_industry_grouping"), | |
| "closest_review_date": parse_ts(c.get("closest_review_date___clone")), | |
| "region": c.get("region"), | |
| "source_1": c.get("source_1"), | |
| "source_2": c.get("source_2"), | |
| "record_source": c.get("hs_object_source_label"), | |
| "record_source_detail_1": c.get("hs_object_source_detail_1"), | |
| "original_traffic_source": c.get("hs_analytics_source"), | |
| "latest_traffic_source": c.get("hs_analytics_latest_source"), | |
| } | |
| mapped.append(enrich_supabase_row(base_row)) | |
| return mapped | |
| def companies_are_different(new_row: Dict, old_row: Dict) -> bool: | |
| compare_keys = [ | |
| "company_name", "company_email", "city", "domain", "street_address", | |
| "street_address2", "hubspot_modified_date", "review_date_updated_by", | |
| "number_of_active_clis", "number_of_clis", | |
| "number_of_associated_contacts", "number_of_associated_deals", | |
| "industry", "macro_industry_grouping", "closest_review_date", | |
| "region", "source_1", "source_2", "record_source", | |
| "record_source_detail_1", "original_traffic_source", | |
| "latest_traffic_source", | |
| ] | |
| for key in compare_keys: | |
| if str(new_row.get(key)) != str(old_row.get(key)): | |
| return True | |
| return False | |
| # ----------------------------------------------------------------------------- | |
| # Upsert | |
| # ----------------------------------------------------------------------------- | |
| def upsert_companies(companies: List[Dict]) -> None: | |
| if not companies: | |
| print("No companies to upsert.") | |
| return | |
| existing = fetch_supabase_table( | |
| supabase_client, "hubspot_companies", "company_id") | |
| mapped = map_company_data_for_db(companies) | |
| rows_to_upsert: List[Dict] = [] | |
| for row in mapped: | |
| cid = row.get("company_id") | |
| if not cid: | |
| continue | |
| old_row = existing.get(str(cid)) | |
| if not old_row or companies_are_different(row, old_row): | |
| rows_to_upsert.append(row) | |
| print(f"{len(rows_to_upsert)} companies to insert/update (out of {len(companies)} read).") | |
| if rows_to_upsert: | |
| # upraw_json_to_supabase(supabase_client, rows_to_upsert, object_type="companies") | |
| batched_insert(supabase_client, "hubspot_companies", | |
| rows_to_upsert, batch_size=1000) | |
| # ----------------------------------------------------------------------------- | |
| # Main (timestamp cursor) | |
| # ----------------------------------------------------------------------------- | |
| def main(since_ms: Optional[int] = None): | |
| """ | |
| Orchestrates: | |
| 1) Search company IDs with <cursor_prop> > since_ms (property fallback) | |
| 2) Read full companies (track max timestamp for <cursor_prop>) | |
| 3) Upsert into Supabase | |
| 4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop } | |
| """ | |
| # Resolve since_ms | |
| if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: | |
| try: | |
| since_ms = int(BOOTSTRAP_SINCE_MS_ENV) | |
| except ValueError: | |
| raise RuntimeError( | |
| "HUBSPOT_COMPANIES_SINCE_MS must be an integer (ms) if set.") | |
| if since_ms is None: | |
| # Default: today@00:00:00Z for first run | |
| today0 = floor_to_utc_midnight( | |
| datetime.datetime.now(datetime.timezone.utc)) | |
| since_ms = to_epoch_ms(today0) | |
| print(f"Searching companies with timestamp > {since_ms} ...") | |
| ids, cursor_prop = search_company_ids_after_ms(since_ms) | |
| print(f"Search property: {cursor_prop}. Found {len(ids)} company IDs.") | |
| now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| if not ids: | |
| print("No companies beyond the cursor. Updating sync metadata and exiting.") | |
| update_sync_metadata(supabase_client, "companies", now_iso) | |
| return | |
| print("Reading companies (with associations)...") | |
| companies, max_ts_ms = read_companies_by_ids(ids, cursor_prop) | |
| print("Upserting into Supabase...") | |
| upsert_companies(companies) | |
| # Advance cursor to max timestamp we actually ingested for the chosen property | |
| new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms | |
| update_sync_metadata(supabase_client, "companies", now_iso) | |
| print( | |
| f"Companies sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.") | |
| # ----------------------------------------------------------------------------- | |
| # CLI | |
| # ----------------------------------------------------------------------------- | |
| def _parse_cli_arg_to_ms(arg: str) -> int: | |
| """ | |
| Accept: | |
| - integer epoch ms | |
| - ISO-8601 (Z or offset) | |
| - YYYY-MM-DD (floors to 00:00Z) | |
| """ | |
| # epoch ms or seconds | |
| if re.fullmatch(r"\d{10,13}", arg): | |
| v = int(arg) | |
| if v < 10_000_000_000_000: # seconds -> ms | |
| v *= 1000 | |
| return v | |
| # YYYY-MM-DD | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): | |
| d = datetime.datetime.strptime( | |
| arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) | |
| return to_epoch_ms(floor_to_utc_midnight(d)) | |
| # ISO-8601 | |
| return to_epoch_ms(arg) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| try: | |
| since = _parse_cli_arg_to_ms(sys.argv[1]) | |
| except Exception as e: | |
| print( | |
| f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" | |
| ) | |
| sys.exit(1) | |
| main(since_ms=since) | |
| else: | |
| main() | |