""" Утилиты для работы с историей запросов в PostgreSQL Используется на бэкенде для логирования запросов к RAG """ from datetime import datetime from typing import List, Dict, Optional import json from sqlalchemy import text from sqlalchemy.exc import SQLAlchemyError from src.config import sql_client def init_history_table(): """ Инициализация таблицы истории запросов Создает таблицу, если она не существует """ try: with sql_client.begin() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS query_history ( id SERIAL PRIMARY KEY, timestamp TIMESTAMP NOT NULL DEFAULT NOW(), dialogue_id VARCHAR(255) NOT NULL, query TEXT NOT NULL, answer TEXT NOT NULL, reason TEXT, search_period JSONB, metadata JSONB ) """)) conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_query_history_dialogue_id ON query_history(dialogue_id) """)) conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_query_history_timestamp ON query_history(timestamp DESC) """)) print("✅ Таблица query_history инициализирована") except SQLAlchemyError as e: print(f"❌ Ошибка при инициализации таблицы: {e}") raise def log_query( query: str, answer: str, reason: str, dialogue_id: Optional[str] = None, search_period: Optional[Dict] = None, metadata_: Optional[Dict] = None ) -> Optional[int]: """ Логировать запрос в историю (вызывается бэкендом после получения ответа от LLM) Args: query: Текст вопроса пользователя answer: Ответ системы reason: Обоснование ответа dialogue_id: ID диалога (опционально) search_period: Период поиска metadata_: Дополнительные метаданные Returns: ID созданной записи или None при ошибке """ # Генерируем dialogue_id если не передан if not dialogue_id: dialogue_id = f"single_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" try: with sql_client.begin() as conn: result = conn.execute( text(""" INSERT INTO query_history (timestamp, dialogue_id, query, answer, reason, search_period, metadata) VALUES (:timestamp, :dialogue_id, :query, :answer, :reason, CAST(:search_period AS JSONB), CAST(:metadata AS JSONB)) RETURNING id """), { "timestamp": datetime.now(), "dialogue_id": dialogue_id, "query": query, "answer": answer, "reason": reason, "search_period": json.dumps(search_period or {}), "metadata": json.dumps(metadata_ or {}) } ) query_id = result.scalar() return query_id except SQLAlchemyError as e: print(f"❌ Ошибка при логировании запроса: {e}") return None def get_all_history(limit: int = 100, offset: int = 0) -> List[Dict]: """Получить всю историю запросов""" try: with sql_client.connect() as conn: result = conn.execute( text(""" SELECT id, timestamp, dialogue_id, query, answer, reason, search_period, metadata FROM query_history ORDER BY timestamp DESC LIMIT :limit OFFSET :offset """), {"limit": limit, "offset": offset} ) rows = result.mappings().all() # Конвертируем datetime в ISO строку для JSON сериализации return [ { **dict(row), "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None } for row in rows ] except SQLAlchemyError as e: print(f"❌ Ошибка при получении истории: {e}") return [] def get_history_by_dialogue(dialogue_id: str) -> List[Dict]: """Получить историю конкретного диалога""" try: with sql_client.connect() as conn: result = conn.execute( text(""" SELECT id, timestamp, dialogue_id, query, answer, reason, search_period, metadata FROM query_history WHERE dialogue_id = :dialogue_id ORDER BY timestamp ASC """), {"dialogue_id": dialogue_id} ) rows = result.mappings().all() return [ { **dict(row), "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None } for row in rows ] except SQLAlchemyError as e: print(f"❌ Ошибка при получении диалога: {e}") return [] def search_history(search_text: str, limit: int = 50) -> List[Dict]: """Поиск по истории запросов""" try: with sql_client.connect() as conn: result = conn.execute( text(""" SELECT id, timestamp, dialogue_id, query, answer, reason, search_period, metadata FROM query_history WHERE query ILIKE :search_pattern OR answer ILIKE :search_pattern ORDER BY timestamp DESC LIMIT :limit """), { "search_pattern": f"%{search_text}%", "limit": limit } ) rows = result.mappings().all() return [ { **dict(row), "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None } for row in rows ] except SQLAlchemyError as e: print(f"❌ Ошибка при поиске в истории: {e}") return [] def get_history_stats() -> Dict: """Получить статистику по истории запросов""" try: with sql_client.connect() as conn: result = conn.execute( text(""" SELECT COUNT(*) as total_queries, COUNT(DISTINCT dialogue_id) as unique_dialogues, MAX(timestamp) as last_query_time, MIN(timestamp) as first_query_time FROM query_history """) ) row = result.mappings().first() if row: return { "total_queries": row["total_queries"], "unique_dialogues": row["unique_dialogues"], "last_query_time": row["last_query_time"].isoformat() if row["last_query_time"] else None, "first_query_time": row["first_query_time"].isoformat() if row["first_query_time"] else None } return {} except SQLAlchemyError as e: print(f"❌ Ошибка при получении статистики: {e}") return {} def delete_history(dialogue_id: Optional[str] = None): """Удалить историю""" try: with sql_client.begin() as conn: if dialogue_id: conn.execute( text("DELETE FROM query_history WHERE dialogue_id = :dialogue_id"), {"dialogue_id": dialogue_id} ) print(f"✅ История диалога {dialogue_id} удалена") else: conn.execute(text("DELETE FROM query_history")) print("✅ Вся история удалена") except SQLAlchemyError as e: print(f"❌ Ошибка при удалении истории: {e}") raise def get_recent_dialogues(limit: int = 10) -> List[Dict]: """Получить список последних диалогов""" try: with sql_client.connect() as conn: result = conn.execute( text(""" SELECT dialogue_id, COUNT(*) as message_count, MIN(timestamp) as started_at, MAX(timestamp) as last_message_at FROM query_history GROUP BY dialogue_id ORDER BY MAX(timestamp) DESC LIMIT :limit """), {"limit": limit} ) rows = result.mappings().all() return [ { "dialogue_id": row["dialogue_id"], "message_count": row["message_count"], "started_at": row["started_at"].isoformat() if row["started_at"] else None, "last_message_at": row["last_message_at"].isoformat() if row["last_message_at"] else None } for row in rows ] except SQLAlchemyError as e: print(f"❌ Ошибка при получении списка диалогов: {e}") return []