import numpy as np import pandas as pd from typing import Any from sqlalchemy import text from langchain_core.tools import tool # Импортируем подключение к БД. Убедитесь, что путь соответствует вашей структуре проекта. from database import engine ALLOWED_PRODUCT_SORT = {"roas", "ad_spend_total", "orders_total", "revenue_total"} ALLOWED_CAMPAIGN_SORT = {"ad_spend_total", "roas", "orders_total", "cpo"} ALLOWED_CATEGORY_SORT = {"ad_spend_total", "roas", "orders_total"} def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Заменяет inf и NaN на None для корректной JSON-сериализации агенту.""" return df.replace([np.inf, -np.inf, np.nan], None) @tool def get_dashboard_ads_summary() -> Any: """Сводка рекламы для дашборда. Быстрая сводка: количество товаров с рекламой, расход за 7 дней, заказы, выручка, средний ROAS, товары с низким ROAS и падающие товары. """ query = text(""" SELECT COUNT(*) as products_with_ads, SUM(ad_spend_total)::numeric(14,2) as total_spend_7d, SUM(orders_total) as total_orders_7d, SUM(revenue_total)::numeric(14,2) as total_revenue_7d, ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as avg_roas, COUNT(*) FILTER (WHERE roas < 1) as products_low_roas, COUNT(*) FILTER (WHERE orders_dyn < -20) as products_falling FROM dm_ad_spend_product_7d """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_ads_summary() -> Any: """Детальная сводка рекламы. Полная сводка за 7 дней с динамикой относительно прошлого периода: расход, просмотры, клики, заказы, выручка, ROAS, CPO, CTR. """ query = text(""" SELECT COUNT(DISTINCT nm_id) as products_count, SUM(ad_spend_total)::numeric(14,2) as ad_spend_total, SUM(ad_spend_prev)::numeric(14,2) as ad_spend_prev, ROUND((SUM(ad_spend_total) - SUM(ad_spend_prev)) / NULLIF(SUM(ad_spend_prev), 0) * 100, 1) as ad_spend_dyn, SUM(views_total) as views_total, SUM(clicks_total) as clicks_total, SUM(orders_total) as orders_total, SUM(orders_prev) as orders_prev, ROUND((SUM(orders_total) - SUM(orders_prev))::numeric / NULLIF(SUM(orders_prev), 0) * 100, 1) as orders_dyn, SUM(revenue_total)::numeric(14,2) as revenue_total, ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as roas_avg, ROUND(SUM(ad_spend_total) / NULLIF(SUM(orders_total), 0), 2) as cpo_avg, ROUND(SUM(clicks_total)::numeric / NULLIF(SUM(views_total), 0) * 100, 2) as ctr_avg FROM dm_ad_spend_product_7d """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_ads_trend(days: int = 30) -> Any: """Тренд рекламных расходов по дням. Args: days: Количество дней для графика. По умолчанию 30. """ query = text(""" SELECT day::text as date, ad_spend_total FROM dm_ad_spend_daily WHERE day >= CURRENT_DATE - :days ORDER BY day ASC """) df = pd.read_sql(query, engine, params={"days": days}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_products_top(limit: int = 20, sort_by: str = "roas") -> Any: """ТОП товаров по эффективности рекламы. Args: limit: Сколько товаров вернуть. sort_by: Поле сортировки. Допустимые значения: 'roas', 'ad_spend_total', 'orders_total', 'revenue_total'. """ if sort_by not in ALLOWED_PRODUCT_SORT: return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_PRODUCT_SORT)}"} query = text(f""" SELECT nm_id, vendor_code, title, brand, category, ad_spend_total, views_total, clicks_total, orders_total, revenue_total, ctr, cpc, cpm, cr, roas, cpo, ad_spend_dyn, orders_dyn, revenue_dyn FROM dm_ad_spend_product_7d WHERE {sort_by} IS NOT NULL ORDER BY {sort_by} DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_products_losers(limit: int = 20) -> Any: """Убыточные товары в рекламе. Возвращает товары с ROAS < 1 и заметным рекламным расходом (больше 100 руб). """ query = text(""" SELECT nm_id, vendor_code, title, brand, ad_spend_total, orders_total, revenue_total, roas, cpo, ad_spend_dyn, orders_dyn FROM dm_ad_spend_product_7d WHERE roas IS NOT NULL AND roas < 1 AND ad_spend_total > 100 ORDER BY ad_spend_total DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_products_rising(limit: int = 20) -> Any: """Товары с ростом эффективности рекламы. Возвращает товары с положительной динамикой заказов. """ query = text(""" SELECT nm_id, vendor_code, title, brand, ad_spend_total, orders_total, revenue_total, roas, cpo, orders_dyn, revenue_dyn FROM dm_ad_spend_product_7d WHERE orders_dyn IS NOT NULL AND orders_dyn > 0 ORDER BY orders_dyn DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_products_falling(limit: int = 20) -> Any: """Товары с падением эффективности рекламы. Возвращает товары с отрицательной динамикой заказов. """ query = text(""" SELECT nm_id, vendor_code, title, brand, ad_spend_total, orders_total, revenue_total, roas, cpo, orders_dyn, revenue_dyn FROM dm_ad_spend_product_7d WHERE orders_dyn IS NOT NULL AND orders_dyn < 0 ORDER BY orders_dyn ASC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_product(nm_id: int) -> Any: """Детали рекламы по конкретному товару. Args: nm_id: Артикул Wildberries. """ query = text(""" SELECT nm_id, vendor_code, title, brand, category, period_start::text, period_end::text, ad_spend_total, ad_spend_prev, ad_spend_dyn, views_total, views_prev, views_dyn, clicks_total, clicks_prev, clicks_dyn, orders_total, orders_prev, orders_dyn, revenue_total, revenue_prev, revenue_dyn, ctr, cpc, cpm, cr, roas, cpo FROM dm_ad_spend_product_7d WHERE nm_id = :nm_id """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) if df.empty: return {"error": f"Товар с артикулом {nm_id} не найден в рекламных данных."} return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_ads_campaigns(limit: int = 20, sort_by: str = "ad_spend_total") -> Any: """Список рекламных кампаний с метриками. Args: limit: Сколько кампаний вернуть. sort_by: Поле сортировки. Допустимые значения: 'ad_spend_total', 'roas', 'orders_total', 'cpo'. """ if sort_by not in ALLOWED_CAMPAIGN_SORT: return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_CAMPAIGN_SORT)}"} query = text(f""" SELECT campaign_id, campaign_name, campaign_status, campaign_type, daily_budget, budget_total, nm_cnt, ad_spend_total, views_total, clicks_total, orders_total, revenue_total, ctr, cpc, cpm, cr, roas, cpo FROM dm_ad_spend_campaign_7d ORDER BY {sort_by} DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_campaigns_inefficient(limit: int = 20) -> Any: """Неэффективные рекламные кампании. Возвращает кампании с низким ROAS (< 1) или высоким CPO (> 500). """ query = text(""" SELECT campaign_id, campaign_name, campaign_status, campaign_type, ad_spend_total, orders_total, revenue_total, roas, cpo, CASE WHEN roas < 1 THEN 'low_roas' WHEN cpo > 500 THEN 'high_cpo' ELSE 'other' END as issue_type FROM dm_ad_spend_campaign_7d WHERE (roas IS NOT NULL AND roas < 1) OR (cpo IS NOT NULL AND cpo > 500) ORDER BY ad_spend_total DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_campaign(campaign_id: int) -> Any: """Детали конкретной рекламной кампании. Args: campaign_id: ID рекламной кампании. """ query = text(""" SELECT campaign_id, campaign_name, campaign_status, campaign_type, daily_budget, budget_total, period_start::text, period_end::text, nm_cnt, ad_spend_total, views_total, clicks_total, atbs_total, orders_total, shks_total, revenue_total, ctr, cpc, cpm, cr, roas, cpo FROM dm_ad_spend_campaign_7d WHERE campaign_id = :campaign_id """) df = pd.read_sql(query, engine, params={"campaign_id": campaign_id}) if df.empty: return {"error": f"Кампания с ID {campaign_id} не найдена."} return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_ads_categories(sort_by: str = "ad_spend_total") -> Any: """Эффективность рекламы по категориям. Args: sort_by: Поле сортировки. Допустимые значения: 'ad_spend_total', 'roas', 'orders_total'. """ if sort_by not in ALLOWED_CATEGORY_SORT: return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_CATEGORY_SORT)}"} query = text(f""" SELECT category, nm_cnt, campaign_cnt, ad_spend_total, views_total, clicks_total, orders_total, revenue_total, ctr, cpc, cpm, cr, roas, cpo FROM dm_ad_spend_category_7d ORDER BY {sort_by} DESC """) df = pd.read_sql(query, engine) return clean_dataframe(df).to_dict(orient="records") @tool def get_ads_category_benchmark(nm_id: int) -> Any: """Сравнение товара со средними рекламными метриками по его категории. Args: nm_id: Артикул Wildberries. """ query = text(""" WITH product AS ( SELECT * FROM dm_ad_spend_product_7d WHERE nm_id = :nm_id ), cat_avg AS ( SELECT category, ROUND(AVG(ctr), 2) as avg_ctr, ROUND(AVG(cpc), 2) as avg_cpc, ROUND(AVG(cr), 2) as avg_cr, ROUND(AVG(roas), 2) as avg_roas, ROUND(AVG(cpo), 2) as avg_cpo FROM dm_ad_spend_product_7d WHERE category = (SELECT category FROM product) GROUP BY category ) SELECT p.nm_id, p.title, p.category, p.ctr, c.avg_ctr, ROUND(p.ctr - c.avg_ctr, 2) as ctr_diff, p.cpc, c.avg_cpc, ROUND(p.cpc - c.avg_cpc, 2) as cpc_diff, p.cr, c.avg_cr, ROUND(p.cr - c.avg_cr, 2) as cr_diff, p.roas, c.avg_roas, ROUND(p.roas - c.avg_roas, 2) as roas_diff, p.cpo, c.avg_cpo, ROUND(p.cpo - c.avg_cpo, 2) as cpo_diff FROM product p LEFT JOIN cat_avg c ON p.category = c.category """) df = pd.read_sql(query, engine, params={"nm_id": nm_id}) if df.empty: return {"error": f"Товар с артикулом {nm_id} не найден."} return clean_dataframe(df).to_dict(orient="records")[0] @tool def get_ads_alerts(limit: int = 30) -> Any: """Все проблемы с рекламой. Возвращает проблемные товары: низкий ROAS, высокий CPO, сильное падение заказов и другие рекламные алерты. """ query = text(""" SELECT nm_id, vendor_code, title, ad_spend_total, orders_total, revenue_total, roas, cpo, orders_dyn, CASE WHEN roas < 0.5 THEN 'critical_roas' WHEN roas < 1 THEN 'low_roas' WHEN orders_dyn < -50 THEN 'orders_crash' WHEN orders_dyn < -20 THEN 'orders_falling' WHEN cpo > 1000 THEN 'very_high_cpo' ELSE 'other' END as alert_type, CASE WHEN roas < 0.5 THEN 1 WHEN orders_dyn < -50 THEN 2 WHEN roas < 1 THEN 3 WHEN orders_dyn < -20 THEN 4 ELSE 5 END as priority FROM dm_ad_spend_product_7d WHERE roas < 1 OR orders_dyn < -20 OR cpo > 1000 ORDER BY priority ASC, ad_spend_total DESC LIMIT :limit """) df = pd.read_sql(query, engine, params={"limit": limit}) return clean_dataframe(df).to_dict(orient="records") @tool def add_insight(nm_id: int, tag: str, recommendation: str, score: float) -> Any: """Сохранить AI-рекомендацию по рекламе для товара в базу. Args: nm_id: Артикул Wildberries. tag: Тег рекомендации. Допустимые значения: 'ads_stop', 'ads_reduce', 'ads_boost', 'ads_optimize', 'ads_test'. recommendation: Конкретное действие с цифрами. score: Уверенность от 0.5 до 1.0. """ query = text(""" INSERT INTO ai_product_insights (product_nm_id, ai_strategy_tag, ai_recommendation, confidence_score) VALUES (:nm_id, :tag, :rec, :score) """) try: with engine.begin() as conn: conn.execute(query, { "nm_id": nm_id, "tag": tag, "rec": recommendation, "score": score }) return {"status": "success", "message": "Инсайт успешно сохранен."} except Exception as e: return {"error": f"Ошибка сохранения в базу данных: {str(e)}"} # Совместимость со старым именем инструмента get_ads_product_details = get_ads_product ads_tools = [ get_dashboard_ads_summary, get_ads_summary, get_ads_trend, get_ads_products_top, get_ads_products_losers, get_ads_products_rising, get_ads_products_falling, get_ads_product, get_ads_campaigns, get_ads_campaigns_inefficient, get_ads_campaign, get_ads_categories, get_ads_category_benchmark, get_ads_alerts, add_insight, ]