MukeshKapoor25 commited on
Commit
07237a3
·
1 Parent(s): fbf5fc4

Refactor imports and improve code formatting in logbert_rca_pipeline_api.py; update requirements.txt to include botocore

Browse files
Files changed (2) hide show
  1. logbert_rca_pipeline_api.py +72 -56
  2. requirements.txt +2 -1
logbert_rca_pipeline_api.py CHANGED
@@ -1,33 +1,30 @@
1
  import os
2
  import sys
3
- import re
4
- import ast
5
- import json
6
- import time
7
- import torch
8
- import pandas as pd
9
- import numpy as np
10
- from tqdm import tqdm
11
- from collections import defaultdict
12
- from transformers import AutoTokenizer, AutoModelForCausalLM
13
- from torch.utils.data import DataLoader
14
-
15
 
16
- sys.path.append('../')
17
- sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
18
-
19
- from logparser import Drain
20
- from bert_pytorch.dataset import LogDataset, WordVocab
21
- from bert_pytorch.model.bert import BERT
22
  from bert_pytorch.model.log_model import BERTLog
23
-
 
 
 
 
 
 
 
 
 
 
 
 
24
  # === Constants ===
25
  TOP_EVENTS = 5
26
  MAX_RCA_TOKENS = 200
27
- MISTRAL_MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
28
- # HF_CACHE = "/content/drive/MyDrive/hf_cache"
29
 
30
  # === Log Parsing ===
 
 
31
  def parse_log_with_drain(log_file, input_dir, output_dir):
32
  regex = [
33
  r"appattempt_\d+_\d+_\d+",
@@ -39,9 +36,11 @@ def parse_log_with_drain(log_file, input_dir, output_dir):
39
  r"[a-f0-9]{8,}"
40
  ]
41
  log_format = r'\[<AppId>] <Date> <Time> <Level> \[<Process>] <Component>: <Content>'
42
- parser = Drain.LogParser(log_format, indir=input_dir, outdir=output_dir, depth=5, st=0.5, rex=regex, keep_para=True)
 
43
  parser.parse(log_file)
44
 
 
45
  def hadoop_sampling(structured_log_path, sequence_output_path):
46
  df = pd.read_csv(structured_log_path)
47
  data_dict = defaultdict(list)
@@ -50,39 +49,51 @@ def hadoop_sampling(structured_log_path, sequence_output_path):
50
  event_id = row.get("EventId")
51
  if pd.notnull(app_id) and pd.notnull(event_id):
52
  data_dict[app_id].append(str(event_id))
53
- pd.DataFrame(list(data_dict.items()), columns=['AppId', 'EventSequence']).to_csv(sequence_output_path, index=False)
 
54
 
55
  # === Utility Functions ===
 
 
56
  def load_parameters(param_path):
57
  options = {}
58
  with open(param_path, 'r') as f:
59
  for line in f:
60
- if ':' not in line: continue
 
61
  key, val = line.strip().split(':', 1)
62
  key, val = key.strip(), val.strip()
63
  if val.lower() in ['true', 'false', 'none']:
64
  val = eval(val.capitalize())
65
  else:
66
- try: val = int(val)
 
67
  except ValueError:
68
- try: val = float(val)
69
- except ValueError: pass
 
 
70
  options[key] = val
71
  return options
72
 
 
73
  def load_logbert_model(options, vocab):
74
  try:
75
  return torch.load(options["model_path"], map_location=options["device"])
76
  except:
77
- bert = BERT(len(vocab), options["hidden"], options["layers"], options["attn_heads"], options["max_len"])
 
78
  model = BERTLog(bert, vocab_size=len(vocab)).to(options["device"])
79
- model.load_state_dict(torch.load(options["model_path"], map_location=options["device"]))
 
80
  return model
81
 
 
82
  def load_center(path, device):
83
  center = torch.load(path, map_location=device)
84
  return center["center"] if isinstance(center, dict) else center
85
 
 
86
  def extract_sequences(path, min_len):
87
  df = pd.read_csv(path)
88
  data, app_ids = [], []
@@ -96,20 +107,25 @@ def extract_sequences(path, min_len):
96
  continue
97
  return data, app_ids
98
 
 
99
  def prepare_dataloader(sequences, vocab, options):
100
  dummy_times = [[0] * len(seq) for seq in sequences]
101
- dataset = LogDataset(sequences, dummy_times, vocab, seq_len=options["seq_len"], on_memory=True, mask_ratio=options["mask_ratio"])
 
102
  return DataLoader(dataset, batch_size=1, shuffle=False, collate_fn=dataset.collate_fn)
