Spaces:
Running
Running
| """ | |
| Wildberries API Client with rate limiting and error handling | |
| Optimized for Hugging Face Spaces deployment | |
| """ | |
| import requests | |
| import time | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass | |
| import pandas as pd | |
| from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
| import logging | |
| from config import get_config | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def validate_wb_data(weekly_data: pd.DataFrame, monthly_data: pd.DataFrame) -> Dict[str, Any]: | |
| """Validate Wildberries data consistency""" | |
| validation_results = { | |
| "status": "valid", | |
| "warnings": [], | |
| "errors": [] | |
| } | |
| try: | |
| # Check record count consistency | |
| if len(weekly_data) > len(monthly_data): | |
| validation_results["warnings"].append( | |
| f"Data inconsistency: Weekly records ({len(weekly_data)}) exceed monthly records ({len(monthly_data)})" | |
| ) | |
| # Check revenue consistency | |
| if not weekly_data.empty and not monthly_data.empty: | |
| weekly_revenue = weekly_data['total_price'].sum() if 'total_price' in weekly_data.columns else 0 | |
| monthly_revenue = monthly_data['total_price'].sum() if 'total_price' in monthly_data.columns else 0 | |
| if weekly_revenue > monthly_revenue: | |
| validation_results["warnings"].append( | |
| f"Data inconsistency: Weekly revenue (₽{weekly_revenue:,.0f}) exceeds monthly revenue (₽{monthly_revenue:,.0f})" | |
| ) | |
| # Check for duplicate sales IDs | |
| for df_name, df in [("weekly", weekly_data), ("monthly", monthly_data)]: | |
| if not df.empty and 'srid' in df.columns: | |
| duplicate_count = df.duplicated('srid').sum() | |
| if duplicate_count > 0: | |
| validation_results["warnings"].append( | |
| f"{df_name.title()} data: {duplicate_count} duplicate sale IDs found" | |
| ) | |
| except Exception as e: | |
| validation_results["status"] = "error" | |
| validation_results["errors"].append(f"Validation error: {str(e)}") | |
| logger.error(f"Data validation error: {str(e)}") | |
| return validation_results | |
| def aggregate_wb_data(df: pd.DataFrame, period: str = 'daily') -> pd.DataFrame: | |
| """Aggregate Wildberries data by time period for better performance""" | |
| if df.empty: | |
| return df | |
| try: | |
| if 'sale_date' not in df.columns: | |
| logger.warning("No sale_date column found for aggregation") | |
| return df | |
| # Ensure sale_date is datetime | |
| df['sale_date'] = pd.to_datetime(df['sale_date'], errors='coerce') | |
| # Define aggregation functions | |
| agg_functions = { | |
| 'total_price': 'sum', | |
| 'quantity': 'sum' | |
| } | |
| # Add optional columns if they exist | |
| if 'amount_for_pay' in df.columns: | |
| agg_functions['amount_for_pay'] = 'sum' | |
| if 'payment_sale_amount' in df.columns: | |
| agg_functions['payment_sale_amount'] = 'sum' | |
| if period == 'daily': | |
| grouped = df.groupby(df['sale_date'].dt.date) | |
| elif period == 'weekly': | |
| grouped = df.groupby(df['sale_date'].dt.to_period('W')) | |
| elif period == 'monthly': | |
| grouped = df.groupby(df['sale_date'].dt.to_period('M')) | |
| else: | |
| return df # Return original if period not recognized | |
| aggregated = grouped.agg(agg_functions).reset_index() | |
| # Convert period back to datetime if needed | |
| if period in ['weekly', 'monthly']: | |
| aggregated['sale_date'] = aggregated['sale_date'].dt.start_time | |
| return aggregated | |
| except Exception as e: | |
| logger.error(f"Error aggregating data: {str(e)}") | |
| return df | |
| class RateLimiter: | |
| """Simple rate limiter implementation""" | |
| requests_per_minute: int = 300 | |
| window_seconds: int = 60 | |
| def __post_init__(self): | |
| self.requests = [] | |
| def can_make_request(self) -> bool: | |
| """Check if we can make a request without hitting rate limits""" | |
| now = time.time() | |
| # Remove requests older than our window | |
| self.requests = [req_time for req_time in self.requests if now - req_time < self.window_seconds] | |
| return len(self.requests) < self.requests_per_minute | |
| def record_request(self): | |
| """Record that we made a request""" | |
| self.requests.append(time.time()) | |
| def wait_if_needed(self): | |
| """Wait if we're at the rate limit""" | |
| if not self.can_make_request(): | |
| # Calculate how long to wait | |
| oldest_request = min(self.requests) if self.requests else time.time() | |
| wait_time = self.window_seconds - (time.time() - oldest_request) + 1 | |
| logger.info(f"Rate limit reached, waiting {wait_time:.1f} seconds") | |
| time.sleep(wait_time) | |
| class WildberriesAPIError(Exception): | |
| """Custom exception for Wildberries API errors""" | |
| pass | |
| class WildberriesAPI: | |
| """Wildberries API client with rate limiting and retry logic""" | |
| def __init__(self, api_token: str = None): | |
| self.config = get_config() | |
| # Only use the explicitly provided token, no fallback to config | |
| self.api_token = api_token | |
| self.session = requests.Session() | |
| self.rate_limiter = RateLimiter( | |
| requests_per_minute=self.config.rate_limit_requests, | |
| window_seconds=self.config.rate_limit_window | |
| ) | |
| # Don't raise exception - allow graceful fallback | |
| self.is_configured = bool(self.api_token and self.api_token.strip()) | |
| # Set up session headers with the specific token | |
| self.session.headers.update(self._get_headers()) | |
| def _get_headers(self) -> Dict[str, str]: | |
| """Get headers with the instance's API token""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "Wildberries-Analytics-Dashboard/1.0" | |
| } | |
| if self.api_token: | |
| headers["Authorization"] = f"Bearer {self.api_token}" | |
| return headers | |
| def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]: | |
| """Make an HTTP request with rate limiting and retry logic""" | |
| # Check if API is configured before making requests | |
| if not self.is_configured: | |
| logger.error("API token not configured - cannot make requests") | |
| logger.debug(f"Token present: {bool(self.api_token)}") | |
| raise WildberriesAPIError("API token not configured - cannot make requests") | |
| # Enhanced logging for debugging | |
| logger.info(f"Making {method} request to {url}") | |
| logger.debug(f"Headers: {self._get_headers()}") | |
| logger.debug(f"Params: {kwargs.get('params', {})}") | |
| # Check rate limits | |
| self.rate_limiter.wait_if_needed() | |
| try: | |
| # Record the request | |
| self.rate_limiter.record_request() | |
| # Make the request | |
| response = self.session.request( | |
| method=method, | |
| url=url, | |
| timeout=self.config.request_timeout, | |
| **kwargs | |
| ) | |
| # Handle rate limiting (429 status) | |
| if response.status_code == 429: | |
| retry_after = int(response.headers.get('X-Ratelimit-Retry', 60)) | |
| logger.warning(f"Rate limited by API, waiting {retry_after} seconds") | |
| time.sleep(retry_after) | |
| raise WildberriesAPIError(f"Rate limited: {response.status_code}") | |
| # Handle other HTTP errors | |
| if not response.ok: | |
| error_msg = f"HTTP {response.status_code}: {response.text}" | |
| logger.error(error_msg) | |
| raise WildberriesAPIError(error_msg) | |
| # Parse JSON response | |
| try: | |
| return response.json() | |
| except json.JSONDecodeError: | |
| return {"data": response.text} | |
| except requests.RequestException as e: | |
| logger.error(f"Request failed: {str(e)}") | |
| raise WildberriesAPIError(f"Request failed: {str(e)}") | |
| def get_sales(self, date_from: str, date_to: str = None, flag: int = 0) -> pd.DataFrame: | |
| """ | |
| Get sales data from Wildberries API with automatic pagination | |
| Args: | |
| date_from: Start date in YYYY-MM-DD format or lastChangeDate for pagination | |
| date_to: End date in YYYY-MM-DD format (optional, not used by sales API) | |
| flag: 0 for sales and returns, 1 for only sales (optional) | |
| Returns: | |
| pandas.DataFrame with sales data | |
| Note: API limit is 80,000 rows per request. This method handles pagination automatically. | |
| """ | |
| endpoint = self.config.get_endpoints()["sales"] | |
| all_sales_data = [] | |
| current_date_from = date_from | |
| max_iterations = 50 # Safety limit to prevent infinite loops | |
| iteration = 0 | |
| try: | |
| while iteration < max_iterations: | |
| # Build parameters for sales API | |
| params = {"dateFrom": current_date_from} | |
| if flag is not None: | |
| params["flag"] = flag | |
| logger.info(f"Fetching sales data from {current_date_from} (iteration {iteration + 1})") | |
| response = self._make_request("GET", endpoint, params=params) | |
| if not response: | |
| logger.warning("No sales data returned from API") | |
| break | |
| # Sales API returns direct array | |
| if isinstance(response, list): | |
| batch_data = response | |
| else: | |
| logger.warning("Unexpected API response format for sales") | |
| break | |
| if not batch_data: | |
| logger.info("Empty response received - all sales data retrieved") | |
| break | |
| logger.info(f"Retrieved {len(batch_data)} sales records") | |
| all_sales_data.extend(batch_data) | |
| # Check if we need pagination (response has 80,000 rows) | |
| if len(batch_data) < 80000: | |
| logger.info("Received less than 80,000 rows - all data retrieved") | |
| break | |
| # Get lastChangeDate from the last record for next request | |
| last_record = batch_data[-1] | |
| if 'lastChangeDate' in last_record: | |
| current_date_from = last_record['lastChangeDate'] | |
| logger.info(f"Next pagination starts from: {current_date_from}") | |
| else: | |
| logger.warning("No lastChangeDate found in response - stopping pagination") | |
| break | |
| iteration += 1 | |
| if iteration >= max_iterations: | |
| logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data") | |
| if not all_sales_data: | |
| return pd.DataFrame() | |
| # Convert to DataFrame and process | |
| sales_data = pd.DataFrame(all_sales_data) | |
| logger.info(f"Total sales records retrieved: {len(sales_data)}") | |
| # Process and clean the data using the correct sales data processor | |
| sales_data = self._process_sales_data(sales_data) | |
| return sales_data | |
| except Exception as e: | |
| logger.error(f"Error fetching sales data: {str(e)}") | |
| raise WildberriesAPIError(f"Failed to fetch sales data: {str(e)}") | |
| def get_stocks(self, date_from: str = None, date_to: str = None) -> pd.DataFrame: | |
| """ | |
| Get current stock levels from Wildberries API with automatic pagination | |
| Args: | |
| date_from: Date to get stock levels for (optional, defaults to 2019-06-20 for total stock) | |
| date_to: Not used for stocks endpoint (stocks API uses only dateFrom) | |
| Returns: | |
| pandas.DataFrame with stock data | |
| Note: API limit is 60,000 rows per request. This method handles pagination automatically. | |
| """ | |
| endpoint = self.config.get_endpoints()["stocks"] | |
| # Use early date to get total stock if no date specified | |
| if not date_from: | |
| date_from = "2019-06-20" # Early date to get all stocks | |
| all_stock_data = [] | |
| current_date_from = date_from | |
| max_iterations = 30 # Safety limit for stocks (should be less than sales) | |
| iteration = 0 | |
| try: | |
| while iteration < max_iterations: | |
| # Stocks API uses only dateFrom parameter (RFC3339 format) | |
| params = {"dateFrom": current_date_from} | |
| logger.info(f"Fetching stock data from {current_date_from} (iteration {iteration + 1})") | |
| response = self._make_request("GET", endpoint, params=params) | |
| if not response: | |
| logger.warning("No stock data returned from API") | |
| break | |
| # Stocks API returns direct array response | |
| if isinstance(response, list): | |
| batch_data = response | |
| else: | |
| logger.warning("Unexpected API response format for stocks") | |
| break | |
| if not batch_data: | |
| logger.info("Empty response received - all stock data retrieved") | |
| break | |
| logger.info(f"Retrieved {len(batch_data)} stock records") | |
| all_stock_data.extend(batch_data) | |
| # Check if we need pagination (response has 60,000 rows) | |
| if len(batch_data) < 60000: | |
| logger.info("Received less than 60,000 rows - all data retrieved") | |
| break | |
| # Get lastChangeDate from the last record for next request | |
| last_record = batch_data[-1] | |
| if 'lastChangeDate' in last_record: | |
| current_date_from = last_record['lastChangeDate'] | |
| logger.info(f"Next pagination starts from: {current_date_from}") | |
| else: | |
| logger.warning("No lastChangeDate found in response - stopping pagination") | |
| break | |
| iteration += 1 | |
| if iteration >= max_iterations: | |
| logger.warning(f"Maximum iterations ({max_iterations}) reached - there may be more data") | |
| if not all_stock_data: | |
| return pd.DataFrame() | |
| # Convert to DataFrame and process | |
| stock_data = pd.DataFrame(all_stock_data) | |
| logger.info(f"Total stock records retrieved: {len(stock_data)}") | |
| # Process and clean the stock data using specific stock processing | |
| stock_data = self._process_stock_data(stock_data) | |
| return stock_data | |
| except Exception as e: | |
| logger.error(f"Error fetching stock data: {str(e)}") | |
| raise WildberriesAPIError(f"Failed to fetch stock data: {str(e)}") | |
| def get_orders(self, date_from: str, date_to: str = None) -> pd.DataFrame: | |
| """ | |
| Get orders data from Wildberries API | |
| Args: | |
| date_from: Start date in YYYY-MM-DD format | |
| date_to: End date in YYYY-MM-DD format (optional) | |
| Returns: | |
| pandas.DataFrame with orders data | |
| """ | |
| endpoint = self.config.get_endpoints()["orders"] | |
| # Add automatic dateTo defaulting to today's date when not provided | |
| if not date_to: | |
| date_to = datetime.now().strftime("%Y-%m-%d") | |
| params = {"dateFrom": date_from, "dateTo": date_to, "limit": 100} | |
| try: | |
| response = self._make_request("GET", endpoint, params=params) | |
| if not response: | |
| logger.warning("No orders data returned from API") | |
| return pd.DataFrame() | |
| # Handle direct array response (v5 API format) | |
| if isinstance(response, list): | |
| orders_data = pd.DataFrame(response) | |
| elif isinstance(response, dict) and "data" in response: | |
| orders_data = pd.DataFrame(response["data"]) | |
| else: | |
| logger.warning("Unexpected API response format") | |
| return pd.DataFrame() | |
| if orders_data.empty: | |
| return orders_data | |
| # Process and clean the data | |
| orders_data = self._process_reportdetail_data(orders_data) | |
| return orders_data | |
| except Exception as e: | |
| logger.error(f"Error fetching orders data: {str(e)}") | |
| raise WildberriesAPIError(f"Failed to fetch orders data: {str(e)}") | |
| def _process_sales_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Process and clean sales data from API response (v1 sales endpoint) | |
| Handles all 28 fields from the official Wildberries Sales API: | |
| - Geographic: warehouseName, warehouseType, countryName, oblastOkrugName, regionName | |
| - Product: supplierArticle, nmId, barcode, category, subject, brand, techSize | |
| - Financial: priceWithDisc (→total_price), finishedPrice, totalPrice (→original_price), discountPercent, spp, paymentSaleAmount, forPay | |
| - Operational: incomeID, isSupply, isRealization, saleID, sticker, gNumber, srid | |
| - Temporal: date, lastChangeDate | |
| """ | |
| # Column mapping based on official Wildberries Sales API schema (v1) | |
| column_mapping = { | |
| 'date': 'sale_date', | |
| 'lastChangeDate': 'last_change_date', | |
| 'warehouseName': 'warehouse', | |
| 'warehouseType': 'warehouse_type', | |
| 'countryName': 'country', | |
| 'oblastOkrugName': 'region', | |
| 'regionName': 'city', | |
| 'supplierArticle': 'supplier_article', | |
| 'nmId': 'product_id', | |
| 'barcode': 'barcode', | |
| 'category': 'category', | |
| 'subject': 'subject', | |
| 'brand': 'brand', | |
| 'techSize': 'tech_size', | |
| 'incomeID': 'income_id', | |
| 'isSupply': 'is_supply', | |
| 'isRealization': 'is_realization', | |
| 'priceWithDisc': 'total_price', # Price with seller discount (for revenue calculation) | |
| 'finishedPrice': 'finished_price', # Final price with all discounts applied | |
| 'totalPrice': 'original_price', # Original price without discounts | |
| 'discountPercent': 'discount_percent', | |
| 'spp': 'spp_discount', | |
| 'paymentSaleAmount': 'payment_sale_amount', | |
| 'forPay': 'amount_for_pay', # What seller receives | |
| 'saleID': 'sale_id', | |
| 'sticker': 'sticker', | |
| 'gNumber': 'g_number', | |
| 'srid': 'unique_id' # Unique identifier for the sale | |
| } | |
| # Rename columns that exist | |
| for old_name, new_name in column_mapping.items(): | |
| if old_name in df.columns: | |
| df = df.rename(columns={old_name: new_name}) | |
| # Convert date columns | |
| date_columns = ['sale_date', 'last_change_date'] | |
| for col in date_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| # Convert numeric columns | |
| numeric_columns = [ | |
| 'product_id', 'total_price', 'finished_price', 'original_price', 'discount_percent', 'spp_discount', | |
| 'payment_sale_amount', 'amount_for_pay', 'income_id' | |
| ] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Add product name (use supplier_article as primary) | |
| if 'product_name' not in df.columns: | |
| if 'supplier_article' in df.columns: | |
| df['product_name'] = df['supplier_article'] | |
| elif 'category' in df.columns: | |
| df['product_name'] = df['category'] | |
| else: | |
| df['product_name'] = 'Unknown Product' | |
| # Identify returns using saleID prefix (R********* = return, S********* = sale) | |
| if 'sale_id' in df.columns: | |
| df['is_return'] = df['sale_id'].astype(str).str.startswith('R') | |
| return_count = df['is_return'].sum() | |
| sale_count = (~df['is_return']).sum() | |
| logger.info(f"Identified {return_count} returns and {sale_count} sales based on saleID") | |
| else: | |
| # Fallback to negative total_price detection if saleID not available | |
| if 'total_price' in df.columns: | |
| df['is_return'] = df['total_price'] < 0 | |
| logger.warning("saleID not available, using total_price < 0 for return detection") | |
| else: | |
| df['is_return'] = False | |
| # Add quantity with proper sign (negative for returns) | |
| df['quantity'] = 1 | |
| df.loc[df['is_return'], 'quantity'] = -1 # Returns have negative quantity | |
| # Apply return logic to financial fields (make returns negative for accounting) | |
| return_mask = df['is_return'] | |
| financial_fields = ['total_price', 'finished_price', 'original_price', 'amount_for_pay', 'payment_sale_amount'] | |
| for field in financial_fields: | |
| if field in df.columns: | |
| # Make returns negative if they're not already (some APIs might already send negative values) | |
| df.loc[return_mask & (df[field] > 0), field] = -df.loc[return_mask & (df[field] > 0), field] | |
| # Calculate commission (difference between total_price and amount_for_pay) | |
| # This will be negative for returns, which is correct for accounting | |
| if 'total_price' in df.columns and 'amount_for_pay' in df.columns: | |
| df['sales_commission'] = df['total_price'] - df['amount_for_pay'] | |
| df['sales_commission'] = df['sales_commission'].fillna(0) | |
| # Add sale_amount for compatibility (use amount_for_pay as seller's net amount) | |
| if 'sale_amount' not in df.columns: | |
| if 'amount_for_pay' in df.columns: | |
| df['sale_amount'] = df['amount_for_pay'] | |
| else: | |
| df['sale_amount'] = df['total_price'] | |
| # Add current_stock for inventory forecasting (default to 0) | |
| if 'current_stock' not in df.columns: | |
| df['current_stock'] = 0 | |
| # Add transaction type for clarity | |
| if 'transaction_type' not in df.columns: | |
| df['transaction_type'] = df['is_return'].map({True: 'Return', False: 'Sale'}) | |
| logger.info(f"Processed {len(df)} sales records") | |
| return df | |
| def _process_stock_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Process and clean stock data from API response (v1 stocks endpoint)""" | |
| # Column mapping based on actual stocks API response structure | |
| column_mapping = { | |
| 'lastChangeDate': 'last_change_date', | |
| 'warehouseName': 'warehouse', | |
| 'supplierArticle': 'supplier_article', | |
| 'nmId': 'product_id', | |
| 'barcode': 'barcode', | |
| 'quantity': 'current_stock', | |
| 'inWayToClient': 'in_way_to_client', | |
| 'inWayFromClient': 'in_way_from_client', | |
| 'quantityFull': 'quantity_full', | |
| 'category': 'category', | |
| 'subject': 'subject', | |
| 'brand': 'brand', | |
| 'techSize': 'tech_size', | |
| 'Price': 'price', | |
| 'Discount': 'discount_percent', | |
| 'isSupply': 'is_supply', | |
| 'isRealization': 'is_realization', | |
| 'SCCode': 'sc_code' | |
| } | |
| # Rename columns that exist | |
| for old_name, new_name in column_mapping.items(): | |
| if old_name in df.columns: | |
| df = df.rename(columns={old_name: new_name}) | |
| # Convert date columns | |
| if 'last_change_date' in df.columns: | |
| df['last_change_date'] = pd.to_datetime(df['last_change_date'], errors='coerce') | |
| # Convert numeric columns | |
| numeric_columns = [ | |
| 'current_stock', 'in_way_to_client', 'in_way_from_client', | |
| 'quantity_full', 'price', 'discount_percent', 'product_id' | |
| ] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Add product name (use supplier_article as primary) | |
| if 'product_name' not in df.columns: | |
| if 'supplier_article' in df.columns: | |
| df['product_name'] = df['supplier_article'] | |
| elif 'subject' in df.columns: | |
| df['product_name'] = df['subject'] | |
| elif 'category' in df.columns: | |
| df['product_name'] = df['category'] | |
| else: | |
| df['product_name'] = 'Unknown Product' | |
| # Calculate discounted price | |
| if 'price_with_discount' not in df.columns and 'price' in df.columns: | |
| if 'discount_percent' in df.columns: | |
| df['price_with_discount'] = df['price'] * (1 - df['discount_percent'] / 100) | |
| else: | |
| df['price_with_discount'] = df['price'] | |
| # Add total_price for compatibility (use price_with_discount) | |
| if 'total_price' not in df.columns: | |
| if 'price_with_discount' in df.columns: | |
| df['total_price'] = df['price_with_discount'] | |
| elif 'price' in df.columns: | |
| df['total_price'] = df['price'] | |
| else: | |
| df['total_price'] = 0 | |
| # Add quantity for compatibility (use current_stock) | |
| if 'quantity' not in df.columns: | |
| if 'current_stock' in df.columns: | |
| df['quantity'] = df['current_stock'] | |
| else: | |
| df['quantity'] = 0 | |
| # Add sale_date for compatibility (use last_change_date) | |
| if 'sale_date' not in df.columns: | |
| if 'last_change_date' in df.columns: | |
| df['sale_date'] = df['last_change_date'] | |
| else: | |
| df['sale_date'] = pd.NaT | |
| # Add sale_amount for compatibility (use total_price * quantity) | |
| if 'sale_amount' not in df.columns: | |
| df['sale_amount'] = df['total_price'] * df['quantity'] | |
| # Calculate total inventory value | |
| if 'inventory_value' not in df.columns: | |
| df['inventory_value'] = df['total_price'] * df['current_stock'] | |
| # Add article field for backward compatibility | |
| if 'article' not in df.columns and 'supplier_article' in df.columns: | |
| df['article'] = df['supplier_article'] | |
| # Mark low stock items (less than 5) | |
| if 'is_low_stock' not in df.columns: | |
| if 'current_stock' in df.columns: | |
| df['is_low_stock'] = df['current_stock'] < 5 | |
| else: | |
| df['is_low_stock'] = True | |
| # Calculate pipeline stock (items in transit) | |
| if 'pipeline_stock' not in df.columns: | |
| in_way_to_client = df['in_way_to_client'] if 'in_way_to_client' in df.columns else 0 | |
| in_way_from_client = df['in_way_from_client'] if 'in_way_from_client' in df.columns else 0 | |
| df['pipeline_stock'] = in_way_to_client + in_way_from_client | |
| logger.info(f"Processed {len(df)} stock records") | |
| return df | |
| def _process_reportdetail_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Process and clean reportDetailByPeriod data from API response (v5)""" | |
| # Column mapping based on actual API response structure | |
| column_mapping = { | |
| 'realizationreport_id': 'report_id', | |
| 'date_from': 'date_from', | |
| 'date_to': 'date_to', | |
| 'create_dt': 'create_date', | |
| 'currency_name': 'currency', | |
| 'suppliercontract_code': 'contract_code', | |
| 'rrd_id': 'record_id', | |
| 'gi_id': 'goods_id', | |
| 'subject_name': 'category', | |
| 'nm_id': 'product_id', | |
| 'brand_name': 'brand', | |
| 'sa_name': 'supplier_article', | |
| 'ts_name': 'tech_size', | |
| 'barcode': 'barcode', | |
| 'doc_type_name': 'document_type', | |
| 'quantity': 'quantity', | |
| 'retail_price': 'retail_price', | |
| 'retail_amount': 'retail_amount', | |
| 'sale_percent': 'sale_percent', | |
| 'commission_percent': 'commission_percent', | |
| 'office_name': 'office_name', | |
| 'supplier_oper_name': 'operation_name', | |
| 'order_dt': 'order_date', | |
| 'sale_dt': 'sale_date', | |
| 'rr_dt': 'report_date', | |
| 'shk_id': 'warehouse_code', | |
| 'retail_price_withdisc_rub': 'discounted_price', | |
| 'delivery_amount': 'delivery_amount', | |
| 'return_amount': 'return_amount', | |
| 'delivery_rub': 'delivery_cost', | |
| 'gi_box_type_name': 'box_type', | |
| 'product_discount_for_report': 'product_discount', | |
| 'supplier_promo': 'supplier_promo', | |
| 'rid': 'rid', | |
| 'ppvz_spp_prc': 'spp_percent', | |
| 'ppvz_kvw_prc_base': 'kvw_percent_base', | |
| 'ppvz_kvw_prc': 'kvw_percent', | |
| 'sup_rating_prc_up': 'rating_bonus', | |
| 'is_kgvp_v2': 'is_kgvp', | |
| 'ppvz_sales_commission': 'sales_commission', | |
| 'ppvz_for_pay': 'amount_for_pay', | |
| 'ppvz_reward': 'reward', | |
| 'acquiring_fee': 'acquiring_fee', | |
| 'acquiring_percent': 'acquiring_percent', | |
| 'payment_processing': 'payment_processing', | |
| 'acquiring_bank': 'acquiring_bank', | |
| 'ppvz_vw': 'warehouse_cost', | |
| 'ppvz_vw_nds': 'warehouse_cost_vat', | |
| 'ppvz_office_name': 'pickup_office', | |
| 'ppvz_office_id': 'pickup_office_id', | |
| 'ppvz_supplier_id': 'supplier_id', | |
| 'ppvz_supplier_name': 'supplier_name', | |
| 'ppvz_inn': 'supplier_inn', | |
| 'declaration_number': 'declaration_number', | |
| 'sticker_id': 'sticker_id', | |
| 'site_country': 'site_country', | |
| 'penalty': 'penalty', | |
| 'additional_payment': 'additional_payment', | |
| 'rebill_logistic_cost': 'logistics_cost', | |
| 'rebill_logistic_org': 'logistics_company', | |
| 'storage_fee': 'storage_fee', | |
| 'deduction': 'deduction', | |
| 'acceptance': 'acceptance', | |
| 'assembly_id': 'assembly_id', | |
| 'srid': 'unique_id', | |
| 'report_type': 'report_type' | |
| } | |
| # Rename columns that exist | |
| for old_name, new_name in column_mapping.items(): | |
| if old_name in df.columns: | |
| df = df.rename(columns={old_name: new_name}) | |
| # Convert date columns | |
| date_columns = ['create_date', 'order_date', 'sale_date', 'report_date'] | |
| for col in date_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| # Convert numeric columns | |
| numeric_columns = [ | |
| 'quantity', 'retail_price', 'retail_amount', 'sale_percent', 'commission_percent', | |
| 'discounted_price', 'delivery_amount', 'return_amount', 'delivery_cost', | |
| 'product_discount', 'supplier_promo', 'spp_percent', 'kvw_percent_base', | |
| 'kvw_percent', 'rating_bonus', 'sales_commission', 'amount_for_pay', | |
| 'reward', 'acquiring_fee', 'acquiring_percent', 'warehouse_cost', | |
| 'warehouse_cost_vat', 'penalty', 'additional_payment', 'logistics_cost', | |
| 'storage_fee', 'deduction', 'acceptance' | |
| ] | |
| for col in numeric_columns: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Add product name (use supplier_article if available) | |
| if 'product_name' not in df.columns: | |
| if 'supplier_article' in df.columns: | |
| df['product_name'] = df['supplier_article'] | |
| elif 'category' in df.columns: | |
| df['product_name'] = df['category'] | |
| else: | |
| df['product_name'] = 'Unknown Product' | |
| # Ensure quantity exists and handle zero quantities properly | |
| if 'quantity' not in df.columns: | |
| df['quantity'] = 1 | |
| else: | |
| # Convert to numeric and fill NaN with 1 | |
| df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(1) | |
| # For sales records, quantity of 0 means 1 transaction | |
| df.loc[df['quantity'] == 0, 'quantity'] = 1 | |
| # Add missing columns that dashboard expects | |
| # Create total_price as unit_price * quantity (total revenue per transaction) | |
| if 'total_price' not in df.columns: | |
| if 'discounted_price' in df.columns: | |
| df['unit_price'] = df['discounted_price'] | |
| elif 'retail_price' in df.columns: | |
| df['unit_price'] = df['retail_price'] | |
| elif 'amount_for_pay' in df.columns: | |
| df['unit_price'] = df['amount_for_pay'] | |
| else: | |
| df['unit_price'] = 0 | |
| # Calculate total_price as unit_price * quantity for correct revenue | |
| df['total_price'] = df['unit_price'] * df['quantity'] | |
| # Add sale_amount if missing (use amount_for_pay or total_price) | |
| if 'sale_amount' not in df.columns: | |
| if 'amount_for_pay' in df.columns: | |
| df['sale_amount'] = df['amount_for_pay'] | |
| else: | |
| df['sale_amount'] = df['total_price'] | |
| # Add current_stock for inventory data (default to 0 if not available) | |
| if 'current_stock' not in df.columns: | |
| df['current_stock'] = 0 | |
| # Filter out non-sales records (keep only actual sales) | |
| if 'document_type' in df.columns: | |
| # Filter to keep only sales transactions | |
| sales_mask = (df['document_type'] == 'Продажа') | (df['document_type'].notna() & (df['document_type'] != '')) | |
| df = df[sales_mask] | |
| logger.info(f"Filtered to {len(df)} sales records") | |
| if 'operation_name' in df.columns: | |
| # Keep records that are actual sales or returns | |
| df = df[~df['operation_name'].str.contains('Возмещение', na=False)] | |
| return df | |
| def _process_orders_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Process and clean orders data from API response (legacy method)""" | |
| # This is now handled by _process_reportdetail_data | |
| return self._process_reportdetail_data(df) | |
| def test_token_validity(self) -> bool: | |
| """Test if the current token is valid by making a simple API call""" | |
| if not self.is_configured: | |
| return False | |
| try: | |
| response = self.ping("statistics") | |
| return response.get("status") == "success" | |
| except Exception: | |
| return False | |
| def test_connection(self) -> Dict[str, Any]: | |
| """Test API connection and return status""" | |
| try: | |
| # First test token validity | |
| if not self.test_token_validity(): | |
| return { | |
| "status": "error", | |
| "message": "Invalid or missing API token", | |
| "records_count": 0, | |
| "rate_limit_remaining": 0 | |
| } | |
| # Try to get stocks for yesterday (minimal data request) | |
| yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") | |
| result = self.get_stocks(yesterday) | |
| return { | |
| "status": "success", | |
| "message": "API connection successful", | |
| "records_count": len(result), | |
| "rate_limit_remaining": self.config.rate_limit_requests - len(self.rate_limiter.requests) | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"API connection failed: {str(e)}", | |
| "records_count": 0, | |
| "rate_limit_remaining": 0 | |
| } | |
| def ping(self, service: str = "statistics") -> Dict[str, Any]: | |
| """ | |
| Test connection to specific WB API service | |
| Args: | |
| service: API service to ping (statistics, content, analytics, common, marketplace, supplies) | |
| Returns: | |
| Dict with ping result | |
| Note: Rate limit is 3 requests every 30 seconds per service | |
| """ | |
| endpoint_key = f"ping_{service}" | |
| if endpoint_key not in self.config.get_endpoints(): | |
| raise WildberriesAPIError(f"Unknown service: {service}") | |
| endpoint = self.config.get_endpoints()[endpoint_key] | |
| try: | |
| # Simple request without using main rate limiter (ping has separate limits) | |
| response = self.session.get(endpoint, timeout=10) | |
| if response.ok: | |
| data = response.json() | |
| return { | |
| "status": "success", | |
| "service": service, | |
| "timestamp": data.get("TS"), | |
| "api_status": data.get("Status"), | |
| "message": f"Connection to {service} API successful" | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "service": service, | |
| "message": f"HTTP {response.status_code}: {response.text}" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "service": service, | |
| "message": f"Connection failed: {str(e)}" | |
| } | |
| def get_seller_info(self) -> Dict[str, Any]: | |
| """ | |
| Get seller information including name and account ID | |
| Returns: | |
| Dict with seller information | |
| Note: Maximum 1 request per minute per seller account | |
| """ | |
| endpoint = self.config.get_endpoints()["seller_info"] | |
| try: | |
| response = self._make_request("GET", endpoint) | |
| return { | |
| "name": response.get("name"), | |
| "seller_id": response.get("sid"), | |
| "trade_mark": response.get("tradeMark"), | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching seller info: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Failed to fetch seller info: {str(e)}" | |
| } | |
| def get_news(self, from_date: str = None, from_id: int = None, limit: int = 500) -> Dict[str, Any]: | |
| """ | |
| Get seller portal news | |
| Args: | |
| from_date: Date from which to get news (YYYY-MM-DD format) | |
| from_id: News ID to start from (including it) | |
| limit: Maximum number of news items (up to 100) | |
| Returns: | |
| Dict with news data | |
| Note: Maximum 10 requests per 10 minutes per seller account | |
| """ | |
| endpoint = self.config.get_endpoints()["news"] | |
| params = {} | |
| if from_date: | |
| params["from"] = from_date | |
| if from_id: | |
| params["fromID"] = from_id | |
| if not from_date and not from_id: | |
| # Default to last 7 days if no parameters specified | |
| from_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") | |
| params["from"] = from_date | |
| try: | |
| response = self._make_request("GET", endpoint, params=params) | |
| news_items = response.get("data", []) | |
| return { | |
| "status": "success", | |
| "count": len(news_items), | |
| "news": news_items[:limit] if len(news_items) > limit else news_items | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching news: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Failed to fetch news: {str(e)}", | |
| "news": [] | |
| } |