ultima_seo / server /listing_checks.py
rsm-roguchi
updater
d4463cf
import os
import json
from shiny import render, reactive, ui
from google.oauth2 import service_account
from googleapiclient.discovery import build
import requests
import base64
import xml.etree.ElementTree as ET
from dotenv import load_dotenv
import time
import pandas as pd
import uuid
from typing import Optional
load_dotenv()
WALMART_CLIENT_ID = os.getenv('WALMART_CLIENT_ID')
WALMART_CLIENT_SECRET = os.getenv('WALMART_CLIENT_SECRET')
SERVICE_ACCOUNT_JSON = json.loads(os.getenv("SERVICE_ACCOUNT_JSON"))
GOOGLE_FOLDER_ID = os.getenv("GOOGLE_DRIVE_FOLDER_ID")
SCOPES = [
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/spreadsheets.readonly"
]
credentials = service_account.Credentials.from_service_account_info(
SERVICE_ACCOUNT_JSON, scopes=SCOPES
)
drive_service = build("drive", "v3", credentials=credentials)
sheets_service = build("sheets", "v4", credentials=credentials)
# ---------------- Walmart Auth ----------------
def get_walmart_token(client_id: str, client_secret: str) -> str:
auth_str = f"{client_id}:{client_secret}"
auth_b64 = base64.b64encode(auth_str.encode()).decode()
headers = {
"Authorization": f"Basic {auth_b64}",
"Content-Type": "application/x-www-form-urlencoded",
"WM_SVC.NAME": "Walmart Marketplace"
}
payload = {"grant_type": "client_credentials"}
url = "https://marketplace.walmartapis.com/v3/token"
response = requests.post(url, headers=headers, data=payload)
if response.status_code != 200:
raise Exception(f"Token request failed: {response.status_code}\n{response.text}")
# XML response
root = ET.fromstring(response.text)
token_elem = root.find("accessToken")
if token_elem is None:
raise Exception("accessToken not found in response.")
return token_elem.text
# ---------------- New ATS workflow ----------------
def fetch_all_ats(access_token: str, limit: int = 50) -> pd.DataFrame:
"""
Pull all SKUs and total available-to-sell (ATS) across nodes using /v3/inventories (nextCursor).
Returns: DataFrame with ['sku','ats']
"""
base_url = "https://marketplace.walmartapis.com/v3/inventories"
headers = {
"Authorization": f"Bearer {access_token}",
"WM_SEC.ACCESS_TOKEN": access_token,
"WM_SVC.NAME": "Walmart Marketplace",
"WM_QOS.CORRELATION_ID": str(uuid.uuid4()),
"Accept": "application/json",
}
rows = []
cursor: Optional[str] = None
while True:
params = {"limit": limit}
if cursor:
params["nextCursor"] = cursor
r = requests.get(base_url, headers=headers, params=params)
if r.status_code != 200:
raise RuntimeError(f"❌ /inventories {r.status_code}: {r.text}")
payload = r.json()
inventories = (payload.get("elements") or {}).get("inventories", []) or []
cursor = (payload.get("meta") or {}).get("nextCursor")
for inv in inventories:
sku = inv.get("sku")
nodes = inv.get("nodes") or []
ats = sum((n.get("availToSellQty", {}) or {}).get("amount", 0) or 0 for n in nodes)
rows.append({"sku": sku, "ats": ats})
if not cursor:
break
return pd.DataFrame(rows, columns=["sku", "ats"])
def _extract_gtin_like(item: dict) -> Optional[str]:
"""
Prefer 'gtin', fallback to 'upc', then scan common identifier shapes.
"""
gtin = item.get("gtin")
if gtin:
return gtin
upc = item.get("upc")
if upc:
return upc
candidates = []
for key in ("productIdentifiers", "identifiers", "additionalProductAttributes", "attributes"):
obj = item.get(key)
if isinstance(obj, list):
for e in obj:
if not isinstance(e, dict):
continue
t = (e.get("productIdType") or e.get("type") or "").upper()
v = e.get("productId") or e.get("id") or e.get("value")
if v and t in {"GTIN", "UPC"}:
candidates.append((t, v))
elif isinstance(obj, dict):
for t, v in obj.items():
if isinstance(v, str) and t.upper() in {"GTIN", "UPC"}:
candidates.append((t.upper(), v))
for t, v in candidates:
if t == "GTIN":
return v
for t, v in candidates:
if t == "UPC":
return v
return None
def _get_total_items(access_token: str) -> int:
"""
Probe /v3/items to read total count from meta (meta.totalCount or totalItems).
"""
url = "https://marketplace.walmartapis.com/v3/items"
headers = {
"Authorization": f"Bearer {access_token}",
"WM_SEC.ACCESS_TOKEN": access_token,
"WM_SVC.NAME": "Walmart Marketplace",
"WM_QOS.CORRELATION_ID": str(uuid.uuid4()),
"Accept": "application/json",
"WM_GLOBAL_VERSION": "3.1",
"WM_MARKET": "us",
}
params = {"limit": 1}
r = requests.get(url, headers=headers, params=params)
if r.status_code != 200:
raise RuntimeError(f"❌ /items (probe) {r.status_code}: {r.text}")
data = r.json()
meta = data.get("meta") or {}
total = meta.get("totalCount")
if total is None:
total = data.get("totalItems")
if total is None:
total = len(data.get("ItemResponse", []) or [])
return int(total)
def fetch_all_items_with_gtin_cursor(
access_token: str,
limit: Optional[int] = None
) -> pd.DataFrame:
"""
Pull items via /v3/items with nextCursor.
If limit is None, auto-sets to total items reported by meta.
Returns: ['sku','gtin','productName']
"""
url = "https://marketplace.walmartapis.com/v3/items"
headers = {
"Authorization": f"Bearer {access_token}",
"WM_SEC.ACCESS_TOKEN": access_token,
"WM_SVC.NAME": "Walmart Marketplace",
"WM_QOS.CORRELATION_ID": str(uuid.uuid4()),
"Accept": "application/json",
"WM_GLOBAL_VERSION": "3.1",
"WM_MARKET": "us",
}
if limit is None:
try:
limit = _get_total_items(access_token)
except Exception as e:
print(f"⚠️ Could not detect total items automatically: {e}. Falling back to 200.")
limit = 200
base_params = {"limit": limit}
recs = []
cursor: Optional[str] = None
while True:
q = dict(base_params)
if cursor:
q["nextCursor"] = cursor
resp = requests.get(url, headers=headers, params=q)
if resp.status_code != 200:
# If large limit is rejected, back off and paginate.
if q.get("limit", 0) > 500:
print(f"ℹ️ Large limit={q['limit']} not accepted. Backing off to 200 with pagination.")
base_params["limit"] = 200
continue
raise RuntimeError(f"❌ /items {resp.status_code}: {resp.text}")
data = resp.json()
items = data.get("ItemResponse", []) or []
cursor = (data.get("meta") or {}).get("nextCursor")
for it in items:
recs.append({
"sku": it.get("sku"),
"gtin": _extract_gtin_like(it),
"productName": it.get("productName"),
})
if not cursor:
break
return pd.DataFrame(recs, columns=["sku", "gtin", "productName"])
def fetch_inventory_with_gtin_cursor(access_token: str) -> pd.DataFrame:
"""
Join ATS (from /inventories) with GTIN/productName (from /items) on SKU.
Returns: ['sku','gtin','productName','ats']
"""
ats_df = fetch_all_ats(access_token)
items_df = fetch_all_items_with_gtin_cursor(access_token, limit=None)
merged = ats_df.merge(items_df, on="sku", how="left")
return merged[["sku", "gtin", "productName", "ats"]]
# ---------------- Google Drive helpers ----------------
def list_sheets_in_folder(folder_id):
query = (
f"'{folder_id}' in parents and "
"mimeType = 'application/vnd.google-apps.spreadsheet' and trashed = false"
)
results = drive_service.files().list(
q=query, fields="files(id, name)"
).execute()
return results.get("files", [])
def load_sheet_as_dataframe(sheet_id, range_name="Sheet1"):
result = sheets_service.spreadsheets().values().get(
spreadsheetId=sheet_id,
range=range_name
).execute()
values = result.get("values", [])
if not values:
return pd.DataFrame()
return pd.DataFrame(values[1:], columns=values[0])
# --- reactive state ---
sheet_index = reactive.Value({}) # Shiny tracks changes
walmart_df = reactive.Value(pd.DataFrame()) # <- Store ATS join here
def server(input, output, session):
# Populate dropdown once (or whenever your Drive listing changes)
@reactive.Effect
def _init_dropdown_from_folder():
try:
sheets = list_sheets_in_folder(GOOGLE_FOLDER_ID)
idx = {s['name']: s['id'] for s in sheets}
sheet_index.set(idx)
ui.update_select("sheet_dropdown_check", choices=list(idx.keys()))
except Exception as e:
print(f"[ERROR] Failed to list folder contents: {e}")
# Button press -> fetch token -> fetch ATS+items -> set reactive value
@reactive.Effect
@reactive.event(input.load_walmart_data)
def _load_walmart_data():
try:
print("[DEBUG] Getting Walmart token...")
token = get_walmart_token(WALMART_CLIENT_ID, WALMART_CLIENT_SECRET)
print("[DEBUG] Fetching Walmart ATS + GTIN ...")
df = fetch_inventory_with_gtin_cursor(token) # <- your new ATS workflow
walmart_df.set(df) # <- invalidate dependents
print(f"[DEBUG] Loaded {len(df)} Walmart items (ATS)")
except Exception as e:
print(f"[ERROR] Failed to load Walmart data: {e}")
walmart_df.set(pd.DataFrame())
@output
@render.text
def walmart_status():
df = walmart_df() # establish dependency
if df.empty:
return "Click 'Load Walmart Data' to fetch inventory data"
return f"Walmart data loaded: {len(df)} items (ATS)"
@output
@render.table
def results_check():
# Re-run when either the dropdown changes or the walmart_df changes
_ = walmart_df() # establish dependency on data
selected = input.sheet_dropdown_check()
idx = sheet_index()
if not selected or selected not in idx:
return pd.DataFrame({"status": ["Select a sheet to compare"]})
if walmart_df().empty:
return pd.DataFrame({"status": ["Click 'Load Walmart Data' first"]})
try:
sheet_id = idx[selected]
google_df = load_sheet_as_dataframe(sheet_id)
if google_df.empty:
return pd.DataFrame({"error": ["Google sheet is empty"]})
wdf = walmart_df()[["sku", "gtin", "productName", "ats"]]
if "walmart_gtin" in google_df.columns:
merged = google_df.merge(
wdf[["gtin", "productName", "ats"]],
left_on="walmart_gtin",
right_on="gtin",
how="inner",
)
compare_col = (
"walmart_ats"
if "walmart_ats" in merged.columns
else ("walmart_quantity" if "walmart_quantity" in merged.columns else None)
)
if compare_col:
lhs = pd.to_numeric(merged[compare_col], errors="coerce").fillna(0).astype(int)
rhs = pd.to_numeric(merged["ats"], errors="coerce").fillna(0).astype(int)
discrepancies = merged.loc[lhs.ne(rhs), ["productName", compare_col, "ats"]]
if discrepancies.empty:
return pd.DataFrame({"status": ["No ATS discrepancies found"]})
return discrepancies
else:
return merged[["productName", "gtin", "ats"]]
else:
# No GTIN in sheet: just show ATS snapshot
return wdf
except Exception as e:
return pd.DataFrame({"error": [f"Error: {e}"]})