103
 
 
104
  def calculate_mean_std(loader, model, center, device):
105
  scores = []
106
  with torch.no_grad():
107
  for batch in tqdm(loader, desc="📏 Computing train distances..."):
108
  batch = {k: v.to(device) for k, v in batch.items()}
109
- cls_output = model(batch["bert_input"], batch["time_input"])["cls_output"]
 
110
  scores.append(torch.norm(cls_output - center, dim=1).item())
111
  return np.mean(scores), np.std(scores)
112
 
 
113
  def generate_prompt(event_templates):
114
  prompt = "The system encountered a failure. Below are the key log events preceding the anomaly:\n\n"
115
  for i, event in enumerate(event_templates, 1):
@@ -118,16 +134,8 @@ def generate_prompt(event_templates):
118
  prompt += "Explain the cause in one or two sentences, using technical reasoning if possible.\n"
119
  return prompt
120
 
121
- def call_mistral(prompt, tokenizer, model, device):
122
- inputs = tokenizer(prompt, return_tensors="pt").to(device)
123
- outputs = model.generate(
124
- **inputs,
125
- max_length=inputs['input_ids'].shape[1] + MAX_RCA_TOKENS,
126
- do_sample=False,
127
- top_k=50,
128
- pad_token_id=tokenizer.eos_token_id
129
- )
130
- return tokenizer.decode(outputs[0], skip_special_tokens=True)[len(prompt):].strip()
131
 
132
  def compute_logkey_anomaly(masked_output, masked_label, top_k=5):
133
  num_undetected = 0
@@ -137,12 +145,16 @@ def compute_logkey_anomaly(masked_output, masked_label, top_k=5):
137
  return num_undetected, len(masked_label)
138
 
139
  # === API-Compatible RCA Pipeline ===
 
 
140
  def detect_anomalies_and_explain(input_log_path):
141
  log_file = os.path.basename(input_log_path)
142
  input_dir = os.path.dirname(input_log_path)
143
- output_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "model", "bert"))
 
144
 
145
- log_structured_file = os.path.join(output_dir, log_file + "_structured.csv")
 
146
  log_templates_file = os.path.join(output_dir, log_file + "_templates.csv")
147
  log_sequence_file = os.path.join(output_dir, "rca_abnormal_sequence.csv")
148
  PARAMS_FILE = os.path.join(output_dir, "bert", "parameters.txt")
@@ -155,26 +167,28 @@ def detect_anomalies_and_explain(input_log_path):
155
 
156
  # Step 2: Load Models and Parameters
157
  options = load_parameters(PARAMS_FILE)
158
- options["device"] = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
159
 
160
- # tokenizer = AutoTokenizer.from_pretrained(MISTRAL_MODEL)
161
- # model_mistral = AutoModelForCausalLM.from_pretrained(MISTRAL_MODEL, torch_dtype=torch.float32).to(options["device"])
162
- # model_mistral.eval()
163
 
164
  vocab = WordVocab.load_vocab(options["vocab_path"])
165
  model = load_logbert_model(options, vocab).to(options["device"]).eval()
166
  center = load_center(CENTER_PATH, options["device"])
167
 
168
  # Step 3: Prepare Data
169
- test_sequences, app_ids = extract_sequences(log_sequence_file, options["min_len"])
 
170
  test_loader = prepare_dataloader(test_sequences, vocab, options)
171
 
172
- train_sequences = [line.strip().split() for line in open(TRAIN_FILE) if len(line.strip().split()) >= options["min_len"]]
 
173
  train_loader = prepare_dataloader(train_sequences, vocab, options)
174
- mean, std = calculate_mean_std(train_loader, model, center, options["device"])
 
175
 
176
  templates_df = pd.read_csv(log_templates_file)
177
- event_template_dict = dict(zip(templates_df["EventId"], templates_df["EventTemplate"]))
 
178
 
179
  # Step 4: Analyze & Explain Anomalies
180
  results = []
@@ -185,7 +199,8 @@ def detect_anomalies_and_explain(input_log_path):
185
  score = torch.norm(cls_output - center, dim=1).item()
186
  z_score = (score - mean) / std
187
 
188
- num_undetected, masked_total = compute_logkey_anomaly(output["logkey_output"][0], batch["bert_label"][0])
 
189
  undetected_ratio = num_undetected / masked_total if masked_total else 0
190
 
191
  status = "Abnormal" if z_score > 2 or undetected_ratio > 0.5 else "Normal"
