File size: 11,537 Bytes
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
d76ef9a
 
 
29fdac9
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
d76ef9a
 
 
 
 
 
 
 
 
 
29fdac9
d76ef9a
29fdac9
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
d76ef9a
29fdac9
 
d76ef9a
29fdac9
 
 
d76ef9a
 
 
29fdac9
 
d76ef9a
 
29fdac9
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
d76ef9a
29fdac9
 
 
 
 
 
 
d76ef9a
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import json
import os
from dataclasses import asdict, dataclass
from typing import Dict, List, Optional

# Force CPU usage to avoid CUDA capability issues in WSL/GPU-mismatch environments.
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "")

from transformers import pipeline

from preprocess import PreprocessResult, preprocess_logs
from retrieval import RunbookRetriever


CANDIDATE_LABELS = [
    "oom",
    "timeout",
    "auth_failure",
    "db_connection",
    "dns_resolution",
    "tls_handshake",
    "crashloop",
    "null_pointer",
    "resource_exhaustion",
    "network_partition",
]


@dataclass
class IncidentResult:
    """
    Контейнер с результатом пайплайна по одному запуску.
    """
    incident_label: str
    incident_score: float
    incident_alternatives: List[Dict]
    explanation: str
    likely_cause: str
    checks: List[str]
    retrieved: List[Dict]
    verification: List[Dict]
    signatures: List[str]


class ModelStore:
    """
    Хранит и переиспользует все необходимые ML-пайплайны.
    """
    def __init__(self):
        """
        Загружает и кэширует все необходимые трансформерные пайплайны.
        """
        self.classifier = pipeline(
            "zero-shot-classification",
            model="facebook/bart-large-mnli",
            device=-1,
        )
        self.summarizer = pipeline(
            "summarization",
            model="sshleifer/distilbart-cnn-12-6",
            device=-1,
        )
        self.nli = pipeline(
            "text-classification",
            model="typeform/distilbert-base-uncased-mnli",
            device=-1,
        )


