status-law-gbot / analytics /chat_analyzer.py
Rulga's picture
Add analizer and tuner bot
59b4796
"""
Модуль для анализа истории чатов и извлечения полезных данных для обучения
"""
import json
from typing import List, Dict, Any, Tuple, Optional
from collections import Counter, defaultdict
import re
from datetime import datetime
from src.knowledge_base.dataset import DatasetManager
class ChatAnalyzer:
def __init__(self, dataset_manager: Optional[DatasetManager] = None):
"""
Инициализация анализатора чатов
Args:
dataset_manager: Менеджер датасетов для получения истории чатов
"""
self.dataset_manager = dataset_manager or DatasetManager()
def get_chat_data(self) -> List[Dict[str, Any]]:
"""
Получение всех данных чатов из датасета
Returns:
Список историй чатов
"""
success, chat_data = self.dataset_manager.get_chat_history()
if not success or not chat_data:
return []
return chat_data
def extract_question_answer_pairs(self, min_question_length: int = 10) -> List[Dict[str, str]]:
"""
Извлечение пар вопрос-ответ из истории чатов
Args:
min_question_length: Минимальная длина вопроса для включения в выборку
Returns:
Список пар вопрос-ответ в формате [{"question": "...", "answer": "..."}]
"""
chat_data = self.get_chat_data()
qa_pairs = []
for chat in chat_data:
messages = chat.get("messages", [])
# Проходим по сообщениям и собираем пары вопрос-ответ
for i in range(len(messages) - 1):
if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
question = messages[i].get("content", "").strip()
answer = messages[i+1].get("content", "").strip()
# Фильтруем по длине вопроса
if len(question) >= min_question_length and answer:
qa_pairs.append({
"question": question,
"answer": answer
})
return qa_pairs
def analyze_common_questions(self, top_n: int = 10) -> List[Tuple[str, int]]:
"""
Анализ наиболее часто задаваемых вопросов
Args:
top_n: Количество самых популярных вопросов для возврата
Returns:
Список кортежей (вопрос, количество)
"""
qa_pairs = self.extract_question_answer_pairs()
# Извлекаем только вопросы
questions = [qa["question"] for qa in qa_pairs]
# Предварительная обработка вопросов для лучшего группирования
processed_questions = []
for q in questions:
# Преобразуем в нижний регистр
q = q.lower()
# Удаляем пунктуацию и лишние пробелы
q = re.sub(r'[^\w\s]', ' ', q)
q = re.sub(r'\s+', ' ', q).strip()
processed_questions.append(q)
# Подсчет частоты вопросов
question_counter = Counter(processed_questions)
# Получаем top_n самых частых вопросов
return question_counter.most_common(top_n)
def analyze_user_satisfaction(self) -> Dict[str, Any]:
"""
Анализ удовлетворенности пользователей на основе истории чатов
Returns:
Словарь с метриками удовлетворенности
"""
chat_data = self.get_chat_data()
# Инициализация метрик
metrics = {
"total_conversations": len(chat_data),
"avg_messages_per_conversation": 0,
"avg_conversation_duration": 0, # в секундах
"follow_up_questions_rate": 0, # процент диалогов с дополнительными вопросами
}
if not chat_data:
return metrics
# Подсчет общего количества сообщений и длительности диалогов
total_messages = 0
conversations_with_followups = 0
total_duration = 0
for chat in chat_data:
messages = chat.get("messages", [])
total_messages += len(messages)
# Проверка наличия дополнительных вопросов от пользователя
user_messages = [m for m in messages if m.get("role") == "user"]
if len(user_messages) > 1:
conversations_with_followups += 1
# Расчет длительности диалога, если есть временные метки
if len(messages) >= 2 and all(["timestamp" in m for m in [messages[0], messages[-1]]]):
try:
start_time = datetime.fromisoformat(messages[0]["timestamp"])
end_time = datetime.fromisoformat(messages[-1]["timestamp"])
duration = (end_time - start_time).total_seconds()
total_duration += duration
except (ValueError, KeyError):
pass
# Расчет средних значений
metrics["avg_messages_per_conversation"] = total_messages / len(chat_data)
metrics["follow_up_questions_rate"] = conversations_with_followups / len(chat_data) * 100
# Расчет средней длительности, если есть данные
if total_duration > 0:
metrics["avg_conversation_duration"] = total_duration / len(chat_data)
return metrics
def extract_failed_questions(self) -> List[str]:
"""
Извлечение вопросов, на которые бот не смог дать удовлетворительный ответ
Returns:
Список вопросов, требующих улучшения
"""
chat_data = self.get_chat_data()
failed_questions = []
# Ключевые слова, указывающие на неудовлетворительный ответ
failure_indicators = [
"не знаю", "не могу ответить", "затрудняюсь ответить",
"у меня нет информации", "не имею данных"
]
for chat in chat_data:
messages = chat.get("messages", [])
for i in range(len(messages) - 1):
if messages[i].get("role") == "user" and messages[i+1].get("role") == "assistant":
question = messages[i].get("content", "").strip()
answer = messages[i+1].get("content", "").strip().lower()
# Проверяем, содержит ли ответ индикаторы неудачи
if any(indicator in answer for indicator in failure_indicators):
failed_questions.append(question)
return failed_questions
def export_training_data(self, output_file: str) -> Tuple[bool, str]:
"""
Экспорт данных для обучения в формате JSONL
Args:
output_file: Путь к выходному файлу
Returns:
(успех, сообщение)
"""
try:
qa_pairs = self.extract_question_answer_pairs()
if not qa_pairs:
return False, "Нет достаточного количества данных для экспорта"
with open(output_file, "w", encoding="utf-8") as f:
for pair in qa_pairs:
training_example = {
"messages": [
{"role": "user", "content": pair["question"]},
{"role": "assistant", "content": pair["answer"]}
]
}
f.write(json.dumps(training_example, ensure_ascii=False) + "\n")
return True, f"Данные для обучения успешно экспортированы в {output_file}. Экспортировано {len(qa_pairs)} примеров."
except Exception as e:
return False, f"Ошибка при экспорте данных для обучения: {str(e)}"
def generate_analytics_report(self) -> Dict[str, Any]:
"""
Генерация полного аналитического отчета
Returns:
Словарь с различными метриками и анализом
"""
report = {}
# Базовые метрики
chat_data = self.get_chat_data()
report["total_conversations"] = len(chat_data)
# Удовлетворенность пользователей
report["satisfaction_metrics"] = self.analyze_user_satisfaction()
# Частые вопросы
report["common_questions"] = self.analyze_common_questions(top_n=20)
# Вопросы без ответов
report["failed_questions"] = self.extract_failed_questions()
report["failed_questions_count"] = len(report["failed_questions"])
# Статистика по количеству пар вопрос-ответ
qa_pairs = self.extract_question_answer_pairs()
report["qa_pairs_count"] = len(qa_pairs)
return report
if __name__ == "__main__":
# Пример использования
analyzer = ChatAnalyzer()
report = analyzer.generate_analytics_report()
print(f"Всего диалогов: {report['total_conversations']}")
print(f"Пар вопрос-ответ для обучения: {report['qa_pairs_count']}")
print("\nСамые популярные вопросы:")
for question, count in report['common_questions'][:5]:
print(f" - {question} ({count} раз)")