@@ -193,16 +208,17 @@ def detect_anomalies_and_explain(input_log_path):
193
  continue
194
 
195
  top_eids = test_sequences[i][:TOP_EVENTS]
196
- event_templates = [event_template_dict.get(eid, f"[Missing Event {eid}]") for eid in top_eids]
197
- #prompt = ''#generate_prompt(event_templates)
198
- #explanation = ''#call_mistral(prompt, tokenizer, model_mistral, options["device"])
 
199
 
200
  results.append({
201
  "AppId": app_ids[i],
202
  "Score": score,
203
  "z_score": z_score,
204
  "UndetectedRatio": undetected_ratio,
205
- "status":status,
206
  "Events": event_templates,
207
  "Explanation": None
208
  })
 
1
  import os
2
  import sys
3
+ # Ensure local logbert_processor and logparser are first in sys.path for all imports
4
+ sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
5
+ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'logparser')))
 
 
 
 
 
 
 
 
 
6
 
 
 
 
 
 
 
7
  from bert_pytorch.model.log_model import BERTLog
8
+ from bert_pytorch.model.bert import BERT
9
+ from bert_pytorch.dataset import LogDataset, WordVocab
10
+ import Drain
11
+ from torch.utils.data import DataLoader
12
+ from collections import defaultdict
13
+ from tqdm import tqdm
14
+ import numpy as np
15
+ import pandas as pd
16
+ import torch
17
+ import time
18
+ import json
19
+ import ast
20
+ import re
21
  # === Constants ===
22
  TOP_EVENTS = 5
23
  MAX_RCA_TOKENS = 200
 
 
24
 
25
  # === Log Parsing ===
26
+
27
+
28
  def parse_log_with_drain(log_file, input_dir, output_dir):
29
  regex = [
30
  r"appattempt_\d+_\d+_\d+",
 
36
  r"[a-f0-9]{8,}"
37
  ]
38
  log_format = r'\[<AppId>] <Date> <Time> <Level> \[<Process>] <Component>: <Content>'
39
+ parser = Drain.LogParser(log_format, indir=input_dir,
40
+ outdir=output_dir, depth=5, st=0.5, rex=regex, keep_para=True)
41
  parser.parse(log_file)
42
 
43
+
44
  def hadoop_sampling(structured_log_path, sequence_output_path):
45
  df = pd.read_csv(structured_log_path)
46
  data_dict = defaultdict(list)
 
49
  event_id = row.get("EventId")
50
  if pd.notnull(app_id) and pd.notnull(event_id):
51
  data_dict[app_id].append(str(event_id))
52
+ pd.DataFrame(list(data_dict.items()), columns=['AppId', 'EventSequence']).to_csv(
53
+ sequence_output_path, index=False)
54
 
55
  # === Utility Functions ===
56
+
57
+
58
  def load_parameters(param_path):
59
  options = {}
60
  with open(param_path, 'r') as f:
61
  for line in f:
62
+ if ':' not in line:
63
+ continue
64
  key, val = line.strip().split(':', 1)
65
  key, val = key.strip(), val.strip()
66
  if val.lower() in ['true', 'false', 'none']:
67
  val = eval(val.capitalize())
68
  else:
69
+ try:
70
+ val = int(val)
71
  except ValueError:
72
+ try:
73
+ val = float(val)
74
+ except ValueError:
75
+ pass
76
  options[key] = val
77
  return options
78
 
79
+
80
  def load_logbert_model(options, vocab):
81
  try:
82
  return torch.load(options["model_path"], map_location=options["device"])
83
  except:
84
+ bert = BERT(len(vocab), options["hidden"], options["layers"],
85
+ options["attn_heads"], options["max_len"])
86
  model = BERTLog(bert, vocab_size=len(vocab)).to(options["device"])
87
+ model.load_state_dict(torch.load(
88
+ options["model_path"], map_location=options["device"]))
89
  return model
90
 
91
+
92
  def load_center(path, device):
93
  center = torch.load(path, map_location=device)
94
  return center["center"] if isinstance(center, dict) else center
95
 
96
+
97
  def extract_sequences(path, min_len):
98
  df = pd.read_csv(path)
99
  data, app_ids = [], []
 
107
  continue
108
  return data, app_ids
109
 
110
+
111
  def prepare_dataloader(sequences, vocab, options):
112
  dummy_times = [[0] * len(seq) for seq in sequences]
113
+ dataset = LogDataset(sequences, dummy_times, vocab,
114
+ seq_len=options["seq_len"], on_memory=True, mask_ratio=options["mask_ratio"])
115
  return DataLoader(dataset, batch_size=1, shuffle=False, collate_fn=dataset.collate_fn)
