niwayandm commited on
Commit
82f122d
·
1 Parent(s): a2f1dfd

Fix region mapping and add line items

Browse files
python/hubspot_companies.py CHANGED
@@ -353,8 +353,7 @@ def map_company_data_for_db(companies: List[Dict]) -> List[Dict]:
353
  "industry": c.get("industry"),
354
  "macro_industry_grouping": c.get("macro_industry_grouping"),
355
  "closest_review_date": parse_ts(c.get("closest_review_date___clone")),
356
- "region": c.get(c.get("region")),
357
- "closest_review_date": parse_ts(c.get("closest_review_date___clone")),
358
  "source_1": c.get("source_1"),
359
  "source_2": c.get("source_2"),
360
  "record_source": c.get("hs_object_source_label"),
@@ -497,10 +496,12 @@ if __name__ == "__main__":
497
  import sys
498
  if len(sys.argv) > 1:
499
  try:
500
- main(since_ms=_parse_cli_arg_to_ms(sys.argv[1]))
501
  except Exception as e:
502
  print(
503
- f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}")
 
504
  sys.exit(1)
 
505
  else:
506
  main()
 
353
  "industry": c.get("industry"),
354
  "macro_industry_grouping": c.get("macro_industry_grouping"),
355
  "closest_review_date": parse_ts(c.get("closest_review_date___clone")),
356
+ "region": c.get("region"),
 
357
  "source_1": c.get("source_1"),
358
  "source_2": c.get("source_2"),
359
  "record_source": c.get("hs_object_source_label"),
 
496
  import sys
497
  if len(sys.argv) > 1:
498
  try:
499
+ since = _parse_cli_arg_to_ms(sys.argv[1])
500
  except Exception as e:
501
  print(
502
+ f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}"
503
+ )
504
  sys.exit(1)
505
+ main(since_ms=since)
506
  else:
507
  main()
