rairo commited on
Commit
02e487d
·
verified ·
1 Parent(s): 931554c

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +97 -44
main.py CHANGED
@@ -113,62 +113,115 @@ logger.info(f"Chart export path set to: {user_defined_path}")
113
  # -----------------------------------------------------------------------------
114
  # Admin API client (client-supplied credentials; holistic admin scope)
115
  # -----------------------------------------------------------------------------
116
- SC_BASE = "https://delta-api.pricelyst.co.zw".rstrip("/")
117
- _TZ = "Africa/Harare"
 
 
 
 
118
 
119
  class SCAuth:
120
- """Caches tokens keyed by admin email."""
121
- _token_cache: Dict[str, Tuple[Optional[str], float]] = {} # email -> (token, expiry_ts)
122
 
123
  @classmethod
124
- def login(cls, email: str, password: str) -> str:
125
- url = f"{SC_BASE}/api/auth/admin/login"
126
  try:
127
- resp = requests.post(url, json={"email": email, "password": password}, timeout=20)
128
- if resp.status_code >= 400:
129
- raise RuntimeError(f"Login failed: {resp.status_code} {resp.text[:200]}")
130
- data = resp.json() or {}
131
- token = data.get("token")
132
- if not token:
133
- raise RuntimeError("Login did not return a token.")
134
- expiry = time.time() + 50 * 60 # ~50 minutes
135
- cls._token_cache[email] = (token, expiry)
136
- logger.info(f"Admin login OK for {email}.")
137
- return token
138
- except Exception as e:
139
- logger.error(f"Admin login error: {e}")
140
- raise
141
 
142
  @classmethod
143
- def ensure(cls, email: str, password: str) -> str:
144
- token, expiry = cls._token_cache.get(email, (None, 0))
145
- if not token or time.time() > expiry:
146
- token = cls.login(email, password)
147
- return token or ""
148
-
149
- def sc_request(method: str, path: str, email: str, password: str,
150
- params: dict | None = None, data: dict | None = None) -> dict:
151
- url = f"{SC_BASE}{path}"
152
- headers = {}
153
- try:
154
- token = SCAuth.ensure(email, password)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  if token:
156
- headers["Authorization"] = f"Bearer {token}"
157
- except Exception:
158
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  try:
160
- resp = requests.request(method, url, params=params, json=data, headers=headers, timeout=30)
161
- if resp.status_code == 401:
162
- # Retry once with fresh token
163
- token = SCAuth.login(email, password)
164
- headers["Authorization"] = f"Bearer {token}"
165
- resp = requests.request(method, url, params=params, json=data, headers=headers, timeout=30)
166
  resp.raise_for_status()
167
- return resp.json() if resp.content else {}
168
  except Exception as e:
169
- logger.error(f"SC request error {method} {path}: {e}")
170
- raise
 
171
 
 
 
 
 
 
172
  # -----------------------------------------------------------------------------
173
  # Temporal helpers
174
  # -----------------------------------------------------------------------------
 
113
  # -----------------------------------------------------------------------------
114
  # Admin API client (client-supplied credentials; holistic admin scope)
115
  # -----------------------------------------------------------------------------
116
+ # --- Auth + Request client (robust: bearer OR cookie session) ---
117
+
118
+ import requests
119
+ from typing import Dict, Optional
120
+
121
+ SC_BASE_URL = os.getenv("SC_BASE_URL", "https://delta-api.pricelyst.co.zw").rstrip("/")
122
 
123
  class SCAuth:
124
+ """Caches a requests.Session per admin email; supports bearer or cookie sessions."""
125
+ _cache: Dict[str, Dict[str, any]] = {}
126
 
127
  @classmethod
128
+ def invalidate(cls, email: str) -> None:
 
129
  try:
130
+ entry = cls._cache.pop(email, None)
131
+ if entry and isinstance(entry.get("session"), requests.Session):
132
+ entry["session"].close()
133
+ except Exception:
134
+ pass
 
 
 
 
 
 
 
 
 