116
 
117
+
118
  def calculate_mean_std(loader, model, center, device):
119
  scores = []
120
  with torch.no_grad():
121
  for batch in tqdm(loader, desc="📏 Computing train distances..."):
122
  batch = {k: v.to(device) for k, v in batch.items()}
123
+ cls_output = model(batch["bert_input"], batch["time_input"])[
124
+ "cls_output"]
125
  scores.append(torch.norm(cls_output - center, dim=1).item())
126
  return np.mean(scores), np.std(scores)
127
 
128
+
129
  def generate_prompt(event_templates):
130
  prompt = "The system encountered a failure. Below are the key log events preceding the anomaly:\n\n"
131
  for i, event in enumerate(event_templates, 1):
 
134
  prompt += "Explain the cause in one or two sentences, using technical reasoning if possible.\n"
135
  return prompt
136
 
137
+
138
+
 
 
 
 
 
 
 
 
139
 
140
  def compute_logkey_anomaly(masked_output, masked_label, top_k=5):
141
  num_undetected = 0
 
145
  return num_undetected, len(masked_label)
146
 
147
  # === API-Compatible RCA Pipeline ===
148
+
149
+
150
  def detect_anomalies_and_explain(input_log_path):
151
  log_file = os.path.basename(input_log_path)
152
  input_dir = os.path.dirname(input_log_path)
153
+ output_dir = os.path.abspath(os.path.join(
154
+ os.path.dirname(__file__), "model", "bert"))
155
 
156
+ log_structured_file = os.path.join(
157
+ output_dir, log_file + "_structured.csv")
158
  log_templates_file = os.path.join(output_dir, log_file + "_templates.csv")
159
  log_sequence_file = os.path.join(output_dir, "rca_abnormal_sequence.csv")
160
  PARAMS_FILE = os.path.join(output_dir, "bert", "parameters.txt")
 
167
 
168
  # Step 2: Load Models and Parameters
169
  options = load_parameters(PARAMS_FILE)
170
+ options["device"] = torch.device(
171
+ "cuda" if torch.cuda.is_available() else "cpu")
172
 
 
 
 
173
 
174
  vocab = WordVocab.load_vocab(options["vocab_path"])
175
  model = load_logbert_model(options, vocab).to(options["device"]).eval()
176
  center = load_center(CENTER_PATH, options["device"])
177
 
178
  # Step 3: Prepare Data
179
+ test_sequences, app_ids = extract_sequences(
180
+ log_sequence_file, options["min_len"])
181
  test_loader = prepare_dataloader(test_sequences, vocab, options)
182
 
183
+ train_sequences = [line.strip().split() for line in open(
184
+ TRAIN_FILE) if len(line.strip().split()) >= options["min_len"]]
185
  train_loader = prepare_dataloader(train_sequences, vocab, options)
186
+ mean, std = calculate_mean_std(
187
+ train_loader, model, center, options["device"])
188
 
189
  templates_df = pd.read_csv(log_templates_file)
190
+ event_template_dict = dict(
191
+ zip(templates_df["EventId"], templates_df["EventTemplate"]))
192
 
193
  # Step 4: Analyze & Explain Anomalies
194
  results = []
 
199
  score = torch.norm(cls_output - center, dim=1).item()
200
  z_score = (score - mean) / std
201
 
202
+ num_undetected, masked_total = compute_logkey_anomaly(
203
+ output["logkey_output"][0], batch["bert_label"][0])
204
  undetected_ratio = num_undetected / masked_total if masked_total else 0
205
 
206
  status = "Abnormal" if z_score > 2 or undetected_ratio > 0.5 else "Normal"
 
208
  continue
209
 
210
  top_eids = test_sequences[i][:TOP_EVENTS]
211
+ event_templates = [event_template_dict.get(
212
+ eid, f"[Missing Event {eid}]") for eid in top_eids]
213
+
214
+ # Inject results to DB
215
 
216
  results.append({
217
  "AppId": app_ids[i],
218
  "Score": score,
219
  "z_score": z_score,
220
  "UndetectedRatio": undetected_ratio,
221
+ "status": status,
222
  "Events": event_templates,
223
  "Explanation": None
224
  })
requirements.txt CHANGED
@@ -16,4 +16,5 @@ sqlalchemy
16
  asyncpg
17
  logparser
18
  bert_pytorch
19
- seaborn
 
 
16
  asyncpg
17
  logparser
18
  bert_pytorch
19
+ seaborn
20
+ botocore