Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from shiny import render, reactive, ui | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| import requests | |
| import base64 | |
| import xml.etree.ElementTree as ET | |
| from dotenv import load_dotenv | |
| import time | |
| import pandas as pd | |
| import uuid | |
| from typing import Optional | |
| load_dotenv() | |
| WALMART_CLIENT_ID = os.getenv('WALMART_CLIENT_ID') | |
| WALMART_CLIENT_SECRET = os.getenv('WALMART_CLIENT_SECRET') | |
| SERVICE_ACCOUNT_JSON = json.loads(os.getenv("SERVICE_ACCOUNT_JSON")) | |
| GOOGLE_FOLDER_ID = os.getenv("GOOGLE_DRIVE_FOLDER_ID") | |
| SCOPES = [ | |
| "https://www.googleapis.com/auth/drive.readonly", | |
| "https://www.googleapis.com/auth/spreadsheets.readonly" | |
| ] | |
| credentials = service_account.Credentials.from_service_account_info( | |
| SERVICE_ACCOUNT_JSON, scopes=SCOPES | |
| ) | |
| drive_service = build("drive", "v3", credentials=credentials) | |
| sheets_service = build("sheets", "v4", credentials=credentials) | |
| # ---------------- Walmart Auth ---------------- | |
| def get_walmart_token(client_id: str, client_secret: str) -> str: | |
| auth_str = f"{client_id}:{client_secret}" | |
| auth_b64 = base64.b64encode(auth_str.encode()).decode() | |
| headers = { | |
| "Authorization": f"Basic {auth_b64}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| "WM_SVC.NAME": "Walmart Marketplace" | |
| } | |
| payload = {"grant_type": "client_credentials"} | |
| url = "https://marketplace.walmartapis.com/v3/token" | |
| response = requests.post(url, headers=headers, data=payload) | |
| if response.status_code != 200: | |
| raise Exception(f"Token request failed: {response.status_code}\n{response.text}") | |
| # XML response | |
| root = ET.fromstring(response.text) | |
| token_elem = root.find("accessToken") | |
| if token_elem is None: | |
| raise Exception("accessToken not found in response.") | |
| return token_elem.text | |
| # ---------------- New ATS workflow ---------------- | |
| def fetch_all_ats(access_token: str, limit: int = 50) -> pd.DataFrame: | |
| """ | |
| Pull all SKUs and total available-to-sell (ATS) across nodes using /v3/inventories (nextCursor). | |
| Returns: DataFrame with ['sku','ats'] | |
| """ | |
| base_url = "https://marketplace.walmartapis.com/v3/inventories" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "WM_SEC.ACCESS_TOKEN": access_token, | |
| "WM_SVC.NAME": "Walmart Marketplace", | |
| "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), | |
| "Accept": "application/json", | |
| } | |
| rows = [] | |
| cursor: Optional[str] = None | |
| while True: | |
| params = {"limit": limit} | |
| if cursor: | |
| params["nextCursor"] = cursor | |
| r = requests.get(base_url, headers=headers, params=params) | |
| if r.status_code != 200: | |
| raise RuntimeError(f"❌ /inventories {r.status_code}: {r.text}") | |
| payload = r.json() | |
| inventories = (payload.get("elements") or {}).get("inventories", []) or [] | |
| cursor = (payload.get("meta") or {}).get("nextCursor") | |
| for inv in inventories: | |
| sku = inv.get("sku") | |
| nodes = inv.get("nodes") or [] | |
| ats = sum((n.get("availToSellQty", {}) or {}).get("amount", 0) or 0 for n in nodes) | |
| rows.append({"sku": sku, "ats": ats}) | |
| if not cursor: | |
| break | |
| return pd.DataFrame(rows, columns=["sku", "ats"]) | |
| def _extract_gtin_like(item: dict) -> Optional[str]: | |
| """ | |
| Prefer 'gtin', fallback to 'upc', then scan common identifier shapes. | |
| """ | |
| gtin = item.get("gtin") | |
| if gtin: | |
| return gtin | |
| upc = item.get("upc") | |
| if upc: | |
| return upc | |
| candidates = [] | |
| for key in ("productIdentifiers", "identifiers", "additionalProductAttributes", "attributes"): | |
| obj = item.get(key) | |
| if isinstance(obj, list): | |
| for e in obj: | |
| if not isinstance(e, dict): | |
| continue | |
| t = (e.get("productIdType") or e.get("type") or "").upper() | |
| v = e.get("productId") or e.get("id") or e.get("value") | |
| if v and t in {"GTIN", "UPC"}: | |
| candidates.append((t, v)) | |
| elif isinstance(obj, dict): | |
| for t, v in obj.items(): | |
| if isinstance(v, str) and t.upper() in {"GTIN", "UPC"}: | |
| candidates.append((t.upper(), v)) | |
| for t, v in candidates: | |
| if t == "GTIN": | |
| return v | |
| for t, v in candidates: | |
| if t == "UPC": | |
| return v | |
| return None | |
| def _get_total_items(access_token: str) -> int: | |
| """ | |
| Probe /v3/items to read total count from meta (meta.totalCount or totalItems). | |
| """ | |
| url = "https://marketplace.walmartapis.com/v3/items" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "WM_SEC.ACCESS_TOKEN": access_token, | |
| "WM_SVC.NAME": "Walmart Marketplace", | |
| "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), | |
| "Accept": "application/json", | |
| "WM_GLOBAL_VERSION": "3.1", | |
| "WM_MARKET": "us", | |
| } | |
| params = {"limit": 1} | |
| r = requests.get(url, headers=headers, params=params) | |
| if r.status_code != 200: | |
| raise RuntimeError(f"❌ /items (probe) {r.status_code}: {r.text}") | |
| data = r.json() | |
| meta = data.get("meta") or {} | |
| total = meta.get("totalCount") | |
| if total is None: | |
| total = data.get("totalItems") | |
| if total is None: | |
| total = len(data.get("ItemResponse", []) or []) | |
| return int(total) | |
| def fetch_all_items_with_gtin_cursor( | |
| access_token: str, | |
| limit: Optional[int] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Pull items via /v3/items with nextCursor. | |
| If limit is None, auto-sets to total items reported by meta. | |
| Returns: ['sku','gtin','productName'] | |
| """ | |
| url = "https://marketplace.walmartapis.com/v3/items" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "WM_SEC.ACCESS_TOKEN": access_token, | |
| "WM_SVC.NAME": "Walmart Marketplace", | |
| "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), | |
| "Accept": "application/json", | |
| "WM_GLOBAL_VERSION": "3.1", | |
| "WM_MARKET": "us", | |
| } | |
| if limit is None: | |
| try: | |
| limit = _get_total_items(access_token) | |
| except Exception as e: | |
| print(f"⚠️ Could not detect total items automatically: {e}. Falling back to 200.") | |
| limit = 200 | |
| base_params = {"limit": limit} | |
| recs = [] | |
| cursor: Optional[str] = None | |
| while True: | |
| q = dict(base_params) | |
| if cursor: | |
| q["nextCursor"] = cursor | |
| resp = requests.get(url, headers=headers, params=q) | |
| if resp.status_code != 200: | |
| # If large limit is rejected, back off and paginate. | |
| if q.get("limit", 0) > 500: | |
| print(f"ℹ️ Large limit={q['limit']} not accepted. Backing off to 200 with pagination.") | |
| base_params["limit"] = 200 | |
| continue | |
| raise RuntimeError(f"❌ /items {resp.status_code}: {resp.text}") | |
| data = resp.json() | |
| items = data.get("ItemResponse", []) or [] | |
| cursor = (data.get("meta") or {}).get("nextCursor") | |
| for it in items: | |
| recs.append({ | |
| "sku": it.get("sku"), | |
| "gtin": _extract_gtin_like(it), | |
| "productName": it.get("productName"), | |
| }) | |
| if not cursor: | |
| break | |
| return pd.DataFrame(recs, columns=["sku", "gtin", "productName"]) | |
| def fetch_inventory_with_gtin_cursor(access_token: str) -> pd.DataFrame: | |
| """ | |
| Join ATS (from /inventories) with GTIN/productName (from /items) on SKU. | |
| Returns: ['sku','gtin','productName','ats'] | |
| """ | |
| ats_df = fetch_all_ats(access_token) | |
| items_df = fetch_all_items_with_gtin_cursor(access_token, limit=None) | |
| merged = ats_df.merge(items_df, on="sku", how="left") | |
| return merged[["sku", "gtin", "productName", "ats"]] | |
| # ---------------- Google Drive helpers ---------------- | |
| def list_sheets_in_folder(folder_id): | |
| query = ( | |
| f"'{folder_id}' in parents and " | |
| "mimeType = 'application/vnd.google-apps.spreadsheet' and trashed = false" | |
| ) | |
| results = drive_service.files().list( | |
| q=query, fields="files(id, name)" | |
| ).execute() | |
| return results.get("files", []) | |
| def load_sheet_as_dataframe(sheet_id, range_name="Sheet1"): | |
| result = sheets_service.spreadsheets().values().get( | |
| spreadsheetId=sheet_id, | |
| range=range_name | |
| ).execute() | |
| values = result.get("values", []) | |
| if not values: | |
| return pd.DataFrame() | |
| return pd.DataFrame(values[1:], columns=values[0]) | |
| # --- reactive state --- | |
| sheet_index = reactive.Value({}) # Shiny tracks changes | |
| walmart_df = reactive.Value(pd.DataFrame()) # <- Store ATS join here | |
| def server(input, output, session): | |
| # Populate dropdown once (or whenever your Drive listing changes) | |
| def _init_dropdown_from_folder(): | |
| try: | |
| sheets = list_sheets_in_folder(GOOGLE_FOLDER_ID) | |
| idx = {s['name']: s['id'] for s in sheets} | |
| sheet_index.set(idx) | |
| ui.update_select("sheet_dropdown_check", choices=list(idx.keys())) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to list folder contents: {e}") | |
| # Button press -> fetch token -> fetch ATS+items -> set reactive value | |
| def _load_walmart_data(): | |
| try: | |
| print("[DEBUG] Getting Walmart token...") | |
| token = get_walmart_token(WALMART_CLIENT_ID, WALMART_CLIENT_SECRET) | |
| print("[DEBUG] Fetching Walmart ATS + GTIN ...") | |
| df = fetch_inventory_with_gtin_cursor(token) # <- your new ATS workflow | |
| walmart_df.set(df) # <- invalidate dependents | |
| print(f"[DEBUG] Loaded {len(df)} Walmart items (ATS)") | |
| except Exception as e: | |
| print(f"[ERROR] Failed to load Walmart data: {e}") | |
| walmart_df.set(pd.DataFrame()) | |
| def walmart_status(): | |
| df = walmart_df() # establish dependency | |
| if df.empty: | |
| return "Click 'Load Walmart Data' to fetch inventory data" | |
| return f"Walmart data loaded: {len(df)} items (ATS)" | |
| def results_check(): | |
| # Re-run when either the dropdown changes or the walmart_df changes | |
| _ = walmart_df() # establish dependency on data | |
| selected = input.sheet_dropdown_check() | |
| idx = sheet_index() | |
| if not selected or selected not in idx: | |
| return pd.DataFrame({"status": ["Select a sheet to compare"]}) | |
| if walmart_df().empty: | |
| return pd.DataFrame({"status": ["Click 'Load Walmart Data' first"]}) | |
| try: | |
| sheet_id = idx[selected] | |
| google_df = load_sheet_as_dataframe(sheet_id) | |
| if google_df.empty: | |
| return pd.DataFrame({"error": ["Google sheet is empty"]}) | |
| wdf = walmart_df()[["sku", "gtin", "productName", "ats"]] | |
| if "walmart_gtin" in google_df.columns: | |
| merged = google_df.merge( | |
| wdf[["gtin", "productName", "ats"]], | |
| left_on="walmart_gtin", | |
| right_on="gtin", | |
| how="inner", | |
| ) | |
| compare_col = ( | |
| "walmart_ats" | |
| if "walmart_ats" in merged.columns | |
| else ("walmart_quantity" if "walmart_quantity" in merged.columns else None) | |
| ) | |
| if compare_col: | |
| lhs = pd.to_numeric(merged[compare_col], errors="coerce").fillna(0).astype(int) | |
| rhs = pd.to_numeric(merged["ats"], errors="coerce").fillna(0).astype(int) | |
| discrepancies = merged.loc[lhs.ne(rhs), ["productName", compare_col, "ats"]] | |
| if discrepancies.empty: | |
| return pd.DataFrame({"status": ["No ATS discrepancies found"]}) | |
| return discrepancies | |
| else: | |
| return merged[["productName", "gtin", "ats"]] | |
| else: | |
| # No GTIN in sheet: just show ATS snapshot | |
| return wdf | |
| except Exception as e: | |
| return pd.DataFrame({"error": [f"Error: {e}"]}) | |