""" HubSpot Tickets → Supabase (incremental since a millisecond cursor) Usage from orchestrator: import hubspot_tickets hubspot_tickets.main(since_ms=) Direct CLI: # epoch ms python hubspot_tickets.py 1754025600000 # ISO-8601 python hubspot_tickets.py 2025-08-01T09:30:00Z # Back-compat date (floors to 00:00Z) python hubspot_tickets.py 2025-08-01 """ import os import re import time import logging import datetime from typing import List, Dict, Optional, Tuple, Union import httpx import hubspot from dotenv import load_dotenv from supabase import create_client from hubspot.crm.tickets import ApiException as TicketsApiException from hubspot_utils import ( parse_ts, try_parse_int, deduplicate_by_key, ) from supabase_utils import ( batched_insert, update_sync_metadata, ) # ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logging.basicConfig( filename=f"logs/hubspot_tickets_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log", filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # ----------------------------------------------------------------------------- # Environment # ----------------------------------------------------------------------------- load_dotenv() HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Optional bootstrap cursor if orchestrator doesn't provide one BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_TICKETS_SINCE_MS") if not HUBSPOT_TOKEN: raise RuntimeError("HUBSPOT_TOKEN is not set") if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY: raise RuntimeError("Supabase env vars are not set") hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN) supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) # ----------------------------------------------------------------------------- # Config # ----------------------------------------------------------------------------- # Custom object association (CLI) CLI_ASSOC_TYPE_ID = "2-25629083" CLI_ASSOC_FALLBACK_KEY = "p8600202_cli_s" TICKET_PROPERTIES = [ "closed_date", "content", "ticket_type", "createdate", "hs_created_by_user_id", "hs_lastmodifieddate", "hs_object_id", "hs_pipeline", "hs_pipeline_stage", "hs_ticket_priority", "subject", "hubspot_owner_id", "source_type", "hs_object_source_label", "hs_object_source_detail_1", ] # ----------------------------------------------------------------------------- # Time helpers # ----------------------------------------------------------------------------- def _ensure_utc(dt: datetime.datetime) -> datetime.datetime: if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return dt.astimezone(datetime.timezone.utc) def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime: dt = _ensure_utc(dt) return dt.replace(hour=0, minute=0, second=0, microsecond=0) def _parse_iso_like_to_dt(value: str) -> datetime.datetime: if value.endswith("Z"): value = value[:-1] + "+00:00" dt = datetime.datetime.fromisoformat(value) return _ensure_utc(dt) def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int: if isinstance(dt_or_str, str): dt = _parse_iso_like_to_dt(dt_or_str) elif isinstance(dt_or_str, datetime.datetime): dt = _ensure_utc(dt_or_str) else: raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}") return int(dt.timestamp() * 1000) def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]: """ Accepts: - ms-epoch as str/int/float - seconds-epoch as str/int/float (auto *1000) - ISO-8601 string Returns ms since epoch or None. """ if value is None: return None # try int first try: v = int(str(value)) if v < 10_000_000_000: # seconds → ms v *= 1000 return v except ValueError: pass # try ISO try: return to_epoch_ms(str(value)) except Exception: logging.warning("Could not parse timestamp value=%r", value) return None # ----------------------------------------------------------------------------- # Pipeline & stage mappings # ----------------------------------------------------------------------------- def get_pipeline_and_stage_mappings() -> Tuple[Dict[str, str], Dict[str, str]]: try: resp = hubspot_client.crm.pipelines.pipelines_api.get_all( object_type="tickets") pipeline_mapping: Dict[str, str] = {} stage_mapping: Dict[str, str] = {} for p in resp.results: pipeline_mapping[p.id] = p.label for s in p.stages: stage_mapping[s.id] = s.label return pipeline_mapping, stage_mapping except Exception as e: logging.error("Failed to fetch pipeline/stage mappings: %s", e) return {}, {} # ----------------------------------------------------------------------------- # Search IDs (ts > since_ms) with property fallback # ----------------------------------------------------------------------------- def _search_ticket_ids_from(since_ms: int, prop: str) -> List[str]: """ Search tickets where {prop} > since_ms. Sort ascending so we can advance cursor monotonically. """ url = "https://api.hubapi.com/crm/v3/objects/tickets/search" headers = { "Authorization": f"Bearer {HUBSPOT_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", } payload = { "filterGroups": [{ "filters": [ {"propertyName": prop, "operator": "GT", "value": str(since_ms)}, ] }], "limit": 100, "sorts": [{"propertyName": prop, "direction": "ASCENDING"}], } ids: List[str] = [] after: Optional[str] = None with httpx.Client(timeout=30.0) as client: while True: body = dict(payload) if after: body["after"] = after resp = client.post(url, headers=headers, json=body) if resp.status_code >= 400: try: logging.error( "Ticket search error for prop '%s': %s", prop, resp.json()) except Exception: logging.error( "Ticket search error for prop '%s': %s", prop, resp.text) resp.raise_for_status() data = resp.json() ids.extend([obj["id"] for obj in data.get("results", []) or []]) after = (data.get("paging") or {}).get("next", {}).get("after") if not after: break time.sleep(0.1) return ids def search_ticket_ids_after_ms(since_ms: int) -> Tuple[List[str], str]: """ Search ticket IDs where {prop} > since_ms (strictly greater), sorted ASC so the max timestamp at the end is monotonic. Returns a tuple of (ticket_ids, prop_used). """ props_to_try = ["createdate"] last_err = None for prop in props_to_try: try: ids = _search_ticket_ids_from(since_ms, prop) logging.info( "Ticket search with '%s' returned %d IDs.", prop, len(ids)) return ids, prop except httpx.HTTPStatusError as e: last_err = e continue if last_err: raise last_err return [], "createdate" # ----------------------------------------------------------------------------- # Read-by-ID (with associations) # ----------------------------------------------------------------------------- def read_tickets_by_ids( ticket_ids: List[str], cursor_prop: str, ) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], Optional[int]]: """ Read tickets by ID with properties and associations (CLI/deals/contacts). Returns: tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms_for_cursor_prop """ if not ticket_ids: return [], [], [], [], None tickets: List[Dict] = [] ticket_cli_links: List[Dict] = [] ticket_deal_links: List[Dict] = [] ticket_contact_links: List[Dict] = [] assoc_types = [CLI_ASSOC_TYPE_ID, "deals", "contacts"] pipeline_map, stage_map = get_pipeline_and_stage_mappings() max_ts_ms: Optional[int] = None for i, tid in enumerate(ticket_ids, start=1): try: record = hubspot_client.crm.tickets.basic_api.get_by_id( tid, properties=TICKET_PROPERTIES, associations=assoc_types, archived=False ) p = record.properties or {} # Track max timestamp based on cursor_prop we searched on cursor_val = p.get(cursor_prop) ts_ms = parse_any_ts_ms(cursor_val) if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms): max_ts_ms = ts_ms tickets.append({ "ticket_id": try_parse_int(record.id), "subject": p.get("subject"), "content": p.get("content"), "ticket_type": p.get("ticket_type"), "closed_date": parse_ts(p.get("closed_date")), "hubspot_modified_date": parse_ts(p.get("hs_lastmodifieddate")), "pipeline_id": try_parse_int(p.get("hs_pipeline")), "pipeline_label": pipeline_map.get(p.get("hs_pipeline"), ""), "ticket_status_id": try_parse_int(p.get("hs_pipeline_stage")), "ticket_status_label": stage_map.get(p.get("hs_pipeline_stage"), ""), "ticket_priority": p.get("hs_ticket_priority"), "source": p.get("source_type"), "record_source": p.get("hs_object_source_label"), "record_source_detail_1": p.get("hs_object_source_detail_1"), "hubspot_owner_id": try_parse_int(p.get("hubspot_owner_id")), "hubspot_created_at": parse_ts(p.get("createdate")), "hubspot_created_by": try_parse_int(p.get("hs_created_by_user_id")), }) # Associations assoc = record.associations or {} # CLI (custom object) key may be numeric ID or the name key cli_bucket = None if assoc.get(CLI_ASSOC_TYPE_ID): cli_bucket = assoc[CLI_ASSOC_TYPE_ID] elif assoc.get(CLI_ASSOC_FALLBACK_KEY): cli_bucket = assoc[CLI_ASSOC_FALLBACK_KEY] if cli_bucket and getattr(cli_bucket, "results", None): for a in cli_bucket.results: if a.id and a.id.isdigit(): ticket_cli_links.append({ "ticket_id": try_parse_int(record.id), "cli_id": try_parse_int(a.id), }) if assoc.get("deals") and getattr(assoc["deals"], "results", None): for a in assoc["deals"].results: if a.id and a.id.isdigit(): ticket_deal_links.append({ "ticket_id": try_parse_int(record.id), "deal_id": try_parse_int(a.id), }) if assoc.get("contacts") and getattr(assoc["contacts"], "results", None): for a in assoc["contacts"].results: if a.id and a.id.isdigit(): ticket_contact_links.append({ "ticket_id": try_parse_int(record.id), "contact_id": try_parse_int(a.id), }) if i % 200 == 0: logging.info("Read %d tickets...", i) time.sleep(0.05) except httpx.HTTPStatusError as e: logging.error("HTTP error reading ticket %s: %s", tid, e) except (TicketsApiException, httpx.HTTPError) as e: logging.error("Error reading ticket %s: %s", tid, e) return tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms # ----------------------------------------------------------------------------- # Upsert # ----------------------------------------------------------------------------- def upsert_tickets( tickets: List[Dict], ticket_cli_links: List[Dict], ticket_deal_links: List[Dict], ticket_contact_links: List[Dict], ) -> None: if tickets: tickets = deduplicate_by_key(tickets, key="ticket_id") batched_insert( supabase_client, "hubspot_tickets", tickets, batch_size=1000, on_conflict=["ticket_id"] ) print(f"Upserted {len(tickets)} tickets.") if ticket_cli_links: ticket_cli_links = deduplicate_by_key( ticket_cli_links, key=("ticket_id", "cli_id")) batched_insert( supabase_client, "hubspot_ticket_clis", ticket_cli_links, batch_size=1000, on_conflict=["ticket_id", "cli_id"] ) print(f"Upserted {len(ticket_cli_links)} ticket-cli associations.") if ticket_deal_links: ticket_deal_links = deduplicate_by_key( ticket_deal_links, key=("ticket_id", "deal_id")) batched_insert( supabase_client, "hubspot_ticket_deals", ticket_deal_links, batch_size=1000, on_conflict=["ticket_id", "deal_id"] ) print(f"Upserted {len(ticket_deal_links)} ticket-deal associations.") if ticket_contact_links: ticket_contact_links = deduplicate_by_key( ticket_contact_links, key=("ticket_id", "contact_id")) batched_insert( supabase_client, "hubspot_ticket_contacts", ticket_contact_links, batch_size=1000, on_conflict=["ticket_id", "contact_id"] ) print( f"Upserted {len(ticket_contact_links)} ticket-contact associations.") # ----------------------------------------------------------------------------- # Main (timestamp cursor) # ----------------------------------------------------------------------------- def main(since_ms: Optional[int] = None): """ Orchestrates: 1) Search ticket IDs with > since_ms (property fallback) 2) Read full tickets with associations (track max timestamp for ) 3) Upsert into Supabase 4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop } """ # Resolve since_ms if since_ms is None and BOOTSTRAP_SINCE_MS_ENV: try: since_ms = int(BOOTSTRAP_SINCE_MS_ENV) except ValueError: raise RuntimeError( "HUBSPOT_TICKETS_SINCE_MS must be an integer (ms) if set.") if since_ms is None: # Default: today@00:00:00Z for first run today0 = floor_to_utc_midnight( datetime.datetime.now(datetime.timezone.utc)) since_ms = to_epoch_ms(today0) print(f"Searching tickets with timestamp > {since_ms} ...") ids, cursor_prop = search_ticket_ids_after_ms(since_ms) print(f"Search property: {cursor_prop}. Found {len(ids)} ticket IDs.") now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() if not ids: print("No tickets beyond the cursor. Updating sync metadata and exiting.") # Only record last_sync_time (updated_at handled by helper) update_sync_metadata(supabase_client, "hubspot_tickets", now_iso) return print("Reading tickets (with associations)...") tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links, max_ts_ms = read_tickets_by_ids( ids, cursor_prop) print("Upserting into Supabase...") upsert_tickets(tickets, ticket_cli_links, ticket_deal_links, ticket_contact_links) # Advance cursor to max timestamp we actually ingested for the chosen property new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms # Record last_sync_time only (updated_at handled by helper) update_sync_metadata(supabase_client, "hubspot_tickets", now_iso) print( f"Tickets sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.") # ----------------------------------------------------------------------------- # CLI # ----------------------------------------------------------------------------- def _parse_cli_arg_to_ms(arg: str) -> int: """ Accept: - integer epoch ms - ISO-8601 (Z or offset) - YYYY-MM-DD (floors to 00:00Z) """ # epoch ms or seconds if re.fullmatch(r"\d{10,13}", arg): v = int(arg) if v < 10_000_000_000_000: # seconds -> ms v *= 1000 return v # YYYY-MM-DD if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg): d = datetime.datetime.strptime( arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc) return to_epoch_ms(floor_to_utc_midnight(d)) # ISO-8601 return to_epoch_ms(arg) if __name__ == "__main__": import sys if len(sys.argv) > 1: try: since = _parse_cli_arg_to_ms(sys.argv[1]) except Exception as e: print( f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}" ) sys.exit(1) main(since_ms=since) else: main()