class IncidentPipeline:
    """
    Компонуёт все стадии анализа логов в единый процесс.
    """
    def __init__(self):
        """
        Собирает модели и ретривер, готовые к переиспользованию.
        """
        self.models = ModelStore()
        self.retriever = RunbookRetriever()

    def classify(self, text: str, source: str) -> Dict:
        """
        Определяет тип инцидента zero-shot классификатором.
        """
        labels = list(CANDIDATE_LABELS)
        if source and source != "auto":
            labels.append(f"{source}_specific")
        res = self.models.classifier(text, candidate_labels=labels, multi_label=False)
        label = res["labels"][0]
        score = float(res["scores"][0])
        alternatives = [
            {"label": res["labels"][i], "score": float(res["scores"][i])}
            for i in range(1, min(4, len(res["labels"])))
        ]
        return {"label": label, "score": score, "alternatives": alternatives}

    def explain(self, text: str, verbosity: int = 1) -> str:
        """
        Делает сжатое пояснение к логам при помощи summarizer.
        """
        max_len = 180 + 60 * verbosity
        min_len = 40 + 20 * verbosity
        summary = self.models.summarizer(
            text,
            max_length=max_len,
            min_length=min_len,
            truncation=True,
        )[0]["summary_text"]
        return summary

    def generate_cause_and_checks(
        self, result: PreprocessResult, label: str, retrieved: List[Dict]
    ) -> tuple[str, List[str]]:
        """
        Подбирает человеко-понятную причину и список проверок по категории.
        """
        cause_map = {
            "oom": "Сервис, вероятно, исчерпал память и был аварийно завершён.",
            "crashloop": "Контейнер постоянно перезапускается из-за повторяющихся сбоев или неуспешных health-check.",
            "timeout": "Верхний уровень или зависимость не ответили вовремя.",
            "auth_failure": "Аутентификация/авторизация отклонена (истёкший токен, нехватка прав или неверная конфигурация).",
            "db_connection": "Пул подключений к базе данных исчерпан либо соединение отвергнуто.",
            "dns_resolution": "Не удалось разрешить DNS-имя целевого хоста.",
            "tls_handshake": "TLS-рукопожатие завершилось ошибкой (сертификат, протокол или шифр).",
            "null_pointer": "Приложение встретило null/None и аварийно завершилось.",
            "resource_exhaustion": "Системные ресурсы (CPU/дескрипторы файлов) исчерпаны.",
            "network_partition": "Сетевой разрыв или проблемы с связностью между компонентами.",
        }
        cause = cause_map.get(label, f"Наиболее вероятная категория инцидента: {label}.")
        checks: List[str] = [
            "Подтвердите временной интервал сбоя в логах и сопоставьте с последними релизами.",
            "Проверьте метрики сервисов/подов (CPU, память, рестарты) вокруг окна инцидента.",
            "Изучите недавние изменения конфигураций и секретов.",
        ]
        if label == "oom" or "oom" in result.signatures:
            checks += [
                "Проверьте лимиты/requests памяти контейнера и фактическое потребление.",
                "Если доступны, изучите дампы heap/thread.",
                "Исключите утечки памяти и неограниченные кэши.",
                "Убедитесь, что флаги памяти JVM/рантайма настроены корректно.",
            ]
        if label in ("timeout",):
            checks += [
                "Замерьте задержки между сервисом и зависимостями.",
                "Проверьте настройки ретраев/бэк-оффов и circuit breaker.",
                "Поиск потенциально медленных запросов либо перегруженных зависимостей.",
            ]
        if label in ("auth_failure",):
            checks += [
                "Проверьте валидность токенов/учётных данных и нужные scope.",
                "Сверьте время между сервисами (clock skew).",
                "Проверьте состояние провайдера аутентификации и его квоты.",
            ]
        if label in ("db_connection",):
            checks += [
                "Сопоставьте размер пула БД с текущей нагрузкой.",
                "Проверьте базу на блокировки или медленные запросы.",
                "Убедитесь в корректности host/port/DNS для подключения.",
            ]
        if label in ("dns_resolution",):
            checks += [
                "Попробуйте вручную резолвить хост из пода/хоста.",
                "Проверьте здоровье DNS-серверов и свежие изменения записей.",
                "Посмотрите search domains и /etc/resolv.conf внутри контейнера.",
            ]
        if label in ("tls_handshake",):
            checks += [
                "Проверьте сертификаты (срок, SAN, цепочка).",
                "Сравните поддерживаемые протоколы/шифры клиента и сервера.",
                "Проверьте настройки ALPN/SNI.",
            ]
        if label in ("crashloop",):
            checks += [
                "Проверьте startup/health‑пробы и переопределения команд.",
                "Посмотрите последние логи перед рестартом для поиска первопричины.",
                "Убедитесь, что конфиги/секреты смонтированы и права корректны.",
            ]
        if retrieved:
            checks.append(f"Изучите ранбук: {retrieved[0]['title']} (сходство {retrieved[0]['score']:.2f}).")
        # Ensure at least 5 checks
        while len(checks) < 5:
            checks.append("Добавьте шаг диагностики: снимите дополнительные логи и метрики.")
        return cause, checks[:10]

    def verify_hypotheses(self, premise: str, hypotheses: List[str]) -> List[Dict]:
        """
        Прогоняет набор гипотез через NLI, чтобы отметить подтверждение/опровержение.
        """
        results = []
        for hyp in hypotheses:
            raw = self.models.nli({"text": premise, "text_pair": hyp})
            pred = raw[0] if isinstance(raw, list) else raw
            results.append({"hypothesis": hyp, "label": pred["label"], "score": float(pred["score"])})
        return results

    def process(
        self,
        raw_text: str,
        source: str = "auto",
        use_retrieval: bool = True,
        use_nli: bool = False,
        verbosity: int = 1,
    ) -> IncidentResult:
        """
        Полный пайплайн обработки логов: от предобработки до верификации.
        """
        if not raw_text or not raw_text.strip():
            raise ValueError("Поле логов пустое. Пожалуйста, вставьте текст логов или стектрейса.")
        pre = preprocess_logs(raw_text)
        cls = self.classify(pre.cleaned_text, source)
        explanation = self.explain(pre.cleaned_text, verbosity=verbosity)
        retrieved = self.retriever.search(pre.cleaned_text, top_k=3) if use_retrieval else []
        cause, checks = self.generate_cause_and_checks(pre, cls["label"], retrieved)
        verification = []
        if use_nli:
            hypotheses = [cause] + [f"Совпадение с ранбуком: {r['title']}" for r in retrieved]
            verification = self.verify_hypotheses(pre.cleaned_text, hypotheses)
        return IncidentResult(
            incident_label=cls["label"],
            incident_score=cls["score"],
            incident_alternatives=cls["alternatives"],
            explanation=explanation,
            likely_cause=cause,
            checks=checks,
            retrieved=retrieved,
            verification=verification,
            signatures=pre.signatures,
        )


def serialize_result(result: IncidentResult) -> str:
    """
    Упаковывает результат в JSON-строку.
    """
    return json.dumps(asdict(result), indent=2, ensure_ascii=False)