n8n-duplicate / python /hubspot_emails.py
niwayandm
Refactor timestamp handle in HubSpot scripts to improve error messaging on n8n
6c5bf90
"""
HubSpot Emails → Supabase (incremental since a millisecond cursor)
Usage from orchestrator:
import hubspot_emails
hubspot_emails.main(since_ms=<int milliseconds since epoch UTC>)
Direct CLI:
# epoch ms
python hubspot_emails.py 1754025600000
# ISO-8601 (Z or offset)
python hubspot_emails.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00:00Z)
python hubspot_emails.py 2025-08-01
"""
import os
import time
import logging
import datetime
import re
from typing import List, Dict, Optional, Tuple, Union
import httpx
import hubspot
from dotenv import load_dotenv
from supabase import create_client
from hubspot.crm.objects.emails import ApiException as EmailApiException
from hubspot_utils import (
clean_text,
)
from supabase_utils import (
batched_insert, update_sync_metadata,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logging.basicConfig(
filename=f"logs/hubspot_email_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# -----------------------------------------------------------------------------
# Environment
# -----------------------------------------------------------------------------
load_dotenv()
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
# Optional bootstrap cursor if orchestrator doesn't provide one
BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_EMAILS_SINCE_MS")
if not HUBSPOT_TOKEN:
raise RuntimeError("HUBSPOT_TOKEN is not set")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise RuntimeError("Supabase env vars are not set")
hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN)
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# -----------------------------------------------------------------------------
# Config
# -----------------------------------------------------------------------------
EMAIL_PROPERTIES = [
"hs_email_from_email",
"hs_email_to_email",
"hs_email_cc_email",
"hs_email_subject",
"hs_email_direction",
"hs_email_text",
"hs_timestamp",
]
# -----------------------------------------------------------------------------
# Email parsing
# -----------------------------------------------------------------------------
EMAIL_RE = re.compile(r'[\w\.\+\-]+@[\w\.\-]+\.\w+')
SUBJECT_PREFIX_RE = re.compile(r'^(re:|fw:|fwd:)\s*', re.IGNORECASE)
def parse_emails(raw: Optional[object]) -> List[str]:
if raw is None:
return []
candidates: List[str] = []
if isinstance(raw, list):
for item in raw or []:
if not item:
continue
if isinstance(item, str):
for chunk in re.split(r'[;,]', item):
candidates.extend(EMAIL_RE.findall(chunk))
elif isinstance(raw, str):
for chunk in re.split(r'[;,]', raw):
candidates.extend(EMAIL_RE.findall(chunk))
else:
candidates.extend(EMAIL_RE.findall(str(raw)))
return sorted({c.strip().lower() for c in candidates if c and c.strip()})
def normalize_subject(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
s = raw.strip().lower()
# remove multiple prefixes
while True:
new_s = SUBJECT_PREFIX_RE.sub("", s)
if new_s == s:
break
s = new_s.strip()
# collapse whitespace
s = re.sub(r"\s+", " ", s).strip()
return s or None
# -----------------------------------------------------------------------------
# Time helpers
# -----------------------------------------------------------------------------
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
def parse_hs_timestamp_ms(value: Optional[Union[str, int, float]]) -> Optional[int]:
"""
HubSpot CRM datetime properties often come back as strings.
Accepts:
- ms epoch as str/int/float
- ISO-8601 strings (rare for CRM objects, but robust to it)
Returns ms since epoch or None.
"""
if value is None:
return None
# Easy path: integer ms
try:
v = int(str(value))
# treat values too small as seconds and up-convert, just in case
if v < 10_000_000_000: # < ~2001-09 in seconds
v = v * 1000
return v
except ValueError:
pass
# Fallback: ISO
try:
return to_epoch_ms(str(value))
except Exception:
logging.warning("Could not parse hs_timestamp=%r", value)
return None
# -----------------------------------------------------------------------------
# Search IDs (ts > since_ms)
# -----------------------------------------------------------------------------
def search_email_ids_after_ms(since_ms: int) -> List[str]:
"""
Find email IDs where hs_timestamp > since_ms (strictly greater),
sorted ASC so the max timestamp at the end is monotonic.
"""
url = "https://api.hubapi.com/crm/v3/objects/emails/search"
headers = {
"Authorization": f"Bearer {HUBSPOT_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"filterGroups": [{
"filters": [
{"propertyName": "hs_timestamp",
"operator": "GT", "value": str(since_ms)}
]
}],
"limit": 100,
"sorts": [{"propertyName": "hs_timestamp", "direction": "ASCENDING"}],
# We only need IDs here; we’ll read full records later
}
ids: List[str] = []
after: Optional[str] = None
with httpx.Client(timeout=30.0) as client:
while True:
body = dict(payload)
if after:
body["after"] = after
resp = client.post(url, headers=headers, json=body)
if resp.status_code >= 400:
try:
logging.error("Email search error: %s", resp.json())
except Exception:
logging.error("Email search error: %s", resp.text)
resp.raise_for_status()
data = resp.json()
ids.extend([obj["id"] for obj in data.get("results", []) or []])
after = (data.get("paging") or {}).get("next", {}).get("after")
if not after:
break
time.sleep(0.1)
logging.info("Found %d email IDs after %d.", len(ids), since_ms)
return ids
# -----------------------------------------------------------------------------
# Read-by-ID (with associations)
# -----------------------------------------------------------------------------
def read_emails_by_ids(
email_ids: List[str]
) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]:
"""
Read emails by ID with properties and associations (companies/contacts/deals).
Returns data lists + max hs_timestamp (ms) seen.
"""
if not email_ids:
return [], [], [], [], None
email_metadata_data: List[Dict] = []
email_company_links: List[Dict] = []
email_contact_links: List[Dict] = []
email_deal_links: List[Dict] = []
email_ticket_links: List[Dict] = []
assoc_types = ["companies", "contacts", "deals", "tickets"]
max_ts_ms: Optional[int] = None
for i, email_id in enumerate(email_ids, start=1):
try:
record = hubspot_client.crm.objects.emails.basic_api.get_by_id(
email_id, properties=EMAIL_PROPERTIES, associations=assoc_types, archived=False
)
props = record.properties or {}
cleaned = clean_text(props.get("hs_email_text") or "")
# Robust timestamp handling
hs_ts_ms = parse_hs_timestamp_ms(props.get("hs_timestamp"))
if hs_ts_ms is not None:
if (max_ts_ms is None) or (hs_ts_ms > max_ts_ms):
max_ts_ms = hs_ts_ms
sent_at_iso = datetime.datetime.fromtimestamp(
hs_ts_ms / 1000, tz=datetime.timezone.utc).isoformat()
else:
sent_at_iso = None
email_metadata_data.append({
"email_id": record.id,
"subject": props.get("hs_email_subject"),
"normalized_subject": normalize_subject(props.get("hs_email_subject")),
"from_email": props.get("hs_email_from_email") or "",
"to_emails": parse_emails(props.get("hs_email_to_email")),
"cc_emails": parse_emails(props.get("hs_email_cc_email")),
"sent_at": sent_at_iso,
"direction": props.get("hs_email_direction"),
"email_content": cleaned,
})
assoc = record.associations or {}
if assoc.get("companies") and getattr(assoc["companies"], "results", None):
for a in assoc["companies"].results:
if a.id and a.id.isdigit():
email_company_links.append(
{"email_id": record.id, "company_id": int(a.id)})
if assoc.get("contacts") and getattr(assoc["contacts"], "results", None):
for a in assoc["contacts"].results:
if a.id and a.id.isdigit():
email_contact_links.append(
{"email_id": record.id, "contact_id": int(a.id)})
if assoc.get("deals") and getattr(assoc["deals"], "results", None):
for a in assoc["deals"].results:
if a.id and a.id.isdigit():
email_deal_links.append(
{"email_id": record.id, "deal_id": int(a.id)})
if assoc.get("tickets") and getattr(assoc["tickets"], "results", None):
for a in assoc["tickets"].results:
if a.id and a.id.isdigit():
email_ticket_links.append(
{"email_id": record.id, "ticket_id": int(a.id)})
if i % 200 == 0:
logging.info("Read %d emails...", i)
time.sleep(0.05)
except httpx.HTTPStatusError as e:
logging.error("HTTP error reading email %s: %s", email_id, e)
except (EmailApiException, httpx.HTTPError) as e:
logging.error("Error reading email %s: %s", email_id, e)
return (
email_metadata_data, email_company_links, email_contact_links,
email_deal_links, email_ticket_links, max_ts_ms
)
# -----------------------------------------------------------------------------
# Upsert
# -----------------------------------------------------------------------------
def upsert_emails(
email_metadata_data: List[Dict],
email_company_links: List[Dict],
email_contact_links: List[Dict],
email_deal_links: List[Dict],
email_ticket_links: List[Dict],
) -> None:
if email_metadata_data:
batched_insert(
supabase_client,
"hubspot_emails",
email_metadata_data,
batch_size=1000,
# If your helper supports on_conflict:
# on_conflict=["email_id"]
)
print(f"Upserted {len(email_metadata_data)} email metadata rows.")
if email_company_links:
batched_insert(
supabase_client,
"hubspot_email_companies",
email_company_links,
batch_size=1000,
on_conflict=["email_id", "company_id"],
)
print(f"Upserted {len(email_company_links)} email-company links.")
if email_contact_links:
batched_insert(
supabase_client,
"hubspot_email_contacts",
email_contact_links,
batch_size=1000,
on_conflict=["email_id", "contact_id"],
)
print(f"Upserted {len(email_contact_links)} email-contact links.")
if email_deal_links:
batched_insert(
supabase_client,
"hubspot_email_deals",
email_deal_links,
batch_size=1000,
on_conflict=["email_id", "deal_id"],
)
print(f"Upserted {len(email_deal_links)} email-deal links.")
if email_ticket_links:
batched_insert(
supabase_client,
"hubspot_email_tickets",
email_ticket_links,
batch_size=1000,
on_conflict=["email_id", "ticket_id"],
)
print(f"Upserted {len(email_ticket_links)} email-ticket links.")
# -----------------------------------------------------------------------------
# Main (timestamp cursor)
# -----------------------------------------------------------------------------
def main(since_ms: Optional[int] = None):
"""
Orchestrates:
1) Search email IDs with hs_timestamp > since_ms
2) Read full emails with associations (track max timestamp)
3) Upsert into Supabase
4) Update sync metadata with both last_sync_metadata and last_sync_time
"""
# Resolve since_ms
if since_ms is None and BOOTSTRAP_SINCE_MS_ENV:
try:
since_ms = int(BOOTSTRAP_SINCE_MS_ENV)
except ValueError:
raise RuntimeError(
"HUBSPOT_EMAILS_SINCE_MS must be an integer (ms) if set.")
if since_ms is None:
# Default: today@00:00:00Z for first run
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
since_ms = to_epoch_ms(today0)
print(f"Searching emails with hs_timestamp > {since_ms} ...")
ids = search_email_ids_after_ms(since_ms)
print(f"Found {len(ids)} email IDs.")
if not ids:
print("No emails beyond the cursor. Updating sync metadata and exiting.")
# Record only last_sync_time; helper sets updated_at
now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
update_sync_metadata(supabase_client, "emails", now_iso)
return
print("Reading emails (with associations)...")
(
email_metadata_data, email_company_links, email_contact_links,
email_deal_links, email_ticket_links, max_ts_ms
) = read_emails_by_ids(ids)
print("Upserting into Supabase...")
upsert_emails(
email_metadata_data,
email_company_links,
email_contact_links,
email_deal_links,
email_ticket_links,
)
# Advance cursor to max timestamp we actually ingested
new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms
now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
update_sync_metadata(supabase_client, "emails", now_iso)
print(f"Emails sync complete. Advanced cursor to {new_cursor_ms}.")
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def _parse_cli_arg_to_ms(arg: str) -> int:
"""
Accept:
- integer epoch ms
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z for convenience/back-compat)
"""
# epoch ms
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
return to_epoch_ms(arg)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
try:
since = _parse_cli_arg_to_ms(sys.argv[1])
except Exception as e:
print(
f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}"
)
sys.exit(1)
main(since_ms=since)
else:
main()