Legal-i commited on
Commit
33fe64e
·
verified ·
1 Parent(s): 66d6196

Stage 158 (α1): Stripe customer-health connector

Browse files
infra/ingestion/connectors.py CHANGED
@@ -177,10 +177,22 @@ def build_connector(connector_type: str, config: dict) -> Connector:
177
  api_version=config.get("api_version", "v60.0"),
178
  timeout=float(config.get("timeout", 30.0)),
179
  )
 
 
 
 
 
 
 
 
 
 
 
180
  raise ValueError(
181
  f"unknown / non-schedulable connector type {connector_type!r} "
182
  f"(schedulable: ['{CSVFolderConnector.type_name}', "
183
  f"'{HttpJsonConnector.type_name}', '{SFTPConnector.type_name}', "
184
  f"'{SQLConnector.type_name}', "
185
- f"'{SalesforceConnector.type_name}'])"
 
186
  )
 
177
  api_version=config.get("api_version", "v60.0"),
178
  timeout=float(config.get("timeout", 30.0)),
179
  )
180
+ # Stage 158 (α1) — Stripe customer-health connector.
181
+ from .stripe_connector import StripeConnector
182
+ if connector_type == StripeConnector.type_name:
183
+ # api_key OR api_key_env required; the constructor enforces.
184
+ return StripeConnector(
185
+ api_key=config.get("api_key"),
186
+ api_key_env=config.get("api_key_env"),
187
+ limit=int(config.get("limit", 100)),
188
+ timeout=float(config.get("timeout", 30.0)),
189
+ base_url=config.get("base_url", "https://api.stripe.com"),
190
+ )
191
  raise ValueError(
192
  f"unknown / non-schedulable connector type {connector_type!r} "
193
  f"(schedulable: ['{CSVFolderConnector.type_name}', "
194
  f"'{HttpJsonConnector.type_name}', '{SFTPConnector.type_name}', "
195
  f"'{SQLConnector.type_name}', "
196
+ f"'{SalesforceConnector.type_name}', "
197
+ f"'{StripeConnector.type_name}'])"
198
  )
