logreader / pipeline.py
PatrickRedStar's picture
add
d76ef9a
import json
import os
from dataclasses import asdict, dataclass
from typing import Dict, List, Optional
# Force CPU usage to avoid CUDA capability issues in WSL/GPU-mismatch environments.
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "")
from transformers import pipeline
from preprocess import PreprocessResult, preprocess_logs
from retrieval import RunbookRetriever
CANDIDATE_LABELS = [
"oom",
"timeout",
"auth_failure",
"db_connection",
"dns_resolution",
"tls_handshake",
"crashloop",
"null_pointer",
"resource_exhaustion",
"network_partition",
]
@dataclass
class IncidentResult:
"""
Контейнер с результатом пайплайна по одному запуску.
"""
incident_label: str
incident_score: float
incident_alternatives: List[Dict]
explanation: str
likely_cause: str
checks: List[str]
retrieved: List[Dict]
verification: List[Dict]
signatures: List[str]
class ModelStore:
"""
Хранит и переиспользует все необходимые ML-пайплайны.
"""
def __init__(self):
"""
Загружает и кэширует все необходимые трансформерные пайплайны.
"""
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
)
self.summarizer = pipeline(
"summarization",
model="sshleifer/distilbart-cnn-12-6",
device=-1,
)
self.nli = pipeline(
"text-classification",
model="typeform/distilbert-base-uncased-mnli",
device=-1,
)
class IncidentPipeline:
"""
Компонуёт все стадии анализа логов в единый процесс.
"""
def __init__(self):
"""
Собирает модели и ретривер, готовые к переиспользованию.
"""
self.models = ModelStore()
self.retriever = RunbookRetriever()
def classify(self, text: str, source: str) -> Dict:
"""
Определяет тип инцидента zero-shot классификатором.
"""
labels = list(CANDIDATE_LABELS)
if source and source != "auto":
labels.append(f"{source}_specific")
res = self.models.classifier(text, candidate_labels=labels, multi_label=False)
label = res["labels"][0]
score = float(res["scores"][0])
alternatives = [
{"label": res["labels"][i], "score": float(res["scores"][i])}
for i in range(1, min(4, len(res["labels"])))
]
return {"label": label, "score": score, "alternatives": alternatives}
def explain(self, text: str, verbosity: int = 1) -> str:
"""
Делает сжатое пояснение к логам при помощи summarizer.
"""
max_len = 180 + 60 * verbosity
min_len = 40 + 20 * verbosity
summary = self.models.summarizer(
text,
max_length=max_len,
min_length=min_len,
truncation=True,
)[0]["summary_text"]
return summary
def generate_cause_and_checks(
self, result: PreprocessResult, label: str, retrieved: List[Dict]
) -> tuple[str, List[str]]:
"""
Подбирает человеко-понятную причину и список проверок по категории.
"""
cause_map = {
"oom": "Сервис, вероятно, исчерпал память и был аварийно завершён.",
"crashloop": "Контейнер постоянно перезапускается из-за повторяющихся сбоев или неуспешных health-check.",
"timeout": "Верхний уровень или зависимость не ответили вовремя.",
"auth_failure": "Аутентификация/авторизация отклонена (истёкший токен, нехватка прав или неверная конфигурация).",
"db_connection": "Пул подключений к базе данных исчерпан либо соединение отвергнуто.",
"dns_resolution": "Не удалось разрешить DNS-имя целевого хоста.",
"tls_handshake": "TLS-рукопожатие завершилось ошибкой (сертификат, протокол или шифр).",
"null_pointer": "Приложение встретило null/None и аварийно завершилось.",
"resource_exhaustion": "Системные ресурсы (CPU/дескрипторы файлов) исчерпаны.",
"network_partition": "Сетевой разрыв или проблемы с связностью между компонентами.",
}
cause = cause_map.get(label, f"Наиболее вероятная категория инцидента: {label}.")
checks: List[str] = [
"Подтвердите временной интервал сбоя в логах и сопоставьте с последними релизами.",
"Проверьте метрики сервисов/подов (CPU, память, рестарты) вокруг окна инцидента.",
"Изучите недавние изменения конфигураций и секретов.",
]
if label == "oom" or "oom" in result.signatures:
checks += [
"Проверьте лимиты/requests памяти контейнера и фактическое потребление.",
"Если доступны, изучите дампы heap/thread.",
"Исключите утечки памяти и неограниченные кэши.",
"Убедитесь, что флаги памяти JVM/рантайма настроены корректно.",
]
if label in ("timeout",):
checks += [
"Замерьте задержки между сервисом и зависимостями.",
"Проверьте настройки ретраев/бэк-оффов и circuit breaker.",
"Поиск потенциально медленных запросов либо перегруженных зависимостей.",
]
if label in ("auth_failure",):
checks += [
"Проверьте валидность токенов/учётных данных и нужные scope.",
"Сверьте время между сервисами (clock skew).",
"Проверьте состояние провайдера аутентификации и его квоты.",
]
if label in ("db_connection",):
checks += [
"Сопоставьте размер пула БД с текущей нагрузкой.",
"Проверьте базу на блокировки или медленные запросы.",
"Убедитесь в корректности host/port/DNS для подключения.",
]
if label in ("dns_resolution",):
checks += [
"Попробуйте вручную резолвить хост из пода/хоста.",
"Проверьте здоровье DNS-серверов и свежие изменения записей.",
"Посмотрите search domains и /etc/resolv.conf внутри контейнера.",
]
if label in ("tls_handshake",):
checks += [
"Проверьте сертификаты (срок, SAN, цепочка).",
"Сравните поддерживаемые протоколы/шифры клиента и сервера.",
"Проверьте настройки ALPN/SNI.",
]
if label in ("crashloop",):
checks += [
"Проверьте startup/health‑пробы и переопределения команд.",
"Посмотрите последние логи перед рестартом для поиска первопричины.",
"Убедитесь, что конфиги/секреты смонтированы и права корректны.",
]
if retrieved:
checks.append(f"Изучите ранбук: {retrieved[0]['title']} (сходство {retrieved[0]['score']:.2f}).")
# Ensure at least 5 checks
while len(checks) < 5:
checks.append("Добавьте шаг диагностики: снимите дополнительные логи и метрики.")
return cause, checks[:10]
def verify_hypotheses(self, premise: str, hypotheses: List[str]) -> List[Dict]:
"""
Прогоняет набор гипотез через NLI, чтобы отметить подтверждение/опровержение.
"""
results = []
for hyp in hypotheses:
raw = self.models.nli({"text": premise, "text_pair": hyp})
pred = raw[0] if isinstance(raw, list) else raw
results.append({"hypothesis": hyp, "label": pred["label"], "score": float(pred["score"])})
return results
def process(
self,
raw_text: str,
source: str = "auto",
use_retrieval: bool = True,
use_nli: bool = False,
verbosity: int = 1,
) -> IncidentResult:
"""
Полный пайплайн обработки логов: от предобработки до верификации.
"""
if not raw_text or not raw_text.strip():
raise ValueError("Поле логов пустое. Пожалуйста, вставьте текст логов или стектрейса.")
pre = preprocess_logs(raw_text)
cls = self.classify(pre.cleaned_text, source)
explanation = self.explain(pre.cleaned_text, verbosity=verbosity)
retrieved = self.retriever.search(pre.cleaned_text, top_k=3) if use_retrieval else []
cause, checks = self.generate_cause_and_checks(pre, cls["label"], retrieved)
verification = []
if use_nli:
hypotheses = [cause] + [f"Совпадение с ранбуком: {r['title']}" for r in retrieved]
verification = self.verify_hypotheses(pre.cleaned_text, hypotheses)
return IncidentResult(
incident_label=cls["label"],
incident_score=cls["score"],
incident_alternatives=cls["alternatives"],
explanation=explanation,
likely_cause=cause,
checks=checks,
retrieved=retrieved,
verification=verification,
signatures=pre.signatures,
)
def serialize_result(result: IncidentResult) -> str:
"""
Упаковывает результат в JSON-строку.
"""
return json.dumps(asdict(result), indent=2, ensure_ascii=False)