| | """ |
| | HubSpot Emails → Supabase (incremental since a millisecond cursor) |
| | |
| | Usage from orchestrator: |
| | import hubspot_emails |
| | hubspot_emails.main(since_ms=<int milliseconds since epoch UTC>) |
| | |
| | Direct CLI: |
| | # epoch ms |
| | python hubspot_emails.py 1754025600000 |
| | |
| | # ISO-8601 (Z or offset) |
| | python hubspot_emails.py 2025-08-01T09:30:00Z |
| | |
| | # Back-compat date (floors to 00:00:00Z) |
| | python hubspot_emails.py 2025-08-01 |
| | """ |
| |
|
| | import os |
| | import time |
| | import logging |
| | import datetime |
| | import re |
| | from typing import List, Dict, Optional, Tuple, Union |
| |
|
| | import httpx |
| | import hubspot |
| | from dotenv import load_dotenv |
| | from supabase import create_client |
| | from hubspot.crm.objects.emails import ApiException as EmailApiException |
| |
|
| | from hubspot_utils import ( |
| | clean_text, |
| | ) |
| | from supabase_utils import ( |
| | batched_insert, update_sync_metadata, |
| | ) |
| |
|
| | |
| | |
| | |
| | logging.basicConfig( |
| | filename=f"logs/hubspot_email_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", |
| | filemode="a", |
| | level=logging.INFO, |
| | format="%(asctime)s [%(levelname)s] %(message)s", |
| | ) |
| |
|
| | |
| | |
| | |
| | load_dotenv() |
| | HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") |
| | SUPABASE_URL = os.getenv("SUPABASE_URL") |
| | SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| | |
| | BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_EMAILS_SINCE_MS") |
| |
|
| | if not HUBSPOT_TOKEN: |
| | raise RuntimeError("HUBSPOT_TOKEN is not set") |
| | if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: |
| | raise RuntimeError("Supabase env vars are not set") |
| |
|
| | hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) |
| | supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
| |
|
| | |
| | |
| | |
| | EMAIL_PROPERTIES = [ |
| | "hs_email_from_email", |
| | "hs_email_to_email", |
| | "hs_email_cc_email", |
| | "hs_email_subject", |
| | "hs_email_direction", |
| | "hs_email_text", |
| | "hs_timestamp", |
| | ] |
| |
|
| | |
| | |
| | |
| | EMAIL_RE = re.compile(r'[\w\.\+\-]+@[\w\.\-]+\.\w+') |
| | SUBJECT_PREFIX_RE = re.compile(r'^(re:|fw:|fwd:)\s*', re.IGNORECASE) |
| |
|
| | def parse_emails(raw: Optional[object]) -> List[str]: |
| | if raw is None: |
| | return [] |
| | candidates: List[str] = [] |
| | if isinstance(raw, list): |
| | for item in raw or []: |
| | if not item: |
| | continue |
| | if isinstance(item, str): |
| | for chunk in re.split(r'[;,]', item): |
| | candidates.extend(EMAIL_RE.findall(chunk)) |
| | elif isinstance(raw, str): |
| | for chunk in re.split(r'[;,]', raw): |
| | candidates.extend(EMAIL_RE.findall(chunk)) |
| | else: |
| | candidates.extend(EMAIL_RE.findall(str(raw))) |
| | return sorted({c.strip().lower() for c in candidates if c and c.strip()}) |
| |
|
| | def normalize_subject(raw: Optional[str]) -> Optional[str]: |
| | if not raw: |
| | return None |
| | |
| | s = raw.strip().lower() |
| |
|
| | |
| | while True: |
| | new_s = SUBJECT_PREFIX_RE.sub("", s) |
| | if new_s == s: |
| | break |
| | s = new_s.strip() |
| |
|
| | |
| | s = re.sub(r"\s+", " ", s).strip() |
| |
|
| | return s or None |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: |
| | if dt.tzinfo is None: |
| | dt = dt.replace(tzinfo=datetime.timezone.utc) |
| | return dt.astimezone(datetime.timezone.utc) |
| |
|
| |
|
| | def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: |
| | dt = _ensure_utc(dt) |
| | return dt.replace(hour=0, minute=0, second=0, microsecond=0) |
| |
|
| |
|
| | def _parse_iso_like_to_dt(value: str) -> datetime.datetime: |
| | if value.endswith("Z"): |
| | value = value[:-1] + "+00:00" |
| | dt = datetime.datetime.fromisoformat(value) |
| | return _ensure_utc(dt) |
| |
|
| |
|
| | def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: |
| | if isinstance(dt_or_str, str): |
| | dt = _parse_iso_like_to_dt(dt_or_str) |
| | elif isinstance(dt_or_str, datetime.datetime): |
| | dt = _ensure_utc(dt_or_str) |
| | else: |
| | raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") |
| | return int(dt.timestamp() * 1000) |
| |
|
| |
|
| | def parse_hs_timestamp_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: |
| | """ |
| | HubSpot CRM datetime properties often come back as strings. |
| | Accepts: |
| | - ms epoch as str/int/float |
| | - ISO-8601 strings (rare for CRM objects, but robust to it) |
| | Returns ms since epoch or None. |
| | """ |
| | if value is None: |
| | return None |
| | |
| | try: |
| | v = int(str(value)) |
| | |
| | if v < 10_000_000_000: |
| | v = v * 1000 |
| | return v |
| | except ValueError: |
| | pass |
| |
|
| | |
| | try: |
| | return to_epoch_ms(str(value)) |
| | except Exception: |
| | logging.warning("Could not parse hs_timestamp=%r", value) |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def search_email_ids_after_ms(since_ms: int) -> List[str]: |
| | """ |
| | Find email IDs where hs_timestamp > since_ms (strictly greater), |
| | sorted ASC so the max timestamp at the end is monotonic. |
| | """ |
| |
|
| | url = "https://api.hubapi.com/crm/v3/objects/emails/search" |
| | headers = { |
| | "Authorization": f"Bearer {HUBSPOT_TOKEN}", |
| | "Content-Type": "application/json", |
| | "Accept": "application/json", |
| | } |
| | payload = { |
| | "filterGroups": [{ |
| | "filters": [ |
| | {"propertyName": "hs_timestamp", |
| | "operator": "GT", "value": str(since_ms)} |
| | ] |
| | }], |
| | "limit": 100, |
| | "sorts": [{"propertyName": "hs_timestamp", "direction": "ASCENDING"}], |
| | |
| | } |
| |
|
| | ids: List[str] = [] |
| | after: Optional[str] = None |
| | with httpx.Client(timeout=30.0) as client: |
| | while True: |
| | body = dict(payload) |
| | if after: |
| | body["after"] = after |
| |
|
| | resp = client.post(url, headers=headers, json=body) |
| | if resp.status_code >= 400: |
| | try: |
| | logging.error("Email search error: %s", resp.json()) |
| | except Exception: |
| | logging.error("Email search error: %s", resp.text) |
| | resp.raise_for_status() |
| |
|
| | data = resp.json() |
| | ids.extend([obj["id"] for obj in data.get("results", []) or []]) |
| |
|
| | after = (data.get("paging") or {}).get("next", {}).get("after") |
| | if not after: |
| | break |
| | time.sleep(0.1) |
| |
|
| | logging.info("Found %d email IDs after %d.", len(ids), since_ms) |
| | return ids |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def read_emails_by_ids( |
| | email_ids: List[str] |
| | ) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]: |
| | """ |
| | Read emails by ID with properties and associations (companies/contacts/deals). |
| | Returns data lists + max hs_timestamp (ms) seen. |
| | """ |
| |
|
| | if not email_ids: |
| | return [], [], [], [], None |
| |
|
| | email_metadata_data: List[Dict] = [] |
| | email_company_links: List[Dict] = [] |
| | email_contact_links: List[Dict] = [] |
| | email_deal_links: List[Dict] = [] |
| | email_ticket_links: List[Dict] = [] |
| |
|
| | assoc_types = ["companies", "contacts", "deals", "tickets"] |
| | max_ts_ms: Optional[int] = None |
| |
|
| | for i, email_id in enumerate(email_ids, start=1): |
| | try: |
| | record = hubspot_client.crm.objects.emails.basic_api.get_by_id( |
| | email_id, properties=EMAIL_PROPERTIES, associations=assoc_types, archived=False |
| | ) |
| | props = record.properties or {} |
| |
|
| | cleaned = clean_text(props.get("hs_email_text") or "") |
| |
|
| | |
| | hs_ts_ms = parse_hs_timestamp_ms(props.get("hs_timestamp")) |
| | if hs_ts_ms is not None: |
| | if (max_ts_ms is None) or (hs_ts_ms > max_ts_ms): |
| | max_ts_ms = hs_ts_ms |
| | sent_at_iso = datetime.datetime.fromtimestamp( |
| | hs_ts_ms / 1000, tz=datetime.timezone.utc).isoformat() |
| | else: |
| | sent_at_iso = None |
| |
|
| | email_metadata_data.append({ |
| | "email_id": record.id, |
| | "subject": props.get("hs_email_subject"), |
| | "normalized_subject": normalize_subject(props.get("hs_email_subject")), |
| | "from_email": props.get("hs_email_from_email") or "", |
| | "to_emails": parse_emails(props.get("hs_email_to_email")), |
| | "cc_emails": parse_emails(props.get("hs_email_cc_email")), |
| | "sent_at": sent_at_iso, |
| | "direction": props.get("hs_email_direction"), |
| | "email_content": cleaned, |
| | }) |
| |
|
| | assoc = record.associations or {} |
| | if assoc.get("companies") and getattr(assoc["companies"], "results", None): |
| | for a in assoc["companies"].results: |
| | if a.id and a.id.isdigit(): |
| | email_company_links.append( |
| | {"email_id": record.id, "company_id": int(a.id)}) |
| | if assoc.get("contacts") and getattr(assoc["contacts"], "results", None): |
| | for a in assoc["contacts"].results: |
| | if a.id and a.id.isdigit(): |
| | email_contact_links.append( |
| | {"email_id": record.id, "contact_id": int(a.id)}) |
| | if assoc.get("deals") and getattr(assoc["deals"], "results", None): |
| | for a in assoc["deals"].results: |
| | if a.id and a.id.isdigit(): |
| | email_deal_links.append( |
| | {"email_id": record.id, "deal_id": int(a.id)}) |
| | if assoc.get("tickets") and getattr(assoc["tickets"], "results", None): |
| | for a in assoc["tickets"].results: |
| | if a.id and a.id.isdigit(): |
| | email_ticket_links.append( |
| | {"email_id": record.id, "ticket_id": int(a.id)}) |
| |
|
| | if i % 200 == 0: |
| | logging.info("Read %d emails...", i) |
| | time.sleep(0.05) |
| |
|
| | except httpx.HTTPStatusError as e: |
| | logging.error("HTTP error reading email %s: %s", email_id, e) |
| | except (EmailApiException, httpx.HTTPError) as e: |
| | logging.error("Error reading email %s: %s", email_id, e) |
| |
|
| | return ( |
| | email_metadata_data, email_company_links, email_contact_links, |
| | email_deal_links, email_ticket_links, max_ts_ms |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def upsert_emails( |
| | email_metadata_data: List[Dict], |
| | email_company_links: List[Dict], |
| | email_contact_links: List[Dict], |
| | email_deal_links: List[Dict], |
| | email_ticket_links: List[Dict], |
| | ) -> None: |
| | if email_metadata_data: |
| | batched_insert( |
| | supabase_client, |
| | "hubspot_emails", |
| | email_metadata_data, |
| | batch_size=1000, |
| | |
| | |
| | ) |
| | print(f"Upserted {len(email_metadata_data)} email metadata rows.") |
| |
|
| | if email_company_links: |
| | batched_insert( |
| | supabase_client, |
| | "hubspot_email_companies", |
| | email_company_links, |
| | batch_size=1000, |
| | on_conflict=["email_id", "company_id"], |
| | ) |
| | print(f"Upserted {len(email_company_links)} email-company links.") |
| |
|
| | if email_contact_links: |
| | batched_insert( |
| | supabase_client, |
| | "hubspot_email_contacts", |
| | email_contact_links, |
| | batch_size=1000, |
| | on_conflict=["email_id", "contact_id"], |
| | ) |
| | print(f"Upserted {len(email_contact_links)} email-contact links.") |
| |
|
| | if email_deal_links: |
| | batched_insert( |
| | supabase_client, |
| | "hubspot_email_deals", |
| | email_deal_links, |
| | batch_size=1000, |
| | on_conflict=["email_id", "deal_id"], |
| | ) |
| | print(f"Upserted {len(email_deal_links)} email-deal links.") |
| |
|
| | if email_ticket_links: |
| | batched_insert( |
| | supabase_client, |
| | "hubspot_email_tickets", |
| | email_ticket_links, |
| | batch_size=1000, |
| | on_conflict=["email_id", "ticket_id"], |
| | ) |
| | print(f"Upserted {len(email_ticket_links)} email-ticket links.") |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def main(since_ms: Optional[int] = None): |
| | """ |
| | Orchestrates: |
| | 1) Search email IDs with hs_timestamp > since_ms |
| | 2) Read full emails with associations (track max timestamp) |
| | 3) Upsert into Supabase |
| | 4) Update sync metadata with both last_sync_metadata and last_sync_time |
| | """ |
| | |
| | if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: |
| | try: |
| | since_ms = int(BOOTSTRAP_SINCE_MS_ENV) |
| | except ValueError: |
| | raise RuntimeError( |
| | "HUBSPOT_EMAILS_SINCE_MS must be an integer (ms) if set.") |
| |
|
| | if since_ms is None: |
| | |
| | today0 = floor_to_utc_midnight( |
| | datetime.datetime.now(datetime.timezone.utc)) |
| | since_ms = to_epoch_ms(today0) |
| |
|
| | print(f"Searching emails with hs_timestamp > {since_ms} ...") |
| | ids = search_email_ids_after_ms(since_ms) |
| | print(f"Found {len(ids)} email IDs.") |
| |
|
| | if not ids: |
| | print("No emails beyond the cursor. Updating sync metadata and exiting.") |
| | |
| | now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| | update_sync_metadata(supabase_client, "emails", now_iso) |
| | return |
| |
|
| | print("Reading emails (with associations)...") |
| | ( |
| | email_metadata_data, email_company_links, email_contact_links, |
| | email_deal_links, email_ticket_links, max_ts_ms |
| | ) = read_emails_by_ids(ids) |
| |
|
| | print("Upserting into Supabase...") |
| | upsert_emails( |
| | email_metadata_data, |
| | email_company_links, |
| | email_contact_links, |
| | email_deal_links, |
| | email_ticket_links, |
| | ) |
| |
|
| | |
| | new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms |
| | now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| |
|
| | update_sync_metadata(supabase_client, "emails", now_iso) |
| |
|
| | print(f"Emails sync complete. Advanced cursor to {new_cursor_ms}.") |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _parse_cli_arg_to_ms(arg: str) -> int: |
| | """ |
| | Accept: |
| | - integer epoch ms |
| | - ISO-8601 (Z or offset) |
| | - YYYY-MM-DD (floors to 00:00Z for convenience/back-compat) |
| | """ |
| | |
| | if re.fullmatch(r"\d{10,13}", arg): |
| | v = int(arg) |
| | if v < 10_000_000_000_000: |
| | v *= 1000 |
| | return v |
| |
|
| | |
| | if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): |
| | d = datetime.datetime.strptime( |
| | arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) |
| | return to_epoch_ms(floor_to_utc_midnight(d)) |
| |
|
| | |
| | return to_epoch_ms(arg) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import sys |
| | if len(sys.argv) > 1: |
| | try: |
| | since = _parse_cli_arg_to_ms(sys.argv[1]) |
| | except Exception as e: |
| | print( |
| | f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" |
| | ) |
| | sys.exit(1) |
| | main(since_ms=since) |
| | else: |
| | main() |
| |
|