Spaces:
Paused
Paused
| """ | |
| HubSpot Emails → Supabase (incremental since a millisecond cursor) | |
| Usage from orchestrator: | |
| import hubspot_emails | |
| hubspot_emails.main(since_ms=<int milliseconds since epoch UTC>) | |
| Direct CLI: | |
| # epoch ms | |
| python hubspot_emails.py 1754025600000 | |
| # ISO-8601 (Z or offset) | |
| python hubspot_emails.py 2025-08-01T09:30:00Z | |
| # Back-compat date (floors to 00:00:00Z) | |
| python hubspot_emails.py 2025-08-01 | |
| """ | |
| import os | |
| import time | |
| import logging | |
| import datetime | |
| import re | |
| from typing import List, Dict, Optional, Tuple, Union | |
| import httpx | |
| import hubspot | |
| from dotenv import load_dotenv | |
| from supabase import create_client | |
| from hubspot.crm.objects.emails import ApiException as EmailApiException | |
| from hubspot_utils import ( | |
| clean_text, | |
| ) | |
| from supabase_utils import ( | |
| batched_insert, update_sync_metadata, | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Logging | |
| # ----------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| filename=f"logs/hubspot_email_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", | |
| filemode="a", | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Environment | |
| # ----------------------------------------------------------------------------- | |
| load_dotenv() | |
| HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") | |
| # Optional bootstrap cursor if orchestrator doesn't provide one | |
| BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_EMAILS_SINCE_MS") | |
| if not HUBSPOT_TOKEN: | |
| raise RuntimeError("HUBSPOT_TOKEN is not set") | |
| if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: | |
| raise RuntimeError("Supabase env vars are not set") | |
| hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) | |
| supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) | |
| # ----------------------------------------------------------------------------- | |
| # Config | |
| # ----------------------------------------------------------------------------- | |
| EMAIL_PROPERTIES = [ | |
| "hs_email_from_email", | |
| "hs_email_to_email", | |
| "hs_email_cc_email", | |
| "hs_email_subject", | |
| "hs_email_direction", | |
| "hs_email_text", | |
| "hs_timestamp", | |
| ] | |
| # ----------------------------------------------------------------------------- | |
| # Email parsing | |
| # ----------------------------------------------------------------------------- | |
| EMAIL_RE = re.compile(r'[\w\.\+\-]+@[\w\.\-]+\.\w+') | |
| SUBJECT_PREFIX_RE = re.compile(r'^(re:|fw:|fwd:)\s*', re.IGNORECASE) | |
| def parse_emails(raw: Optional[object]) -> List[str]: | |
| if raw is None: | |
| return [] | |
| candidates: List[str] = [] | |
| if isinstance(raw, list): | |
| for item in raw or []: | |
| if not item: | |
| continue | |
| if isinstance(item, str): | |
| for chunk in re.split(r'[;,]', item): | |
| candidates.extend(EMAIL_RE.findall(chunk)) | |
| elif isinstance(raw, str): | |
| for chunk in re.split(r'[;,]', raw): | |
| candidates.extend(EMAIL_RE.findall(chunk)) | |
| else: | |
| candidates.extend(EMAIL_RE.findall(str(raw))) | |
| return sorted({c.strip().lower() for c in candidates if c and c.strip()}) | |
| def normalize_subject(raw: Optional[str]) -> Optional[str]: | |
| if not raw: | |
| return None | |
| s = raw.strip().lower() | |
| # remove multiple prefixes | |
| while True: | |
| new_s = SUBJECT_PREFIX_RE.sub("", s) | |
| if new_s == s: | |
| break | |
| s = new_s.strip() | |
| # collapse whitespace | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s or None | |
| # ----------------------------------------------------------------------------- | |
| # Time helpers | |
| # ----------------------------------------------------------------------------- | |
| def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=datetime.timezone.utc) | |
| return dt.astimezone(datetime.timezone.utc) | |
| def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: | |
| dt = _ensure_utc(dt) | |
| return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
| def _parse_iso_like_to_dt(value: str) -> datetime.datetime: | |
| if value.endswith("Z"): | |
| value = value[:-1] + "+00:00" | |
| dt = datetime.datetime.fromisoformat(value) | |
| return _ensure_utc(dt) | |
| def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: | |
| if isinstance(dt_or_str, str): | |
| dt = _parse_iso_like_to_dt(dt_or_str) | |
| elif isinstance(dt_or_str, datetime.datetime): | |
| dt = _ensure_utc(dt_or_str) | |
| else: | |
| raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") | |
| return int(dt.timestamp() * 1000) | |
| def parse_hs_timestamp_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: | |
| """ | |
| HubSpot CRM datetime properties often come back as strings. | |
| Accepts: | |
| - ms epoch as str/int/float | |
| - ISO-8601 strings (rare for CRM objects, but robust to it) | |
| Returns ms since epoch or None. | |
| """ | |
| if value is None: | |
| return None | |
| # Easy path: integer ms | |
| try: | |
| v = int(str(value)) | |
| # treat values too small as seconds and up-convert, just in case | |
| if v < 10_000_000_000: # < ~2001-09 in seconds | |
| v = v * 1000 | |
| return v | |
| except ValueError: | |
| pass | |
| # Fallback: ISO | |
| try: | |
| return to_epoch_ms(str(value)) | |
| except Exception: | |
| logging.warning("Could not parse hs_timestamp=%r", value) | |
| return None | |
| # ----------------------------------------------------------------------------- | |
| # Search IDs (ts > since_ms) | |
| # ----------------------------------------------------------------------------- | |
| def search_email_ids_after_ms(since_ms: int) -> List[str]: | |
| """ | |
| Find email IDs where hs_timestamp > since_ms (strictly greater), | |
| sorted ASC so the max timestamp at the end is monotonic. | |
| """ | |
| url = "https://api.hubapi.com/crm/v3/objects/emails/search" | |
| headers = { | |
| "Authorization": f"Bearer {HUBSPOT_TOKEN}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| payload = { | |
| "filterGroups": [{ | |
| "filters": [ | |
| {"propertyName": "hs_timestamp", | |
| "operator": "GT", "value": str(since_ms)} | |
| ] | |
| }], | |
| "limit": 100, | |
| "sorts": [{"propertyName": "hs_timestamp", "direction": "ASCENDING"}], | |
| # We only need IDs here; we’ll read full records later | |
| } | |
| ids: List[str] = [] | |
| after: Optional[str] = None | |
| with httpx.Client(timeout=30.0) as client: | |
| while True: | |
| body = dict(payload) | |
| if after: | |
| body["after"] = after | |
| resp = client.post(url, headers=headers, json=body) | |
| if resp.status_code >= 400: | |
| try: | |
| logging.error("Email search error: %s", resp.json()) | |
| except Exception: | |
| logging.error("Email search error: %s", resp.text) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| ids.extend([obj["id"] for obj in data.get("results", []) or []]) | |
| after = (data.get("paging") or {}).get("next", {}).get("after") | |
| if not after: | |
| break | |
| time.sleep(0.1) | |
| logging.info("Found %d email IDs after %d.", len(ids), since_ms) | |
| return ids | |
| # ----------------------------------------------------------------------------- | |
| # Read-by-ID (with associations) | |
| # ----------------------------------------------------------------------------- | |
| def read_emails_by_ids( | |
| email_ids: List[str] | |
| ) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]: | |
| """ | |
| Read emails by ID with properties and associations (companies/contacts/deals). | |
| Returns data lists + max hs_timestamp (ms) seen. | |
| """ | |
| if not email_ids: | |
| return [], [], [], [], None | |
| email_metadata_data: List[Dict] = [] | |
| email_company_links: List[Dict] = [] | |
| email_contact_links: List[Dict] = [] | |
| email_deal_links: List[Dict] = [] | |
| email_ticket_links: List[Dict] = [] | |
| assoc_types = ["companies", "contacts", "deals", "tickets"] | |
| max_ts_ms: Optional[int] = None | |
| for i, email_id in enumerate(email_ids, start=1): | |
| try: | |
| record = hubspot_client.crm.objects.emails.basic_api.get_by_id( | |
| email_id, properties=EMAIL_PROPERTIES, associations=assoc_types, archived=False | |
| ) | |
| props = record.properties or {} | |
| cleaned = clean_text(props.get("hs_email_text") or "") | |
| # Robust timestamp handling | |
| hs_ts_ms = parse_hs_timestamp_ms(props.get("hs_timestamp")) | |
| if hs_ts_ms is not None: | |
| if (max_ts_ms is None) or (hs_ts_ms > max_ts_ms): | |
| max_ts_ms = hs_ts_ms | |
| sent_at_iso = datetime.datetime.fromtimestamp( | |
| hs_ts_ms / 1000, tz=datetime.timezone.utc).isoformat() | |
| else: | |
| sent_at_iso = None | |
| email_metadata_data.append({ | |
| "email_id": record.id, | |
| "subject": props.get("hs_email_subject"), | |
| "normalized_subject": normalize_subject(props.get("hs_email_subject")), | |
| "from_email": props.get("hs_email_from_email") or "", | |
| "to_emails": parse_emails(props.get("hs_email_to_email")), | |
| "cc_emails": parse_emails(props.get("hs_email_cc_email")), | |
| "sent_at": sent_at_iso, | |
| "direction": props.get("hs_email_direction"), | |
| "email_content": cleaned, | |
| }) | |
| assoc = record.associations or {} | |
| if assoc.get("companies") and getattr(assoc["companies"], "results", None): | |
| for a in assoc["companies"].results: | |
| if a.id and a.id.isdigit(): | |
| email_company_links.append( | |
| {"email_id": record.id, "company_id": int(a.id)}) | |
| if assoc.get("contacts") and getattr(assoc["contacts"], "results", None): | |
| for a in assoc["contacts"].results: | |
| if a.id and a.id.isdigit(): | |
| email_contact_links.append( | |
| {"email_id": record.id, "contact_id": int(a.id)}) | |
| if assoc.get("deals") and getattr(assoc["deals"], "results", None): | |
| for a in assoc["deals"].results: | |
| if a.id and a.id.isdigit(): | |
| email_deal_links.append( | |
| {"email_id": record.id, "deal_id": int(a.id)}) | |
| if assoc.get("tickets") and getattr(assoc["tickets"], "results", None): | |
| for a in assoc["tickets"].results: | |
| if a.id and a.id.isdigit(): | |
| email_ticket_links.append( | |
| {"email_id": record.id, "ticket_id": int(a.id)}) | |
| if i % 200 == 0: | |
| logging.info("Read %d emails...", i) | |
| time.sleep(0.05) | |
| except httpx.HTTPStatusError as e: | |
| logging.error("HTTP error reading email %s: %s", email_id, e) | |
| except (EmailApiException, httpx.HTTPError) as e: | |
| logging.error("Error reading email %s: %s", email_id, e) | |
| return ( | |
| email_metadata_data, email_company_links, email_contact_links, | |
| email_deal_links, email_ticket_links, max_ts_ms | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Upsert | |
| # ----------------------------------------------------------------------------- | |
| def upsert_emails( | |
| email_metadata_data: List[Dict], | |
| email_company_links: List[Dict], | |
| email_contact_links: List[Dict], | |
| email_deal_links: List[Dict], | |
| email_ticket_links: List[Dict], | |
| ) -> None: | |
| if email_metadata_data: | |
| batched_insert( | |
| supabase_client, | |
| "hubspot_emails", | |
| email_metadata_data, | |
| batch_size=1000, | |
| # If your helper supports on_conflict: | |
| # on_conflict=["email_id"] | |
| ) | |
| print(f"Upserted {len(email_metadata_data)} email metadata rows.") | |
| if email_company_links: | |
| batched_insert( | |
| supabase_client, | |
| "hubspot_email_companies", | |
| email_company_links, | |
| batch_size=1000, | |
| on_conflict=["email_id", "company_id"], | |
| ) | |
| print(f"Upserted {len(email_company_links)} email-company links.") | |
| if email_contact_links: | |
| batched_insert( | |
| supabase_client, | |
| "hubspot_email_contacts", | |
| email_contact_links, | |
| batch_size=1000, | |
| on_conflict=["email_id", "contact_id"], | |
| ) | |
| print(f"Upserted {len(email_contact_links)} email-contact links.") | |
| if email_deal_links: | |
| batched_insert( | |
| supabase_client, | |
| "hubspot_email_deals", | |
| email_deal_links, | |
| batch_size=1000, | |
| on_conflict=["email_id", "deal_id"], | |
| ) | |
| print(f"Upserted {len(email_deal_links)} email-deal links.") | |
| if email_ticket_links: | |
| batched_insert( | |
| supabase_client, | |
| "hubspot_email_tickets", | |
| email_ticket_links, | |
| batch_size=1000, | |
| on_conflict=["email_id", "ticket_id"], | |
| ) | |
| print(f"Upserted {len(email_ticket_links)} email-ticket links.") | |
| # ----------------------------------------------------------------------------- | |
| # Main (timestamp cursor) | |
| # ----------------------------------------------------------------------------- | |
| def main(since_ms: Optional[int] = None): | |
| """ | |
| Orchestrates: | |
| 1) Search email IDs with hs_timestamp > since_ms | |
| 2) Read full emails with associations (track max timestamp) | |
| 3) Upsert into Supabase | |
| 4) Update sync metadata with both last_sync_metadata and last_sync_time | |
| """ | |
| # Resolve since_ms | |
| if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: | |
| try: | |
| since_ms = int(BOOTSTRAP_SINCE_MS_ENV) | |
| except ValueError: | |
| raise RuntimeError( | |
| "HUBSPOT_EMAILS_SINCE_MS must be an integer (ms) if set.") | |
| if since_ms is None: | |
| # Default: today@00:00:00Z for first run | |
| today0 = floor_to_utc_midnight( | |
| datetime.datetime.now(datetime.timezone.utc)) | |
| since_ms = to_epoch_ms(today0) | |
| print(f"Searching emails with hs_timestamp > {since_ms} ...") | |
| ids = search_email_ids_after_ms(since_ms) | |
| print(f"Found {len(ids)} email IDs.") | |
| if not ids: | |
| print("No emails beyond the cursor. Updating sync metadata and exiting.") | |
| # Record only last_sync_time; helper sets updated_at | |
| now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| update_sync_metadata(supabase_client, "emails", now_iso) | |
| return | |
| print("Reading emails (with associations)...") | |
| ( | |
| email_metadata_data, email_company_links, email_contact_links, | |
| email_deal_links, email_ticket_links, max_ts_ms | |
| ) = read_emails_by_ids(ids) | |
| print("Upserting into Supabase...") | |
| upsert_emails( | |
| email_metadata_data, | |
| email_company_links, | |
| email_contact_links, | |
| email_deal_links, | |
| email_ticket_links, | |
| ) | |
| # Advance cursor to max timestamp we actually ingested | |
| new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms | |
| now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() | |
| update_sync_metadata(supabase_client, "emails", now_iso) | |
| print(f"Emails sync complete. Advanced cursor to {new_cursor_ms}.") | |
| # ----------------------------------------------------------------------------- | |
| # CLI | |
| # ----------------------------------------------------------------------------- | |
| def _parse_cli_arg_to_ms(arg: str) -> int: | |
| """ | |
| Accept: | |
| - integer epoch ms | |
| - ISO-8601 (Z or offset) | |
| - YYYY-MM-DD (floors to 00:00Z for convenience/back-compat) | |
| """ | |
| # epoch ms | |
| if re.fullmatch(r"\d{10,13}", arg): | |
| v = int(arg) | |
| if v < 10_000_000_000_000: # seconds -> ms | |
| v *= 1000 | |
| return v | |
| # YYYY-MM-DD | |
| if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): | |
| d = datetime.datetime.strptime( | |
| arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) | |
| return to_epoch_ms(floor_to_utc_midnight(d)) | |
| # ISO-8601 | |
| return to_epoch_ms(arg) | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| try: | |
| since = _parse_cli_arg_to_ms(sys.argv[1]) | |
| except Exception as e: | |
| print( | |
| f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" | |
| ) | |
| sys.exit(1) | |
| main(since_ms=since) | |
| else: | |
| main() | |