import json import os from dataclasses import asdict, dataclass from typing import Dict, List, Optional # Force CPU usage to avoid CUDA capability issues in WSL/GPU-mismatch environments. os.environ.setdefault("CUDA_VISIBLE_DEVICES", "") from transformers import pipeline from preprocess import PreprocessResult, preprocess_logs from retrieval import RunbookRetriever CANDIDATE_LABELS = [ "oom", "timeout", "auth_failure", "db_connection", "dns_resolution", "tls_handshake", "crashloop", "null_pointer", "resource_exhaustion", "network_partition", ] @dataclass class IncidentResult: """ Контейнер с результатом пайплайна по одному запуску. """ incident_label: str incident_score: float incident_alternatives: List[Dict] explanation: str likely_cause: str checks: List[str] retrieved: List[Dict] verification: List[Dict] signatures: List[str] class ModelStore: """ Хранит и переиспользует все необходимые ML-пайплайны. """ def __init__(self): """ Загружает и кэширует все необходимые трансформерные пайплайны. """ self.classifier = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device=-1, ) self.summarizer = pipeline( "summarization", model="sshleifer/distilbart-cnn-12-6", device=-1, ) self.nli = pipeline( "text-classification", model="typeform/distilbert-base-uncased-mnli", device=-1, ) class IncidentPipeline: """ Компонуёт все стадии анализа логов в единый процесс. """ def __init__(self): """ Собирает модели и ретривер, готовые к переиспользованию. """ self.models = ModelStore() self.retriever = RunbookRetriever() def classify(self, text: str, source: str) -> Dict: """ Определяет тип инцидента zero-shot классификатором. """ labels = list(CANDIDATE_LABELS) if source and source != "auto": labels.append(f"{source}_specific") res = self.models.classifier(text, candidate_labels=labels, multi_label=False) label = res["labels"][0] score = float(res["scores"][0]) alternatives = [ {"label": res["labels"][i], "score": float(res["scores"][i])} for i in range(1, min(4, len(res["labels"]))) ] return {"label": label, "score": score, "alternatives": alternatives} def explain(self, text: str, verbosity: int = 1) -> str: """ Делает сжатое пояснение к логам при помощи summarizer. """ max_len = 180 + 60 * verbosity min_len = 40 + 20 * verbosity summary = self.models.summarizer( text, max_length=max_len, min_length=min_len, truncation=True, )[0]["summary_text"] return summary def generate_cause_and_checks( self, result: PreprocessResult, label: str, retrieved: List[Dict] ) -> tuple[str, List[str]]: """ Подбирает человеко-понятную причину и список проверок по категории. """ cause_map = { "oom": "Сервис, вероятно, исчерпал память и был аварийно завершён.", "crashloop": "Контейнер постоянно перезапускается из-за повторяющихся сбоев или неуспешных health-check.", "timeout": "Верхний уровень или зависимость не ответили вовремя.", "auth_failure": "Аутентификация/авторизация отклонена (истёкший токен, нехватка прав или неверная конфигурация).", "db_connection": "Пул подключений к базе данных исчерпан либо соединение отвергнуто.", "dns_resolution": "Не удалось разрешить DNS-имя целевого хоста.", "tls_handshake": "TLS-рукопожатие завершилось ошибкой (сертификат, протокол или шифр).", "null_pointer": "Приложение встретило null/None и аварийно завершилось.", "resource_exhaustion": "Системные ресурсы (CPU/дескрипторы файлов) исчерпаны.", "network_partition": "Сетевой разрыв или проблемы с связностью между компонентами.", } cause = cause_map.get(label, f"Наиболее вероятная категория инцидента: {label}.") checks: List[str] = [ "Подтвердите временной интервал сбоя в логах и сопоставьте с последними релизами.", "Проверьте метрики сервисов/подов (CPU, память, рестарты) вокруг окна инцидента.", "Изучите недавние изменения конфигураций и секретов.", ] if label == "oom" or "oom" in result.signatures: checks += [ "Проверьте лимиты/requests памяти контейнера и фактическое потребление.", "Если доступны, изучите дампы heap/thread.", "Исключите утечки памяти и неограниченные кэши.", "Убедитесь, что флаги памяти JVM/рантайма настроены корректно.", ] if label in ("timeout",): checks += [ "Замерьте задержки между сервисом и зависимостями.", "Проверьте настройки ретраев/бэк-оффов и circuit breaker.", "Поиск потенциально медленных запросов либо перегруженных зависимостей.", ] if label in ("auth_failure",): checks += [ "Проверьте валидность токенов/учётных данных и нужные scope.", "Сверьте время между сервисами (clock skew).", "Проверьте состояние провайдера аутентификации и его квоты.", ] if label in ("db_connection",): checks += [ "Сопоставьте размер пула БД с текущей нагрузкой.", "Проверьте базу на блокировки или медленные запросы.", "Убедитесь в корректности host/port/DNS для подключения.", ] if label in ("dns_resolution",): checks += [ "Попробуйте вручную резолвить хост из пода/хоста.", "Проверьте здоровье DNS-серверов и свежие изменения записей.", "Посмотрите search domains и /etc/resolv.conf внутри контейнера.", ] if label in ("tls_handshake",): checks += [ "Проверьте сертификаты (срок, SAN, цепочка).", "Сравните поддерживаемые протоколы/шифры клиента и сервера.", "Проверьте настройки ALPN/SNI.", ] if label in ("crashloop",): checks += [ "Проверьте startup/health‑пробы и переопределения команд.", "Посмотрите последние логи перед рестартом для поиска первопричины.", "Убедитесь, что конфиги/секреты смонтированы и права корректны.", ] if retrieved: checks.append(f"Изучите ранбук: {retrieved[0]['title']} (сходство {retrieved[0]['score']:.2f}).") # Ensure at least 5 checks while len(checks) < 5: checks.append("Добавьте шаг диагностики: снимите дополнительные логи и метрики.") return cause, checks[:10] def verify_hypotheses(self, premise: str, hypotheses: List[str]) -> List[Dict]: """ Прогоняет набор гипотез через NLI, чтобы отметить подтверждение/опровержение. """ results = [] for hyp in hypotheses: raw = self.models.nli({"text": premise, "text_pair": hyp}) pred = raw[0] if isinstance(raw, list) else raw results.append({"hypothesis": hyp, "label": pred["label"], "score": float(pred["score"])}) return results def process( self, raw_text: str, source: str = "auto", use_retrieval: bool = True, use_nli: bool = False, verbosity: int = 1, ) -> IncidentResult: """ Полный пайплайн обработки логов: от предобработки до верификации. """ if not raw_text or not raw_text.strip(): raise ValueError("Поле логов пустое. Пожалуйста, вставьте текст логов или стектрейса.") pre = preprocess_logs(raw_text) cls = self.classify(pre.cleaned_text, source) explanation = self.explain(pre.cleaned_text, verbosity=verbosity) retrieved = self.retriever.search(pre.cleaned_text, top_k=3) if use_retrieval else [] cause, checks = self.generate_cause_and_checks(pre, cls["label"], retrieved) verification = [] if use_nli: hypotheses = [cause] + [f"Совпадение с ранбуком: {r['title']}" for r in retrieved] verification = self.verify_hypotheses(pre.cleaned_text, hypotheses) return IncidentResult( incident_label=cls["label"], incident_score=cls["score"], incident_alternatives=cls["alternatives"], explanation=explanation, likely_cause=cause, checks=checks, retrieved=retrieved, verification=verification, signatures=pre.signatures, ) def serialize_result(result: IncidentResult) -> str: """ Упаковывает результат в JSON-строку. """ return json.dumps(asdict(result), indent=2, ensure_ascii=False)