n8n-duplicate / python /hubspot_tickets.py
niwayandm
Refactor timestamp handle in HubSpot scripts to improve error messaging on n8n
6c5bf90
"""
HubSpot Tickets → Supabase (incremental since a millisecond cursor)
Usage from orchestrator:
import hubspot_tickets
hubspot_tickets.main(since_ms=<int milliseconds since epoch UTC>)
Direct CLI:
# epoch ms
python hubspot_tickets.py 1754025600000
# ISO-8601
python hubspot_tickets.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00Z)
python hubspot_tickets.py 2025-08-01
"""
import os
import re
import time
import logging
import datetime
from typing import List, Dict, Optional, Tuple, Union
import httpx
import hubspot
from dotenv import load_dotenv
from supabase import create_client
from hubspot.crm.tickets import ApiException as TicketsApiException
from hubspot_utils import (
parse_ts, try_parse_int, deduplicate_by_key,
)
from supabase_utils import (
batched_insert, update_sync_metadata,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logging.basicConfig(
filename=f"logs/hubspot_tickets_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# -----------------------------------------------------------------------------
# Environment
# -----------------------------------------------------------------------------
load_dotenv()
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
# Optional bootstrap cursor if orchestrator doesn't provide one
BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_TICKETS_SINCE_MS")
if not HUBSPOT_TOKEN:
raise RuntimeError("HUBSPOT_TOKEN is not set")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise RuntimeError("Supabase env vars are not set")
hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN)
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# -----------------------------------------------------------------------------
# Config
# -----------------------------------------------------------------------------
# Custom object association (CLI)
CLI_ASSOC_TYPE_ID = "2-25629083"
CLI_ASSOC_FALLBACK_KEY = "p8600202_cli_s"
TICKET_PROPERTIES = [
"closed_date",
"content",
"ticket_type",
"createdate",
"hs_created_by_user_id",
"hs_lastmodifieddate",
"hs_object_id",
"hs_pipeline",
"hs_pipeline_stage",
"hs_ticket_priority",
"subject",
"hubspot_owner_id",
"source_type",
"hs_object_source_label",
"hs_object_source_detail_1",
]
# -----------------------------------------------------------------------------
# Time helpers
# -----------------------------------------------------------------------------
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]:
"""
Accepts:
- ms-epoch as str/int/float
- seconds-epoch as str/int/float (auto *1000)
- ISO-8601 string
Returns ms since epoch or None.
"""
if value is None:
return None
# try int first
try:
v = int(str(value))
if v < 10_000_000_000: # seconds → ms
v *= 1000
return v
except ValueError:
pass
# try ISO
try:
return to_epoch_ms(str(value))
except Exception:
logging.warning("Could not parse timestamp value=%r", value)
return None
# -----------------------------------------------------------------------------
# Pipeline & stage mappings
# -----------------------------------------------------------------------------
def get_pipeline_and_stage_mappings() -> Tuple[Dict[str, str], Dict[str, str]]:
try:
resp = hubspot_client.crm.pipelines.pipelines_api.get_all(
object_type="tickets")
pipeline_mapping: Dict[str, str] = {}
stage_mapping: Dict[str, str] = {}
for p in resp.results:
pipeline_mapping[p.id] = p.label
for s in p.stages:
stage_mapping[s.id] = s.label
return pipeline_mapping, stage_mapping
except Exception as e:
logging.error("Failed to fetch pipeline/stage mappings: %s", e)
return {}, {}
# -----------------------------------------------------------------------------
# Search IDs (ts > since_ms) with property fallback
# -----------------------------------------------------------------------------
def _search_ticket_ids_from(since_ms: int, prop: str) -> List[str]:
"""
Search tickets where {prop} > since_ms.
Sort ascending so we can advance cursor monotonically.
"""
url = "https://api.hubapi.com/crm/v3/objects/tickets/search"
headers = {
"Authorization": f"Bearer {HUBSPOT_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"filterGroups": [{
"filters": [
{"propertyName": prop, "operator": "GT",
"value": str(since_ms)},
]
}],
"limit": 100,
"sorts": [{"propertyName": prop, "direction": "ASCENDING"}],
}
ids: List[str] = []
after: Optional[str] = None
with httpx.Client(timeout=30.0) as client:
while True:
body = dict(payload)
if after:
body["after"] = after
resp = client.post(url, headers=headers, json=body)
if resp.status_code >= 400:
try:
logging.error(
"Ticket search error for prop '%s': %s", prop, resp.json())
except Exception:
logging.error(
"Ticket search error for prop '%s': %s", prop, resp.text)
resp.raise_for_status()
data = resp.json()
ids.extend([obj["id"] for obj in data.get("results", []) or []])
after = (data.get("paging") or {}).get("next", {}).get("after")
if not after:
break
time.sleep(0.1)
return ids
def search_ticket_ids_after_ms(since_ms: int) -> Tuple[List[str], str]:
"""
Search ticket IDs where {prop} > since_ms (strictly greater),
sorted ASC so the max timestamp at the end is monotonic.
Returns a tuple of (ticket_ids, prop_used).
"""
props_to_try = ["createdate"]
last_err = None
for prop in props_to_try:
try:
ids = _search_ticket_ids_from(since_ms, prop)
logging.info(
"Ticket search with '%s' returned %d IDs.", prop, len(ids))
return ids, prop
except httpx.HTTPStatusError as e:
last_err = e
continue
if last_err:
raise last_err
return [], "createdate"
# -----------------------------------------------------------------------------
# Read-by-ID (with associations)
# -----------------------------------------------------------------------------
def read_tickets_by_ids(
ticket_ids: List[str],
cursor_prop: str,
) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]:
"""
Read tickets by ID with properties and associations (CLI/deals/contacts).
Returns: tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms_for_cursor_prop
"""
if not ticket_ids:
return [], [], [], [], None
tickets: List[Dict] = []
ticket_cli_links: List[Dict] = []
ticket_deal_links: List[Dict] = []
ticket_contact_links: List[Dict] = []
assoc_types = [CLI_ASSOC_TYPE_ID, "deals", "contacts"]
pipeline_map, stage_map = get_pipeline_and_stage_mappings()
max_ts_ms: Optional[int] = None
for i, tid in enumerate(ticket_ids, start=1):
try:
record = hubspot_client.crm.tickets.basic_api.get_by_id(
tid, properties=TICKET_PROPERTIES, associations=assoc_types, archived=False
)
p = record.properties or {}
# Track max timestamp based on cursor_prop we searched on
cursor_val = p.get(cursor_prop)
ts_ms = parse_any_ts_ms(cursor_val)
if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms):
max_ts_ms = ts_ms
tickets.append({
"ticket_id": try_parse_int(record.id),
"subject": p.get("subject"),
"content": p.get("content"),
"ticket_type": p.get("ticket_type"),
"closed_date": parse_ts(p.get("closed_date")),
"hubspot_modified_date": parse_ts(p.get("hs_lastmodifieddate")),
"pipeline_id": try_parse_int(p.get("hs_pipeline")),
"pipeline_label": pipeline_map.get(p.get("hs_pipeline"), ""),
"ticket_status_id": try_parse_int(p.get("hs_pipeline_stage")),
"ticket_status_label": stage_map.get(p.get("hs_pipeline_stage"), ""),
"ticket_priority": p.get("hs_ticket_priority"),
"source": p.get("source_type"),
"record_source": p.get("hs_object_source_label"),
"record_source_detail_1": p.get("hs_object_source_detail_1"),
"hubspot_owner_id": try_parse_int(p.get("hubspot_owner_id")),
"hubspot_created_at": parse_ts(p.get("createdate")),
"hubspot_created_by": try_parse_int(p.get("hs_created_by_user_id")),
})
# Associations
assoc = record.associations or {}
# CLI (custom object) key may be numeric ID or the name key
cli_bucket = None
if assoc.get(CLI_ASSOC_TYPE_ID):
cli_bucket = assoc[CLI_ASSOC_TYPE_ID]
elif assoc.get(CLI_ASSOC_FALLBACK_KEY):
cli_bucket = assoc[CLI_ASSOC_FALLBACK_KEY]
if cli_bucket and getattr(cli_bucket, "results", None):
for a in cli_bucket.results:
if a.id and a.id.isdigit():
ticket_cli_links.append({
"ticket_id": try_parse_int(record.id),
"cli_id": try_parse_int(a.id),
})
if assoc.get("deals") and getattr(assoc["deals"], "results", None):
for a in assoc["deals"].results:
if a.id and a.id.isdigit():
ticket_deal_links.append({
"ticket_id": try_parse_int(record.id),
"deal_id": try_parse_int(a.id),
})
if assoc.get("contacts") and getattr(assoc["contacts"], "results", None):
for a in assoc["contacts"].results:
if a.id and a.id.isdigit():
ticket_contact_links.append({
"ticket_id": try_parse_int(record.id),
"contact_id": try_parse_int(a.id),
})
if i % 200 == 0:
logging.info("Read %d tickets...", i)
time.sleep(0.05)
except httpx.HTTPStatusError as e:
logging.error("HTTP error reading ticket %s: %s", tid, e)
except (TicketsApiException, httpx.HTTPError) as e:
logging.error("Error reading ticket %s: %s", tid, e)
return tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms
# -----------------------------------------------------------------------------
# Upsert
# -----------------------------------------------------------------------------
def upsert_tickets(
tickets: List[Dict],
ticket_cli_links: List[Dict],
ticket_deal_links: List[Dict],
ticket_contact_links: List[Dict],
) -> None:
if tickets:
tickets = deduplicate_by_key(tickets, key="ticket_id")
batched_insert(
supabase_client, "hubspot_tickets", tickets,
batch_size=1000, on_conflict=["ticket_id"]
)
print(f"Upserted {len(tickets)} tickets.")
if ticket_cli_links:
ticket_cli_links = deduplicate_by_key(
ticket_cli_links, key=("ticket_id", "cli_id"))
batched_insert(
supabase_client, "hubspot_ticket_clis", ticket_cli_links,
batch_size=1000, on_conflict=["ticket_id", "cli_id"]
)
print(f"Upserted {len(ticket_cli_links)} ticket-cli associations.")
if ticket_deal_links:
ticket_deal_links = deduplicate_by_key(
ticket_deal_links, key=("ticket_id", "deal_id"))
batched_insert(
supabase_client, "hubspot_ticket_deals", ticket_deal_links,
batch_size=1000, on_conflict=["ticket_id", "deal_id"]
)
print(f"Upserted {len(ticket_deal_links)} ticket-deal associations.")
if ticket_contact_links:
ticket_contact_links = deduplicate_by_key(
ticket_contact_links, key=("ticket_id", "contact_id"))
batched_insert(
supabase_client, "hubspot_ticket_contacts", ticket_contact_links,
batch_size=1000, on_conflict=["ticket_id", "contact_id"]
)
print(
f"Upserted {len(ticket_contact_links)} ticket-contact associations.")
# -----------------------------------------------------------------------------
# Main (timestamp cursor)
# -----------------------------------------------------------------------------
def main(since_ms: Optional[int] = None):
"""
Orchestrates:
1) Search ticket IDs with <cursor_prop> > since_ms (property fallback)
2) Read full tickets with associations (track max timestamp for <cursor_prop>)
3) Upsert into Supabase
4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop }
"""
# Resolve since_ms
if since_ms is None and BOOTSTRAP_SINCE_MS_ENV:
try:
since_ms = int(BOOTSTRAP_SINCE_MS_ENV)
except ValueError:
raise RuntimeError(
"HUBSPOT_TICKETS_SINCE_MS must be an integer (ms) if set.")
if since_ms is None:
# Default: today@00:00:00Z for first run
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
since_ms = to_epoch_ms(today0)
print(f"Searching tickets with timestamp > {since_ms} ...")
ids, cursor_prop = search_ticket_ids_after_ms(since_ms)
print(f"Search property: {cursor_prop}. Found {len(ids)} ticket IDs.")
now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
if not ids:
print("No tickets beyond the cursor. Updating sync metadata and exiting.")
# Only record last_sync_time (updated_at handled by helper)
update_sync_metadata(supabase_client, "hubspot_tickets", now_iso)
return
print("Reading tickets (with associations)...")
tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms = read_tickets_by_ids(
ids, cursor_prop)
print("Upserting into Supabase...")
upsert_tickets(tickets, ticket_cli_links,
ticket_deal_links, ticket_contact_links)
# Advance cursor to max timestamp we actually ingested for the chosen property
new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms
# Record last_sync_time only (updated_at handled by helper)
update_sync_metadata(supabase_client, "hubspot_tickets", now_iso)
print(
f"Tickets sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.")
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def _parse_cli_arg_to_ms(arg: str) -> int:
"""
Accept:
- integer epoch ms
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z)
"""
# epoch ms or seconds
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
return to_epoch_ms(arg)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
try:
since = _parse_cli_arg_to_ms(sys.argv[1])
except Exception as e:
print(
f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}"
)
sys.exit(1)
main(since_ms=since)
else:
main()