import os import json from shiny import render, reactive, ui from google.oauth2 import service_account from googleapiclient.discovery import build import requests import base64 import xml.etree.ElementTree as ET from dotenv import load_dotenv import time import pandas as pd import uuid from typing import Optional load_dotenv() WALMART_CLIENT_ID = os.getenv('WALMART_CLIENT_ID') WALMART_CLIENT_SECRET = os.getenv('WALMART_CLIENT_SECRET') SERVICE_ACCOUNT_JSON = json.loads(os.getenv("SERVICE_ACCOUNT_JSON")) GOOGLE_FOLDER_ID = os.getenv("GOOGLE_DRIVE_FOLDER_ID") SCOPES = [ "https://www.googleapis.com/auth/drive.readonly", "https://www.googleapis.com/auth/spreadsheets.readonly" ] credentials = service_account.Credentials.from_service_account_info( SERVICE_ACCOUNT_JSON, scopes=SCOPES ) drive_service = build("drive", "v3", credentials=credentials) sheets_service = build("sheets", "v4", credentials=credentials) # ---------------- Walmart Auth ---------------- def get_walmart_token(client_id: str, client_secret: str) -> str: auth_str = f"{client_id}:{client_secret}" auth_b64 = base64.b64encode(auth_str.encode()).decode() headers = { "Authorization": f"Basic {auth_b64}", "Content-Type": "application/x-www-form-urlencoded", "WM_SVC.NAME": "Walmart Marketplace" } payload = {"grant_type": "client_credentials"} url = "https://marketplace.walmartapis.com/v3/token" response = requests.post(url, headers=headers, data=payload) if response.status_code != 200: raise Exception(f"Token request failed: {response.status_code}\n{response.text}") # XML response root = ET.fromstring(response.text) token_elem = root.find("accessToken") if token_elem is None: raise Exception("accessToken not found in response.") return token_elem.text # ---------------- New ATS workflow ---------------- def fetch_all_ats(access_token: str, limit: int = 50) -> pd.DataFrame: """ Pull all SKUs and total available-to-sell (ATS) across nodes using /v3/inventories (nextCursor). Returns: DataFrame with ['sku','ats'] """ base_url = "https://marketplace.walmartapis.com/v3/inventories" headers = { "Authorization": f"Bearer {access_token}", "WM_SEC.ACCESS_TOKEN": access_token, "WM_SVC.NAME": "Walmart Marketplace", "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), "Accept": "application/json", } rows = [] cursor: Optional[str] = None while True: params = {"limit": limit} if cursor: params["nextCursor"] = cursor r = requests.get(base_url, headers=headers, params=params) if r.status_code != 200: raise RuntimeError(f"❌ /inventories {r.status_code}: {r.text}") payload = r.json() inventories = (payload.get("elements") or {}).get("inventories", []) or [] cursor = (payload.get("meta") or {}).get("nextCursor") for inv in inventories: sku = inv.get("sku") nodes = inv.get("nodes") or [] ats = sum((n.get("availToSellQty", {}) or {}).get("amount", 0) or 0 for n in nodes) rows.append({"sku": sku, "ats": ats}) if not cursor: break return pd.DataFrame(rows, columns=["sku", "ats"]) def _extract_gtin_like(item: dict) -> Optional[str]: """ Prefer 'gtin', fallback to 'upc', then scan common identifier shapes. """ gtin = item.get("gtin") if gtin: return gtin upc = item.get("upc") if upc: return upc candidates = [] for key in ("productIdentifiers", "identifiers", "additionalProductAttributes", "attributes"): obj = item.get(key) if isinstance(obj, list): for e in obj: if not isinstance(e, dict): continue t = (e.get("productIdType") or e.get("type") or "").upper() v = e.get("productId") or e.get("id") or e.get("value") if v and t in {"GTIN", "UPC"}: candidates.append((t, v)) elif isinstance(obj, dict): for t, v in obj.items(): if isinstance(v, str) and t.upper() in {"GTIN", "UPC"}: candidates.append((t.upper(), v)) for t, v in candidates: if t == "GTIN": return v for t, v in candidates: if t == "UPC": return v return None def _get_total_items(access_token: str) -> int: """ Probe /v3/items to read total count from meta (meta.totalCount or totalItems). """ url = "https://marketplace.walmartapis.com/v3/items" headers = { "Authorization": f"Bearer {access_token}", "WM_SEC.ACCESS_TOKEN": access_token, "WM_SVC.NAME": "Walmart Marketplace", "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), "Accept": "application/json", "WM_GLOBAL_VERSION": "3.1", "WM_MARKET": "us", } params = {"limit": 1} r = requests.get(url, headers=headers, params=params) if r.status_code != 200: raise RuntimeError(f"❌ /items (probe) {r.status_code}: {r.text}") data = r.json() meta = data.get("meta") or {} total = meta.get("totalCount") if total is None: total = data.get("totalItems") if total is None: total = len(data.get("ItemResponse", []) or []) return int(total) def fetch_all_items_with_gtin_cursor( access_token: str, limit: Optional[int] = None ) -> pd.DataFrame: """ Pull items via /v3/items with nextCursor. If limit is None, auto-sets to total items reported by meta. Returns: ['sku','gtin','productName'] """ url = "https://marketplace.walmartapis.com/v3/items" headers = { "Authorization": f"Bearer {access_token}", "WM_SEC.ACCESS_TOKEN": access_token, "WM_SVC.NAME": "Walmart Marketplace", "WM_QOS.CORRELATION_ID": str(uuid.uuid4()), "Accept": "application/json", "WM_GLOBAL_VERSION": "3.1", "WM_MARKET": "us", } if limit is None: try: limit = _get_total_items(access_token) except Exception as e: print(f"⚠️ Could not detect total items automatically: {e}. Falling back to 200.") limit = 200 base_params = {"limit": limit} recs = [] cursor: Optional[str] = None while True: q = dict(base_params) if cursor: q["nextCursor"] = cursor resp = requests.get(url, headers=headers, params=q) if resp.status_code != 200: # If large limit is rejected, back off and paginate. if q.get("limit", 0) > 500: print(f"ℹ️ Large limit={q['limit']} not accepted. Backing off to 200 with pagination.") base_params["limit"] = 200 continue raise RuntimeError(f"❌ /items {resp.status_code}: {resp.text}") data = resp.json() items = data.get("ItemResponse", []) or [] cursor = (data.get("meta") or {}).get("nextCursor") for it in items: recs.append({ "sku": it.get("sku"), "gtin": _extract_gtin_like(it), "productName": it.get("productName"), }) if not cursor: break return pd.DataFrame(recs, columns=["sku", "gtin", "productName"]) def fetch_inventory_with_gtin_cursor(access_token: str) -> pd.DataFrame: """ Join ATS (from /inventories) with GTIN/productName (from /items) on SKU. Returns: ['sku','gtin','productName','ats'] """ ats_df = fetch_all_ats(access_token) items_df = fetch_all_items_with_gtin_cursor(access_token, limit=None) merged = ats_df.merge(items_df, on="sku", how="left") return merged[["sku", "gtin", "productName", "ats"]] # ---------------- Google Drive helpers ---------------- def list_sheets_in_folder(folder_id): query = ( f"'{folder_id}' in parents and " "mimeType = 'application/vnd.google-apps.spreadsheet' and trashed = false" ) results = drive_service.files().list( q=query, fields="files(id, name)" ).execute() return results.get("files", []) def load_sheet_as_dataframe(sheet_id, range_name="Sheet1"): result = sheets_service.spreadsheets().values().get( spreadsheetId=sheet_id, range=range_name ).execute() values = result.get("values", []) if not values: return pd.DataFrame() return pd.DataFrame(values[1:], columns=values[0]) # --- reactive state --- sheet_index = reactive.Value({}) # Shiny tracks changes walmart_df = reactive.Value(pd.DataFrame()) # <- Store ATS join here def server(input, output, session): # Populate dropdown once (or whenever your Drive listing changes) @reactive.Effect def _init_dropdown_from_folder(): try: sheets = list_sheets_in_folder(GOOGLE_FOLDER_ID) idx = {s['name']: s['id'] for s in sheets} sheet_index.set(idx) ui.update_select("sheet_dropdown_check", choices=list(idx.keys())) except Exception as e: print(f"[ERROR] Failed to list folder contents: {e}") # Button press -> fetch token -> fetch ATS+items -> set reactive value @reactive.Effect @reactive.event(input.load_walmart_data) def _load_walmart_data(): try: print("[DEBUG] Getting Walmart token...") token = get_walmart_token(WALMART_CLIENT_ID, WALMART_CLIENT_SECRET) print("[DEBUG] Fetching Walmart ATS + GTIN ...") df = fetch_inventory_with_gtin_cursor(token) # <- your new ATS workflow walmart_df.set(df) # <- invalidate dependents print(f"[DEBUG] Loaded {len(df)} Walmart items (ATS)") except Exception as e: print(f"[ERROR] Failed to load Walmart data: {e}") walmart_df.set(pd.DataFrame()) @output @render.text def walmart_status(): df = walmart_df() # establish dependency if df.empty: return "Click 'Load Walmart Data' to fetch inventory data" return f"Walmart data loaded: {len(df)} items (ATS)" @output @render.table def results_check(): # Re-run when either the dropdown changes or the walmart_df changes _ = walmart_df() # establish dependency on data selected = input.sheet_dropdown_check() idx = sheet_index() if not selected or selected not in idx: return pd.DataFrame({"status": ["Select a sheet to compare"]}) if walmart_df().empty: return pd.DataFrame({"status": ["Click 'Load Walmart Data' first"]}) try: sheet_id = idx[selected] google_df = load_sheet_as_dataframe(sheet_id) if google_df.empty: return pd.DataFrame({"error": ["Google sheet is empty"]}) wdf = walmart_df()[["sku", "gtin", "productName", "ats"]] if "walmart_gtin" in google_df.columns: merged = google_df.merge( wdf[["gtin", "productName", "ats"]], left_on="walmart_gtin", right_on="gtin", how="inner", ) compare_col = ( "walmart_ats" if "walmart_ats" in merged.columns else ("walmart_quantity" if "walmart_quantity" in merged.columns else None) ) if compare_col: lhs = pd.to_numeric(merged[compare_col], errors="coerce").fillna(0).astype(int) rhs = pd.to_numeric(merged["ats"], errors="coerce").fillna(0).astype(int) discrepancies = merged.loc[lhs.ne(rhs), ["productName", compare_col, "ats"]] if discrepancies.empty: return pd.DataFrame({"status": ["No ATS discrepancies found"]}) return discrepancies else: return merged[["productName", "gtin", "ats"]] else: # No GTIN in sheet: just show ATS snapshot return wdf except Exception as e: return pd.DataFrame({"error": [f"Error: {e}"]})