infra/ingestion/stripe_connector.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ infra.ingestion.stripe_connector — pull observations from a Stripe
3
+ account via the REST API. Aimed at customer-health drift detection:
4
+ who's delinquent, who has no default payment method, who's been
5
+ around long enough that loss would hurt MRR.
6
+
7
+ Customer flow:
8
+ 1. In Stripe Dashboard → Developers → API keys, mint a restricted
9
+ key with read access to Customers + Subscriptions.
10
+ 2. Store the key in an env var on this deployment (recommended)
11
+ OR paste inline into connector_config.
12
+ 3. Schedule a daily ingest with entity_type="customer".
13
+ 4. Engine baselines per-customer signals over time, drift triggers
14
+ on customers whose health metrics decay.
15
+
16
+ Configuration:
17
+
18
+ api_key (optional) the Stripe secret key, inline.
19
+ Use ONLY for demos — secret ends up in the
20
+ schedule's connector_config column.
21
+ api_key_env (optional, recommended) env-var name whose
22
+ value is the Stripe key. ONE of api_key /
23
+ api_key_env required.
24
+ limit max customers per fetch (default 100, max 100
25
+ per Stripe page). v1: single page only.
26
+ timeout HTTP timeout in seconds (default 30).
27
+ base_url override for testing (default
28
+ "https://api.stripe.com").
29
+
30
+ v1 emits 3 metrics per customer, all derivable from /v1/customers
31
+ without needing a second round-trip per row:
32
+
33
+ delinquent 1.0 if customer.delinquent else 0.0 — a
34
+ Stripe-managed flag set when invoices fail.
35
+ Rising → churn risk.
36
+ has_default_payment 1.0 if invoice_settings.default_payment_method
37
+ is set else 0.0. Dropping → customer removed
38
+ their card, often a churn precursor.
39
+ days_since_created customer.created → days. Doesn't drift by
40
+ itself, but the engine uses it as a baseline
41
+ feature to weight other signals (old customer
42
+ going delinquent is worse than a new one).
43
+
44
+ v2 (future): aggregate /v1/subscriptions per customer for MRR /
45
+ status / cancel_at_period_end. Out of v1 scope.
46
+
47
+ Stdlib only — ``urllib.request``. All failures surface as
48
+ :class:`ConnectorFetchError` matching the HTTP connector's shape.
49
+ """
50
+ from __future__ import annotations
51
+
52
+ import json
53
+ import os
54
+ import time
55
+ from datetime import datetime, timezone
56
+ from typing import List, Optional
57
+ from urllib.error import HTTPError, URLError
58
+ from urllib.request import Request, urlopen
59
+
60
+ from core.pipeline import Observation
61
+
62
+ from .connectors import Connector
63
+ from .http_connector import ConnectorFetchError
64
+
65
+
66
+ class StripeConnector(Connector):
67
+ """Fetches per-customer observations from a Stripe account."""
68
+
69
+ type_name = "stripe"
70
+
71
+ def __init__(
72
+ self, *,
73
+ api_key: Optional[str] = None,
74
+ api_key_env: Optional[str] = None,
75
+ limit: int = 100,
76
+ timeout: float = 30.0,
77
+ base_url: str = "https://api.stripe.com",
78
+ ):
79
+ if not (api_key or api_key_env):
80
+ raise ValueError(
81
+ "stripe connector needs api_key OR api_key_env "
82
+ "(recommended). Set one of them in connector_config."
83
+ )
84
+ if limit < 1 or limit > 100:
85
+ raise ValueError(
86
+ "stripe connector limit must be in [1, 100] — Stripe's "
87
+ "max page size for /v1/customers."
88
+ )
89
+ self.api_key = api_key
90
+ self.api_key_env = api_key_env
91
+ self.limit = limit
92
+ self.timeout = timeout
93
+ self.base_url = base_url.rstrip("/")
94
+
95
+ # ---- contract methods -----------------------------------------
96
+
97
+ def fetch(self, entity_type: str) -> List[Observation]:
98
+ if entity_type != "customer":
99
+ # v1 supports only "customer" — return empty rather than
100
+ # raise, so a schedule misconfigured for a different
101
+ # entity_type fails cleanly downstream (zero observations
102
+ # = pipeline notes nothing changed) instead of breaking.
103
+ return []
104
+ key = self._resolve_key()
105
+ url = f"{self.base_url}/v1/customers?limit={self.limit}"
106
+ req = Request(url, headers={
107
+ "Authorization": f"Bearer {key}",
108
+ "Accept": "application/json",
109
+ })
110
+ try:
111
+ with urlopen(req, timeout=self.timeout) as resp:
112
+ body = resp.read()
113
+ except HTTPError as e:
114
+ code, msg = _classify_http_error(e)
115
+ raise ConnectorFetchError(
116
+ code,
117
+ f"Stripe {e.code} {e.reason}: {msg}",
118
+ cause=e,
119
+ ) from e
120
+ except URLError as e:
121
+ raise ConnectorFetchError(
122
+ "network",
123
+ f"could not reach Stripe at {self.base_url}: {e.reason}",
124
+ cause=e,
125
+ ) from e
126
+ try:
127
+ payload = json.loads(body)
128
+ except json.JSONDecodeError as e:
129
+ raise ConnectorFetchError(
130
+ "bad_payload", "Stripe returned non-JSON body", cause=e,
131
+ ) from e
132
+ return self._customers_to_observations(payload)
133
+
134
+ def entity_types(self) -> List[str]:
135
+ return ["customer"]
136
+
137
+ # ---- internals ------------------------------------------------
138
+
139
+ def _resolve_key(self) -> str:
140
+ if self.api_key:
141
+ return self.api_key
142
+ key = os.environ.get(self.api_key_env or "")
143
+ if not key:
144
+ raise ConnectorFetchError(
145
+ "missing_secret",
146
+ f"env var {self.api_key_env!r} (api_key_env) is unset "
147
+ "or empty — set it on the deployment or pass api_key "
148
+ "inline in connector_config",
149
+ )
150
+ return key
151
+
152
+ def _customers_to_observations(self, payload: dict) -> List[Observation]:
153
+ if not isinstance(payload, dict) or "data" not in payload:
154
+ raise ConnectorFetchError(
155
+ "bad_payload",
156
+ "Stripe response missing 'data' — body keys: "
157
+ f"{list(payload.keys()) if isinstance(payload, dict) else type(payload).__name__}",
158
+ )
159
+ today_iso = datetime.now(timezone.utc).date().isoformat()
160
+ now_unix = time.time()
161
+ out: List[Observation] = []
162
+ for c in payload["data"]:
163
+ if not isinstance(c, dict):
164
+ continue
165
+ cid = c.get("id")
166
+ if not cid:
167
+ continue
168
+ created = c.get("created") or 0
169
+ days_since = max(0.0, (now_unix - float(created)) / 86400.0) if created else 0.0
170
+ default_pm = (
171
+ (c.get("invoice_settings") or {}).get("default_payment_method")
172
+ )
173
+ out.append(Observation(
174
+ entity_id=str(cid),
175
+ day=today_iso,
176
+ values={
177
+ "delinquent":
178
+ 1.0 if c.get("delinquent") else 0.0,
179
+ "has_default_payment":
180
+ 1.0 if default_pm else 0.0,
181
+ "days_since_created": days_since,
182
+ },
183
+ ))
184
+ if payload.get("has_more"):
185
+ import sys as _sys
186
+ print(
187
+ f"warning: Stripe customers list has more than {self.limit} "
188
+ f"records (has_more=true). v1 reads a single page; tighten "
189
+ "the customer filter or wait for v2 pagination.",
190
+ file=_sys.stderr,
191
+ )
192
+ return out
193
+
194
+
195
+ def _classify_http_error(e: HTTPError) -> tuple:
196
+ """Map a Stripe HTTP error to a stable (code, hint) tuple.
197
+
198
+ Stripe uses standard HTTP codes + an error object in the body
199
+ (``{"error": {"type": "...", "message": "..."}}``).
200
+ """
201
+ try:
202
+ body = e.read().decode("utf-8", errors="replace")
203
+ except Exception:
204
+ body = ""
205
+ snippet = body[:300] if body else ""
206
+ if e.code == 401:
207
+ return ("invalid_credentials",
208
+ f"Stripe rejected the API key — verify the key is "
209
+ f"correct and has read scope on Customers. Body: {snippet}")
210
+ if e.code == 403:
211
+ return ("forbidden",
212
+ f"Stripe key lacks scope for /v1/customers — restricted "
213
+ f"keys must include the customers:read permission: "
214
+ f"{snippet}")
215
+ if e.code == 429:
216
+ return ("rate_limit",
217
+ f"Stripe rate limit hit — schedule a less-frequent "
218
+ f"ingest or stagger calls. Body: {snippet}")
219
+ if 500 <= e.code < 600:
220
+ return ("upstream",
221
+ f"Stripe returned 5xx ({snippet})")
222
+ return ("http_error", snippet)