wb / main.py
LevinAleksey's picture
Update main.py
6317790 verified
from fastapi import FastAPI, HTTPException, Query
from typing import List, Optional, Literal
from pydantic import BaseModel
from database import engine
import pandas as pd
import numpy as np
from sqlalchemy import text
app = FastAPI(title="WB AI Analytics API - Neuron_AI Edition")
from fastapi.responses import RedirectResponse
@app.get("/", include_in_schema=False)
def root():
"""Просто отдаем 200 OK, чтобы Hugging Face отстал"""
return {"status": "ok", "message": "Server is running"}
def clean_dataframe(df):
"""Заменяет inf и NaN на None для корректной валидации Pydantic и JSON-сериализации"""
return df.replace([np.inf, -np.inf, np.nan], None)
# --- Схемы для логистики ---
class LogisticsSummary(BaseModel):
total_products: int
total_sales: int
total_returns: int
avg_return_rate: Optional[float]
total_delivery: float
total_storage: float
total_penalties: float
total_logistics: float
total_revenue: float
total_gmv: float
logistics_pct: Optional[float]
high_logistics_count: int
has_penalties_count: int
high_returns_count: int
class LogisticsProduct(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
brand: Optional[str]
sales_count: int
returns_count: int
return_rate_pct: Optional[float]
delivery_total: float
storage_total: float
penalty_total: float
logistics_total: float
revenue_total: float
logistics_pct_of_gmv: Optional[float]
logistics_status: str
class LogisticsProductDetail(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
brand: Optional[str]
sales_count: int
returns_count: int
return_rate_pct: Optional[float]
delivery_total: float
storage_total: float
penalty_total: float
acceptance_total: float
logistics_total: float
revenue_total: float
gmv_total: float
avg_commission_pct: Optional[float]
logistics_per_sale: Optional[float]
logistics_pct_of_gmv: Optional[float]
delivery_7d: float
storage_7d: float
revenue_7d: float
supplier_reward_total: float
acquiring_total: float
logistics_status: str
class LogisticsTrendPoint(BaseModel):
date: str
delivery: float
storage: float
penalties: float
revenue: float
sales: int
returns: int
class CategoryCommission(BaseModel):
category: str
parent_category: Optional[str]
commission_fbo_pct: Optional[float]
commission_fbs_pct: Optional[float]
commission_express_pct: Optional[float]
paid_storage_pct: Optional[float]
# --- Схемы для финансов ---
class FinanceSummary(BaseModel):
total_products: int
total_sales: int
total_units: int
total_gmv: float
total_profit_wb: float
total_cost: float
total_profit_net: float
avg_margin_wb_pct: Optional[float]
avg_margin_net_pct: Optional[float]
total_commission: float
total_delivery: float
total_storage: float
total_wb_expenses: float
products_loss: int
products_low_margin: int
class FinanceProduct(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
brand: Optional[str]
sales_count: int
units_sold: int
gmv: float
profit: float
total_cost: float
profit_net: float
margin_wb_pct: Optional[float]
margin_net_pct: Optional[float]
roi_pct: Optional[float]
unit_profit_net: Optional[float]
finance_status: str
class FinanceProductDetail(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
brand: Optional[str]
cost_price: float
sales_count: int
units_sold: int
return_rate_pct: Optional[float]
gmv: float
net_revenue: float
profit: float
total_cost: float
profit_net: float
commission_total: float
avg_commission_pct: Optional[float]
acquiring_total: float
delivery_total: float
storage_total: float
penalty_total: float
wb_expenses: float
unit_revenue: Optional[float]
unit_cost: Optional[float]
unit_commission: Optional[float]
unit_delivery: Optional[float]
unit_storage: Optional[float]
unit_acquiring: Optional[float]
unit_profit_wb: Optional[float]
unit_profit_net: Optional[float]
margin_wb_pct: Optional[float]
margin_net_pct: Optional[float]
margin_before_wb_pct: Optional[float]
roi_pct: Optional[float]
gmv_7d: float
profit_7d: float
sales_7d: int
gmv_growth_pct: Optional[float]
profit_growth_pct: Optional[float]
finance_status: str
class LossProduct(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
sales_count: int
gmv: float
profit: float
total_cost: float
profit_net: float
margin_net_pct: Optional[float]
roi_pct: Optional[float]
unit_profit_net: Optional[float]
cost_price: float
avg_price: Optional[float]
class UnitEconomics(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
category: Optional[str]
sales_count: int
unit_revenue: Optional[float]
unit_cost: Optional[float]
unit_commission: Optional[float]
unit_delivery: Optional[float]
unit_storage: Optional[float]
unit_acquiring: Optional[float]
unit_profit_wb: Optional[float]
unit_profit_net: Optional[float]
margin_before_wb_pct: Optional[float]
margin_net_pct: Optional[float]
class ExpensesBreakdown(BaseModel):
commission: float
acquiring: float
delivery: float
storage: float
penalties: float
acceptance: float
total: float
commission_pct: Optional[float]
delivery_pct: Optional[float]
storage_pct: Optional[float]
class FinanceTrendPoint(BaseModel):
date: str
sales: int
gmv: float
profit: float
delivery: float
storage: float
commission: float
class CategoryReport(BaseModel):
category: str
products_count: int
total_sales: int
total_gmv: float
total_profit_wb: float
total_cost: float
total_profit_net: float
avg_margin_net_pct: Optional[float]
avg_roi_pct: Optional[float]
# --- Схемы для остатков ---
class StockSummary(BaseModel):
total_products: int
total_stock_fbo: int
total_stock_fbs: int
total_stock: int
out_of_stock: int
critical_stock: int
low_stock: int
ok_stock: int
total_restock_needed: int
class StockProduct(BaseModel):
nm_id: int
title: Optional[str]
vendor_code: Optional[str]
brand: Optional[str]
category: Optional[str]
stock_fbo: int
stock_fbs: int
total_stock: int
fbo_warehouses: int
fbs_warehouses: int
avg_daily_sales_7d: Optional[float]
days_of_stock: Optional[float]
restock_needed_30d: Optional[int]
stock_status: str
class WarehouseStock(BaseModel):
warehouse_name: str
products_count: int
total_quantity: int
# --- Схемы данных ---
class ProductAnalytics(BaseModel):
nm_id: int
title: str
brand: str
current_price: float
stock_fbo: int
stock_fbs: int
total_stock: int
orders_7d: int
net_revenue_30d: float
class SalesTrendPoint(BaseModel):
date: str
orders_count: int
sales_count: int
returns_count: int
revenue: float
avg_check: float
class StockDistribution(BaseModel):
warehouse_name: str
quantity: int
stock_type: str
class StockSummary(BaseModel):
nm_id: int
stock_fbo: int
stock_fbs: int
total_stock: int
fbo_warehouses: int
fbs_warehouses: int
class AIInsight(BaseModel):
nm_id: int
tag: str
recommendation: str
score: float
class SalesStats(BaseModel):
nm_id: int
orders_total: int
orders_7d: int
orders_30d: int
sales_total: int
returns_total: int
net_revenue_30d: float
avg_order_value: float
class LowStockAlert(BaseModel):
nm_id: int
title: str
total_stock: int
orders_7d: int
days_of_stock: float
# --- Схемы для воронки ---
class FunnelSummary(BaseModel):
total_products: int
total_open_count: int
total_cart_count: int
total_order_count: int
total_order_sum: float
total_buyout_count: int
total_buyout_sum: float
total_cancel_count: int
avg_add_to_cart_percent: float
avg_cart_to_order_percent: float
avg_buyout_percent: float
products_falling: int
products_rising: int
period_start: str
period_end: str
class FunnelProduct(BaseModel):
nm_id: int
title: str
vendor_code: str
brand_name: str
subject_name: str
product_rating: Optional[float]
price_final: float
total_stock_actual: int
open_count: int
cart_count: int
order_count: int
order_sum: float
buyout_count: int
buyout_sum: float
cancel_count: int
add_to_cart_percent: Optional[float]
cart_to_order_percent: Optional[float]
buyout_percent: Optional[float]
order_count_dyn: Optional[int]
buyout_sum_dyn: Optional[int]
days_of_stock: Optional[float]
is_falling: bool
is_rising: bool
class FunnelAlert(BaseModel):
nm_id: int
title: str
vendor_code: str
metric_value: float
metric_dyn: int
alert_type: str
class ConversionLeak(BaseModel):
nm_id: int
title: str
open_count: int
cart_count: int
order_count: int
buyout_count: int
add_to_cart_percent: Optional[float]
cart_to_order_percent: Optional[float]
buyout_percent: Optional[float]
weak_stage: str
potential_orders_lost: int
# --- Схемы для рекламы ---
class AdsSummary(BaseModel):
products_count: int
ad_spend_total: float
ad_spend_prev: float
ad_spend_dyn: Optional[float]
views_total: int
clicks_total: int
orders_total: int
orders_prev: int
orders_dyn: Optional[float]
revenue_total: float
roas_avg: Optional[float]
cpo_avg: Optional[float]
ctr_avg: Optional[float]
class AdsTrendPoint(BaseModel):
date: str
ad_spend_total: float
class AdsProduct(BaseModel):
nm_id: int
vendor_code: Optional[str]
title: Optional[str]
brand: Optional[str]
category: Optional[str]
ad_spend_total: float
views_total: int
clicks_total: int
orders_total: int
revenue_total: float
ctr: Optional[float]
cpc: Optional[float]
cpm: Optional[float]
cr: Optional[float]
roas: Optional[float]
cpo: Optional[float]
ad_spend_dyn: Optional[float]
orders_dyn: Optional[float]
revenue_dyn: Optional[float]
class AdsProductDetail(BaseModel):
nm_id: int
vendor_code: Optional[str]
title: Optional[str]
brand: Optional[str]
category: Optional[str]
period_start: str
period_end: str
ad_spend_total: float
ad_spend_prev: float
ad_spend_dyn: Optional[float]
views_total: int
views_prev: int
views_dyn: Optional[float]
clicks_total: int
clicks_prev: int
clicks_dyn: Optional[float]
orders_total: int
orders_prev: int
orders_dyn: Optional[float]
revenue_total: float
revenue_prev: float
revenue_dyn: Optional[float]
ctr: Optional[float]
cpc: Optional[float]
cpm: Optional[float]
cr: Optional[float]
roas: Optional[float]
cpo: Optional[float]
class AdsCampaign(BaseModel):
campaign_id: int
campaign_name: Optional[str]
campaign_status: Optional[int]
campaign_type: Optional[int]
daily_budget: Optional[int]
budget_total: Optional[int]
nm_cnt: int
ad_spend_total: float
views_total: int
clicks_total: int
orders_total: int
revenue_total: float
ctr: Optional[float]
cpc: Optional[float]
cpm: Optional[float]
cr: Optional[float]
roas: Optional[float]
cpo: Optional[float]
class AdsCampaignInefficient(BaseModel):
campaign_id: int
campaign_name: Optional[str]
campaign_status: Optional[int]
campaign_type: Optional[int]
ad_spend_total: float
orders_total: int
revenue_total: float
roas: Optional[float]
cpo: Optional[float]
issue_type: str
class AdsCategory(BaseModel):
category: str
nm_cnt: int
campaign_cnt: int
ad_spend_total: float
views_total: int
clicks_total: int
orders_total: int
revenue_total: float
ctr: Optional[float]
cpc: Optional[float]
cpm: Optional[float]
cr: Optional[float]
roas: Optional[float]
cpo: Optional[float]
class AdsCategoryBenchmark(BaseModel):
nm_id: int
title: Optional[str]
category: Optional[str]
ctr: Optional[float]
avg_ctr: Optional[float]
ctr_diff: Optional[float]
cpc: Optional[float]
avg_cpc: Optional[float]
cpc_diff: Optional[float]
cr: Optional[float]
avg_cr: Optional[float]
cr_diff: Optional[float]
roas: Optional[float]
avg_roas: Optional[float]
roas_diff: Optional[float]
cpo: Optional[float]
avg_cpo: Optional[float]
cpo_diff: Optional[float]
class AdsAlert(BaseModel):
nm_id: int
vendor_code: Optional[str]
title: Optional[str]
ad_spend_total: float
orders_total: int
revenue_total: float
roas: Optional[float]
cpo: Optional[float]
orders_dyn: Optional[float]
alert_type: str
priority: int
class DashboardAdsSummary(BaseModel):
products_with_ads: int
total_spend_7d: float
total_orders_7d: int
total_revenue_7d: float
avg_roas: Optional[float]
products_low_roas: int
products_falling: int
# --- Health ---
@app.get("/health")
def health_check():
return {"status": "alive"}
# ==================== FINANCE (ФИНАНСЫ) ====================
@app.get("/finance/summary")
def get_finance_summary():
"""Общая финансовая сводка за 30 дней"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(sales_count) as total_sales,
SUM(units_sold) as total_units,
SUM(gmv) as total_gmv,
SUM(profit) as total_profit_wb,
SUM(total_cost) as total_cost,
SUM(profit) - SUM(total_cost) as total_profit_net,
ROUND(SUM(profit) / NULLIF(SUM(gmv), 0) * 100, 2) as avg_margin_wb_pct,
ROUND((SUM(profit) - SUM(total_cost)) / NULLIF(SUM(gmv), 0) * 100, 2) as avg_margin_net_pct,
SUM(commission_total) as total_commission,
SUM(delivery_total) as total_delivery,
SUM(storage_total) as total_storage,
SUM(wb_expenses) as total_wb_expenses,
COUNT(*) FILTER (WHERE finance_status = 'loss') as products_loss,
COUNT(*) FILTER (WHERE finance_status = 'low_margin') as products_low_margin
FROM mv_finance_analytics
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/finance/products")
def get_finance_products(
limit: int = 50,
status: str = Query(None, enum=["loss", "low_margin", "has_penalties", "profit_falling", "ok"]),
sort_by: str = Query("profit", enum=["profit", "gmv", "margin_net_pct", "roi_pct", "unit_profit_net"])
):
"""Список товаров с финансами"""
where = f"WHERE finance_status = '{status}'" if status else ""
order = "ASC" if status == "loss" else "DESC"
query = text(f"""
SELECT nm_id, title, vendor_code, category, brand,
sales_count, units_sold, gmv, profit, total_cost,
profit - total_cost as profit_net,
margin_wb_pct, margin_net_pct, roi_pct,
unit_profit_net, finance_status
FROM mv_finance_analytics
{where}
ORDER BY {sort_by} {order} NULLS LAST
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/product/{nm_id}")
def get_finance_product(nm_id: int):
"""Детальная финансовая карточка товара"""
query = text("""
SELECT nm_id, title, vendor_code, category, brand,
cost_price, sales_count, units_sold, return_rate_pct,
gmv, net_revenue, profit, total_cost,
profit - total_cost as profit_net,
commission_total, avg_commission_pct, acquiring_total,
delivery_total, storage_total, penalty_total, wb_expenses,
unit_revenue, unit_cost, unit_commission, unit_delivery,
unit_storage, unit_acquiring, unit_profit_wb, unit_profit_net,
margin_wb_pct, margin_net_pct, margin_before_wb_pct, roi_pct,
gmv_7d, profit_7d, sales_7d,
gmv_growth_pct, profit_growth_pct, finance_status
FROM mv_finance_analytics
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/finance/loss")
def get_loss_products(limit: int = 50):
"""Убыточные товары (profit_net < 0)"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, gmv, profit, total_cost,
profit - total_cost as profit_net,
margin_net_pct, roi_pct,
unit_profit_net, cost_price, avg_price
FROM mv_finance_analytics
WHERE profit - total_cost < 0
ORDER BY profit - total_cost ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/low-margin")
def get_low_margin_products(threshold: float = 5.0, limit: int = 50):
"""Товары с низкой маржой (< threshold %)"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, gmv, profit, total_cost,
profit - total_cost as profit_net,
margin_net_pct, roi_pct, unit_profit_net
FROM mv_finance_analytics
WHERE margin_net_pct < :threshold AND margin_net_pct IS NOT NULL
ORDER BY margin_net_pct ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/top-profit")
def get_top_profit_products(limit: int = 50):
"""Самые прибыльные товары"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, gmv, profit, total_cost,
profit - total_cost as profit_net,
margin_net_pct, roi_pct, unit_profit_net
FROM mv_finance_analytics
WHERE profit - total_cost > 0
ORDER BY profit - total_cost DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/top-roi")
def get_top_roi_products(limit: int = 50):
"""Товары с лучшим ROI"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, gmv, cost_price, total_cost,
profit - total_cost as profit_net,
margin_net_pct, roi_pct
FROM mv_finance_analytics
WHERE roi_pct IS NOT NULL AND roi_pct > 0
ORDER BY roi_pct DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/unit-economics")
def get_unit_economics(limit: int = 50, sort_by: str = Query("unit_profit_net", enum=["unit_profit_net", "unit_revenue", "unit_cost"])):
"""Юнит-экономика по товарам"""
query = text(f"""
SELECT nm_id, title, vendor_code, category,
sales_count,
unit_revenue, unit_cost, unit_commission,
unit_delivery, unit_storage, unit_acquiring,
unit_profit_wb, unit_profit_net,
margin_before_wb_pct, margin_net_pct
FROM mv_finance_analytics
WHERE sales_count > 0
ORDER BY {sort_by} DESC NULLS LAST
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/expenses-breakdown")
def get_expenses_breakdown():
"""Структура расходов WB"""
query = text("""
SELECT
SUM(commission_total) as commission,
SUM(acquiring_total) as acquiring,
SUM(delivery_total) as delivery,
SUM(storage_total) as storage,
SUM(penalty_total) as penalties,
SUM(acceptance_total) as acceptance,
SUM(wb_expenses) as total,
ROUND(SUM(commission_total) / NULLIF(SUM(wb_expenses), 0) * 100, 1) as commission_pct,
ROUND(SUM(delivery_total) / NULLIF(SUM(wb_expenses), 0) * 100, 1) as delivery_pct,
ROUND(SUM(storage_total) / NULLIF(SUM(wb_expenses), 0) * 100, 1) as storage_pct
FROM mv_finance_analytics
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/finance/trend")
def get_finance_trend(days: int = Query(30, ge=1, le=365)):
"""Финансовый тренд по дням"""
query = text("""
SELECT
DATE(rd.sale_dt) AS date,
COUNT(*) FILTER (
WHERE rd.doc_type_name = 'Продажа'
) AS sales,
COALESCE(
SUM(rd.retail_price_withdisc_rub) FILTER (
WHERE rd.doc_type_name = 'Продажа'
),
0
) AS gmv,
COALESCE(SUM(rd.supplier_reward), 0) AS profit,
COALESCE(SUM(rd.delivery_rub), 0) AS delivery,
COALESCE(SUM(rd.storage_fee), 0) AS storage,
COALESCE(
SUM(
rd.commission_percent * rd.retail_price_withdisc_rub / 100
) FILTER (
WHERE rd.doc_type_name = 'Продажа'
),
0
) AS commission
FROM wb_report_details rd
WHERE rd.sale_dt > NOW() - (:days * INTERVAL '1 day')
GROUP BY DATE(rd.sale_dt)
ORDER BY date ASC
""")
df = pd.read_sql_query(query, engine, params={"days": days})
if df.empty:
return []
df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")
return clean_dataframe(df).to_dict(orient="records")
@app.get("/finance/category-report")
def get_category_report():
"""Финансы по категориям"""
query = text("""
SELECT
category,
COUNT(*) as products_count,
SUM(sales_count) as total_sales,
SUM(gmv) as total_gmv,
SUM(profit) as total_profit_wb,
SUM(total_cost) as total_cost,
SUM(profit) - SUM(total_cost) as total_profit_net,
ROUND(AVG(margin_net_pct), 2) as avg_margin_net_pct,
ROUND(AVG(roi_pct), 2) as avg_roi_pct
FROM mv_finance_analytics
WHERE category IS NOT NULL
GROUP BY category
ORDER BY total_profit_net DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
# ==================== LOGISTICS (ЛОГИСТИКА) ====================
@app.get("/logistics/summary")
def get_logistics_summary():
"""Общая сводка по логистике за 30 дней"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(sales_count) as total_sales,
SUM(returns_count) as total_returns,
ROUND(SUM(returns_count)::numeric / NULLIF(SUM(sales_count), 0) * 100, 1) as avg_return_rate,
SUM(delivery_total) as total_delivery,
SUM(storage_total) as total_storage,
SUM(penalty_total) as total_penalties,
SUM(logistics_total) as total_logistics,
SUM(revenue_total) as total_revenue,
SUM(gmv_total) as total_gmv,
ROUND(SUM(logistics_total) / NULLIF(SUM(gmv_total), 0) * 100, 2) as logistics_pct,
COUNT(*) FILTER (WHERE logistics_status = 'high_logistics') as high_logistics_count,
COUNT(*) FILTER (WHERE logistics_status = 'has_penalties') as has_penalties_count,
COUNT(*) FILTER (WHERE logistics_status = 'high_returns') as high_returns_count
FROM mv_logistics_analytics
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/logistics/products")
def get_logistics_products(
limit: int = 50,
status: str = Query(None, enum=["high_logistics", "has_penalties", "high_returns", "ok"]),
sort_by: str = Query("logistics_total", enum=["logistics_total", "delivery_total", "storage_total", "penalty_total", "return_rate_pct"])
):
"""Список товаров с логистикой"""
where = f"WHERE logistics_status = '{status}'" if status else ""
query = text(f"""
SELECT nm_id, title, vendor_code, category, brand,
sales_count, returns_count, return_rate_pct,
delivery_total, storage_total, penalty_total, logistics_total,
revenue_total, logistics_pct_of_gmv, logistics_status
FROM mv_logistics_analytics
{where}
ORDER BY {sort_by} DESC NULLS LAST
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/product/{nm_id}")
def get_logistics_product(nm_id: int):
"""Детали логистики по товару"""
query = text("""
SELECT nm_id, title, vendor_code, category, brand,
sales_count, returns_count, return_rate_pct,
delivery_total, storage_total, penalty_total, acceptance_total, logistics_total,
revenue_total, gmv_total, avg_commission_pct,
logistics_per_sale, logistics_pct_of_gmv,
delivery_7d, storage_7d, revenue_7d,
supplier_reward_total, acquiring_total, logistics_status
FROM mv_logistics_analytics
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/logistics/high-cost")
def get_high_logistics_cost(limit: int = 50):
"""Товары с высокими затратами на логистику (>25% от GMV)"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, delivery_total, storage_total, logistics_total,
gmv_total, logistics_pct_of_gmv
FROM mv_logistics_analytics
WHERE logistics_status = 'high_logistics'
ORDER BY logistics_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/penalties")
def get_logistics_penalties(limit: int = 50):
"""Товары со штрафами"""
query = text("""
SELECT nm_id, title, vendor_code, category,
penalty_total, sales_count, revenue_total,
ROUND(penalty_total / NULLIF(revenue_total, 0) * 100, 2) as penalty_pct_of_revenue
FROM mv_logistics_analytics
WHERE penalty_total > 0
ORDER BY penalty_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/returns")
def get_high_returns(limit: int = 50):
"""Товары с высоким процентом возвратов (>20%)"""
query = text("""
SELECT nm_id, title, vendor_code, category,
sales_count, returns_count, return_rate_pct,
delivery_total, revenue_total
FROM mv_logistics_analytics
WHERE return_rate_pct > 20 AND sales_count >= 5
ORDER BY return_rate_pct DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/profitability")
def get_logistics_profitability(limit: int = 50):
"""Анализ прибыльности с учётом логистики"""
query = text("""
SELECT nm_id, title, vendor_code, category,
gmv_total,
logistics_total,
revenue_total as net_revenue,
supplier_reward_total,
ROUND(supplier_reward_total / NULLIF(gmv_total, 0) * 100, 2) as margin_pct,
logistics_pct_of_gmv
FROM mv_logistics_analytics
WHERE gmv_total > 0
ORDER BY supplier_reward_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/trend")
def get_logistics_trend(days: int = 30):
"""Тренд затрат на логистику по дням"""
query = text("""
SELECT
DATE(sale_dt) as date,
SUM(delivery_rub) as delivery,
SUM(storage_fee) as storage,
SUM(penalty) as penalties,
SUM(ppvz_for_pay) as revenue,
COUNT(*) FILTER (WHERE doc_type_name = 'Продажа') as sales,
COUNT(*) FILTER (WHERE doc_type_name = 'Возврат') as returns
FROM wb_report_details
WHERE sale_dt > NOW() - INTERVAL :days || ' days'
GROUP BY DATE(sale_dt)
ORDER BY date ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
df['date'] = df['date'].astype(str)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/logistics/commissions")
def get_commissions_by_category():
"""Комиссии WB по категориям"""
query = text("""
SELECT
subject_name as category,
parent_name as parent_category,
kgvp_marketplace as commission_fbo_pct,
kgvp_supplier as commission_fbs_pct,
kgvp_supplier_express as commission_express_pct,
paid_storage_kgvp as paid_storage_pct
FROM wb_tariff_commissions
ORDER BY kgvp_marketplace DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
# ==================== STOCKS (ОСТАТКИ) ====================
@app.get("/stocks/summary")
def get_stocks_summary():
"""Общая сводка по остаткам"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(stock_fbo) as total_stock_fbo,
SUM(stock_fbs) as total_stock_fbs,
SUM(total_stock) as total_stock,
COUNT(*) FILTER (WHERE stock_status = 'out_of_stock') as out_of_stock,
COUNT(*) FILTER (WHERE stock_status = 'critical') as critical_stock,
COUNT(*) FILTER (WHERE stock_status = 'low') as low_stock,
COUNT(*) FILTER (WHERE stock_status = 'ok') as ok_stock,
SUM(restock_needed_30d) as total_restock_needed
FROM mv_stock_analytics
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/stocks/products")
def get_stocks_products(
limit: int = 50,
status: str = Query(None, enum=["out_of_stock", "critical", "low", "ok"]),
sort_by: str = Query("days_of_stock", enum=["days_of_stock", "total_stock", "avg_daily_sales_7d", "restock_needed_30d"])
):
"""Список товаров с остатками"""
where = f"WHERE stock_status = '{status}'" if status else ""
order = "ASC" if sort_by == "days_of_stock" else "DESC"
query = text(f"""
SELECT nm_id, title, vendor_code, brand, category,
stock_fbo, stock_fbs, total_stock, fbo_warehouses, fbs_warehouses,
avg_daily_sales_7d, days_of_stock, restock_needed_30d, stock_status
FROM mv_stock_analytics
{where}
ORDER BY {sort_by} {order} NULLS LAST
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/stocks/product/{nm_id}")
def get_stock_product(nm_id: int):
"""Детали остатков по товару"""
query = text("""
SELECT nm_id, title, vendor_code, brand, category,
stock_fbo, stock_fbs, total_stock, fbo_warehouses, fbs_warehouses,
fbo_warehouse_list, avg_daily_sales_7d, avg_daily_sales_30d,
days_of_stock, restock_needed_30d, stock_status
FROM mv_stock_analytics
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/stocks/out-of-stock")
def get_out_of_stock_products(limit: int = 50):
"""Товары с нулевыми остатками"""
query = text("""
SELECT nm_id, title, vendor_code, brand, category,
avg_daily_sales_7d, avg_daily_sales_30d, restock_needed_30d
FROM mv_stock_analytics
WHERE stock_status = 'out_of_stock' AND avg_daily_sales_7d > 0
ORDER BY avg_daily_sales_7d DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/stocks/critical")
def get_critical_stock_products(limit: int = 50):
"""Товары с критически низкими остатками (< 7 дней)"""
query = text("""
SELECT nm_id, title, vendor_code, brand, category,
total_stock, avg_daily_sales_7d, days_of_stock, restock_needed_30d
FROM mv_stock_analytics
WHERE stock_status = 'critical'
ORDER BY days_of_stock ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/stocks/restock-plan")
def get_restock_plan(days: int = 30, limit: int = 50):
"""План пополнения на N дней"""
query = text("""
SELECT nm_id, title, vendor_code, brand,
total_stock, avg_daily_sales_7d, days_of_stock,
GREATEST(0, ROUND(avg_daily_sales_7d * :days - total_stock, 0)) as restock_needed,
stock_status
FROM mv_stock_analytics
WHERE avg_daily_sales_7d > 0
ORDER BY restock_needed DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"days": days, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/stocks/warehouses")
def get_warehouse_distribution():
"""Распределение остатков по складам FBO"""
query = text("""
SELECT warehouse_name,
COUNT(DISTINCT product_nm_id) as products_count,
SUM(quantity) as total_quantity
FROM wb_stocks
WHERE quantity > 0
GROUP BY warehouse_name
ORDER BY total_quantity DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/stocks/product/{nm_id}/warehouses")
def get_product_warehouse_distribution(nm_id: int):
"""Распределение товара по складам"""
query = text("""
SELECT warehouse_name, quantity, 'fbo' as stock_type
FROM wb_stocks WHERE product_nm_id = :nm_id AND quantity > 0
UNION ALL
SELECT CONCAT('FBS #', warehouse_id), amount, 'fbs'
FROM wb_stocks_fbs WHERE nm_id = :nm_id AND amount > 0
ORDER BY quantity DESC
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")
# ==================== ANALYTICS ====================
@app.get("/analytics/top-products", response_model=List[ProductAnalytics])
def get_top_products(
limit: int = Query(10, ge=1, le=1000),
sort_by: Literal["orders_7d", "net_revenue_30d", "total_stock"] = "orders_7d"
):
"""Топ товаров с сортировкой"""
allowed_sort_fields = {
"orders_7d": "orders_7d",
"net_revenue_30d": "net_revenue_30d",
"total_stock": "total_stock",
}
order_field = allowed_sort_fields[sort_by]
query = text(f"""
SELECT *
FROM wb_vitrine_for_ai
ORDER BY {order_field} DESC NULLS LAST
LIMIT :limit
""")
try:
df = pd.read_sql(query, engine, params={"limit": limit})
df = clean_dataframe(df)
# Чиним проблемные числовые поля перед отдачей в response_model
numeric_defaults = {
"current_price": 0.0,
"orders_7d": 0,
"net_revenue_30d": 0.0,
"total_stock": 0,
}
for col, default_value in numeric_defaults.items():
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(default_value)
return df.to_dict(orient="records")
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Ошибка получения топ-товаров: {str(e)}"
)
@app.get("/analytics/product/{nm_id}", response_model=ProductAnalytics)
def get_product_details(nm_id: int):
"""Детальная информация по артикулу"""
query = text("SELECT * FROM wb_vitrine_for_ai WHERE nm_id = :nm_id")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/analytics/product/{nm_id}/stocks", response_model=List[StockDistribution])
def get_product_stocks(nm_id: int):
"""Распределение остатков по складам (FBO + FBS)"""
query = text("""
SELECT warehouse_name, quantity, 'fbo' as stock_type
FROM wb_stocks
WHERE product_nm_id = :nm_id AND quantity > 0
UNION ALL
SELECT CONCAT('FBS #', warehouse_id) as warehouse_name, amount as quantity, 'fbs' as stock_type
FROM wb_stocks_fbs
WHERE nm_id = :nm_id AND amount > 0
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/product/{nm_id}/stock-summary", response_model=StockSummary)
def get_stock_summary(nm_id: int):
"""Сводка по остаткам товара"""
query = text("""
SELECT
:nm_id as nm_id,
COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) as stock_fbo,
COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as stock_fbs,
COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) +
COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as total_stock,
(SELECT COUNT(DISTINCT warehouse_name) FROM wb_stocks WHERE product_nm_id = :nm_id AND quantity > 0) as fbo_warehouses,
(SELECT COUNT(DISTINCT warehouse_id) FROM wb_stocks_fbs WHERE nm_id = :nm_id AND amount > 0) as fbs_warehouses
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/analytics/low-stock", response_model=List[LowStockAlert])
def get_low_stock_alerts(days_threshold: int = 7):
"""Товары с низким запасом (на сколько дней хватит)"""
query = text("""
SELECT
nm_id, title, total_stock, orders_7d,
CASE
WHEN orders_7d > 0 THEN ROUND(total_stock::numeric / (orders_7d / 7.0), 1)
ELSE 999
END as days_of_stock
FROM wb_vitrine_for_ai
WHERE total_stock > 0 AND orders_7d > 0
HAVING CASE
WHEN orders_7d > 0 THEN total_stock::numeric / (orders_7d / 7.0)
ELSE 999
END < :days_threshold
ORDER BY days_of_stock ASC
""")
df = pd.read_sql(query, engine, params={"days_threshold": days_threshold})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/out-of-stock")
def get_out_of_stock():
"""Товары с нулевыми остатками, но с заказами за 30 дней"""
query = text("""
SELECT nm_id, title, brand, orders_7d, net_revenue_30d
FROM wb_vitrine_for_ai
WHERE total_stock = 0 AND orders_7d > 0
ORDER BY orders_7d DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/product/{nm_id}/sales", response_model=SalesStats)
def get_product_sales(nm_id: int):
"""Статистика продаж товара"""
query = text("""
SELECT
:nm_id as nm_id,
COUNT(*) FILTER (WHERE type = 'order') as orders_total,
COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d,
COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '30 days') as orders_30d,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_total,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns_total,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days'), 0) as net_revenue_30d,
COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) as avg_order_value
FROM wb_transactions
WHERE product_nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/analytics/profitability")
def get_profitability_report(limit: int = 50):
"""Анализ прибыльности по товарам"""
query = text("""
SELECT
t.product_nm_id as nm_id,
p.title,
COUNT(*) as sales_count,
SUM(t.finished_price) as total_customer_paid,
SUM(t.for_pay) as total_net_profit,
ROUND(AVG(t.for_pay / NULLIF(t.finished_price, 0) * 100), 1) as margin_pct
FROM wb_transactions t
LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id
WHERE t.type = 'sale' AND t.status = 'sale'
GROUP BY t.product_nm_id, p.title
ORDER BY total_net_profit DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/returns")
def get_returns_report():
"""Анализ возвратов"""
query = text("""
SELECT
t.product_nm_id as nm_id,
p.title,
COUNT(*) FILTER (WHERE t.status = 'return') as returns_count,
COUNT(*) FILTER (WHERE t.status = 'sale') as sales_count,
ROUND(
COUNT(*) FILTER (WHERE t.status = 'return')::numeric /
NULLIF(COUNT(*), 0) * 100, 1
) as return_rate_pct
FROM wb_transactions t
LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id
WHERE t.type = 'sale'
GROUP BY t.product_nm_id, p.title
HAVING COUNT(*) FILTER (WHERE t.status = 'return') > 0
ORDER BY return_rate_pct DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/regions")
def get_regional_stats():
"""Статистика по регионам"""
query = text("""
SELECT
region,
COUNT(*) as orders_count,
SUM(for_pay) as total_revenue
FROM wb_transactions
WHERE type = 'order' AND region IS NOT NULL
GROUP BY region
ORDER BY orders_count DESC
LIMIT 20
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
# ==================== FUNNEL (ВОРОНКА) ====================
@app.get("/funnel/summary", response_model=FunnelSummary)
def get_funnel_summary():
"""Общая сводка по воронке продаж"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(open_count) as total_open_count,
SUM(cart_count) as total_cart_count,
SUM(order_count) as total_order_count,
SUM(order_sum) as total_order_sum,
SUM(buyout_count) as total_buyout_count,
SUM(buyout_sum) as total_buyout_sum,
SUM(cancel_count) as total_cancel_count,
ROUND(AVG(add_to_cart_percent), 2) as avg_add_to_cart_percent,
ROUND(AVG(cart_to_order_percent), 2) as avg_cart_to_order_percent,
ROUND(AVG(buyout_percent), 2) as avg_buyout_percent,
COUNT(*) FILTER (WHERE is_falling = true) as products_falling,
COUNT(*) FILTER (WHERE is_rising = true) as products_rising,
MIN(period_start)::text as period_start,
MAX(period_end)::text as period_end
FROM mv_funnel_with_finance
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/funnel/product/{nm_id}", response_model=FunnelProduct)
def get_funnel_product(nm_id: int):
"""Детальная воронка по товару"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name, subject_name,
product_rating, price_final, total_stock_actual,
open_count, cart_count, order_count, order_sum,
buyout_count, buyout_sum, cancel_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
order_count_dyn, buyout_sum_dyn, days_of_stock,
is_falling, is_rising
FROM mv_funnel_with_finance
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден в воронке")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/funnel/top", response_model=List[FunnelProduct])
def get_funnel_top(
limit: int = 10,
sort_by: str = Query("order_sum", enum=["order_sum", "order_count", "buyout_sum", "open_count", "cart_count"])
):
"""ТОП товаров по метрикам воронки"""
query = text(f"""
SELECT
nm_id, title, vendor_code, brand_name, subject_name,
product_rating, price_final, total_stock_actual,
open_count, cart_count, order_count, order_sum,
buyout_count, buyout_sum, cancel_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
order_count_dyn, buyout_sum_dyn, days_of_stock,
is_falling, is_rising
FROM mv_funnel_with_finance
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/dynamics-alerts", response_model=List[FunnelAlert])
def get_dynamics_alerts(
threshold: int = -20,
limit: int = 20
):
"""Товары с падением динамики (order_count_dyn < threshold)"""
query = text("""
SELECT
nm_id, title, vendor_code,
order_count as metric_value,
order_count_dyn as metric_dyn,
'falling_orders' as alert_type
FROM mv_funnel_with_finance
WHERE order_count_dyn IS NOT NULL AND order_count_dyn < :threshold
ORDER BY order_count_dyn ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/rising-stars", response_model=List[FunnelAlert])
def get_rising_stars(
threshold: int = 20,
limit: int = 20
):
"""Товары с ростом (order_count_dyn > threshold)"""
query = text("""
SELECT
nm_id, title, vendor_code,
order_count as metric_value,
order_count_dyn as metric_dyn,
'rising_orders' as alert_type
FROM mv_funnel_with_finance
WHERE order_count_dyn IS NOT NULL AND order_count_dyn > :threshold
ORDER BY order_count_dyn DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/conversion-leaks", response_model=List[ConversionLeak])
def get_conversion_leaks(limit: int = 20):
"""Товары с проблемами конверсии — где теряем клиентов"""
query = text("""
SELECT
nm_id, title,
open_count, cart_count, order_count, buyout_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
CASE
WHEN add_to_cart_percent < 5 THEN 'open_to_cart'
WHEN cart_to_order_percent < 10 THEN 'cart_to_order'
WHEN buyout_percent < 50 THEN 'order_to_buyout'
ELSE 'unknown'
END as weak_stage,
CASE
WHEN add_to_cart_percent < 5 THEN ROUND(open_count * 0.10 - cart_count)
WHEN cart_to_order_percent < 10 THEN ROUND(cart_count * 0.20 - order_count)
ELSE ROUND(order_count * 0.80 - buyout_count)
END as potential_orders_lost
FROM mv_funnel_with_finance
WHERE open_count > 100
AND (add_to_cart_percent < 5 OR cart_to_order_percent < 10 OR buyout_percent < 50)
ORDER BY open_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/high-demand-low-stock")
def get_high_demand_low_stock(limit: int = 20):
"""Горячие товары с низким остатком"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name,
open_count, order_count, order_sum,
total_stock_actual, days_of_stock,
order_count_dyn
FROM mv_funnel_with_finance
WHERE is_low_stock = true
ORDER BY order_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/wishlist-leaders")
def get_wishlist_leaders(limit: int = 20):
"""Товары с высоким добавлением в избранное (потенциал роста)"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name,
add_to_wishlist, add_to_wishlist_dyn,
open_count, order_count, order_sum,
price_final, total_stock_actual
FROM mv_funnel_with_finance
WHERE add_to_wishlist > 0
ORDER BY add_to_wishlist DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/cancel-analysis")
def get_cancel_analysis(limit: int = 20):
"""Анализ отмен и возвратов по воронке"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name,
order_count, buyout_count, cancel_count,
cancel_sum, cancel_count_dyn,
ROUND(cancel_count::numeric / NULLIF(order_count, 0) * 100, 1) as cancel_rate_pct,
buyout_percent
FROM mv_funnel_with_finance
WHERE cancel_count > 0
ORDER BY cancel_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/category-benchmark")
def get_category_benchmark(subject_id: int = None, limit: int = 20):
"""Сравнение товаров со средним по категории"""
if subject_id:
query = text("""
WITH category_avg AS (
SELECT
subject_id,
subject_name,
ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct,
ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct,
ROUND(AVG(buyout_percent), 2) as avg_buyout_pct,
ROUND(AVG(order_count), 0) as avg_orders
FROM mv_funnel_with_finance
WHERE subject_id = :subject_id
GROUP BY subject_id, subject_name
)
SELECT
f.nm_id, f.title, f.vendor_code,
f.subject_name,
f.add_to_cart_percent,
c.avg_cart_pct,
ROUND(f.add_to_cart_percent - c.avg_cart_pct, 2) as cart_diff,
f.cart_to_order_percent,
c.avg_order_pct,
ROUND(f.cart_to_order_percent - c.avg_order_pct, 2) as order_diff,
f.buyout_percent,
c.avg_buyout_pct,
ROUND(f.buyout_percent - c.avg_buyout_pct, 2) as buyout_diff,
f.order_count,
c.avg_orders
FROM mv_funnel_with_finance f
JOIN category_avg c ON f.subject_id = c.subject_id
ORDER BY f.order_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"subject_id": subject_id, "limit": limit})
else:
query = text("""
WITH category_avg AS (
SELECT
subject_id,
subject_name,
ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct,
ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct,
ROUND(AVG(buyout_percent), 2) as avg_buyout_pct,
COUNT(*) as products_count
FROM mv_funnel_with_finance
GROUP BY subject_id, subject_name
HAVING COUNT(*) > 1
)
SELECT
subject_id, subject_name,
avg_cart_pct, avg_order_pct, avg_buyout_pct,
products_count
FROM category_avg
ORDER BY products_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/funnel/rating-impact")
def get_rating_impact(limit: int = 20):
"""Влияние рейтинга на конверсию"""
query = text("""
SELECT
nm_id, title, vendor_code,
product_rating, feedback_rating,
open_count, order_count, buyout_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
order_count_dyn
FROM mv_funnel_with_finance
WHERE product_rating IS NOT NULL AND product_rating < 4.5
ORDER BY open_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
# ==================== AI INSIGHTS ====================
@app.post("/insights/add")
def add_ai_insight(insight: AIInsight):
"""Запись рекомендации ИИ"""
query = text("""
INSERT INTO ai_product_insights (product_nm_id, ai_strategy_tag, ai_recommendation, confidence_score)
VALUES (:nm_id, :tag, :rec, :score)
""")
with engine.begin() as conn:
conn.execute(query, {
"nm_id": insight.nm_id,
"tag": insight.tag,
"rec": insight.recommendation,
"score": insight.score
})
return {"status": "success"}
@app.get("/insights/{nm_id}")
def get_insights(nm_id: int):
"""Получить AI-рекомендации для товара"""
query = text("""
SELECT ai_strategy_tag as tag, ai_recommendation as recommendation,
confidence_score as score, analyzed_at
FROM ai_product_insights
WHERE product_nm_id = :nm_id
ORDER BY analyzed_at DESC
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/sales-trend")
def get_sales_trend(days: int = Query(30, ge=1, le=365)):
"""Продажи по дням для графика"""
query = text("""
WITH date_series AS (
SELECT generate_series(
CURRENT_DATE - ((:days - 1) * INTERVAL '1 day'),
CURRENT_DATE,
INTERVAL '1 day'
)::date AS date
),
sales_agg AS (
SELECT
DATE(date_at) AS date,
COUNT(*) FILTER (WHERE type = 'order') AS orders_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') AS sales_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') AS returns_count,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) AS revenue,
COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) AS avg_check
FROM wb_transactions
WHERE date_at >= CURRENT_DATE - ((:days - 1) * INTERVAL '1 day')
AND date_at < CURRENT_DATE + INTERVAL '1 day'
GROUP BY DATE(date_at)
)
SELECT
ds.date,
COALESCE(sa.orders_count, 0) AS orders_count,
COALESCE(sa.sales_count, 0) AS sales_count,
COALESCE(sa.returns_count, 0) AS returns_count,
COALESCE(sa.revenue, 0) AS revenue,
COALESCE(sa.avg_check, 0) AS avg_check
FROM date_series ds
LEFT JOIN sales_agg sa ON ds.date = sa.date
ORDER BY ds.date ASC
""")
try:
df = pd.read_sql(query, engine, params={"days": days})
df["date"] = df["date"].astype(str)
return clean_dataframe(df).to_dict(orient="records")
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Ошибка получения тренда продаж: {str(e)}"
)
@app.get("/analytics/conversion-rate")
def get_conversion_rate(limit: int = 30):
"""Конверсия заказ→продажа по товарам"""
query = text("""
SELECT
product_nm_id as nm_id,
COUNT(*) FILTER (WHERE type = 'order') as orders,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns,
ROUND(
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale')::numeric /
NULLIF(COUNT(*) FILTER (WHERE type = 'order'), 0) * 100, 1
) as conversion_pct
FROM wb_transactions
GROUP BY product_nm_id
HAVING COUNT(*) FILTER (WHERE type = 'order') > 5
ORDER BY orders DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/avg-check-trend")
def get_avg_check_trend(days: int = 30):
"""Динамика среднего чека по дням"""
query = text("""
SELECT
DATE(date_at) as date,
ROUND(AVG(finished_price), 2) as avg_check,
COUNT(*) as orders_count
FROM wb_transactions
WHERE type = 'order' AND date_at > NOW() - INTERVAL :days || ' days'
GROUP BY DATE(date_at)
ORDER BY date ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
df['date'] = df['date'].astype(str)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/analytics/regions-revenue")
def get_regions_revenue(limit: int = 15):
"""ТОП регионов по выручке"""
query = text("""
SELECT
region,
COUNT(*) FILTER (WHERE type = 'order') as orders_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_count,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) as revenue,
ROUND(AVG(finished_price) FILTER (WHERE type = 'order'), 2) as avg_check
FROM wb_transactions
WHERE region IS NOT NULL
GROUP BY region
ORDER BY revenue DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
# ==================== ADS (РЕКЛАМА) ====================
@app.get("/ads/summary")
def get_ads_summary():
"""Общая сводка по рекламе за 7 дней с динамикой"""
query = text("""
SELECT
COUNT(DISTINCT nm_id) as products_count,
SUM(ad_spend_total)::numeric(14,2) as ad_spend_total,
SUM(ad_spend_prev)::numeric(14,2) as ad_spend_prev,
ROUND((SUM(ad_spend_total) - SUM(ad_spend_prev)) / NULLIF(SUM(ad_spend_prev), 0) * 100, 1) as ad_spend_dyn,
SUM(views_total) as views_total,
SUM(clicks_total) as clicks_total,
SUM(orders_total) as orders_total,
SUM(orders_prev) as orders_prev,
ROUND((SUM(orders_total) - SUM(orders_prev))::numeric / NULLIF(SUM(orders_prev), 0) * 100, 1) as orders_dyn,
SUM(revenue_total)::numeric(14,2) as revenue_total,
ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as roas_avg,
ROUND(SUM(ad_spend_total) / NULLIF(SUM(orders_total), 0), 2) as cpo_avg,
ROUND(SUM(clicks_total)::numeric / NULLIF(SUM(views_total), 0) * 100, 2) as ctr_avg
FROM dm_ad_spend_product_7d
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/ads/trend")
def get_ads_trend(days: int = 30):
"""Расход на рекламу по дням (для графика)"""
query = text("""
SELECT
day::text as date,
ad_spend_total
FROM dm_ad_spend_daily
WHERE day >= CURRENT_DATE - :days
ORDER BY day ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/products/top")
def get_ads_products_top(
limit: int = 20,
sort_by: str = Query("roas", enum=["roas", "ad_spend_total", "orders_total", "revenue_total"])
):
"""ТОП товаров по эффективности рекламы"""
query = text(f"""
SELECT
nm_id, vendor_code, title, brand, category,
ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo,
ad_spend_dyn, orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE {sort_by} IS NOT NULL
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/products/losers")
def get_ads_products_losers(limit: int = 20):
"""Убыточные товары (ROAS < 1)"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
ad_spend_dyn, orders_dyn
FROM dm_ad_spend_product_7d
WHERE roas IS NOT NULL AND roas < 1 AND ad_spend_total > 100
ORDER BY ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/products/rising")
def get_ads_products_rising(limit: int = 20):
"""Товары с ростом эффективности рекламы"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE orders_dyn IS NOT NULL AND orders_dyn > 0
ORDER BY orders_dyn DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/products/falling")
def get_ads_products_falling(limit: int = 20):
"""Товары с падением эффективности рекламы"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE orders_dyn IS NOT NULL AND orders_dyn < 0
ORDER BY orders_dyn ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/product/{nm_id}")
def get_ads_product(nm_id: int):
"""Детали рекламы по товару"""
query = text("""
SELECT
nm_id, vendor_code, title, brand, category,
period_start::text, period_end::text,
ad_spend_total, ad_spend_prev, ad_spend_dyn,
views_total, views_prev, views_dyn,
clicks_total, clicks_prev, clicks_dyn,
orders_total, orders_prev, orders_dyn,
revenue_total, revenue_prev, revenue_dyn,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_product_7d
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден в рекламных данных")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/ads/campaigns")
def get_ads_campaigns(
limit: int = 20,
sort_by: str = Query("ad_spend_total", enum=["ad_spend_total", "roas", "orders_total", "cpo"])
):
"""Список кампаний с метриками"""
query = text(f"""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
daily_budget, budget_total,
nm_cnt, ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_campaign_7d
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/campaigns/inefficient")
def get_ads_campaigns_inefficient(limit: int = 20):
"""Неэффективные кампании (ROAS < 1 или высокий CPO)"""
query = text("""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
CASE
WHEN roas < 1 THEN 'low_roas'
WHEN cpo > 500 THEN 'high_cpo'
ELSE 'other'
END as issue_type
FROM dm_ad_spend_campaign_7d
WHERE (roas IS NOT NULL AND roas < 1) OR (cpo IS NOT NULL AND cpo > 500)
ORDER BY ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/campaign/{campaign_id}")
def get_ads_campaign(campaign_id: int):
"""Детали кампании"""
query = text("""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
daily_budget, budget_total,
period_start::text, period_end::text,
nm_cnt, ad_spend_total, views_total, clicks_total,
atbs_total, orders_total, shks_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_campaign_7d
WHERE campaign_id = :campaign_id
""")
df = pd.read_sql(query, engine, params={"campaign_id": campaign_id})
if df.empty:
raise HTTPException(status_code=404, detail="Кампания не найдена")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/ads/categories")
def get_ads_categories(
sort_by: str = Query("ad_spend_total", enum=["ad_spend_total", "roas", "orders_total"])
):
"""Эффективность рекламы по категориям"""
query = text(f"""
SELECT
category,
nm_cnt, campaign_cnt,
ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_category_7d
ORDER BY {sort_by} DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@app.get("/ads/category-benchmark")
def get_ads_category_benchmark(nm_id: int):
"""Сравнение товара со средним по категории"""
query = text("""
WITH product AS (
SELECT * FROM dm_ad_spend_product_7d WHERE nm_id = :nm_id
),
cat_avg AS (
SELECT
category,
ROUND(AVG(ctr), 2) as avg_ctr,
ROUND(AVG(cpc), 2) as avg_cpc,
ROUND(AVG(cr), 2) as avg_cr,
ROUND(AVG(roas), 2) as avg_roas,
ROUND(AVG(cpo), 2) as avg_cpo
FROM dm_ad_spend_product_7d
WHERE category = (SELECT category FROM product)
GROUP BY category
)
SELECT
p.nm_id, p.title, p.category,
p.ctr, c.avg_ctr, ROUND(p.ctr - c.avg_ctr, 2) as ctr_diff,
p.cpc, c.avg_cpc, ROUND(p.cpc - c.avg_cpc, 2) as cpc_diff,
p.cr, c.avg_cr, ROUND(p.cr - c.avg_cr, 2) as cr_diff,
p.roas, c.avg_roas, ROUND(p.roas - c.avg_roas, 2) as roas_diff,
p.cpo, c.avg_cpo, ROUND(p.cpo - c.avg_cpo, 2) as cpo_diff
FROM product p
LEFT JOIN cat_avg c ON p.category = c.category
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
raise HTTPException(status_code=404, detail="Товар не найден")
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/ads/alerts")
def get_ads_alerts(limit: int = 30):
"""Все проблемы с рекламой"""
query = text("""
SELECT
nm_id, vendor_code, title,
ad_spend_total, orders_total, revenue_total,
roas, cpo, orders_dyn,
CASE
WHEN roas < 0.5 THEN 'critical_roas'
WHEN roas < 1 THEN 'low_roas'
WHEN orders_dyn < -50 THEN 'orders_crash'
WHEN orders_dyn < -20 THEN 'orders_falling'
WHEN cpo > 1000 THEN 'very_high_cpo'
ELSE 'other'
END as alert_type,
CASE
WHEN roas < 0.5 THEN 1
WHEN orders_dyn < -50 THEN 2
WHEN roas < 1 THEN 3
WHEN orders_dyn < -20 THEN 4
ELSE 5
END as priority
FROM dm_ad_spend_product_7d
WHERE roas < 1 OR orders_dyn < -20 OR cpo > 1000
ORDER BY priority ASC, ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@app.get("/dashboard/ads-summary")
def get_dashboard_ads_summary():
"""Быстрая сводка рекламы для дашборда"""
query = text("""
SELECT
COUNT(*) as products_with_ads,
SUM(ad_spend_total)::numeric(14,2) as total_spend_7d,
SUM(orders_total) as total_orders_7d,
SUM(revenue_total)::numeric(14,2) as total_revenue_7d,
ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as avg_roas,
COUNT(*) FILTER (WHERE roas < 1) as products_low_roas,
COUNT(*) FILTER (WHERE orders_dyn < -20) as products_falling
FROM dm_ad_spend_product_7d
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
# ==================== DASHBOARD ====================
@app.get("/dashboard/summary")
def get_dashboard_summary():
"""Сводка для дашборда"""
query = text("""
SELECT
(SELECT COUNT(*) FROM wb_products) as total_products,
(SELECT SUM(total_stock) FROM wb_vitrine_for_ai) as total_stock,
(SELECT SUM(stock_fbo) FROM wb_vitrine_for_ai) as total_stock_fbo,
(SELECT SUM(stock_fbs) FROM wb_vitrine_for_ai) as total_stock_fbs,
(SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '24 hours') as orders_24h,
(SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d,
(SELECT COALESCE(SUM(for_pay), 0) FROM wb_transactions WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days') as revenue_30d,
(SELECT COUNT(*) FROM wb_vitrine_for_ai WHERE total_stock = 0) as out_of_stock_count
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@app.get("/dashboard/funnel-summary")
def get_dashboard_funnel_summary():
"""Быстрая сводка воронки для дашборда"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(order_count) as total_orders,
SUM(order_sum) as total_order_sum,
SUM(buyout_sum) as total_buyout_sum,
COUNT(*) FILTER (WHERE is_falling = true) as products_falling,
COUNT(*) FILTER (WHERE is_rising = true) as products_rising,
COUNT(*) FILTER (WHERE is_low_stock = true) as products_low_stock,
COUNT(*) FILTER (WHERE has_conversion_problem = true) as products_conversion_issues
FROM mv_funnel_with_finance
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]