Spaces:
Sleeping
Sleeping
| """ | |
| Утилиты для работы с историей запросов в PostgreSQL | |
| Используется на бэкенде для логирования запросов к RAG | |
| """ | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| import json | |
| from sqlalchemy import text | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from src.config import sql_client | |
| def init_history_table(): | |
| """ | |
| Инициализация таблицы истории запросов | |
| Создает таблицу, если она не существует | |
| """ | |
| try: | |
| with sql_client.begin() as conn: | |
| conn.execute(text(""" | |
| CREATE TABLE IF NOT EXISTS query_history ( | |
| id SERIAL PRIMARY KEY, | |
| timestamp TIMESTAMP NOT NULL DEFAULT NOW(), | |
| dialogue_id VARCHAR(255) NOT NULL, | |
| query TEXT NOT NULL, | |
| answer TEXT NOT NULL, | |
| reason TEXT, | |
| search_period JSONB, | |
| metadata JSONB | |
| ) | |
| """)) | |
| conn.execute(text(""" | |
| CREATE INDEX IF NOT EXISTS idx_query_history_dialogue_id | |
| ON query_history(dialogue_id) | |
| """)) | |
| conn.execute(text(""" | |
| CREATE INDEX IF NOT EXISTS idx_query_history_timestamp | |
| ON query_history(timestamp DESC) | |
| """)) | |
| print("✅ Таблица query_history инициализирована") | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при инициализации таблицы: {e}") | |
| raise | |
| def log_query( | |
| query: str, | |
| answer: str, | |
| reason: str, | |
| dialogue_id: Optional[str] = None, | |
| search_period: Optional[Dict] = None, | |
| metadata_: Optional[Dict] = None | |
| ) -> Optional[int]: | |
| """ | |
| Логировать запрос в историю (вызывается бэкендом после получения ответа от LLM) | |
| Args: | |
| query: Текст вопроса пользователя | |
| answer: Ответ системы | |
| reason: Обоснование ответа | |
| dialogue_id: ID диалога (опционально) | |
| search_period: Период поиска | |
| metadata_: Дополнительные метаданные | |
| Returns: | |
| ID созданной записи или None при ошибке | |
| """ | |
| # Генерируем dialogue_id если не передан | |
| if not dialogue_id: | |
| dialogue_id = f"single_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" | |
| try: | |
| with sql_client.begin() as conn: | |
| result = conn.execute( | |
| text(""" | |
| INSERT INTO query_history | |
| (timestamp, dialogue_id, query, answer, reason, search_period, metadata) | |
| VALUES (:timestamp, :dialogue_id, :query, :answer, :reason, | |
| CAST(:search_period AS JSONB), CAST(:metadata AS JSONB)) | |
| RETURNING id | |
| """), | |
| { | |
| "timestamp": datetime.now(), | |
| "dialogue_id": dialogue_id, | |
| "query": query, | |
| "answer": answer, | |
| "reason": reason, | |
| "search_period": json.dumps(search_period or {}), | |
| "metadata": json.dumps(metadata_ or {}) | |
| } | |
| ) | |
| query_id = result.scalar() | |
| return query_id | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при логировании запроса: {e}") | |
| return None | |
| def get_all_history(limit: int = 100, offset: int = 0) -> List[Dict]: | |
| """Получить всю историю запросов""" | |
| try: | |
| with sql_client.connect() as conn: | |
| result = conn.execute( | |
| text(""" | |
| SELECT id, timestamp, dialogue_id, query, answer, reason, | |
| search_period, metadata | |
| FROM query_history | |
| ORDER BY timestamp DESC | |
| LIMIT :limit OFFSET :offset | |
| """), | |
| {"limit": limit, "offset": offset} | |
| ) | |
| rows = result.mappings().all() | |
| # Конвертируем datetime в ISO строку для JSON сериализации | |
| return [ | |
| { | |
| **dict(row), | |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None | |
| } | |
| for row in rows | |
| ] | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при получении истории: {e}") | |
| return [] | |
| def get_history_by_dialogue(dialogue_id: str) -> List[Dict]: | |
| """Получить историю конкретного диалога""" | |
| try: | |
| with sql_client.connect() as conn: | |
| result = conn.execute( | |
| text(""" | |
| SELECT id, timestamp, dialogue_id, query, answer, reason, | |
| search_period, metadata | |
| FROM query_history | |
| WHERE dialogue_id = :dialogue_id | |
| ORDER BY timestamp ASC | |
| """), | |
| {"dialogue_id": dialogue_id} | |
| ) | |
| rows = result.mappings().all() | |
| return [ | |
| { | |
| **dict(row), | |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None | |
| } | |
| for row in rows | |
| ] | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при получении диалога: {e}") | |
| return [] | |
| def search_history(search_text: str, limit: int = 50) -> List[Dict]: | |
| """Поиск по истории запросов""" | |
| try: | |
| with sql_client.connect() as conn: | |
| result = conn.execute( | |
| text(""" | |
| SELECT id, timestamp, dialogue_id, query, answer, reason, | |
| search_period, metadata | |
| FROM query_history | |
| WHERE query ILIKE :search_pattern | |
| OR answer ILIKE :search_pattern | |
| ORDER BY timestamp DESC | |
| LIMIT :limit | |
| """), | |
| { | |
| "search_pattern": f"%{search_text}%", | |
| "limit": limit | |
| } | |
| ) | |
| rows = result.mappings().all() | |
| return [ | |
| { | |
| **dict(row), | |
| "timestamp": row["timestamp"].isoformat() if row["timestamp"] else None | |
| } | |
| for row in rows | |
| ] | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при поиске в истории: {e}") | |
| return [] | |
| def get_history_stats() -> Dict: | |
| """Получить статистику по истории запросов""" | |
| try: | |
| with sql_client.connect() as conn: | |
| result = conn.execute( | |
| text(""" | |
| SELECT | |
| COUNT(*) as total_queries, | |
| COUNT(DISTINCT dialogue_id) as unique_dialogues, | |
| MAX(timestamp) as last_query_time, | |
| MIN(timestamp) as first_query_time | |
| FROM query_history | |
| """) | |
| ) | |
| row = result.mappings().first() | |
| if row: | |
| return { | |
| "total_queries": row["total_queries"], | |
| "unique_dialogues": row["unique_dialogues"], | |
| "last_query_time": row["last_query_time"].isoformat() if row["last_query_time"] else None, | |
| "first_query_time": row["first_query_time"].isoformat() if row["first_query_time"] else None | |
| } | |
| return {} | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при получении статистики: {e}") | |
| return {} | |
| def delete_history(dialogue_id: Optional[str] = None): | |
| """Удалить историю""" | |
| try: | |
| with sql_client.begin() as conn: | |
| if dialogue_id: | |
| conn.execute( | |
| text("DELETE FROM query_history WHERE dialogue_id = :dialogue_id"), | |
| {"dialogue_id": dialogue_id} | |
| ) | |
| print(f"✅ История диалога {dialogue_id} удалена") | |
| else: | |
| conn.execute(text("DELETE FROM query_history")) | |
| print("✅ Вся история удалена") | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при удалении истории: {e}") | |
| raise | |
| def get_recent_dialogues(limit: int = 10) -> List[Dict]: | |
| """Получить список последних диалогов""" | |
| try: | |
| with sql_client.connect() as conn: | |
| result = conn.execute( | |
| text(""" | |
| SELECT | |
| dialogue_id, | |
| COUNT(*) as message_count, | |
| MIN(timestamp) as started_at, | |
| MAX(timestamp) as last_message_at | |
| FROM query_history | |
| GROUP BY dialogue_id | |
| ORDER BY MAX(timestamp) DESC | |
| LIMIT :limit | |
| """), | |
| {"limit": limit} | |
| ) | |
| rows = result.mappings().all() | |
| return [ | |
| { | |
| "dialogue_id": row["dialogue_id"], | |
| "message_count": row["message_count"], | |
| "started_at": row["started_at"].isoformat() if row["started_at"] else None, | |
| "last_message_at": row["last_message_at"].isoformat() if row["last_message_at"] else None | |
| } | |
| for row in rows | |
| ] | |
| except SQLAlchemyError as e: | |
| print(f"❌ Ошибка при получении списка диалогов: {e}") | |
| return [] | |