135
 
136
  @classmethod
137
+ def _extract_token(cls, js: dict) -> Optional[str]:
138
+ if not isinstance(js, dict):
139
+ return None
140
+ # Try common token shapes
141
+ candidates = [
142
+ js.get("token"),
143
+ js.get("access_token"),
144
+ (js.get("data") or {}).get("token"),
145
+ (js.get("data") or {}).get("access_token"),
146
+ (js.get("authorization") or {}).get("token"),
147
+ (js.get("auth") or {}).get("token"),
148
+ ]
149
+ for t in candidates:
150
+ if isinstance(t, str) and t.strip():
151
+ return t.strip()
152
+ return None
153
+
154
+ @classmethod
155
+ def login(cls, email: str, password: str) -> Dict[str, any]:
156
+ s = requests.Session()
157
+ s.headers.update({"Accept": "application/json"})
158
+ url = f"{SC_BASE_URL}/api/auth/admin/login"
159
+ # IMPORTANT: many APIs expect JSON, not form data
160
+ resp = s.post(url, json={"email": email, "password": password}, timeout=30)
161
+
162
+ # Try JSON parse (may fail if empty body)
163
+ body_text = ""
164
+ body_json = {}
165
+ try:
166
+ body_json = resp.json() or {}
167
+ except Exception:
168
+ body_text = (resp.text or "")[:800] # keep it short for logs
169
+
170
+ token = cls._extract_token(body_json)
171
+
172
  if token:
173
+ s.headers.update({"Authorization": f"Bearer {token}"})
174
+ entry = {"session": s, "auth": "bearer", "token": token}
175
+ cls._cache[email] = entry
176
+ return entry
177
+
178
+ # If no token but we got Set-Cookie, assume cookie session auth
179
+ if resp.cookies and (resp.status_code // 100) == 2:
180
+ entry = {"session": s, "auth": "cookie"}
181
+ cls._cache[email] = entry
182
+ return entry
183
+
184
+ # Neither token nor cookie: raise, but include short body for diagnosis
185
+ snippet = body_text or (str(body_json)[:800])
186
+ raise RuntimeError(f"Login did not return a token or cookie session. HTTP {resp.status_code}. Body≈ {snippet}")
187
+
188
+ def sc_request(method: str, path: str, email: str, password: str, *,
189
+ params: dict = None, json_body: dict = None, timeout: int = 30):
190
+ """Authenticated request with 401 auto-refresh (once)."""
191
+ if not path.startswith("/"):
192
+ path = "/" + path
193
+ url = f"{SC_BASE_URL}{path}"
194
+
195
+ def _do(s: requests.Session):
196
+ return s.request(method.upper(), url, params=params, json=json_body, timeout=timeout)
197
+
198
+ # get or create session
199
+ entry = SCAuth._cache.get(email)
200
+ if not entry:
201
+ entry = SCAuth.login(email, password)
202
+ s = entry["session"]
203
+
204
+ resp = _do(s)
205
+ if resp.status_code == 401:
206
+ # refresh & retry once
207
+ SCAuth.invalidate(email)
208
+ entry = SCAuth.login(email, password)
209
+ s = entry["session"]
210
+ resp = _do(s)
211
+
212
+ # Raise for other errors
213
  try:
 
 
 
 
 
 
214
  resp.raise_for_status()
 
215
  except Exception as e:
216
+ # include small snippet to aid debugging
217
+ snippet = (getattr(resp, "text", "") or "")[:800]
218
+ raise RuntimeError(f"SC request error {method.upper()} {path}: HTTP {resp.status_code} – {snippet}") from e
219
 
220
+ # Return JSON or text as appropriate
221
+ try:
222
+ return resp.json()
223
+ except Exception:
224
+ return resp.text
225
  # -----------------------------------------------------------------------------
226
  # Temporal helpers
227
  # -----------------------------------------------------------------------------