rtl-reliability-engine / features /log_feature_extraction.py
abhinavvvvv's picture
fixed embed dim errors
fb121b9
import pandas as pd
import numpy as np
import re
import joblib
from sklearn.feature_extraction.text import TfidfVectorizer
WINDOW = 10
def parse_log_file(log_file):
records = []
pattern = re.compile(r"(\d+)ns\s+\[(\w+)\]\s+(\w+)\s+(.*)")
with open(log_file) as f:
for line in f:
m = pattern.match(line.strip())
if m:
records.append({
"time": int(m.group(1)),
"severity": m.group(2),
"module": m.group(3),
"message": m.group(4)
})
return pd.DataFrame(records)
def severity_flags(df):
df["error_flag"] = (df["severity"] == "ERROR").astype(int)
df["critical_flag"] = (df["severity"] == "CRITICAL").astype(int)
df["warning_flag"] = (df["severity"] == "WARNING").astype(int)
return df
def temporal_features(df):
df = df.sort_values("time")
df["time_since_last_event"] = df["time"].diff().fillna(0)
last_error = df["time"].where(df["severity"] == "ERROR")
last_critical = df["time"].where(df["severity"] == "CRITICAL")
df["time_since_last_error"] = df["time"] - last_error.ffill()
df["time_since_last_critical"] = df["time"] - last_critical.ffill()
df["time_since_last_error"] = df["time_since_last_error"].fillna(0)
df["time_since_last_critical"] = df["time_since_last_critical"].fillna(0)
# transform to reduce dominance
df["log_time_since_last_error"] = np.log1p(df["time_since_last_error"])
df["log_time_since_last_critical"] = np.log1p(df["time_since_last_critical"])
return df
def rolling_features(df):
df["error_count_last_10"] = df["error_flag"].rolling(WINDOW).sum().shift(1).fillna(0)
df["critical_count_last_10"] = df["critical_flag"].rolling(WINDOW).sum().shift(1).fillna(0)
df["warning_count_last_10"] = df["warning_flag"].rolling(WINDOW).sum().shift(1).fillna(0)
df["failure_rate_recent_window"] = (
df["error_count_last_10"] + df["critical_count_last_10"]
) / WINDOW
# trend features
df["rolling_error_rate_20"] = df["error_flag"].rolling(20).mean().shift(1)
df["rolling_warning_rate_20"] = df["warning_flag"].rolling(20).mean().shift(1)
df["error_acceleration"] = df["error_flag"].diff().rolling(10).sum()
return df
def module_features(df):
stats = df.groupby("module").agg(
total_logs=("severity", "count"),
error_logs=("error_flag", "sum"),
critical_logs=("critical_flag", "sum")
)
stats["historical_error_rate"] = stats["error_logs"] / stats["total_logs"]
stats["historical_critical_ratio"] = stats["critical_logs"] / stats["total_logs"]
stats["module_failure_density"] = (
stats["error_logs"] + stats["critical_logs"]
) / stats["total_logs"]
df = df.merge(stats, on="module", how="left")
return df
def text_features(df):
df["clean_message"] = df["message"].str.lower()
df["message_length"] = df["clean_message"].str.len()
keywords = ["timeout", "overflow", "stall", "violation"]
for k in keywords:
df[f"kw_{k}"] = df["clean_message"].str.contains(k).astype(int)
vectorizer = joblib.load("models/tfidf_vectorizer.pkl")
X = vectorizer.transform(df["clean_message"])
tfidf = pd.DataFrame(
X.toarray(),
columns=[f"tfidf_{i}" for i in range(X.shape[1])]
)
df = pd.concat([df.reset_index(drop=True), tfidf], axis=1)
joblib.dump(vectorizer, "tfidf_vectorizer.pkl")
return df
def run_pipeline(input_file, output_file):
df = parse_log_file(input_file)
df = severity_flags(df)
df = temporal_features(df)
df = rolling_features(df)
df = module_features(df)
df = text_features(df)
df.to_csv(output_file, index=False)
print("Feature extraction complete")
if __name__ == "__main__":
run_pipeline("C:/Codes/SanDisk/rtl_logs_with_severity.txt", "data/features.csv")