ChatWB / api /analytics.py
Levin-Aleksey's picture
add
3871f09
import numpy as np
import pandas as pd
from typing import Any, Optional
from sqlalchemy import text
from langchain_core.tools import tool
# Импортируем подключение к БД
from database import engine
def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Заменяет inf и NaN на None для корректной сериализации."""
return df.replace([np.inf, -np.inf, np.nan], None)
# ==================== ДАШБОРД ====================
@tool
def get_dashboard_summary() -> Any:
"""Общая сводка бизнеса.
Сводка: товары, остатки FBO/FBS, заказы за 24ч/7д, выручка 30д, out-of-stock.
"""
query = text("""
SELECT
(SELECT COUNT(*) FROM wb_products) as total_products,
(SELECT SUM(total_stock) FROM wb_vitrine_for_ai) as total_stock,
(SELECT SUM(stock_fbo) FROM wb_vitrine_for_ai) as total_stock_fbo,
(SELECT SUM(stock_fbs) FROM wb_vitrine_for_ai) as total_stock_fbs,
(SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '24 hours') as orders_24h,
(SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d,
(SELECT COALESCE(SUM(for_pay), 0) FROM wb_transactions WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days') as revenue_30d,
(SELECT COUNT(*) FROM wb_vitrine_for_ai WHERE total_stock = 0) as out_of_stock_count
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_dashboard_funnel_summary() -> Any:
"""Сводка воронки для дашборда.
Быстрая сводка: заказы, выкупы, товары с падением/ростом, проблемы конверсии.
"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(order_count) as total_orders,
SUM(order_sum) as total_order_sum,
SUM(buyout_sum) as total_buyout_sum,
COUNT(*) FILTER (WHERE is_falling = true) as products_falling,
COUNT(*) FILTER (WHERE is_rising = true) as products_rising,
COUNT(*) FILTER (WHERE is_low_stock = true) as products_low_stock,
COUNT(*) FILTER (WHERE has_conversion_problem = true) as products_conversion_issues
FROM mv_funnel_with_finance
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
# ==================== АНАЛИТИКА ТОВАРОВ ====================
@tool
def get_top_products(limit: int = 10, sort_by: str = "orders_7d") -> Any:
"""ТОП товаров.
Топ товаров с сортировкой по заказам, выручке или остаткам.
Допустимые sort_by: "orders_7d", "net_revenue_30d", "total_stock".
"""
allowed_sorts = {"orders_7d", "net_revenue_30d", "total_stock"}
if sort_by not in allowed_sorts:
return {"error": f"Неверная сортировка. Разрешено: {allowed_sorts}"}
query = text(f"SELECT * FROM wb_vitrine_for_ai ORDER BY {sort_by} DESC LIMIT :limit")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_product_details(nm_id: int) -> Any:
"""Детали товара.
Детальная информация по артикулу: цена, остатки, заказы, выручка.
"""
query = text("SELECT * FROM wb_vitrine_for_ai WHERE nm_id = :nm_id")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
return {"error": "Товар не найден"}
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_product_sales(nm_id: int) -> Any:
"""Статистика продаж товара.
Заказы, продажи, возвраты, выручка, средний чек по конкретному товару.
"""
query = text("""
SELECT
:nm_id as nm_id,
COUNT(*) FILTER (WHERE type = 'order') as orders_total,
COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d,
COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '30 days') as orders_30d,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_total,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns_total,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days'), 0) as net_revenue_30d,
COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) as avg_order_value
FROM wb_transactions
WHERE product_nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_product_stocks(nm_id: int) -> Any:
"""Остатки по складам.
Распределение остатков товара по складам FBO и FBS.
"""
query = text("""
SELECT warehouse_name, quantity, 'fbo' as stock_type
FROM wb_stocks
WHERE product_nm_id = :nm_id AND quantity > 0
UNION ALL
SELECT CONCAT('FBS #', warehouse_id) as warehouse_name, amount as quantity, 'fbs' as stock_type
FROM wb_stocks_fbs
WHERE nm_id = :nm_id AND amount > 0
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_stock_summary(nm_id: int) -> Any:
"""Сводка остатков товара.
Общие остатки FBO/FBS, количество складов для артикула.
"""
query = text("""
SELECT
:nm_id as nm_id,
COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) as stock_fbo,
COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as stock_fbs,
COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) +
COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as total_stock,
(SELECT COUNT(DISTINCT warehouse_name) FROM wb_stocks WHERE product_nm_id = :nm_id AND quantity > 0) as fbo_warehouses,
(SELECT COUNT(DISTINCT warehouse_id) FROM wb_stocks_fbs WHERE nm_id = :nm_id AND amount > 0) as fbs_warehouses
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_low_stock_alerts(days_threshold: int = 7) -> Any:
"""Низкие остатки.
Товары с запасом менее days_threshold дней при текущих темпах продаж.
"""
query = text("""
SELECT
nm_id, title, total_stock, orders_7d,
CASE
WHEN orders_7d > 0 THEN ROUND(total_stock::numeric / (orders_7d / 7.0), 1)
ELSE 999
END as days_of_stock
FROM wb_vitrine_for_ai
WHERE total_stock > 0 AND orders_7d > 0
HAVING CASE
WHEN orders_7d > 0 THEN total_stock::numeric / (orders_7d / 7.0)
ELSE 999
END < :days_threshold
ORDER BY days_of_stock ASC
""")
df = pd.read_sql(query, engine, params={"days_threshold": days_threshold})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_out_of_stock() -> Any:
"""Нулевые остатки.
Товары с нулевыми остатками, но с заказами за 30 дней (упущенные продажи).
"""
query = text("""
SELECT nm_id, title, brand, orders_7d, net_revenue_30d
FROM wb_vitrine_for_ai
WHERE total_stock = 0 AND orders_7d > 0
ORDER BY orders_7d DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_profitability_report(limit: int = 50) -> Any:
"""Прибыльность товаров.
Анализ маржинальности: выручка, чистая прибыль, процент маржи.
"""
query = text("""
SELECT
t.product_nm_id as nm_id,
p.title,
COUNT(*) as sales_count,
SUM(t.finished_price) as total_customer_paid,
SUM(t.for_pay) as total_net_profit,
ROUND(AVG(t.for_pay / NULLIF(t.finished_price, 0) * 100), 1) as margin_pct
FROM wb_transactions t
LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id
WHERE t.type = 'sale' AND t.status = 'sale'
GROUP BY t.product_nm_id, p.title
ORDER BY total_net_profit DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_returns_report() -> Any:
"""Анализ возвратов.
Товары с возвратами: количество, процент возвратов.
"""
query = text("""
SELECT
t.product_nm_id as nm_id,
p.title,
COUNT(*) FILTER (WHERE t.status = 'return') as returns_count,
COUNT(*) FILTER (WHERE t.status = 'sale') as sales_count,
ROUND(
COUNT(*) FILTER (WHERE t.status = 'return')::numeric /
NULLIF(COUNT(*), 0) * 100, 1
) as return_rate_pct
FROM wb_transactions t
LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id
WHERE t.type = 'sale'
GROUP BY t.product_nm_id, p.title
HAVING COUNT(*) FILTER (WHERE t.status = 'return') > 0
ORDER BY return_rate_pct DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_regions_revenue(limit: int = 15) -> Any:
"""ТОП регионов по выручке.
Статистика по регионам: заказы, продажи, выручка, средний чек.
"""
query = text("""
SELECT
region,
COUNT(*) FILTER (WHERE type = 'order') as orders_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_count,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) as revenue,
ROUND(AVG(finished_price) FILTER (WHERE type = 'order'), 2) as avg_check
FROM wb_transactions
WHERE region IS NOT NULL
GROUP BY region
ORDER BY revenue DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_sales_trend(days: int = 30) -> Any:
"""Тренд продаж по дням.
Продажи, заказы, возвраты, выручка по дням для построения графика.
"""
query = text("""
SELECT
DATE(date_at) as date,
COUNT(*) FILTER (WHERE type = 'order') as orders_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_count,
COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns_count,
COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) as revenue,
COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) as avg_check
FROM wb_transactions
WHERE date_at > NOW() - (:days * INTERVAL '1 day')
GROUP BY DATE(date_at)
ORDER BY date ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
df['date'] = df['date'].astype(str)
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_avg_check_trend(days: int = 30) -> Any:
"""Динамика среднего чека.
Средний чек по дням для анализа трендов.
"""
query = text("""
SELECT
DATE(date_at) as date,
ROUND(AVG(finished_price), 2) as avg_check,
COUNT(*) as orders_count
FROM wb_transactions
WHERE type = 'order' AND date_at > NOW() - (:days * INTERVAL '1 day')
GROUP BY DATE(date_at)
ORDER BY date ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
df['date'] = df['date'].astype(str)
return clean_dataframe(df).to_dict(orient="records")
# ==================== ВОРОНКА ====================
@tool
def get_funnel_summary() -> Any:
"""Сводка воронки продаж.
Общая воронка: просмотры, корзина, заказы, выкупы, конверсии, динамика.
"""
query = text("""
SELECT
COUNT(*) as total_products,
SUM(open_count) as total_open_count,
SUM(cart_count) as total_cart_count,
SUM(order_count) as total_order_count,
SUM(order_sum) as total_order_sum,
SUM(buyout_count) as total_buyout_count,
SUM(buyout_sum) as total_buyout_sum,
SUM(cancel_count) as total_cancel_count,
ROUND(AVG(add_to_cart_percent), 2) as avg_add_to_cart_percent,
ROUND(AVG(cart_to_order_percent), 2) as avg_cart_to_order_percent,
ROUND(AVG(buyout_percent), 2) as avg_buyout_percent,
COUNT(*) FILTER (WHERE is_falling = true) as products_falling,
COUNT(*) FILTER (WHERE is_rising = true) as products_rising,
MIN(period_start)::text as period_start,
MAX(period_end)::text as period_end
FROM mv_funnel_with_finance
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_funnel_product(nm_id: int) -> Any:
"""Воронка товара.
Детальная воронка конкретного товара с конверсиями и динамикой.
"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name, subject_name,
product_rating, price_final, total_stock_actual,
open_count, cart_count, order_count, order_sum,
buyout_count, buyout_sum, cancel_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
order_count_dyn, buyout_sum_dyn, days_of_stock,
is_falling, is_rising
FROM mv_funnel_with_finance
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
return {"error": "Товар не найден в воронке"}
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_funnel_top(limit: int = 10, sort_by: str = "order_sum") -> Any:
"""ТОП по воронке.
ТОП товаров по метрикам воронки: заказы, выкупы, просмотры.
Допустимые sort_by: "order_sum", "order_count", "buyout_sum", "open_count", "cart_count".
"""
allowed_sorts = {"order_sum", "order_count", "buyout_sum", "open_count", "cart_count"}
if sort_by not in allowed_sorts:
return {"error": f"Неверная сортировка. Разрешено: {allowed_sorts}"}
query = text(f"""
SELECT
nm_id, title, vendor_code, brand_name, subject_name,
product_rating, price_final, total_stock_actual,
open_count, cart_count, order_count, order_sum,
buyout_count, buyout_sum, cancel_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
order_count_dyn, buyout_sum_dyn, days_of_stock,
is_falling, is_rising
FROM mv_funnel_with_finance
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_dynamics_alerts(threshold: int = -20, limit: int = 20) -> Any:
"""Товары с падением.
Товары с падением заказов относительно прошлой недели.
"""
query = text("""
SELECT
nm_id, title, vendor_code,
order_count as metric_value,
order_count_dyn as metric_dyn,
'falling_orders' as alert_type
FROM mv_funnel_with_finance
WHERE order_count_dyn IS NOT NULL AND order_count_dyn < :threshold
ORDER BY order_count_dyn ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_rising_stars(threshold: int = 20, limit: int = 20) -> Any:
"""Растущие товары.
Товары с ростом заказов относительно прошлой недели.
"""
query = text("""
SELECT
nm_id, title, vendor_code,
order_count as metric_value,
order_count_dyn as metric_dyn,
'rising_orders' as alert_type
FROM mv_funnel_with_finance
WHERE order_count_dyn IS NOT NULL AND order_count_dyn > :threshold
ORDER BY order_count_dyn DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_conversion_leaks(limit: int = 20) -> Any:
"""Проблемы конверсии.
Товары с низкой конверсией на этапах воронки — где теряем клиентов.
"""
query = text("""
SELECT
nm_id, title,
open_count, cart_count, order_count, buyout_count,
add_to_cart_percent, cart_to_order_percent, buyout_percent,
CASE
WHEN add_to_cart_percent < 5 THEN 'open_to_cart'
WHEN cart_to_order_percent < 10 THEN 'cart_to_order'
WHEN buyout_percent < 50 THEN 'order_to_buyout'
ELSE 'unknown'
END as weak_stage,
CASE
WHEN add_to_cart_percent < 5 THEN ROUND(open_count * 0.10 - cart_count)
WHEN cart_to_order_percent < 10 THEN ROUND(cart_count * 0.20 - order_count)
ELSE ROUND(order_count * 0.80 - buyout_count)
END as potential_orders_lost
FROM mv_funnel_with_finance
WHERE open_count > 100
AND (add_to_cart_percent < 5 OR cart_to_order_percent < 10 OR buyout_percent < 50)
ORDER BY open_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_high_demand_low_stock(limit: int = 20) -> Any:
"""Горячие товары с низким остатком.
Популярные товары, которые скоро закончатся.
"""
query = text("""
SELECT
nm_id, title, vendor_code, brand_name,
open_count, order_count, order_sum,
total_stock_actual, days_of_stock,
order_count_dyn
FROM mv_funnel_with_finance
WHERE is_low_stock = true
ORDER BY order_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_category_benchmark(subject_id: Optional[int] = None, limit: int = 20) -> Any:
"""Сравнение с категорией.
Сравнение конверсий товара со средним по категории.
"""
if subject_id:
query = text("""
WITH category_avg AS (
SELECT
subject_id, subject_name,
ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct,
ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct,
ROUND(AVG(buyout_percent), 2) as avg_buyout_pct,
ROUND(AVG(order_count), 0) as avg_orders
FROM mv_funnel_with_finance
WHERE subject_id = :subject_id
GROUP BY subject_id, subject_name
)
SELECT
f.nm_id, f.title, f.vendor_code, f.subject_name,
f.add_to_cart_percent, c.avg_cart_pct, ROUND(f.add_to_cart_percent - c.avg_cart_pct, 2) as cart_diff,
f.cart_to_order_percent, c.avg_order_pct, ROUND(f.cart_to_order_percent - c.avg_order_pct, 2) as order_diff,
f.buyout_percent, c.avg_buyout_pct, ROUND(f.buyout_percent - c.avg_buyout_pct, 2) as buyout_diff,
f.order_count, c.avg_orders
FROM mv_funnel_with_finance f
JOIN category_avg c ON f.subject_id = c.subject_id
ORDER BY f.order_count DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"subject_id": subject_id, "limit": limit})
else:
query = text("""
WITH category_avg AS (
SELECT
subject_id, subject_name,
ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct,
ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct,
ROUND(AVG(buyout_percent), 2) as avg_buyout_pct,
COUNT(*) as products_count
FROM mv_funnel_with_finance
GROUP BY subject_id, subject_name
HAVING COUNT(*) > 1
)
SELECT * FROM category_avg ORDER BY products_count DESC LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
# ==================== ИНСАЙТЫ (ИИ) ====================
@tool
def add_ai_insight(nm_id: int, tag: str, recommendation: str, score: float) -> Any:
"""Записать рекомендацию.
Сохранить AI-рекомендацию для товара в базу.
"""
query = text("""
INSERT INTO ai_product_insights (product_nm_id, ai_strategy_tag, ai_recommendation, confidence_score)
VALUES (:nm_id, :tag, :rec, :score)
""")
try:
with engine.begin() as conn:
conn.execute(query, {
"nm_id": nm_id,
"tag": tag,
"rec": recommendation,
"score": score
})
return {"status": "success"}
except Exception as e:
return {"error": str(e)}
@tool
def get_insights(nm_id: int) -> Any:
"""Получить рекомендации.
Получить сохранённые AI-рекомендации для товара.
"""
query = text("""
SELECT ai_strategy_tag as tag, ai_recommendation as recommendation,
confidence_score as score, analyzed_at
FROM ai_product_insights
WHERE product_nm_id = :nm_id
ORDER BY analyzed_at DESC
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
return clean_dataframe(df).to_dict(orient="records")
# Экспорт всех инструментов одним списком
all_tools = [
get_dashboard_summary,
get_dashboard_funnel_summary,
get_top_products,
get_product_details,
get_product_sales,
get_product_stocks,
get_stock_summary,
get_low_stock_alerts,
get_out_of_stock,
get_profitability_report,
get_returns_report,
get_regions_revenue,
get_sales_trend,
get_avg_check_trend,
get_funnel_summary,
get_funnel_product,
get_funnel_top,
get_dynamics_alerts,
get_rising_stars,
get_conversion_leaks,
get_high_demand_low_stock,
get_category_benchmark,
add_ai_insight,
get_insights
]
# Единый контракт экспорта как в других модулях api/*
analytics_tools = all_tools