import numpy as np import pandas as pd from typing import Any, Optional from sqlalchemy import text from langchain_core.tools import tool # Импортируем подключение к БД from database import engine def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Заменяет inf и NaN на None для корректной сериализации.""" return df.replace([np.inf, -np.inf, np.nan], None) # ==================== ДАШБОРД ==================== @tool def get_dashboard_summary() -> Any: """Общая сводка бизнеса. Сводка: товары, остатки FBO/FBS, заказы за 24ч/7д, выручка 30д, out-of-stock. """ query = text(""" SELECT (SELECT COUNT(*) FROM wb_products) as total_products, (SELECT SUM(total_stock) FROM wb_vitrine_for_ai) as total_stock, (SELECT SUM(stock_fbo) FROM wb_vitrine_for_ai) as total_stock_fbo, (SELECT SUM(stock_fbs) FROM wb_vitrine_for_ai) as total_stock_fbs, (SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '24 hours') as orders_24h, (SELECT COUNT(*) FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d, (SELECT COALESCE(SUM(for_pay), 0) FROM wb_transactions WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days') as revenue_30d, (SELECT COUNT(*) FROM wb_vitrine_for_ai WHERE total_stock = 0) as out_of_stock_count """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_dashboard_funnel_summary() -> Any: """Сводка воронки для дашборда. Быстрая сводка: заказы, выкупы, товары с падением/ростом, проблемы конверсии. """ query = text(""" SELECT COUNT(*) as total_products, SUM(order_count) as total_orders, SUM(order_sum) as total_order_sum, SUM(buyout_sum) as total_buyout_sum, COUNT(*) FILTER (WHERE is_falling = true) as products_falling, COUNT(*) FILTER (WHERE is_rising = true) as products_rising, COUNT(*) FILTER (WHERE is_low_stock = true) as products_low_stock, COUNT(*) FILTER (WHERE has_conversion_problem = true) as products_conversion_issues FROM mv_funnel_with_finance """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records")[0] # ==================== АНАЛИТИКА ТОВАРОВ ==================== @tool def get_top_products(limit: int = 10, sort_by: str = "orders_7d") -> Any: """ТОП товаров. Топ товаров с сортировкой по заказам, выручке или остаткам. Допустимые sort_by: "orders_7d", "net_revenue_30d", "total_stock". """ allowed_sorts = {"orders_7d", "net_revenue_30d", "total_stock"} if sort_by not in allowed_sorts: return {"error": f"Неверная сортировка. Разрешено: {allowed_sorts}"} query = text(f"SELECT * FROM wb_vitrine_for_ai ORDER BY {sort_by} DESC LIMIT :limit") df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_product_details(nm_id: int) -> Any: """Детали товара. Детальная информация по артикулу: цена, остатки, заказы, выручка. """ query = text("SELECT * FROM wb_vitrine_for_ai WHERE nm_id = :nm_id") df = pd.read_sql(query, engine, params={"nm_id": nm_id}) if df.empty: return {"error": "Товар не найден"} return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_product_sales(nm_id: int) -> Any: """Статистика продаж товара. Заказы, продажи, возвраты, выручка, средний чек по конкретному товару. """ query = text(""" SELECT :nm_id as nm_id, COUNT(*) FILTER (WHERE type = 'order') as orders_total, COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '7 days') as orders_7d, COUNT(*) FILTER (WHERE type = 'order' AND date_at > NOW() - INTERVAL '30 days') as orders_30d, COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_total, COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns_total, COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale' AND date_at > NOW() - INTERVAL '30 days'), 0) as net_revenue_30d, COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) as avg_order_value FROM wb_transactions WHERE product_nm_id = :nm_id """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_product_stocks(nm_id: int) -> Any: """Остатки по складам. Распределение остатков товара по складам FBO и FBS. """ query = text(""" SELECT warehouse_name, quantity, 'fbo' as stock_type FROM wb_stocks WHERE product_nm_id = :nm_id AND quantity > 0 UNION ALL SELECT CONCAT('FBS #', warehouse_id) as warehouse_name, amount as quantity, 'fbs' as stock_type FROM wb_stocks_fbs WHERE nm_id = :nm_id AND amount > 0 """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) return clean_dataframe(df).to_dict(orient="records") @tool def get_stock_summary(nm_id: int) -> Any: """Сводка остатков товара. Общие остатки FBO/FBS, количество складов для артикула. """ query = text(""" SELECT :nm_id as nm_id, COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) as stock_fbo, COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as stock_fbs, COALESCE((SELECT SUM(quantity_full) FROM wb_stocks WHERE product_nm_id = :nm_id), 0) + COALESCE((SELECT SUM(amount) FROM wb_stocks_fbs WHERE nm_id = :nm_id), 0) as total_stock, (SELECT COUNT(DISTINCT warehouse_name) FROM wb_stocks WHERE product_nm_id = :nm_id AND quantity > 0) as fbo_warehouses, (SELECT COUNT(DISTINCT warehouse_id) FROM wb_stocks_fbs WHERE nm_id = :nm_id AND amount > 0) as fbs_warehouses """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_low_stock_alerts(days_threshold: int = 7) -> Any: """Низкие остатки. Товары с запасом менее days_threshold дней при текущих темпах продаж. """ query = text(""" SELECT nm_id, title, total_stock, orders_7d, CASE WHEN orders_7d > 0 THEN ROUND(total_stock::numeric / (orders_7d / 7.0), 1) ELSE 999 END as days_of_stock FROM wb_vitrine_for_ai WHERE total_stock > 0 AND orders_7d > 0 HAVING CASE WHEN orders_7d > 0 THEN total_stock::numeric / (orders_7d / 7.0) ELSE 999 END < :days_threshold ORDER BY days_of_stock ASC """) df = pd.read_sql(query, engine, params={"days_threshold": days_threshold}) return clean_dataframe(df).to_dict(orient="records") @tool def get_out_of_stock() -> Any: """Нулевые остатки. Товары с нулевыми остатками, но с заказами за 30 дней (упущенные продажи). """ query = text(""" SELECT nm_id, title, brand, orders_7d, net_revenue_30d FROM wb_vitrine_for_ai WHERE total_stock = 0 AND orders_7d > 0 ORDER BY orders_7d DESC """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records") @tool def get_profitability_report(limit: int = 50) -> Any: """Прибыльность товаров. Анализ маржинальности: выручка, чистая прибыль, процент маржи. """ query = text(""" SELECT t.product_nm_id as nm_id, p.title, COUNT(*) as sales_count, SUM(t.finished_price) as total_customer_paid, SUM(t.for_pay) as total_net_profit, ROUND(AVG(t.for_pay / NULLIF(t.finished_price, 0) * 100), 1) as margin_pct FROM wb_transactions t LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id WHERE t.type = 'sale' AND t.status = 'sale' GROUP BY t.product_nm_id, p.title ORDER BY total_net_profit DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_returns_report() -> Any: """Анализ возвратов. Товары с возвратами: количество, процент возвратов. """ query = text(""" SELECT t.product_nm_id as nm_id, p.title, COUNT(*) FILTER (WHERE t.status = 'return') as returns_count, COUNT(*) FILTER (WHERE t.status = 'sale') as sales_count, ROUND( COUNT(*) FILTER (WHERE t.status = 'return')::numeric / NULLIF(COUNT(*), 0) * 100, 1 ) as return_rate_pct FROM wb_transactions t LEFT JOIN wb_products p ON t.product_nm_id = p.nm_id WHERE t.type = 'sale' GROUP BY t.product_nm_id, p.title HAVING COUNT(*) FILTER (WHERE t.status = 'return') > 0 ORDER BY return_rate_pct DESC """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records") @tool def get_regions_revenue(limit: int = 15) -> Any: """ТОП регионов по выручке. Статистика по регионам: заказы, продажи, выручка, средний чек. """ query = text(""" SELECT region, COUNT(*) FILTER (WHERE type = 'order') as orders_count, COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_count, COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) as revenue, ROUND(AVG(finished_price) FILTER (WHERE type = 'order'), 2) as avg_check FROM wb_transactions WHERE region IS NOT NULL GROUP BY region ORDER BY revenue DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_sales_trend(days: int = 30) -> Any: """Тренд продаж по дням. Продажи, заказы, возвраты, выручка по дням для построения графика. """ query = text(""" SELECT DATE(date_at) as date, COUNT(*) FILTER (WHERE type = 'order') as orders_count, COUNT(*) FILTER (WHERE type = 'sale' AND status = 'sale') as sales_count, COUNT(*) FILTER (WHERE type = 'sale' AND status = 'return') as returns_count, COALESCE(SUM(for_pay) FILTER (WHERE type = 'sale' AND status = 'sale'), 0) as revenue, COALESCE(AVG(finished_price) FILTER (WHERE type = 'order'), 0) as avg_check FROM wb_transactions WHERE date_at > NOW() - (:days * INTERVAL '1 day') GROUP BY DATE(date_at) ORDER BY date ASC """) df = pd.read_sql(query, engine, params={"days": days}) df['date'] = df['date'].astype(str) return clean_dataframe(df).to_dict(orient="records") @tool def get_avg_check_trend(days: int = 30) -> Any: """Динамика среднего чека. Средний чек по дням для анализа трендов. """ query = text(""" SELECT DATE(date_at) as date, ROUND(AVG(finished_price), 2) as avg_check, COUNT(*) as orders_count FROM wb_transactions WHERE type = 'order' AND date_at > NOW() - (:days * INTERVAL '1 day') GROUP BY DATE(date_at) ORDER BY date ASC """) df = pd.read_sql(query, engine, params={"days": days}) df['date'] = df['date'].astype(str) return clean_dataframe(df).to_dict(orient="records") # ==================== ВОРОНКА ==================== @tool def get_funnel_summary() -> Any: """Сводка воронки продаж. Общая воронка: просмотры, корзина, заказы, выкупы, конверсии, динамика. """ query = text(""" SELECT COUNT(*) as total_products, SUM(open_count) as total_open_count, SUM(cart_count) as total_cart_count, SUM(order_count) as total_order_count, SUM(order_sum) as total_order_sum, SUM(buyout_count) as total_buyout_count, SUM(buyout_sum) as total_buyout_sum, SUM(cancel_count) as total_cancel_count, ROUND(AVG(add_to_cart_percent), 2) as avg_add_to_cart_percent, ROUND(AVG(cart_to_order_percent), 2) as avg_cart_to_order_percent, ROUND(AVG(buyout_percent), 2) as avg_buyout_percent, COUNT(*) FILTER (WHERE is_falling = true) as products_falling, COUNT(*) FILTER (WHERE is_rising = true) as products_rising, MIN(period_start)::text as period_start, MAX(period_end)::text as period_end FROM mv_funnel_with_finance """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_funnel_product(nm_id: int) -> Any: """Воронка товара. Детальная воронка конкретного товара с конверсиями и динамикой. """ query = text(""" SELECT nm_id, title, vendor_code, brand_name, subject_name, product_rating, price_final, total_stock_actual, open_count, cart_count, order_count, order_sum, buyout_count, buyout_sum, cancel_count, add_to_cart_percent, cart_to_order_percent, buyout_percent, order_count_dyn, buyout_sum_dyn, days_of_stock, is_falling, is_rising FROM mv_funnel_with_finance WHERE nm_id = :nm_id """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) if df.empty: return {"error": "Товар не найден в воронке"} return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_funnel_top(limit: int = 10, sort_by: str = "order_sum") -> Any: """ТОП по воронке. ТОП товаров по метрикам воронки: заказы, выкупы, просмотры. Допустимые sort_by: "order_sum", "order_count", "buyout_sum", "open_count", "cart_count". """ allowed_sorts = {"order_sum", "order_count", "buyout_sum", "open_count", "cart_count"} if sort_by not in allowed_sorts: return {"error": f"Неверная сортировка. Разрешено: {allowed_sorts}"} query = text(f""" SELECT nm_id, title, vendor_code, brand_name, subject_name, product_rating, price_final, total_stock_actual, open_count, cart_count, order_count, order_sum, buyout_count, buyout_sum, cancel_count, add_to_cart_percent, cart_to_order_percent, buyout_percent, order_count_dyn, buyout_sum_dyn, days_of_stock, is_falling, is_rising FROM mv_funnel_with_finance ORDER BY {sort_by} DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_dynamics_alerts(threshold: int = -20, limit: int = 20) -> Any: """Товары с падением. Товары с падением заказов относительно прошлой недели. """ query = text(""" SELECT nm_id, title, vendor_code, order_count as metric_value, order_count_dyn as metric_dyn, 'falling_orders' as alert_type FROM mv_funnel_with_finance WHERE order_count_dyn IS NOT NULL AND order_count_dyn < :threshold ORDER BY order_count_dyn ASC LIMIT :limit """) df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_rising_stars(threshold: int = 20, limit: int = 20) -> Any: """Растущие товары. Товары с ростом заказов относительно прошлой недели. """ query = text(""" SELECT nm_id, title, vendor_code, order_count as metric_value, order_count_dyn as metric_dyn, 'rising_orders' as alert_type FROM mv_funnel_with_finance WHERE order_count_dyn IS NOT NULL AND order_count_dyn > :threshold ORDER BY order_count_dyn DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"threshold": threshold, "limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_conversion_leaks(limit: int = 20) -> Any: """Проблемы конверсии. Товары с низкой конверсией на этапах воронки — где теряем клиентов. """ query = text(""" SELECT nm_id, title, open_count, cart_count, order_count, buyout_count, add_to_cart_percent, cart_to_order_percent, buyout_percent, CASE WHEN add_to_cart_percent < 5 THEN 'open_to_cart' WHEN cart_to_order_percent < 10 THEN 'cart_to_order' WHEN buyout_percent < 50 THEN 'order_to_buyout' ELSE 'unknown' END as weak_stage, CASE WHEN add_to_cart_percent < 5 THEN ROUND(open_count * 0.10 - cart_count) WHEN cart_to_order_percent < 10 THEN ROUND(cart_count * 0.20 - order_count) ELSE ROUND(order_count * 0.80 - buyout_count) END as potential_orders_lost FROM mv_funnel_with_finance WHERE open_count > 100 AND (add_to_cart_percent < 5 OR cart_to_order_percent < 10 OR buyout_percent < 50) ORDER BY open_count DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_high_demand_low_stock(limit: int = 20) -> Any: """Горячие товары с низким остатком. Популярные товары, которые скоро закончатся. """ query = text(""" SELECT nm_id, title, vendor_code, brand_name, open_count, order_count, order_sum, total_stock_actual, days_of_stock, order_count_dyn FROM mv_funnel_with_finance WHERE is_low_stock = true ORDER BY order_count DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_category_benchmark(subject_id: Optional[int] = None, limit: int = 20) -> Any: """Сравнение с категорией. Сравнение конверсий товара со средним по категории. """ if subject_id: query = text(""" WITH category_avg AS ( SELECT subject_id, subject_name, ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct, ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct, ROUND(AVG(buyout_percent), 2) as avg_buyout_pct, ROUND(AVG(order_count), 0) as avg_orders FROM mv_funnel_with_finance WHERE subject_id = :subject_id GROUP BY subject_id, subject_name ) SELECT f.nm_id, f.title, f.vendor_code, f.subject_name, f.add_to_cart_percent, c.avg_cart_pct, ROUND(f.add_to_cart_percent - c.avg_cart_pct, 2) as cart_diff, f.cart_to_order_percent, c.avg_order_pct, ROUND(f.cart_to_order_percent - c.avg_order_pct, 2) as order_diff, f.buyout_percent, c.avg_buyout_pct, ROUND(f.buyout_percent - c.avg_buyout_pct, 2) as buyout_diff, f.order_count, c.avg_orders FROM mv_funnel_with_finance f JOIN category_avg c ON f.subject_id = c.subject_id ORDER BY f.order_count DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"subject_id": subject_id, "limit": limit}) else: query = text(""" WITH category_avg AS ( SELECT subject_id, subject_name, ROUND(AVG(add_to_cart_percent), 2) as avg_cart_pct, ROUND(AVG(cart_to_order_percent), 2) as avg_order_pct, ROUND(AVG(buyout_percent), 2) as avg_buyout_pct, COUNT(*) as products_count FROM mv_funnel_with_finance GROUP BY subject_id, subject_name HAVING COUNT(*) > 1 ) SELECT * FROM category_avg ORDER BY products_count DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") # ==================== ИНСАЙТЫ (ИИ) ==================== @tool def add_ai_insight(nm_id: int, tag: str, recommendation: str, score: float) -> Any: """Записать рекомендацию. Сохранить AI-рекомендацию для товара в базу. """ query = text(""" INSERT INTO ai_product_insights (product_nm_id, ai_strategy_tag, ai_recommendation, confidence_score) VALUES (:nm_id, :tag, :rec, :score) """) try: with engine.begin() as conn: conn.execute(query, { "nm_id": nm_id, "tag": tag, "rec": recommendation, "score": score }) return {"status": "success"} except Exception as e: return {"error": str(e)} @tool def get_insights(nm_id: int) -> Any: """Получить рекомендации. Получить сохранённые AI-рекомендации для товара. """ query = text(""" SELECT ai_strategy_tag as tag, ai_recommendation as recommendation, confidence_score as score, analyzed_at FROM ai_product_insights WHERE product_nm_id = :nm_id ORDER BY analyzed_at DESC """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) return clean_dataframe(df).to_dict(orient="records") # Экспорт всех инструментов одним списком all_tools = [ get_dashboard_summary, get_dashboard_funnel_summary, get_top_products, get_product_details, get_product_sales, get_product_stocks, get_stock_summary, get_low_stock_alerts, get_out_of_stock, get_profitability_report, get_returns_report, get_regions_revenue, get_sales_trend, get_avg_check_trend, get_funnel_summary, get_funnel_product, get_funnel_top, get_dynamics_alerts, get_rising_stars, get_conversion_leaks, get_high_demand_low_stock, get_category_benchmark, add_ai_insight, get_insights ] # Единый контракт экспорта как в других модулях api/* analytics_tools = all_tools