Spaces:
Running
Running
| """ | |
| Utility functions and demo data for the Wildberries Analytics Dashboard | |
| Includes fallback data for when API is not available | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any | |
| import random | |
| from config import get_config, DEMO_SETTINGS | |
| def load_demo_sales_data(period: str = "week") -> pd.DataFrame: | |
| """Generate realistic demo sales data for testing""" | |
| # Set random seed for reproducible results | |
| np.random.seed(42) | |
| random.seed(42) | |
| config = get_config() | |
| demo_config = DEMO_SETTINGS | |
| # Calculate date range | |
| if period == "week": | |
| days = 7 | |
| num_sales = random.randint(50, 200) | |
| elif period == "month": | |
| days = 30 | |
| num_sales = random.randint(200, 800) | |
| else: | |
| days = 7 | |
| num_sales = random.randint(50, 200) | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days) | |
| # Generate product list | |
| products = [] | |
| for i in range(demo_config["demo_products_count"]): | |
| products.append({ | |
| 'product_id': 1000000 + i, | |
| 'product_name': f'Товар {i+1}', | |
| 'article': f'ART{1000+i}', | |
| 'category': random.choice(demo_config["demo_categories"]), | |
| 'brand': f'Бренд {chr(65 + i % 26)}', | |
| 'base_price': random.randint(500, 5000) | |
| }) | |
| # Generate sales data | |
| sales_data = [] | |
| for _ in range(num_sales): | |
| product = random.choice(products) | |
| sale_date = start_date + timedelta( | |
| days=random.random() * days, | |
| hours=random.randint(0, 23), | |
| minutes=random.randint(0, 59) | |
| ) | |
| # Generate realistic pricing with discounts | |
| base_price = product['base_price'] | |
| discount_percent = random.choice([0, 5, 10, 15, 20, 25, 30]) if random.random() < 0.6 else 0 | |
| price_with_discount = base_price * (1 - discount_percent / 100) | |
| # Generate additional pricing fields | |
| spp_discount = random.randint(0, 10) if random.random() < 0.3 else 0 | |
| finished_price = price_with_discount * (1 - spp_discount / 100) | |
| # Determine if this is a return (10% chance) | |
| is_return = random.random() < 0.1 | |
| sale_id = f'R{random.randint(100000, 999999)}' if is_return else f'S{random.randint(100000, 999999)}' | |
| # Calculate amount_for_pay based on priceWithDisc (what seller receives from forPay field) | |
| # This comes directly from the forPay API field, excluding returns | |
| if is_return: | |
| amount_for_pay = 0 # Returns don't generate payout for seller | |
| else: | |
| amount_for_pay = price_with_discount * 0.75 # What seller receives (forPay equivalent) | |
| sales_data.append({ | |
| 'sale_id': sale_id, | |
| 'product_id': product['product_id'], | |
| 'product_name': product['product_name'], | |
| 'article': product['article'], | |
| 'sale_date': sale_date, | |
| 'last_change_date': sale_date, | |
| 'warehouse': random.choice(['Коледино', 'Электросталь', 'Тула', 'Казань']), | |
| 'country': 'Россия', | |
| 'region': random.choice(['Московская', 'Санкт-Петербургская', 'Свердловская', 'Татарстан']), | |
| 'city': random.choice(['Москва', 'Санкт-Петербург', 'Екатеринбург', 'Казань']), | |
| 'total_price': price_with_discount, # Use priceWithDisc for revenue | |
| 'original_price': base_price, # totalPrice | |
| 'finished_price': finished_price, # finishedPrice | |
| 'discount_percent': discount_percent, | |
| 'spp_discount': spp_discount, | |
| 'price_with_discount': price_with_discount, | |
| 'sale_amount': finished_price, | |
| 'amount_for_pay': amount_for_pay, # From forPay field, 0 for returns | |
| 'sales_commission': price_with_discount - amount_for_pay, # Commission = total_price - amount_for_pay | |
| 'quantity': 1, | |
| 'category': product['category'], | |
| 'brand': product['brand'], | |
| 'is_supply': True, | |
| 'is_realization': True, | |
| 'is_return': is_return, | |
| 'order_type': 'Возвратный' if is_return else 'Клиентский' | |
| }) | |
| df = pd.DataFrame(sales_data) | |
| df['sale_date'] = pd.to_datetime(df['sale_date']) | |
| df['last_change_date'] = pd.to_datetime(df['last_change_date']) | |
| # Ensure sales_commission is never negative and is 0 for returns | |
| df['sales_commission'] = df['sales_commission'].clip(lower=0) | |
| df.loc[df['is_return'], 'sales_commission'] = 0 | |
| return df | |
| def load_demo_inventory_data() -> pd.DataFrame: | |
| """Generate realistic demo inventory data""" | |
| np.random.seed(42) | |
| random.seed(42) | |
| demo_config = DEMO_SETTINGS | |
| # Generate inventory data | |
| inventory_data = [] | |
| for i in range(demo_config["demo_products_count"]): | |
| # Generate realistic stock levels | |
| stock_level = random.randint(*demo_config["demo_stock_range"]) | |
| # Some products should be low stock for demonstration | |
| if i < 3: # First 3 products are low stock | |
| stock_level = random.randint(0, 10) | |
| elif i < 6: # Next 3 are medium stock | |
| stock_level = random.randint(10, 50) | |
| inventory_data.append({ | |
| 'product_id': 1000000 + i, | |
| 'product_name': f'Товар {i+1}', | |
| 'article': f'ART{1000+i}', | |
| 'current_stock': stock_level, | |
| 'in_way_to_client': random.randint(0, 20), | |
| 'in_way_from_client': random.randint(0, 5), | |
| 'warehouse': random.choice(['Коледино', 'Электросталь', 'Тула', 'Казань']), | |
| 'category': random.choice(demo_config["demo_categories"]), | |
| 'brand': f'Бренд {chr(65 + i % 26)}', | |
| 'price': random.randint(500, 5000), | |
| 'last_change_date': datetime.now() - timedelta(days=random.randint(0, 3)) | |
| }) | |
| df = pd.DataFrame(inventory_data) | |
| df['last_change_date'] = pd.to_datetime(df['last_change_date']) | |
| return df | |
| def process_sales_data(data: pd.DataFrame) -> pd.DataFrame: | |
| """Process and validate sales data from API or demo""" | |
| if data.empty: | |
| return data | |
| # Ensure required columns exist | |
| required_columns = ['product_id', 'product_name', 'sale_date', 'total_price', 'quantity'] | |
| for col in required_columns: | |
| if col not in data.columns: | |
| if col == 'quantity': | |
| data[col] = 1 # Default quantity | |
| elif col == 'product_name' and 'article' in data.columns: | |
| data[col] = data['article'] | |
| else: | |
| data[col] = f'Unknown {col}' | |
| # Data validation and cleaning | |
| data = data.copy() | |
| # Remove rows with missing critical data | |
| data = data.dropna(subset=['product_id', 'total_price']) | |
| # Ensure numeric columns are numeric | |
| numeric_columns = ['total_price', 'quantity', 'sale_amount', 'finished_price'] | |
| for col in numeric_columns: | |
| if col in data.columns: | |
| data[col] = pd.to_numeric(data[col], errors='coerce') | |
| data[col] = data[col].fillna(0) | |
| # Ensure positive values | |
| for col in ['total_price', 'quantity']: | |
| if col in data.columns: | |
| data[col] = data[col].abs() | |
| # Sort by date | |
| if 'sale_date' in data.columns: | |
| data = data.sort_values('sale_date') | |
| return data | |
| def calculate_daily_sales(sales_data: pd.DataFrame, product_id: int = None) -> pd.Series: | |
| """Calculate daily sales for a product or all products""" | |
| if sales_data.empty: | |
| return pd.Series() | |
| if product_id: | |
| sales_data = sales_data[sales_data['product_id'] == product_id] | |
| if 'sale_date' not in sales_data.columns: | |
| return pd.Series() | |
| # Group by date and sum quantities | |
| daily_sales = sales_data.groupby(sales_data['sale_date'].dt.date)['quantity'].sum() | |
| return daily_sales | |
| def get_product_performance_metrics(sales_data: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate performance metrics for each product""" | |
| if sales_data.empty: | |
| return pd.DataFrame() | |
| # Group by product | |
| product_metrics = sales_data.groupby(['product_id', 'product_name']).agg({ | |
| 'quantity': 'sum', | |
| 'total_price': 'sum', | |
| 'sale_date': ['count', 'min', 'max'] | |
| }).round(2) | |
| # Flatten column names | |
| product_metrics.columns = ['total_quantity', 'total_revenue', 'total_orders', 'first_sale', 'last_sale'] | |
| # Calculate additional metrics | |
| product_metrics['avg_order_value'] = (product_metrics['total_revenue'] / product_metrics['total_orders']).round(2) | |
| product_metrics['avg_daily_sales'] = product_metrics['total_quantity'] / 30 # Assuming 30-day period | |
| # Reset index to make product info regular columns | |
| product_metrics = product_metrics.reset_index() | |
| return product_metrics | |
| def validate_api_response(response: Dict[str, Any], required_fields: List[str] = None) -> bool: | |
| """Validate API response structure""" | |
| if not isinstance(response, dict): | |
| return False | |
| if required_fields: | |
| for field in required_fields: | |
| if field not in response: | |
| return False | |
| return True | |
| def format_currency(amount: float, currency: str = "₽") -> str: | |
| """Format currency amounts for display""" | |
| if pd.isna(amount) or amount is None: | |
| return f"0 {currency}" | |
| return f"{amount:,.2f} {currency}" | |
| def format_number(number: float, decimals: int = 0) -> str: | |
| """Format numbers with thousand separators""" | |
| if pd.isna(number) or number is None: | |
| return "0" | |
| if decimals > 0: | |
| return f"{number:,.{decimals}f}" | |
| else: | |
| return f"{number:,.0f}" | |
| def get_risk_color(days_until_stockout: float) -> str: | |
| """Get color code for risk level""" | |
| if days_until_stockout < 7: | |
| return "#ff4444" # Red | |
| elif days_until_stockout < 14: | |
| return "#ffaa00" # Orange | |
| else: | |
| return "#44aa44" # Green | |
| def export_to_csv(data: pd.DataFrame, filename: str = None) -> str: | |
| """Export DataFrame to CSV and return filename""" | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"wildberries_data_{timestamp}.csv" | |
| # Clean data for export | |
| export_data = data.copy() | |
| # Convert datetime columns to strings | |
| for col in export_data.columns: | |
| if export_data[col].dtype == 'datetime64[ns]': | |
| export_data[col] = export_data[col].dt.strftime('%Y-%m-%d %H:%M:%S') | |
| # Save to CSV | |
| export_data.to_csv(filename, index=False, encoding='utf-8') | |
| return filename | |
| def create_sample_data_file(): | |
| """Create sample data JSON file for the examples directory""" | |
| sample_sales = load_demo_sales_data("week") | |
| sample_inventory = load_demo_inventory_data() | |
| sample_data = { | |
| "sales_data": sample_sales.head(10).to_dict('records'), | |
| "inventory_data": sample_inventory.head(10).to_dict('records'), | |
| "metadata": { | |
| "generated_at": datetime.now().isoformat(), | |
| "description": "Sample data for Wildberries Analytics Dashboard", | |
| "note": "This is demo data for testing purposes only" | |
| } | |
| } | |
| # Convert datetime objects to strings for JSON serialization | |
| for item in sample_data["sales_data"]: | |
| for key, value in item.items(): | |
| if isinstance(value, (datetime, pd.Timestamp)): | |
| item[key] = value.isoformat() | |
| for item in sample_data["inventory_data"]: | |
| for key, value in item.items(): | |
| if isinstance(value, (datetime, pd.Timestamp)): | |
| item[key] = value.isoformat() | |
| return sample_data | |
| # Cache for demo data to avoid regenerating it multiple times | |
| _demo_sales_cache = {} | |
| _demo_inventory_cache = None | |
| def get_cached_demo_sales(period: str) -> pd.DataFrame: | |
| """Get cached demo sales data to ensure consistency across calls""" | |
| global _demo_sales_cache | |
| if period not in _demo_sales_cache: | |
| _demo_sales_cache[period] = load_demo_sales_data(period) | |
| return _demo_sales_cache[period].copy() | |
| def get_cached_demo_inventory() -> pd.DataFrame: | |
| """Get cached demo inventory data to ensure consistency across calls""" | |
| global _demo_inventory_cache | |
| if _demo_inventory_cache is None: | |
| _demo_inventory_cache = load_demo_inventory_data() | |
| return _demo_inventory_cache.copy() | |
| # Update the main functions to use cached data | |
| def load_demo_sales_data_cached(period: str = "week") -> pd.DataFrame: | |
| """Load demo sales data with caching""" | |
| return get_cached_demo_sales(period) | |
| def load_demo_inventory_data_cached() -> pd.DataFrame: | |
| """Load demo inventory data with caching""" | |
| return get_cached_demo_inventory() |