python/hubspot_lineitems.py ADDED
@@ -0,0 +1,394 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ HubSpot Line Items → Supabase (incremental since a millisecond cursor)
3
+
4
+ Usage from orchestrator:
5
+ import hubspot_lineitems
6
+ hubspot_lineitems.main(since_ms=<int milliseconds since epoch UTC>)
7
+
8
+ Direct CLI:
9
+ # epoch ms
10
+ python hubspot_lineitems.py 1754025600000
11
+ # ISO-8601
12
+ python hubspot_lineitems.py 2025-08-01T09:30:00Z
13
+ # Back-compat date (floors to 00:00Z)
14
+ python hubspot_lineitems.py 2025-08-01
15
+ """
16
+ import os
17
+ import re
18
+ import time
19
+ import logging
20
+ import datetime
21
+ from typing import List, Dict, Optional, Tuple, Union
22
+
23
+ import httpx
24
+ import hubspot
25
+ from dotenv import load_dotenv
26
+ from supabase import create_client
27
+ from hubspot.crm.line_items import ApiException as LineItemsApiException
28
+
29
+ from hubspot_utils import (
30
+ parse_ts, try_parse_int, try_parse_float, deduplicate_by_key
31
+ )
32
+ from supabase_utils import (
33
+ insert_into_supabase_table, update_sync_metadata
34
+ )
35
+
36
+ # -----------------------------------------------------------------------------
37
+ # Constants
38
+ # -----------------------------------------------------------------------------
39
+ LINEITEM_PROPERTIES = [
40
+ "hs_product_id",
41
+ "hs_object_id",
42
+ "name",
43
+ "description",
44
+ "quantity",
45
+ "hs_term_in_months",
46
+ "recurringbillingfrequency",
47
+ "price",
48
+ "hs_cost_of_goods_sold",
49
+ "discount",
50
+ "hs_margin",
51
+ "hs_margin_tcv",
52
+ "amount",
53
+ "network_carrier",
54
+ "supplier",
55
+ "pipeline",
56
+ "hs_product_type",
57
+ "deal_type",
58
+ "cnx_type",
59
+ "createdate",
60
+ "hs_lastmodifieddate",
61
+ ]
62
+
63
+ # -----------------------------------------------------------------------------
64
+ # Logging
65
+ # -----------------------------------------------------------------------------
66
+ logging.basicConfig(
67
+ filename=f"logs/hubspot_lineitems_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
68
+ filemode="a",
69
+ level=logging.INFO,
70
+ format="%(asctime)s [%(levelname)s] %(message)s",
71
+ )
72
+
73
+ # -----------------------------------------------------------------------------
74
+ # Environment
75
+ # -----------------------------------------------------------------------------
76
+ load_dotenv()
77
+ HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
78
+ SUPABASE_URL = os.getenv("SUPABASE_URL")
79
+ SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
80
+ # Optional bootstrap cursor if orchestrator doesn't provide one
81
+ BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_LINEITEMS_SINCE_MS")
82
+
83
+ if not HUBSPOT_TOKEN:
84
+ raise RuntimeError("HUBSPOT_TOKEN is not set")
85
+ if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
86
+ raise RuntimeError("Supabase env vars are not set")
87
+
88
+ hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN)
89
+ supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
90
+
91
+ # -----------------------------------------------------------------------------
92
+ # Time helpers
93
+ # -----------------------------------------------------------------------------
94
+ def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
95
+ if dt.tzinfo is None:
96
+ dt = dt.replace(tzinfo=datetime.timezone.utc)
97
+ return dt.astimezone(datetime.timezone.utc)
98
+
99
+ def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
100
+ dt = _ensure_utc(dt)
101
+ return dt.replace(hour=0, minute=0, second=0, microsecond=0)
102
+
103
+ def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
104
+ if isinstance(value, str) and value.endswith("Z"):
105
+ value = value[:-1] + "+00:00"
106
+ dt = datetime.datetime.fromisoformat(value)
107
+ return _ensure_utc(dt)
108
+
109
+ def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int:
110
+ if isinstance(dt_or_str, str):
111
+ dt = _parse_iso_like_to_dt(dt_or_str)
112
+ elif isinstance(dt_or_str, datetime.datetime):
113
+ dt = _ensure_utc(dt_or_str)
114
+ else:
115
+ raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
116
+ return int(dt.timestamp() * 1000)
117
+
118
+ def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]:
119
+ """
120
+ Accepts ms-epoch / sec-epoch / ISO-8601; returns ms since epoch or None.
121
+ """
122
+ if value is None:
123
+ return None
124
+ try:
125
+ v = int(str(value))
126
+ if v < 10_000_000_000_000: # seconds → ms
127
+ v *= 1000
128
+ return v
129
+ except ValueError:
130
+ pass
131
+ try:
132
+ return to_epoch_ms(str(value))
133
+ except Exception:
134
+ logging.warning("Could not parse timestamp value=%r", value)
135
+ return None
136
+
137
+ # -----------------------------------------------------------------------------
138
+ # Search IDs (ts > since_ms) with property fallback
139
+ # -----------------------------------------------------------------------------
140
+ def _search_lineitem_ids_from(since_ms: int, prop: str) -> List[str]:
141
+ """
142
+ Search line ite,s where {prop} > since_ms (epoch-ms).
143
+ Sort ascending so we can advance the cursor monotonically.
144
+ """
145
+ url = "https://api.hubapi.com/crm/v3/objects/line_items/search"
146
+ headers = {
147
+ "Authorization": f"Bearer {HUBSPOT_TOKEN}",
148
+ "Content-Type": "application/json",
149
+ "Accept": "application/json",
150
+ }
151
+ payload = {
152
+ "filterGroups": [{
153
+ "filters": [
154
+ {"propertyName": prop, "operator": "GT", "value": str(since_ms)},
155
+ ]
156
+ }],
157
+ "limit": 100,
158
+ "sorts": [{"propertyName": prop, "direction": "ASCENDING"}],
159
+ }
160
+
161
+ ids: List[str] = []
162
+ after: Optional[str] = None
163
+ with httpx.Client(timeout=30.0) as client:
164
+ while True:
165
+ body = dict(payload)
166
+ if after:
167
+ body["after"] = after
168
+
169
+ resp = client.post(url, headers=headers, json=body)
170
+ if resp.status_code >= 400:
171
+ try:
172
+ logging.error("Line item search error for prop '%s': %s", prop, resp.json())
173
+ except Exception:
174
+ logging.error("Line item search error for prop '%s': %s", prop, resp.text)
175
+ resp.raise_for_status()
176
+
177
+ data = resp.json()
178
+ ids.extend([obj["id"] for obj in data.get("results", []) or []])
179
+
180
+ after = (data.get("paging") or {}).get("next", {}).get("after")
181
+ if not after:
182
+ break
183
+ time.sleep(0.1)
184
+
185
+ return ids
186
+
187
+ def search_lineitem_ids_after_ms(since_ms: int) -> Tuple[List[str], str]:
188
+ """
189
+ Try these properties in order; return (ids, prop_used) for the first successful search:
190
+ 1) hs_lastmodifieddate
191
+ 2) lastmodifieddate
192
+ 3) createdate
193
+ 4) hs_createdate
194
+ """
195
+ props_to_try = ["createdate", "hs_createdate"]
196
+ last_err = None
197
+
198
+ for prop in props_to_try:
199
+ try:
200
+ ids = _search_lineitem_ids_from(since_ms, prop)
201
+ logging.info("Line itemsearch with '%s' returned %d IDs.", prop, len(ids))
202
+ return ids, prop
203
+ except httpx.HTTPStatusError as e:
204
+ last_err = e
205
+ continue
206
+
207
+ if last_err:
208
+ raise last_err
209
+ return [], "hs_lastmodifieddate"
210
+
211
+ def read_deals_by_ids(
212
+ lineitem_ids: List[str],
213
+ cursor_prop: str,
214
+ ) -> Tuple[List[Dict], List[Dict], Optional[int]]:
215
+ """
216
+ Read line items by ID including deal associations.
217
+ Returns: (all_lineitems, deal_lineitem_links, max_ts_ms_for_cursor_prop)
218
+ """
219
+ if not lineitem_ids:
220
+ return [], [], None
221
+
222
+ all_lineitems: List[Dict] = []
223
+ deal_lineitem_links: List[Dict] = []
224
+
225
+ assoc_types = ["deals"]
226
+
227
+ max_ts_ms: Optional[int] = None
228
+
229
+ for i, did in enumerate(lineitem_ids, start=1):
230
+ try:
231
+ record = hubspot_client.crm.line_items.basic_api.get_by_id(
232
+ line_item_id=did, properties=LINEITEM_PROPERTIES, associations=assoc_types, archived=False
233
+ )
234
+ p = record.properties or {}
235
+
236
+ # Track max timestamp for the cursor property we used in the search
237
+ cursor_val = p.get(cursor_prop)
238
+ ts_ms = parse_any_ts_ms(cursor_val)
239
+ if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms):
240
+ max_ts_ms = ts_ms
241
+
242
+ # Deals
243
+ if getattr(record, "associations", None) and record.associations.get("deals"):
244
+ bucket = record.associations["deals"]
245
+ if getattr(bucket, "results", None):
246
+ for a in bucket.results:
247
+ if a.id and a.id.isdigit():
248
+ deal_lineitem_links.append({
249
+ "deal_id": try_parse_int(record.id),
250
+ "lineitem_id": try_parse_int(a.id),
251
+ })
252
+
253
+ # Created date: accept either createdate or hs_createdate
254
+ created_iso = p.get("createdate") or p.get("hs_createdate")
255
+
256
+ all_lineitems.append({
257
+ "product_id": try_parse_int(p.get("hs_product_id")),
258
+ "lineitem_id": try_parse_int(record.id),
259
+ "name": p.get("name"),
260
+ "description": p.get("description"),
261
+ "quantity": try_parse_int(p.get("quantity")),
262
+ "term_in_months": try_parse_int(p.get("hs_term_in_months")),
263
+ "billing_frequency": p.get("recurringbillingfrequency"),
264
+ "price": try_parse_float(p.get("price")),
265
+ "unit_cost": try_parse_float(p.get("hs_cost_of_goods_sold")),
266
+ "discount": try_parse_float(p.get("discount")),
267
+ "margin": try_parse_float(p.get("hs_margin")),
268
+ "total_contract_value_margin": try_parse_float(
269
+ p.get("hs_margin_tcv")
270
+ ),
271
+ "net_price": try_parse_float(p.get("amount")),
272
+ "network_carrier": p.get("network_carrier"),
273
+ "supplier": p.get("supplier"),
274
+ "pipeline": p.get("pipeline"),
275
+ "product_type": p.get("hs_product_type"),
276
+ "deal_type": p.get("deal_type"),
277
+ "cnx_type": p.get("cnx_type"),
278
+ "hubspot_create_date": parse_ts(created_iso),
279
+ "hubspot_modified_date": parse_ts(
280
+ p.get("hs_lastmodifieddate")
281
+ ),
282
+ })
283
+
284
+ if i % 200 == 0:
285
+ logging.info("Read %d deals...", i)
286
+ time.sleep(0.05)
287
+
288
+ except httpx.HTTPStatusError as e:
289
+ logging.error("HTTP error reading line item %s: %s", did, e)
290
+ except (LineItemsApiException, httpx.HTTPError) as e:
291
+ logging.error("Error reading line item %s: %s", did, e)
292
+
293
+ return all_lineitems, deal_lineitem_links, max_ts_ms
294
+
295
+ # -----------------------------------------------------------------------------
296
+ # Upsert
297
+ # -----------------------------------------------------------------------------
298
+ def upsert_lineitems(lineitems: List[Dict], deal_lineitem_links: List[Dict]) -> None:
299
+ if lineitems:
300
+ insert_into_supabase_table(supabase_client, "hubspot_lineitems", lineitems)
301
+ print(f"Upserted {len(lineitems)} line items.")
302
+
303
+ if deal_lineitem_links:
304
+ deal_lineitem_links = deduplicate_by_key(deal_lineitem_links, key=("deal_id", "lineitem_id"))
305
+ insert_into_supabase_table(
306
+ supabase_client,
307
+ "hubspot_deal_lineitems",
308
+ deal_lineitem_links,
309
+ on_conflict=["deal_id", "lineitem_id"],
310
+ )
311
+ print(f"Upserted {len(deal_lineitem_links)} deal-lineitem associations.")
312
+
313
+ # -----------------------------------------------------------------------------
314
+ # Main (timestamp cursor)
315
+ # -----------------------------------------------------------------------------
316
+ def main(since_ms: Optional[int] = None):
317
+ """
318
+ Orchestrates:
319
+ 1) Search line item IDs with <cursor_prop> > since_ms (property fallback)
320
+ 2) Read full line items with associations (track max timestamp for <cursor_prop>)
321
+ 3) Upsert into Supabase
322
+ 4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop }
323
+ """
324
+ # Resolve since_ms
325
+ if since_ms is None and BOOTSTRAP_SINCE_MS_ENV:
326
+ try:
327
+ since_ms = int(BOOTSTRAP_SINCE_MS_ENV)
328
+ except ValueError:
329
+ raise RuntimeError("HUBSPOT_LINEITEMS_SINCE_MS must be an integer (ms) if set.")
330
+
331
+ if since_ms is None:
332
+ # Default: today@00:00:00Z for first run
333
+ today0 = floor_to_utc_midnight(datetime.datetime.now(datetime.timezone.utc))
334
+ since_ms = to_epoch_ms(today0)
335
+
336
+ print(f"Searching line items with timestamp > {since_ms} ...")
337
+ ids, cursor_prop = search_lineitem_ids_after_ms(since_ms)
338
+ print(f"Search property: {cursor_prop}. Found {len(ids)} line item IDs.")
339
+
340
+ now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
341
+
342
+ if not ids:
343
+ print("No line items beyond the cursor. Updating sync metadata and exiting.")
344
+ update_sync_metadata(supabase_client, "hubspot_lineitems", now_iso)
345
+ return
346
+
347
+ print("Reading line items (with associations)...")
348
+ deals, deal_lineitem_links, max_ts_ms = read_deals_by_ids(ids, cursor_prop)
349
+
350
+ print("Upserting into Supabase...")
351
+ upsert_lineitems(deals, deal_lineitem_links)
352
+
353
+ # Advance cursor to max timestamp we actually ingested for the chosen property
354
+ new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms
355
+
356
+ update_sync_metadata(supabase_client, "hubspot_lineitems", now_iso)
357
+
358
+ print(f"Line items sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.")
359
+
360
+ # -----------------------------------------------------------------------------
361
+ # CLI
362
+ # -----------------------------------------------------------------------------
363
+ def _parse_cli_arg_to_ms(arg: str) -> int:
364
+ """
365
+ Accept:
366
+ - integer epoch ms
367
+ - ISO-8601 (Z or offset)
368
+ - YYYY-MM-DD (floors to 00:00Z)
369
+ """
370
+ # epoch ms or seconds
371
+ if re.fullmatch(r"\d{10,13}", arg):
372
+ v = int(arg)
373
+ if v < 10_000_000_000_000: # seconds -> ms
374
+ v *= 1000
375
+ return v
376
+
377
+ # YYYY-MM-DD
378
+ if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
379
+ d = datetime.datetime.strptime(arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
380
+ return to_epoch_ms(floor_to_utc_midnight(d))
381
+
382
+ # ISO-8601
383
+ return to_epoch_ms(arg)
384
+
385
+ if __name__ == "__main__":
386
+ import sys
387
+ if len(sys.argv) > 1:
388
+ try:
389
+ main(since_ms=_parse_cli_arg_to_ms(sys.argv[1]))
390
+ except Exception as e:
391
+ print(f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}")
392
+ sys.exit(1)
393
+ else:
394
+ main()