WB_Analyzer / wildberries_client.py
bakyt92's picture
updates - small fix
399d51d
"""
Wildberries API Client with rate limiting and error handling
Optimized for Hugging Face Spaces deployment
"""
import requests
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import logging
from config import get_config
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def validate_wb_data(weekly_data: pd.DataFrame, monthly_data: pd.DataFrame) -> Dict[str, Any]:
"""Validate Wildberries data consistency"""
validation_results = {
"status": "valid",
"warnings": [],
"errors": []
}
try:
# Check record count consistency
if len(weekly_data) > len(monthly_data):
validation_results["warnings"].append(
f"Data inconsistency: Weekly records ({len(weekly_data)}) exceed monthly records ({len(monthly_data)})"
)
# Check revenue consistency
if not weekly_data.empty and not monthly_data.empty:
weekly_revenue = weekly_data['total_price'].sum() if 'total_price' in weekly_data.columns else 0
monthly_revenue = monthly_data['total_price'].sum() if 'total_price' in monthly_data.columns else 0
if weekly_revenue > monthly_revenue:
validation_results["warnings"].append(
f"Data inconsistency: Weekly revenue (₽{weekly_revenue:,.0f}) exceeds monthly revenue (₽{monthly_revenue:,.0f})"
)
# Check for duplicate sales IDs
for df_name, df in [("weekly", weekly_data), ("monthly", monthly_data)]:
if not df.empty and 'srid' in df.columns:
duplicate_count = df.duplicated('srid').sum()
if duplicate_count > 0:
validation_results["warnings"].append(
f"{df_name.title()} data: {duplicate_count} duplicate sale IDs found"
)
except Exception as e:
validation_results["status"] = "error"
validation_results["errors"].append(f"Validation error: {str(e)}")
logger.error(f"Data validation error: {str(e)}")
return validation_results
def aggregate_wb_data(df: pd.DataFrame, period: str = 'daily') -> pd.DataFrame:
"""Aggregate Wildberries data by time period for better performance"""
if df.empty:
return df
try:
if 'sale_date' not in df.columns:
logger.warning("No sale_date column found for aggregation")
return df
# Ensure sale_date is datetime
df['sale_date'] = pd.to_datetime(df['sale_date'], errors='coerce')
# Define aggregation functions
agg_functions = {
'total_price': 'sum',
'quantity': 'sum'
}
# Add optional columns if they exist
if 'amount_for_pay' in df.columns:
agg_functions['amount_for_pay'] = 'sum'
if 'payment_sale_amount' in df.columns:
agg_functions['payment_sale_amount'] = 'sum'
if period == 'daily':
grouped = df.groupby(df['sale_date'].dt.date)
elif period == 'weekly':
grouped = df.groupby(df['sale_date'].dt.to_period('W'))
elif period == 'monthly':
grouped = df.groupby(df['sale_date'].dt.to_period('M'))
else:
return df # Return original if period not recognized
aggregated = grouped.agg(agg_functions).reset_index()
# Convert period back to datetime if needed
if period in ['weekly', 'monthly']:
aggregated['sale_date'] = aggregated['sale_date'].dt.start_time
return aggregated
except Exception as e:
logger.error(f"Error aggregating data: {str(e)}")
return df
@dataclass
class RateLimiter:
"""Simple rate limiter implementation"""
requests_per_minute: int = 300
window_seconds: int = 60
def __post_init__(self):
self.requests = []
def can_make_request(self) -> bool:
"""Check if we can make a request without hitting rate limits"""
now = time.time()
# Remove requests older than our window
self.requests = [req_time for req_time in self.requests if now - req_time < self.window_seconds]
return len(self.requests) < self.requests_per_minute
def record_request(self):
"""Record that we made a request"""
self.requests.append(time.time())
def wait_if_needed(self):
"""Wait if we're at the rate limit"""
if not self.can_make_request():
# Calculate how long to wait
oldest_request = min(self.requests) if self.requests else time.time()
wait_time = self.window_seconds - (time.time() - oldest_request) + 1
logger.info(f"Rate limit reached, waiting {wait_time:.1f} seconds")
time.sleep(wait_time)
class WildberriesAPIError(Exception):
"""Custom exception for Wildberries API errors"""
pass
class WildberriesAPI:
"""Wildberries API client with rate limiting and retry logic"""
def __init__(self, api_token: str = None):
self.config = get_config()
# Only use the explicitly provided token, no fallback to config
self.api_token = api_token
self.session = requests.Session()
self.rate_limiter = RateLimiter(
requests_per_minute=self.config.rate_limit_requests,
window_seconds=self.config.rate_limit_window
)
# Don't raise exception - allow graceful fallback
self.is_configured = bool(self.api_token and self.api_token.strip())
# Set up session headers with the specific token
self.session.headers.update(self._get_headers())
def _get_headers(self) -> Dict[str, str]:
"""Get headers with the instance's API token"""
headers = {
"Content-Type": "application/json",
"User-Agent": "Wildberries-Analytics-Dashboard/1.0"
}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
return headers
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((requests.RequestException, WildberriesAPIError))
)
def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
"""Make an HTTP request with rate limiting and retry logic"""
# Check if API is configured before making requests
if not self.is_configured:
logger.error("API token not configured - cannot make requests")
logger.debug(f"Token present: {bool(self.api_token)}")
raise WildberriesAPIError("API token not configured - cannot make requests")
# Enhanced logging for debugging
logger.info(f"Making {method} request to {url}")
logger.debug(f"Headers: {self._get_headers()}")
logger.debug(f"Params: {kwargs.get('params', {})}")
# Check rate limits
self.rate_limiter.wait_if_needed()
try:
# Record the request
self.rate_limiter.record_request()
# Make the request
response = self.session.request(
method=method,
url=url,
timeout=self.config.request_timeout,
**kwargs
)
# Handle rate limiting (429 status)
if response.status_code == 429:
retry_after = int(response.headers.get('X-Ratelimit-Retry', 60))
logger.warning(f"Rate limited by API, waiting {retry_after} seconds")
time.sleep(retry_after)
raise WildberriesAPIError(f"Rate limited: {response.status_code}")
# Handle other HTTP errors
if not response.ok:
error_msg = f"HTTP {response.status_code}: {response.text}"
logger.error(error_msg)
raise WildberriesAPIError(error_msg)
# Parse JSON response
try:
return response.json()
except json.JSONDecodeError:
return {"data": response.text}
except requests.RequestException as e:
logger.error(f"Request failed: {str(e)}")
raise WildberriesAPIError(f"Request failed: {str(e)}")
def get_sales(self, date_from: str, date_to: str = None, flag: int = 0) -> pd.DataFrame:
"""
Get sales data from Wildberries API with automatic pagination
Args:
date_from: Start date in YYYY-MM-DD format or lastChangeDate for pagination
date_to: End date in YYYY-MM-DD format (optional, not used by sales API)
flag: 0 for sales and returns, 1 for only sales (optional)
Returns:
pandas.DataFrame with sales data
Note: API limit is 80,000 rows per request. This method handles pagination automatically.
"""
endpoint = self.config.get_endpoints()["sales"]
all_sales_data = []
current_date_from = date_from
max_iterations = 50 # Safety limit to prevent infinite loops
iteration = 0
try:
while iteration < max_iterations:
# Build parameters for sales API
params = {"dateFrom": current_date_from}
if flag is not None:
params["flag"] = flag
logger.info(f"Fetching sales data from {current_date_from} (iteration {iteration + 1})")
response = self._make_request("GET", endpoint, params=params)
if not response:
logger.warning("No sales data returned from API")
break
# Sales API returns direct array
if isinstance(response, list):
batch_data = response
else:
logger.warning("Unexpected API response format for sales")
break
if not batch_data:
logger.info("Empty response received - all sales data retrieved")
break
logger.info(f"Retrieved {len(batch_data)} sales records")
all_sales_data.extend(batch_data)
# Check if we need pagination (response has 80,000 rows)
if len(batch_data) < 80000:
logger.info("Received less than 80,000 rows - all data retrieved")
break
# Get lastChangeDate from the last record for next request
last_record = batch_data[-1]
if 'lastChangeDate' in last_record:
current_date_from = last_record['lastChangeDate']
logger.info(f"Next pagination starts from: {current_date_from}")
else:
logger.warning("No lastChangeDate found in response - stopping pagination")
break
iteration += 1
if iteration >= max_iterations:
logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data")
if not all_sales_data:
return pd.DataFrame()
# Convert to DataFrame and process
sales_data = pd.DataFrame(all_sales_data)
logger.info(f"Total sales records retrieved: {len(sales_data)}")
# Process and clean the data using the correct sales data processor
sales_data = self._process_sales_data(sales_data)
return sales_data
except Exception as e:
logger.error(f"Error fetching sales data: {str(e)}")
raise WildberriesAPIError(f"Failed to fetch sales data: {str(e)}")
def get_stocks(self, date_from: str = None, date_to: str = None) -> pd.DataFrame:
"""
Get current stock levels from Wildberries API with automatic pagination
Args:
date_from: Date to get stock levels for (optional, defaults to 2019-06-20 for total stock)
date_to: Not used for stocks endpoint (stocks API uses only dateFrom)
Returns:
pandas.DataFrame with stock data
Note: API limit is 60,000 rows per request. This method handles pagination automatically.
"""
endpoint = self.config.get_endpoints()["stocks"]
# Use early date to get total stock if no date specified
if not date_from:
date_from = "2019-06-20" # Early date to get all stocks
all_stock_data = []
current_date_from = date_from
max_iterations = 30 # Safety limit for stocks (should be less than sales)
iteration = 0
try:
while iteration < max_iterations:
# Stocks API uses only dateFrom parameter (RFC3339 format)
params = {"dateFrom": current_date_from}
logger.info(f"Fetching stock data from {current_date_from} (iteration {iteration + 1})")
response = self._make_request("GET", endpoint, params=params)
if not response:
logger.warning("No stock data returned from API")
break
# Stocks API returns direct array response
if isinstance(response, list):
batch_data = response
else:
logger.warning("Unexpected API response format for stocks")
break
if not batch_data:
logger.info("Empty response received - all stock data retrieved")
break
logger.info(f"Retrieved {len(batch_data)} stock records")
all_stock_data.extend(batch_data)
# Check if we need pagination (response has 60,000 rows)
if len(batch_data) < 60000:
logger.info("Received less than 60,000 rows - all data retrieved")
break
# Get lastChangeDate from the last record for next request
last_record = batch_data[-1]
if 'lastChangeDate' in last_record:
current_date_from = last_record['lastChangeDate']
logger.info(f"Next pagination starts from: {current_date_from}")
else:
logger.warning("No lastChangeDate found in response - stopping pagination")
break
iteration += 1
if iteration >= max_iterations:
logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data")
if not all_stock_data:
return pd.DataFrame()
# Convert to DataFrame and process
stock_data = pd.DataFrame(all_stock_data)
logger.info(f"Total stock records retrieved: {len(stock_data)}")
# Process and clean the stock data using specific stock processing
stock_data = self._process_stock_data(stock_data)
return stock_data
except Exception as e:
logger.error(f"Error fetching stock data: {str(e)}")
raise WildberriesAPIError(f"Failed to fetch stock data: {str(e)}")
def get_orders(self, date_from: str, date_to: str = None) -> pd.DataFrame:
"""
Get orders data from Wildberries API
Args:
date_from: Start date in YYYY-MM-DD format
date_to: End date in YYYY-MM-DD format (optional)
Returns:
pandas.DataFrame with orders data
"""
endpoint = self.config.get_endpoints()["orders"]
# Add automatic dateTo defaulting to today's date when not provided
if not date_to:
date_to = datetime.now().strftime("%Y-%m-%d")
params = {"dateFrom": date_from, "dateTo": date_to, "limit": 100}
try:
response = self._make_request("GET", endpoint, params=params)
if not response:
logger.warning("No orders data returned from API")
return pd.DataFrame()
# Handle direct array response (v5 API format)
if isinstance(response, list):
orders_data = pd.DataFrame(response)
elif isinstance(response, dict) and "data" in response:
orders_data = pd.DataFrame(response["data"])
else:
logger.warning("Unexpected API response format")
return pd.DataFrame()
if orders_data.empty:
return orders_data
# Process and clean the data
orders_data = self._process_reportdetail_data(orders_data)
return orders_data
except Exception as e:
logger.error(f"Error fetching orders data: {str(e)}")
raise WildberriesAPIError(f"Failed to fetch orders data: {str(e)}")
def _process_sales_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Process and clean sales data from API response (v1 sales endpoint)
Handles all 28 fields from the official Wildberries Sales API:
- Geographic: warehouseName, warehouseType, countryName, oblastOkrugName, regionName
- Product: supplierArticle, nmId, barcode, category, subject, brand, techSize
- Financial: priceWithDisc (→total_price), finishedPrice, totalPrice (→original_price), discountPercent, spp, paymentSaleAmount, forPay
- Operational: incomeID, isSupply, isRealization, saleID, sticker, gNumber, srid
- Temporal: date, lastChangeDate
"""
# Column mapping based on official Wildberries Sales API schema (v1)
column_mapping = {
'date': 'sale_date',
'lastChangeDate': 'last_change_date',
'warehouseName': 'warehouse',
'warehouseType': 'warehouse_type',
'countryName': 'country',
'oblastOkrugName': 'region',
'regionName': 'city',
'supplierArticle': 'supplier_article',
'nmId': 'product_id',
'barcode': 'barcode',
'category': 'category',
'subject': 'subject',
'brand': 'brand',
'techSize': 'tech_size',
'incomeID': 'income_id',
'isSupply': 'is_supply',
'isRealization': 'is_realization',
'priceWithDisc': 'total_price', # Price with seller discount (for revenue calculation)
'finishedPrice': 'finished_price', # Final price with all discounts applied
'totalPrice': 'original_price', # Original price without discounts
'discountPercent': 'discount_percent',
'spp': 'spp_discount',
'paymentSaleAmount': 'payment_sale_amount',
'forPay': 'amount_for_pay', # What seller receives
'saleID': 'sale_id',
'sticker': 'sticker',
'gNumber': 'g_number',
'srid': 'unique_id' # Unique identifier for the sale
}
# Rename columns that exist
for old_name, new_name in column_mapping.items():
if old_name in df.columns:
df = df.rename(columns={old_name: new_name})
# Convert date columns
date_columns = ['sale_date', 'last_change_date']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# Convert numeric columns
numeric_columns = [
'product_id', 'total_price', 'finished_price', 'original_price', 'discount_percent', 'spp_discount',
'payment_sale_amount', 'amount_for_pay', 'income_id'
]
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Add product name (use supplier_article as primary)
if 'product_name' not in df.columns:
if 'supplier_article' in df.columns:
df['product_name'] = df['supplier_article']
elif 'category' in df.columns:
df['product_name'] = df['category']
else:
df['product_name'] = 'Unknown Product'
# Identify returns using saleID prefix (R********* = return, S********* = sale)
if 'sale_id' in df.columns:
df['is_return'] = df['sale_id'].astype(str).str.startswith('R')
return_count = df['is_return'].sum()
sale_count = (~df['is_return']).sum()
logger.info(f"Identified {return_count} returns and {sale_count} sales based on saleID")
else:
# Fallback to negative total_price detection if saleID not available
if 'total_price' in df.columns:
df['is_return'] = df['total_price'] < 0
logger.warning("saleID not available, using total_price < 0 for return detection")
else:
df['is_return'] = False
# Add quantity with proper sign (negative for returns)
df['quantity'] = 1
df.loc[df['is_return'], 'quantity'] = -1 # Returns have negative quantity
# Apply return logic to financial fields (make returns negative for accounting)
return_mask = df['is_return']
financial_fields = ['total_price', 'finished_price', 'original_price', 'amount_for_pay', 'payment_sale_amount']
for field in financial_fields:
if field in df.columns:
# Make returns negative if they're not already (some APIs might already send negative values)
df.loc[return_mask & (df[field] > 0), field] = -df.loc[return_mask & (df[field] > 0), field]
# Calculate commission (difference between total_price and amount_for_pay)
# This will be negative for returns, which is correct for accounting
if 'total_price' in df.columns and 'amount_for_pay' in df.columns:
df['sales_commission'] = df['total_price'] - df['amount_for_pay']
df['sales_commission'] = df['sales_commission'].fillna(0)
# Add sale_amount for compatibility (use amount_for_pay as seller's net amount)
if 'sale_amount' not in df.columns:
if 'amount_for_pay' in df.columns:
df['sale_amount'] = df['amount_for_pay']
else:
df['sale_amount'] = df['total_price']
# Add current_stock for inventory forecasting (default to 0)
if 'current_stock' not in df.columns:
df['current_stock'] = 0
# Add transaction type for clarity
if 'transaction_type' not in df.columns:
df['transaction_type'] = df['is_return'].map({True: 'Return', False: 'Sale'})
logger.info(f"Processed {len(df)} sales records")
return df
def _process_stock_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process and clean stock data from API response (v1 stocks endpoint)"""
# Column mapping based on actual stocks API response structure
column_mapping = {
'lastChangeDate': 'last_change_date',
'warehouseName': 'warehouse',
'supplierArticle': 'supplier_article',
'nmId': 'product_id',
'barcode': 'barcode',
'quantity': 'current_stock',
'inWayToClient': 'in_way_to_client',
'inWayFromClient': 'in_way_from_client',
'quantityFull': 'quantity_full',
'category': 'category',
'subject': 'subject',
'brand': 'brand',
'techSize': 'tech_size',
'Price': 'price',
'Discount': 'discount_percent',
'isSupply': 'is_supply',
'isRealization': 'is_realization',
'SCCode': 'sc_code'
}
# Rename columns that exist
for old_name, new_name in column_mapping.items():
if old_name in df.columns:
df = df.rename(columns={old_name: new_name})
# Convert date columns
if 'last_change_date' in df.columns:
df['last_change_date'] = pd.to_datetime(df['last_change_date'], errors='coerce')
# Convert numeric columns
numeric_columns = [
'current_stock', 'in_way_to_client', 'in_way_from_client',
'quantity_full', 'price', 'discount_percent', 'product_id'
]
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Add product name (use supplier_article as primary)
if 'product_name' not in df.columns:
if 'supplier_article' in df.columns:
df['product_name'] = df['supplier_article']
elif 'subject' in df.columns:
df['product_name'] = df['subject']
elif 'category' in df.columns:
df['product_name'] = df['category']
else:
df['product_name'] = 'Unknown Product'
# Calculate discounted price
if 'price_with_discount' not in df.columns and 'price' in df.columns:
if 'discount_percent' in df.columns:
df['price_with_discount'] = df['price'] * (1 - df['discount_percent'] / 100)
else:
df['price_with_discount'] = df['price']
# Add total_price for compatibility (use price_with_discount)
if 'total_price' not in df.columns:
if 'price_with_discount' in df.columns:
df['total_price'] = df['price_with_discount']
elif 'price' in df.columns:
df['total_price'] = df['price']
else:
df['total_price'] = 0
# Add quantity for compatibility (use current_stock)
if 'quantity' not in df.columns:
if 'current_stock' in df.columns:
df['quantity'] = df['current_stock']
else:
df['quantity'] = 0
# Add sale_date for compatibility (use last_change_date)
if 'sale_date' not in df.columns:
if 'last_change_date' in df.columns:
df['sale_date'] = df['last_change_date']
else:
df['sale_date'] = pd.NaT
# Add sale_amount for compatibility (use total_price * quantity)
if 'sale_amount' not in df.columns:
df['sale_amount'] = df['total_price'] * df['quantity']
# Calculate total inventory value
if 'inventory_value' not in df.columns:
df['inventory_value'] = df['total_price'] * df['current_stock']
# Add article field for backward compatibility
if 'article' not in df.columns and 'supplier_article' in df.columns:
df['article'] = df['supplier_article']
# Mark low stock items (less than 5)
if 'is_low_stock' not in df.columns:
if 'current_stock' in df.columns:
df['is_low_stock'] = df['current_stock'] < 5
else:
df['is_low_stock'] = True
# Calculate pipeline stock (items in transit)
if 'pipeline_stock' not in df.columns:
in_way_to_client = df['in_way_to_client'] if 'in_way_to_client' in df.columns else 0
in_way_from_client = df['in_way_from_client'] if 'in_way_from_client' in df.columns else 0
df['pipeline_stock'] = in_way_to_client + in_way_from_client
logger.info(f"Processed {len(df)} stock records")
return df
def _process_reportdetail_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process and clean reportDetailByPeriod data from API response (v5)"""
# Column mapping based on actual API response structure
column_mapping = {
'realizationreport_id': 'report_id',
'date_from': 'date_from',
'date_to': 'date_to',
'create_dt': 'create_date',
'currency_name': 'currency',
'suppliercontract_code': 'contract_code',
'rrd_id': 'record_id',
'gi_id': 'goods_id',
'subject_name': 'category',
'nm_id': 'product_id',
'brand_name': 'brand',
'sa_name': 'supplier_article',
'ts_name': 'tech_size',
'barcode': 'barcode',
'doc_type_name': 'document_type',
'quantity': 'quantity',
'retail_price': 'retail_price',
'retail_amount': 'retail_amount',
'sale_percent': 'sale_percent',
'commission_percent': 'commission_percent',
'office_name': 'office_name',
'supplier_oper_name': 'operation_name',
'order_dt': 'order_date',
'sale_dt': 'sale_date',
'rr_dt': 'report_date',
'shk_id': 'warehouse_code',
'retail_price_withdisc_rub': 'discounted_price',
'delivery_amount': 'delivery_amount',
'return_amount': 'return_amount',
'delivery_rub': 'delivery_cost',
'gi_box_type_name': 'box_type',
'product_discount_for_report': 'product_discount',
'supplier_promo': 'supplier_promo',
'rid': 'rid',
'ppvz_spp_prc': 'spp_percent',
'ppvz_kvw_prc_base': 'kvw_percent_base',
'ppvz_kvw_prc': 'kvw_percent',
'sup_rating_prc_up': 'rating_bonus',
'is_kgvp_v2': 'is_kgvp',
'ppvz_sales_commission': 'sales_commission',
'ppvz_for_pay': 'amount_for_pay',
'ppvz_reward': 'reward',
'acquiring_fee': 'acquiring_fee',
'acquiring_percent': 'acquiring_percent',
'payment_processing': 'payment_processing',
'acquiring_bank': 'acquiring_bank',
'ppvz_vw': 'warehouse_cost',
'ppvz_vw_nds': 'warehouse_cost_vat',
'ppvz_office_name': 'pickup_office',
'ppvz_office_id': 'pickup_office_id',
'ppvz_supplier_id': 'supplier_id',
'ppvz_supplier_name': 'supplier_name',
'ppvz_inn': 'supplier_inn',
'declaration_number': 'declaration_number',
'sticker_id': 'sticker_id',
'site_country': 'site_country',
'penalty': 'penalty',
'additional_payment': 'additional_payment',
'rebill_logistic_cost': 'logistics_cost',
'rebill_logistic_org': 'logistics_company',
'storage_fee': 'storage_fee',
'deduction': 'deduction',
'acceptance': 'acceptance',
'assembly_id': 'assembly_id',
'srid': 'unique_id',
'report_type': 'report_type'
}
# Rename columns that exist
for old_name, new_name in column_mapping.items():
if old_name in df.columns:
df = df.rename(columns={old_name: new_name})
# Convert date columns
date_columns = ['create_date', 'order_date', 'sale_date', 'report_date']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# Convert numeric columns
numeric_columns = [
'quantity', 'retail_price', 'retail_amount', 'sale_percent', 'commission_percent',
'discounted_price', 'delivery_amount', 'return_amount', 'delivery_cost',
'product_discount', 'supplier_promo', 'spp_percent', 'kvw_percent_base',
'kvw_percent', 'rating_bonus', 'sales_commission', 'amount_for_pay',
'reward', 'acquiring_fee', 'acquiring_percent', 'warehouse_cost',
'warehouse_cost_vat', 'penalty', 'additional_payment', 'logistics_cost',
'storage_fee', 'deduction', 'acceptance'
]
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Add product name (use supplier_article if available)
if 'product_name' not in df.columns:
if 'supplier_article' in df.columns:
df['product_name'] = df['supplier_article']
elif 'category' in df.columns:
df['product_name'] = df['category']
else:
df['product_name'] = 'Unknown Product'
# Ensure quantity exists and handle zero quantities properly
if 'quantity' not in df.columns:
df['quantity'] = 1
else:
# Convert to numeric and fill NaN with 1
df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(1)
# For sales records, quantity of 0 means 1 transaction
df.loc[df['quantity'] == 0, 'quantity'] = 1
# Add missing columns that dashboard expects
# Create total_price as unit_price * quantity (total revenue per transaction)
if 'total_price' not in df.columns:
if 'discounted_price' in df.columns:
df['unit_price'] = df['discounted_price']
elif 'retail_price' in df.columns:
df['unit_price'] = df['retail_price']
elif 'amount_for_pay' in df.columns:
df['unit_price'] = df['amount_for_pay']
else:
df['unit_price'] = 0
# Calculate total_price as unit_price * quantity for correct revenue
df['total_price'] = df['unit_price'] * df['quantity']
# Add sale_amount if missing (use amount_for_pay or total_price)
if 'sale_amount' not in df.columns:
if 'amount_for_pay' in df.columns:
df['sale_amount'] = df['amount_for_pay']
else:
df['sale_amount'] = df['total_price']
# Add current_stock for inventory data (default to 0 if not available)
if 'current_stock' not in df.columns:
df['current_stock'] = 0
# Filter out non-sales records (keep only actual sales)
if 'document_type' in df.columns:
# Filter to keep only sales transactions
sales_mask = (df['document_type'] == 'Продажа') | (df['document_type'].notna() & (df['document_type'] != ''))
df = df[sales_mask]
logger.info(f"Filtered to {len(df)} sales records")
if 'operation_name' in df.columns:
# Keep records that are actual sales or returns
df = df[~df['operation_name'].str.contains('Возмещение', na=False)]
return df
def _process_orders_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process and clean orders data from API response (legacy method)"""
# This is now handled by _process_reportdetail_data
return self._process_reportdetail_data(df)
def test_token_validity(self) -> bool:
"""Test if the current token is valid by making a simple API call"""
if not self.is_configured:
return False
try:
response = self.ping("statistics")
return response.get("status") == "success"
except Exception:
return False
def test_connection(self) -> Dict[str, Any]:
"""Test API connection and return status"""
try:
# First test token validity
if not self.test_token_validity():
return {
"status": "error",
"message": "Invalid or missing API token",
"records_count": 0,
"rate_limit_remaining": 0
}
# Try to get stocks for yesterday (minimal data request)
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
result = self.get_stocks(yesterday)
return {
"status": "success",
"message": "API connection successful",
"records_count": len(result),
"rate_limit_remaining": self.config.rate_limit_requests - len(self.rate_limiter.requests)
}
except Exception as e:
return {
"status": "error",
"message": f"API connection failed: {str(e)}",
"records_count": 0,
"rate_limit_remaining": 0
}
def ping(self, service: str = "statistics") -> Dict[str, Any]:
"""
Test connection to specific WB API service
Args:
service: API service to ping (statistics, content, analytics, common, marketplace, supplies)
Returns:
Dict with ping result
Note: Rate limit is 3 requests every 30 seconds per service
"""
endpoint_key = f"ping_{service}"
if endpoint_key not in self.config.get_endpoints():
raise WildberriesAPIError(f"Unknown service: {service}")
endpoint = self.config.get_endpoints()[endpoint_key]
try:
# Simple request without using main rate limiter (ping has separate limits)
response = self.session.get(endpoint, timeout=10)
if response.ok:
data = response.json()
return {
"status": "success",
"service": service,
"timestamp": data.get("TS"),
"api_status": data.get("Status"),
"message": f"Connection to {service} API successful"
}
else:
return {
"status": "error",
"service": service,
"message": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"status": "error",
"service": service,
"message": f"Connection failed: {str(e)}"
}
def get_seller_info(self) -> Dict[str, Any]:
"""
Get seller information including name and account ID
Returns:
Dict with seller information
Note: Maximum 1 request per minute per seller account
"""
endpoint = self.config.get_endpoints()["seller_info"]
try:
response = self._make_request("GET", endpoint)
return {
"name": response.get("name"),
"seller_id": response.get("sid"),
"trade_mark": response.get("tradeMark"),
"status": "success"
}
except Exception as e:
logger.error(f"Error fetching seller info: {str(e)}")
return {
"status": "error",
"message": f"Failed to fetch seller info: {str(e)}"
}
def get_news(self, from_date: str = None, from_id: int = None, limit: int = 500) -> Dict[str, Any]:
"""
Get seller portal news
Args:
from_date: Date from which to get news (YYYY-MM-DD format)
from_id: News ID to start from (including it)
limit: Maximum number of news items (up to 100)
Returns:
Dict with news data
Note: Maximum 10 requests per 10 minutes per seller account
"""
endpoint = self.config.get_endpoints()["news"]
params = {}
if from_date:
params["from"] = from_date
if from_id:
params["fromID"] = from_id
if not from_date and not from_id:
# Default to last 7 days if no parameters specified
from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
params["from"] = from_date
try:
response = self._make_request("GET", endpoint, params=params)
news_items = response.get("data", [])
return {
"status": "success",
"count": len(news_items),
"news": news_items[:limit] if len(news_items) > limit else news_items
}
except Exception as e:
logger.error(f"Error fetching news: {str(e)}")
return {
"status": "error",
"message": f"Failed to fetch news: {str(e)}",
"news": []
}