""" Wildberries API Client with rate limiting and error handling Optimized for Hugging Face Spaces deployment """ import requests import time import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass import pandas as pd from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import logging from config import get_config # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def validate_wb_data(weekly_data: pd.DataFrame, monthly_data: pd.DataFrame) -> Dict[str, Any]: """Validate Wildberries data consistency""" validation_results = { "status": "valid", "warnings": [], "errors": [] } try: # Check record count consistency if len(weekly_data) > len(monthly_data): validation_results["warnings"].append( f"Data inconsistency: Weekly records ({len(weekly_data)}) exceed monthly records ({len(monthly_data)})" ) # Check revenue consistency if not weekly_data.empty and not monthly_data.empty: weekly_revenue = weekly_data['total_price'].sum() if 'total_price' in weekly_data.columns else 0 monthly_revenue = monthly_data['total_price'].sum() if 'total_price' in monthly_data.columns else 0 if weekly_revenue > monthly_revenue: validation_results["warnings"].append( f"Data inconsistency: Weekly revenue (₽{weekly_revenue:,.0f}) exceeds monthly revenue (₽{monthly_revenue:,.0f})" ) # Check for duplicate sales IDs for df_name, df in [("weekly", weekly_data), ("monthly", monthly_data)]: if not df.empty and 'srid' in df.columns: duplicate_count = df.duplicated('srid').sum() if duplicate_count > 0: validation_results["warnings"].append( f"{df_name.title()} data: {duplicate_count} duplicate sale IDs found" ) except Exception as e: validation_results["status"] = "error" validation_results["errors"].append(f"Validation error: {str(e)}") logger.error(f"Data validation error: {str(e)}") return validation_results def aggregate_wb_data(df: pd.DataFrame, period: str = 'daily') -> pd.DataFrame: """Aggregate Wildberries data by time period for better performance""" if df.empty: return df try: if 'sale_date' not in df.columns: logger.warning("No sale_date column found for aggregation") return df # Ensure sale_date is datetime df['sale_date'] = pd.to_datetime(df['sale_date'], errors='coerce') # Define aggregation functions agg_functions = { 'total_price': 'sum', 'quantity': 'sum' } # Add optional columns if they exist if 'amount_for_pay' in df.columns: agg_functions['amount_for_pay'] = 'sum' if 'payment_sale_amount' in df.columns: agg_functions['payment_sale_amount'] = 'sum' if period == 'daily': grouped = df.groupby(df['sale_date'].dt.date) elif period == 'weekly': grouped = df.groupby(df['sale_date'].dt.to_period('W')) elif period == 'monthly': grouped = df.groupby(df['sale_date'].dt.to_period('M')) else: return df # Return original if period not recognized aggregated = grouped.agg(agg_functions).reset_index() # Convert period back to datetime if needed if period in ['weekly', 'monthly']: aggregated['sale_date'] = aggregated['sale_date'].dt.start_time return aggregated except Exception as e: logger.error(f"Error aggregating data: {str(e)}") return df @dataclass class RateLimiter: """Simple rate limiter implementation""" requests_per_minute: int = 300 window_seconds: int = 60 def __post_init__(self): self.requests = [] def can_make_request(self) -> bool: """Check if we can make a request without hitting rate limits""" now = time.time() # Remove requests older than our window self.requests = [req_time for req_time in self.requests if now - req_time < self.window_seconds] return len(self.requests) < self.requests_per_minute def record_request(self): """Record that we made a request""" self.requests.append(time.time()) def wait_if_needed(self): """Wait if we're at the rate limit""" if not self.can_make_request(): # Calculate how long to wait oldest_request = min(self.requests) if self.requests else time.time() wait_time = self.window_seconds - (time.time() - oldest_request) + 1 logger.info(f"Rate limit reached, waiting {wait_time:.1f} seconds") time.sleep(wait_time) class WildberriesAPIError(Exception): """Custom exception for Wildberries API errors""" pass class WildberriesAPI: """Wildberries API client with rate limiting and retry logic""" def __init__(self, api_token: str = None): self.config = get_config() # Only use the explicitly provided token, no fallback to config self.api_token = api_token self.session = requests.Session() self.rate_limiter = RateLimiter( requests_per_minute=self.config.rate_limit_requests, window_seconds=self.config.rate_limit_window ) # Don't raise exception - allow graceful fallback self.is_configured = bool(self.api_token and self.api_token.strip()) # Set up session headers with the specific token self.session.headers.update(self._get_headers()) def _get_headers(self) -> Dict[str, str]: """Get headers with the instance's API token""" headers = { "Content-Type": "application/json", "User-Agent": "Wildberries-Analytics-Dashboard/1.0" } if self.api_token: headers["Authorization"] = f"Bearer {self.api_token}" return headers @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((requests.RequestException, WildberriesAPIError)) ) def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]: """Make an HTTP request with rate limiting and retry logic""" # Check if API is configured before making requests if not self.is_configured: logger.error("API token not configured - cannot make requests") logger.debug(f"Token present: {bool(self.api_token)}") raise WildberriesAPIError("API token not configured - cannot make requests") # Enhanced logging for debugging logger.info(f"Making {method} request to {url}") logger.debug(f"Headers: {self._get_headers()}") logger.debug(f"Params: {kwargs.get('params', {})}") # Check rate limits self.rate_limiter.wait_if_needed() try: # Record the request self.rate_limiter.record_request() # Make the request response = self.session.request( method=method, url=url, timeout=self.config.request_timeout, **kwargs ) # Handle rate limiting (429 status) if response.status_code == 429: retry_after = int(response.headers.get('X-Ratelimit-Retry', 60)) logger.warning(f"Rate limited by API, waiting {retry_after} seconds") time.sleep(retry_after) raise WildberriesAPIError(f"Rate limited: {response.status_code}") # Handle other HTTP errors if not response.ok: error_msg = f"HTTP {response.status_code}: {response.text}" logger.error(error_msg) raise WildberriesAPIError(error_msg) # Parse JSON response try: return response.json() except json.JSONDecodeError: return {"data": response.text} except requests.RequestException as e: logger.error(f"Request failed: {str(e)}") raise WildberriesAPIError(f"Request failed: {str(e)}") def get_sales(self, date_from: str, date_to: str = None, flag: int = 0) -> pd.DataFrame: """ Get sales data from Wildberries API with automatic pagination Args: date_from: Start date in YYYY-MM-DD format or lastChangeDate for pagination date_to: End date in YYYY-MM-DD format (optional, not used by sales API) flag: 0 for sales and returns, 1 for only sales (optional) Returns: pandas.DataFrame with sales data Note: API limit is 80,000 rows per request. This method handles pagination automatically. """ endpoint = self.config.get_endpoints()["sales"] all_sales_data = [] current_date_from = date_from max_iterations = 50 # Safety limit to prevent infinite loops iteration = 0 try: while iteration < max_iterations: # Build parameters for sales API params = {"dateFrom": current_date_from} if flag is not None: params["flag"] = flag logger.info(f"Fetching sales data from {current_date_from} (iteration {iteration + 1})") response = self._make_request("GET", endpoint, params=params) if not response: logger.warning("No sales data returned from API") break # Sales API returns direct array if isinstance(response, list): batch_data = response else: logger.warning("Unexpected API response format for sales") break if not batch_data: logger.info("Empty response received - all sales data retrieved") break logger.info(f"Retrieved {len(batch_data)} sales records") all_sales_data.extend(batch_data) # Check if we need pagination (response has 80,000 rows) if len(batch_data) < 80000: logger.info("Received less than 80,000 rows - all data retrieved") break # Get lastChangeDate from the last record for next request last_record = batch_data[-1] if 'lastChangeDate' in last_record: current_date_from = last_record['lastChangeDate'] logger.info(f"Next pagination starts from: {current_date_from}") else: logger.warning("No lastChangeDate found in response - stopping pagination") break iteration += 1 if iteration >= max_iterations: logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data") if not all_sales_data: return pd.DataFrame() # Convert to DataFrame and process sales_data = pd.DataFrame(all_sales_data) logger.info(f"Total sales records retrieved: {len(sales_data)}") # Process and clean the data using the correct sales data processor sales_data = self._process_sales_data(sales_data) return sales_data except Exception as e: logger.error(f"Error fetching sales data: {str(e)}") raise WildberriesAPIError(f"Failed to fetch sales data: {str(e)}") def get_stocks(self, date_from: str = None, date_to: str = None) -> pd.DataFrame: """ Get current stock levels from Wildberries API with automatic pagination Args: date_from: Date to get stock levels for (optional, defaults to 2019-06-20 for total stock) date_to: Not used for stocks endpoint (stocks API uses only dateFrom) Returns: pandas.DataFrame with stock data Note: API limit is 60,000 rows per request. This method handles pagination automatically. """ endpoint = self.config.get_endpoints()["stocks"] # Use early date to get total stock if no date specified if not date_from: date_from = "2019-06-20" # Early date to get all stocks all_stock_data = [] current_date_from = date_from max_iterations = 30 # Safety limit for stocks (should be less than sales) iteration = 0 try: while iteration < max_iterations: # Stocks API uses only dateFrom parameter (RFC3339 format) params = {"dateFrom": current_date_from} logger.info(f"Fetching stock data from {current_date_from} (iteration {iteration + 1})") response = self._make_request("GET", endpoint, params=params) if not response: logger.warning("No stock data returned from API") break # Stocks API returns direct array response if isinstance(response, list): batch_data = response else: logger.warning("Unexpected API response format for stocks") break if not batch_data: logger.info("Empty response received - all stock data retrieved") break logger.info(f"Retrieved {len(batch_data)} stock records") all_stock_data.extend(batch_data) # Check if we need pagination (response has 60,000 rows) if len(batch_data) < 60000: logger.info("Received less than 60,000 rows - all data retrieved") break # Get lastChangeDate from the last record for next request last_record = batch_data[-1] if 'lastChangeDate' in last_record: current_date_from = last_record['lastChangeDate'] logger.info(f"Next pagination starts from: {current_date_from}") else: logger.warning("No lastChangeDate found in response - stopping pagination") break iteration += 1 if iteration >= max_iterations: logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data") if not all_stock_data: return pd.DataFrame() # Convert to DataFrame and process stock_data = pd.DataFrame(all_stock_data) logger.info(f"Total stock records retrieved: {len(stock_data)}") # Process and clean the stock data using specific stock processing stock_data = self._process_stock_data(stock_data) return stock_data except Exception as e: logger.error(f"Error fetching stock data: {str(e)}") raise WildberriesAPIError(f"Failed to fetch stock data: {str(e)}") def get_orders(self, date_from: str, date_to: str = None) -> pd.DataFrame: """ Get orders data from Wildberries API Args: date_from: Start date in YYYY-MM-DD format date_to: End date in YYYY-MM-DD format (optional) Returns: pandas.DataFrame with orders data """ endpoint = self.config.get_endpoints()["orders"] # Add automatic dateTo defaulting to today's date when not provided if not date_to: date_to = datetime.now().strftime("%Y-%m-%d") params = {"dateFrom": date_from, "dateTo": date_to, "limit": 100} try: response = self._make_request("GET", endpoint, params=params) if not response: logger.warning("No orders data returned from API") return pd.DataFrame() # Handle direct array response (v5 API format) if isinstance(response, list): orders_data = pd.DataFrame(response) elif isinstance(response, dict) and "data" in response: orders_data = pd.DataFrame(response["data"]) else: logger.warning("Unexpected API response format") return pd.DataFrame() if orders_data.empty: return orders_data # Process and clean the data orders_data = self._process_reportdetail_data(orders_data) return orders_data except Exception as e: logger.error(f"Error fetching orders data: {str(e)}") raise WildberriesAPIError(f"Failed to fetch orders data: {str(e)}") def _process_sales_data(self, df: pd.DataFrame) -> pd.DataFrame: """ Process and clean sales data from API response (v1 sales endpoint) Handles all 28 fields from the official Wildberries Sales API: - Geographic: warehouseName, warehouseType, countryName, oblastOkrugName, regionName - Product: supplierArticle, nmId, barcode, category, subject, brand, techSize - Financial: priceWithDisc (→total_price), finishedPrice, totalPrice (→original_price), discountPercent, spp, paymentSaleAmount, forPay - Operational: incomeID, isSupply, isRealization, saleID, sticker, gNumber, srid - Temporal: date, lastChangeDate """ # Column mapping based on official Wildberries Sales API schema (v1) column_mapping = { 'date': 'sale_date', 'lastChangeDate': 'last_change_date', 'warehouseName': 'warehouse', 'warehouseType': 'warehouse_type', 'countryName': 'country', 'oblastOkrugName': 'region', 'regionName': 'city', 'supplierArticle': 'supplier_article', 'nmId': 'product_id', 'barcode': 'barcode', 'category': 'category', 'subject': 'subject', 'brand': 'brand', 'techSize': 'tech_size', 'incomeID': 'income_id', 'isSupply': 'is_supply', 'isRealization': 'is_realization', 'priceWithDisc': 'total_price', # Price with seller discount (for revenue calculation) 'finishedPrice': 'finished_price', # Final price with all discounts applied 'totalPrice': 'original_price', # Original price without discounts 'discountPercent': 'discount_percent', 'spp': 'spp_discount', 'paymentSaleAmount': 'payment_sale_amount', 'forPay': 'amount_for_pay', # What seller receives 'saleID': 'sale_id', 'sticker': 'sticker', 'gNumber': 'g_number', 'srid': 'unique_id' # Unique identifier for the sale } # Rename columns that exist for old_name, new_name in column_mapping.items(): if old_name in df.columns: df = df.rename(columns={old_name: new_name}) # Convert date columns date_columns = ['sale_date', 'last_change_date'] for col in date_columns: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Convert numeric columns numeric_columns = [ 'product_id', 'total_price', 'finished_price', 'original_price', 'discount_percent', 'spp_discount', 'payment_sale_amount', 'amount_for_pay', 'income_id' ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # Add product name (use supplier_article as primary) if 'product_name' not in df.columns: if 'supplier_article' in df.columns: df['product_name'] = df['supplier_article'] elif 'category' in df.columns: df['product_name'] = df['category'] else: df['product_name'] = 'Unknown Product' # Identify returns using saleID prefix (R********* = return, S********* = sale) if 'sale_id' in df.columns: df['is_return'] = df['sale_id'].astype(str).str.startswith('R') return_count = df['is_return'].sum() sale_count = (~df['is_return']).sum() logger.info(f"Identified {return_count} returns and {sale_count} sales based on saleID") else: # Fallback to negative total_price detection if saleID not available if 'total_price' in df.columns: df['is_return'] = df['total_price'] < 0 logger.warning("saleID not available, using total_price < 0 for return detection") else: df['is_return'] = False # Add quantity with proper sign (negative for returns) df['quantity'] = 1 df.loc[df['is_return'], 'quantity'] = -1 # Returns have negative quantity # Apply return logic to financial fields (make returns negative for accounting) return_mask = df['is_return'] financial_fields = ['total_price', 'finished_price', 'original_price', 'amount_for_pay', 'payment_sale_amount'] for field in financial_fields: if field in df.columns: # Make returns negative if they're not already (some APIs might already send negative values) df.loc[return_mask & (df[field] > 0), field] = -df.loc[return_mask & (df[field] > 0), field] # Calculate commission (difference between total_price and amount_for_pay) # This will be negative for returns, which is correct for accounting if 'total_price' in df.columns and 'amount_for_pay' in df.columns: df['sales_commission'] = df['total_price'] - df['amount_for_pay'] df['sales_commission'] = df['sales_commission'].fillna(0) # Add sale_amount for compatibility (use amount_for_pay as seller's net amount) if 'sale_amount' not in df.columns: if 'amount_for_pay' in df.columns: df['sale_amount'] = df['amount_for_pay'] else: df['sale_amount'] = df['total_price'] # Add current_stock for inventory forecasting (default to 0) if 'current_stock' not in df.columns: df['current_stock'] = 0 # Add transaction type for clarity if 'transaction_type' not in df.columns: df['transaction_type'] = df['is_return'].map({True: 'Return', False: 'Sale'}) logger.info(f"Processed {len(df)} sales records") return df def _process_stock_data(self, df: pd.DataFrame) -> pd.DataFrame: """Process and clean stock data from API response (v1 stocks endpoint)""" # Column mapping based on actual stocks API response structure column_mapping = { 'lastChangeDate': 'last_change_date', 'warehouseName': 'warehouse', 'supplierArticle': 'supplier_article', 'nmId': 'product_id', 'barcode': 'barcode', 'quantity': 'current_stock', 'inWayToClient': 'in_way_to_client', 'inWayFromClient': 'in_way_from_client', 'quantityFull': 'quantity_full', 'category': 'category', 'subject': 'subject', 'brand': 'brand', 'techSize': 'tech_size', 'Price': 'price', 'Discount': 'discount_percent', 'isSupply': 'is_supply', 'isRealization': 'is_realization', 'SCCode': 'sc_code' } # Rename columns that exist for old_name, new_name in column_mapping.items(): if old_name in df.columns: df = df.rename(columns={old_name: new_name}) # Convert date columns if 'last_change_date' in df.columns: df['last_change_date'] = pd.to_datetime(df['last_change_date'], errors='coerce') # Convert numeric columns numeric_columns = [ 'current_stock', 'in_way_to_client', 'in_way_from_client', 'quantity_full', 'price', 'discount_percent', 'product_id' ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # Add product name (use supplier_article as primary) if 'product_name' not in df.columns: if 'supplier_article' in df.columns: df['product_name'] = df['supplier_article'] elif 'subject' in df.columns: df['product_name'] = df['subject'] elif 'category' in df.columns: df['product_name'] = df['category'] else: df['product_name'] = 'Unknown Product' # Calculate discounted price if 'price_with_discount' not in df.columns and 'price' in df.columns: if 'discount_percent' in df.columns: df['price_with_discount'] = df['price'] * (1 - df['discount_percent'] / 100) else: df['price_with_discount'] = df['price'] # Add total_price for compatibility (use price_with_discount) if 'total_price' not in df.columns: if 'price_with_discount' in df.columns: df['total_price'] = df['price_with_discount'] elif 'price' in df.columns: df['total_price'] = df['price'] else: df['total_price'] = 0 # Add quantity for compatibility (use current_stock) if 'quantity' not in df.columns: if 'current_stock' in df.columns: df['quantity'] = df['current_stock'] else: df['quantity'] = 0 # Add sale_date for compatibility (use last_change_date) if 'sale_date' not in df.columns: if 'last_change_date' in df.columns: df['sale_date'] = df['last_change_date'] else: df['sale_date'] = pd.NaT # Add sale_amount for compatibility (use total_price * quantity) if 'sale_amount' not in df.columns: df['sale_amount'] = df['total_price'] * df['quantity'] # Calculate total inventory value if 'inventory_value' not in df.columns: df['inventory_value'] = df['total_price'] * df['current_stock'] # Add article field for backward compatibility if 'article' not in df.columns and 'supplier_article' in df.columns: df['article'] = df['supplier_article'] # Mark low stock items (less than 5) if 'is_low_stock' not in df.columns: if 'current_stock' in df.columns: df['is_low_stock'] = df['current_stock'] < 5 else: df['is_low_stock'] = True # Calculate pipeline stock (items in transit) if 'pipeline_stock' not in df.columns: in_way_to_client = df['in_way_to_client'] if 'in_way_to_client' in df.columns else 0 in_way_from_client = df['in_way_from_client'] if 'in_way_from_client' in df.columns else 0 df['pipeline_stock'] = in_way_to_client + in_way_from_client logger.info(f"Processed {len(df)} stock records") return df def _process_reportdetail_data(self, df: pd.DataFrame) -> pd.DataFrame: """Process and clean reportDetailByPeriod data from API response (v5)""" # Column mapping based on actual API response structure column_mapping = { 'realizationreport_id': 'report_id', 'date_from': 'date_from', 'date_to': 'date_to', 'create_dt': 'create_date', 'currency_name': 'currency', 'suppliercontract_code': 'contract_code', 'rrd_id': 'record_id', 'gi_id': 'goods_id', 'subject_name': 'category', 'nm_id': 'product_id', 'brand_name': 'brand', 'sa_name': 'supplier_article', 'ts_name': 'tech_size', 'barcode': 'barcode', 'doc_type_name': 'document_type', 'quantity': 'quantity', 'retail_price': 'retail_price', 'retail_amount': 'retail_amount', 'sale_percent': 'sale_percent', 'commission_percent': 'commission_percent', 'office_name': 'office_name', 'supplier_oper_name': 'operation_name', 'order_dt': 'order_date', 'sale_dt': 'sale_date', 'rr_dt': 'report_date', 'shk_id': 'warehouse_code', 'retail_price_withdisc_rub': 'discounted_price', 'delivery_amount': 'delivery_amount', 'return_amount': 'return_amount', 'delivery_rub': 'delivery_cost', 'gi_box_type_name': 'box_type', 'product_discount_for_report': 'product_discount', 'supplier_promo': 'supplier_promo', 'rid': 'rid', 'ppvz_spp_prc': 'spp_percent', 'ppvz_kvw_prc_base': 'kvw_percent_base', 'ppvz_kvw_prc': 'kvw_percent', 'sup_rating_prc_up': 'rating_bonus', 'is_kgvp_v2': 'is_kgvp', 'ppvz_sales_commission': 'sales_commission', 'ppvz_for_pay': 'amount_for_pay', 'ppvz_reward': 'reward', 'acquiring_fee': 'acquiring_fee', 'acquiring_percent': 'acquiring_percent', 'payment_processing': 'payment_processing', 'acquiring_bank': 'acquiring_bank', 'ppvz_vw': 'warehouse_cost', 'ppvz_vw_nds': 'warehouse_cost_vat', 'ppvz_office_name': 'pickup_office', 'ppvz_office_id': 'pickup_office_id', 'ppvz_supplier_id': 'supplier_id', 'ppvz_supplier_name': 'supplier_name', 'ppvz_inn': 'supplier_inn', 'declaration_number': 'declaration_number', 'sticker_id': 'sticker_id', 'site_country': 'site_country', 'penalty': 'penalty', 'additional_payment': 'additional_payment', 'rebill_logistic_cost': 'logistics_cost', 'rebill_logistic_org': 'logistics_company', 'storage_fee': 'storage_fee', 'deduction': 'deduction', 'acceptance': 'acceptance', 'assembly_id': 'assembly_id', 'srid': 'unique_id', 'report_type': 'report_type' } # Rename columns that exist for old_name, new_name in column_mapping.items(): if old_name in df.columns: df = df.rename(columns={old_name: new_name}) # Convert date columns date_columns = ['create_date', 'order_date', 'sale_date', 'report_date'] for col in date_columns: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Convert numeric columns numeric_columns = [ 'quantity', 'retail_price', 'retail_amount', 'sale_percent', 'commission_percent', 'discounted_price', 'delivery_amount', 'return_amount', 'delivery_cost', 'product_discount', 'supplier_promo', 'spp_percent', 'kvw_percent_base', 'kvw_percent', 'rating_bonus', 'sales_commission', 'amount_for_pay', 'reward', 'acquiring_fee', 'acquiring_percent', 'warehouse_cost', 'warehouse_cost_vat', 'penalty', 'additional_payment', 'logistics_cost', 'storage_fee', 'deduction', 'acceptance' ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # Add product name (use supplier_article if available) if 'product_name' not in df.columns: if 'supplier_article' in df.columns: df['product_name'] = df['supplier_article'] elif 'category' in df.columns: df['product_name'] = df['category'] else: df['product_name'] = 'Unknown Product' # Ensure quantity exists and handle zero quantities properly if 'quantity' not in df.columns: df['quantity'] = 1 else: # Convert to numeric and fill NaN with 1 df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(1) # For sales records, quantity of 0 means 1 transaction df.loc[df['quantity'] == 0, 'quantity'] = 1 # Add missing columns that dashboard expects # Create total_price as unit_price * quantity (total revenue per transaction) if 'total_price' not in df.columns: if 'discounted_price' in df.columns: df['unit_price'] = df['discounted_price'] elif 'retail_price' in df.columns: df['unit_price'] = df['retail_price'] elif 'amount_for_pay' in df.columns: df['unit_price'] = df['amount_for_pay'] else: df['unit_price'] = 0 # Calculate total_price as unit_price * quantity for correct revenue df['total_price'] = df['unit_price'] * df['quantity'] # Add sale_amount if missing (use amount_for_pay or total_price) if 'sale_amount' not in df.columns: if 'amount_for_pay' in df.columns: df['sale_amount'] = df['amount_for_pay'] else: df['sale_amount'] = df['total_price'] # Add current_stock for inventory data (default to 0 if not available) if 'current_stock' not in df.columns: df['current_stock'] = 0 # Filter out non-sales records (keep only actual sales) if 'document_type' in df.columns: # Filter to keep only sales transactions sales_mask = (df['document_type'] == 'Продажа') | (df['document_type'].notna() & (df['document_type'] != '')) df = df[sales_mask] logger.info(f"Filtered to {len(df)} sales records") if 'operation_name' in df.columns: # Keep records that are actual sales or returns df = df[~df['operation_name'].str.contains('Возмещение', na=False)] return df def _process_orders_data(self, df: pd.DataFrame) -> pd.DataFrame: """Process and clean orders data from API response (legacy method)""" # This is now handled by _process_reportdetail_data return self._process_reportdetail_data(df) def test_token_validity(self) -> bool: """Test if the current token is valid by making a simple API call""" if not self.is_configured: return False try: response = self.ping("statistics") return response.get("status") == "success" except Exception: return False def test_connection(self) -> Dict[str, Any]: """Test API connection and return status""" try: # First test token validity if not self.test_token_validity(): return { "status": "error", "message": "Invalid or missing API token", "records_count": 0, "rate_limit_remaining": 0 } # Try to get stocks for yesterday (minimal data request) yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") result = self.get_stocks(yesterday) return { "status": "success", "message": "API connection successful", "records_count": len(result), "rate_limit_remaining": self.config.rate_limit_requests - len(self.rate_limiter.requests) } except Exception as e: return { "status": "error", "message": f"API connection failed: {str(e)}", "records_count": 0, "rate_limit_remaining": 0 } def ping(self, service: str = "statistics") -> Dict[str, Any]: """ Test connection to specific WB API service Args: service: API service to ping (statistics, content, analytics, common, marketplace, supplies) Returns: Dict with ping result Note: Rate limit is 3 requests every 30 seconds per service """ endpoint_key = f"ping_{service}" if endpoint_key not in self.config.get_endpoints(): raise WildberriesAPIError(f"Unknown service: {service}") endpoint = self.config.get_endpoints()[endpoint_key] try: # Simple request without using main rate limiter (ping has separate limits) response = self.session.get(endpoint, timeout=10) if response.ok: data = response.json() return { "status": "success", "service": service, "timestamp": data.get("TS"), "api_status": data.get("Status"), "message": f"Connection to {service} API successful" } else: return { "status": "error", "service": service, "message": f"HTTP {response.status_code}: {response.text}" } except Exception as e: return { "status": "error", "service": service, "message": f"Connection failed: {str(e)}" } def get_seller_info(self) -> Dict[str, Any]: """ Get seller information including name and account ID Returns: Dict with seller information Note: Maximum 1 request per minute per seller account """ endpoint = self.config.get_endpoints()["seller_info"] try: response = self._make_request("GET", endpoint) return { "name": response.get("name"), "seller_id": response.get("sid"), "trade_mark": response.get("tradeMark"), "status": "success" } except Exception as e: logger.error(f"Error fetching seller info: {str(e)}") return { "status": "error", "message": f"Failed to fetch seller info: {str(e)}" } def get_news(self, from_date: str = None, from_id: int = None, limit: int = 500) -> Dict[str, Any]: """ Get seller portal news Args: from_date: Date from which to get news (YYYY-MM-DD format) from_id: News ID to start from (including it) limit: Maximum number of news items (up to 100) Returns: Dict with news data Note: Maximum 10 requests per 10 minutes per seller account """ endpoint = self.config.get_endpoints()["news"] params = {} if from_date: params["from"] = from_date if from_id: params["fromID"] = from_id if not from_date and not from_id: # Default to last 7 days if no parameters specified from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") params["from"] = from_date try: response = self._make_request("GET", endpoint, params=params) news_items = response.get("data", []) return { "status": "success", "count": len(news_items), "news": news_items[:limit] if len(news_items) > limit else news_items } except Exception as e: logger.error(f"Error fetching news: {str(e)}") return { "status": "error", "message": f"Failed to fetch news: {str(e)}", "news": [] }