""" HubSpot Emails → Supabase (incremental since a millisecond cursor) Usage from orchestrator: import load_hubspot_emails load_hubspot_emails.main(since_ms=) Direct CLI: # epoch ms python load_hubspot_emails.py 1754025600000 # ISO-8601 (Z or offset) python load_hubspot_emails.py 2025-08-01T09:30:00Z # Back-compat date (floors to 00:00:00Z) python load_hubspot_emails.py 2025-08-01 """ import os import time import logging import datetime import re from typing import List, Dict, Optional, Tuple, Union import httpx import hubspot from dotenv import load_dotenv from supabase import create_client from hubspot.crm.objects.emails import ApiException as EmailApiException from hubspot_utils import ( clean_text, ) from supabase_utils import ( batched_insert, update_sync_metadata, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logging.basicConfig( filename=f"logs/hubspot_email_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # ----------------------------------------------------------------------------- # Environment # ----------------------------------------------------------------------------- load_dotenv() HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Optional bootstrap cursor if orchestrator doesn't provide one BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_EMAILS_SINCE_MS") if not HUBSPOT_TOKEN: raise RuntimeError("HUBSPOT_TOKEN is not set") if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: raise RuntimeError("Supabase env vars are not set") hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) # ----------------------------------------------------------------------------- # Config # ----------------------------------------------------------------------------- EMAIL_PROPERTIES = [ "hs_email_from_email", "hs_email_to_email", "hs_email_cc_email", "hs_email_subject", "hs_email_direction", "hs_email_text", "hs_timestamp", ] # ----------------------------------------------------------------------------- # Email parsing # ----------------------------------------------------------------------------- EMAIL_RE = re.compile(r'[\w\.\+\-]+@[\w\.\-]+\.\w+') SUBJECT_PREFIX_RE = re.compile(r'^(re:|fw:|fwd:)\s*', re.IGNORECASE) def parse_emails(raw: Optional[object]) -> List[str]: if raw is None: return [] candidates: List[str] = [] if isinstance(raw, list): for item in raw or []: if not item: continue if isinstance(item, str): for chunk in re.split(r'[;,]', item): candidates.extend(EMAIL_RE.findall(chunk)) elif isinstance(raw, str): for chunk in re.split(r'[;,]', raw): candidates.extend(EMAIL_RE.findall(chunk)) else: candidates.extend(EMAIL_RE.findall(str(raw))) return sorted({c.strip().lower() for c in candidates if c and c.strip()}) def normalize_subject(raw: Optional[str]) -> Optional[str]: if not raw: return None s = raw.strip().lower() # remove multiple prefixes while True: new_s = SUBJECT_PREFIX_RE.sub("", s) if new_s == s: break s = new_s.strip() # collapse whitespace s = re.sub(r"\s+", " ", s).strip() return s or None # ----------------------------------------------------------------------------- # Time helpers # ----------------------------------------------------------------------------- def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: dt = _ensure_utc(dt) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _parse_iso_like_to_dt(value: str) -> datetime.datetime: if value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) return _ensure_utc(dt) def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: if isinstance(dt_or_str, str): dt = _parse_iso_like_to_dt(dt_or_str) elif isinstance(dt_or_str, datetime.datetime): dt = _ensure_utc(dt_or_str) else: raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") return int(dt.timestamp() * 1000) def parse_hs_timestamp_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: """ HubSpot CRM datetime properties often come back as strings. Accepts: - ms epoch as str/int/float - ISO-8601 strings (rare for CRM objects, but robust to it) Returns ms since epoch or None. """ if value is None: return None # Easy path: integer ms try: v = int(str(value)) # treat values too small as seconds and up-convert, just in case if v < 10_000_000_000: # < ~2001-09 in seconds v = v * 1000 return v except ValueError: pass # Fallback: ISO try: return to_epoch_ms(str(value)) except Exception: logging.warning("Could not parse hs_timestamp=%r", value) return None # ----------------------------------------------------------------------------- # Search IDs (ts > since_ms) # ----------------------------------------------------------------------------- def search_email_ids_after_ms(since_ms: int) -> List[str]: """ Find email IDs where hs_timestamp > since_ms (strictly greater), sorted ASC so the max timestamp at the end is monotonic. """ url = "https://api.hubapi.com/crm/v3/objects/emails/search" headers = { "Authorization": f"Bearer {HUBSPOT_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", } payload = { "filterGroups": [{ "filters": [ {"propertyName": "hs_timestamp", "operator": "GT", "value": str(since_ms)} ] }], "limit": 100, "sorts": [{"propertyName": "hs_timestamp", "direction": "ASCENDING"}], # We only need IDs here; we’ll read full records later } ids: List[str] = [] after: Optional[str] = None with httpx.Client(timeout=30.0) as client: while True: body = dict(payload) if after: body["after"] = after resp = client.post(url, headers=headers, json=body) if resp.status_code >= 400: try: logging.error("Email search error: %s", resp.json()) except Exception: logging.error("Email search error: %s", resp.text) resp.raise_for_status() data = resp.json() ids.extend([obj["id"] for obj in data.get("results", []) or []]) after = (data.get("paging") or {}).get("next", {}).get("after") if not after: break time.sleep(0.1) logging.info("Found %d email IDs after %d.", len(ids), since_ms) return ids # ----------------------------------------------------------------------------- # Read-by-ID (with associations) # ----------------------------------------------------------------------------- def read_emails_by_ids( email_ids: List[str] ) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]: """ Read emails by ID with properties and associations (companies/contacts/deals). Returns data lists + max hs_timestamp (ms) seen. """ if not email_ids: return [], [], [], [], None email_metadata_data: List[Dict] = [] email_company_links: List[Dict] = [] email_contact_links: List[Dict] = [] email_deal_links: List[Dict] = [] email_ticket_links: List[Dict] = [] assoc_types = ["companies", "contacts", "deals", "tickets"] max_ts_ms: Optional[int] = None for i, email_id in enumerate(email_ids, start=1): try: record = hubspot_client.crm.objects.emails.basic_api.get_by_id( email_id, properties=EMAIL_PROPERTIES, associations=assoc_types, archived=False ) props = record.properties or {} cleaned = clean_text(props.get("hs_email_text") or "") # Robust timestamp handling hs_ts_ms = parse_hs_timestamp_ms(props.get("hs_timestamp")) if hs_ts_ms is not None: if (max_ts_ms is None) or (hs_ts_ms > max_ts_ms): max_ts_ms = hs_ts_ms sent_at_iso = datetime.datetime.fromtimestamp( hs_ts_ms / 1000, tz=datetime.timezone.utc).isoformat() else: sent_at_iso = None email_metadata_data.append({ "email_id": record.id, "subject": props.get("hs_email_subject"), "normalized_subject": normalize_subject(props.get("hs_email_subject")), "from_email": props.get("hs_email_from_email") or "", "to_emails": parse_emails(props.get("hs_email_to_email")), "cc_emails": parse_emails(props.get("hs_email_cc_email")), "sent_at": sent_at_iso, "direction": props.get("hs_email_direction"), "email_content": cleaned, }) assoc = record.associations or {} if assoc.get("companies") and getattr(assoc["companies"], "results", None): for a in assoc["companies"].results: if a.id and a.id.isdigit(): email_company_links.append( {"email_id": record.id, "company_id": int(a.id)}) if assoc.get("contacts") and getattr(assoc["contacts"], "results", None): for a in assoc["contacts"].results: if a.id and a.id.isdigit(): email_contact_links.append( {"email_id": record.id, "contact_id": int(a.id)}) if assoc.get("deals") and getattr(assoc["deals"], "results", None): for a in assoc["deals"].results: if a.id and a.id.isdigit(): email_deal_links.append( {"email_id": record.id, "deal_id": int(a.id)}) if assoc.get("tickets") and getattr(assoc["tickets"], "results", None): for a in assoc["tickets"].results: if a.id and a.id.isdigit(): email_ticket_links.append( {"email_id": record.id, "ticket_id": int(a.id)}) if i % 200 == 0: logging.info("Read %d emails...", i) time.sleep(0.05) except httpx.HTTPStatusError as e: logging.error("HTTP error reading email %s: %s", email_id, e) except (EmailApiException, httpx.HTTPError) as e: logging.error("Error reading email %s: %s", email_id, e) return ( email_metadata_data, email_company_links, email_contact_links, email_deal_links, email_ticket_links, max_ts_ms ) # ----------------------------------------------------------------------------- # Upsert # ----------------------------------------------------------------------------- def upsert_emails( email_metadata_data: List[Dict], email_company_links: List[Dict], email_contact_links: List[Dict], email_deal_links: List[Dict], email_ticket_links: List[Dict], ) -> None: if email_metadata_data: batched_insert( supabase_client, "hubspot_emails", email_metadata_data, batch_size=1000, # If your helper supports on_conflict: # on_conflict=["email_id"] ) print(f"Upserted {len(email_metadata_data)} email metadata rows.") if email_company_links: batched_insert( supabase_client, "hubspot_email_companies", email_company_links, batch_size=1000, on_conflict=["email_id", "company_id"], ) print(f"Upserted {len(email_company_links)} email-company links.") if email_contact_links: batched_insert( supabase_client, "hubspot_email_contacts", email_contact_links, batch_size=1000, on_conflict=["email_id", "contact_id"], ) print(f"Upserted {len(email_contact_links)} email-contact links.") if email_deal_links: batched_insert( supabase_client, "hubspot_email_deals", email_deal_links, batch_size=1000, on_conflict=["email_id", "deal_id"], ) print(f"Upserted {len(email_deal_links)} email-deal links.") if email_ticket_links: batched_insert( supabase_client, "hubspot_email_tickets", email_ticket_links, batch_size=1000, on_conflict=["email_id", "ticket_id"], ) print(f"Upserted {len(email_ticket_links)} email-ticket links.") # ----------------------------------------------------------------------------- # Main (timestamp cursor) # ----------------------------------------------------------------------------- def main(since_ms: Optional[int] = None): """ Orchestrates: 1) Search email IDs with hs_timestamp > since_ms 2) Read full emails with associations (track max timestamp) 3) Upsert into Supabase 4) Update sync metadata with both last_sync_metadata and last_sync_time """ # Resolve since_ms if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: try: since_ms = int(BOOTSTRAP_SINCE_MS_ENV) except ValueError: raise RuntimeError( "HUBSPOT_EMAILS_SINCE_MS must be an integer (ms) if set.") if since_ms is None: # Default: today@00:00:00Z for first run today0 = floor_to_utc_midnight( datetime.datetime.now(datetime.timezone.utc)) since_ms = to_epoch_ms(today0) print(f"Searching emails with hs_timestamp > {since_ms} ...") ids = search_email_ids_after_ms(since_ms) print(f"Found {len(ids)} email IDs.") if not ids: print("No emails beyond the cursor. Updating sync metadata and exiting.") # Record only last_sync_time; helper sets updated_at now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() update_sync_metadata(supabase_client, "emails", now_iso) return print("Reading emails (with associations)...") ( email_metadata_data, email_company_links, email_contact_links, email_deal_links, email_ticket_links, max_ts_ms ) = read_emails_by_ids(ids) print("Upserting into Supabase...") upsert_emails( email_metadata_data, email_company_links, email_contact_links, email_deal_links, email_ticket_links, ) # Advance cursor to max timestamp we actually ingested new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() update_sync_metadata(supabase_client, "emails", now_iso) print(f"Emails sync complete. Advanced cursor to {new_cursor_ms}.") # ----------------------------------------------------------------------------- # CLI # ----------------------------------------------------------------------------- def _parse_cli_arg_to_ms(arg: str) -> int: """ Accept: - integer epoch ms - ISO-8601 (Z or offset) - YYYY-MM-DD (floors to 00:00Z for convenience/back-compat) """ # epoch ms if re.fullmatch(r"\d{10,13}", arg): v = int(arg) if v < 10_000_000_000_000: # seconds -> ms v *= 1000 return v # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): d = datetime.datetime.strptime( arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) return to_epoch_ms(floor_to_utc_midnight(d)) # ISO-8601 return to_epoch_ms(arg) if __name__ == "__main__": import sys if len(sys.argv) > 1: try: main(since_ms=_parse_cli_arg_to_ms(sys.argv[1])) except Exception as e: print( f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}") sys.exit(1) else: main()