import pandas as pd import numpy as np import re import joblib from sklearn.feature_extraction.text import TfidfVectorizer WINDOW = 10 def parse_log_file(log_file): records = [] pattern = re.compile(r"(\d+)ns\s+\[(\w+)\]\s+(\w+)\s+(.*)") with open(log_file) as f: for line in f: m = pattern.match(line.strip()) if m: records.append({ "time": int(m.group(1)), "severity": m.group(2), "module": m.group(3), "message": m.group(4) }) return pd.DataFrame(records) def severity_flags(df): df["error_flag"] = (df["severity"] == "ERROR").astype(int) df["critical_flag"] = (df["severity"] == "CRITICAL").astype(int) df["warning_flag"] = (df["severity"] == "WARNING").astype(int) return df def temporal_features(df): df = df.sort_values("time") df["time_since_last_event"] = df["time"].diff().fillna(0) last_error = df["time"].where(df["severity"] == "ERROR") last_critical = df["time"].where(df["severity"] == "CRITICAL") df["time_since_last_error"] = df["time"] - last_error.ffill() df["time_since_last_critical"] = df["time"] - last_critical.ffill() df["time_since_last_error"] = df["time_since_last_error"].fillna(0) df["time_since_last_critical"] = df["time_since_last_critical"].fillna(0) # transform to reduce dominance df["log_time_since_last_error"] = np.log1p(df["time_since_last_error"]) df["log_time_since_last_critical"] = np.log1p(df["time_since_last_critical"]) return df def rolling_features(df): df["error_count_last_10"] = df["error_flag"].rolling(WINDOW).sum().shift(1).fillna(0) df["critical_count_last_10"] = df["critical_flag"].rolling(WINDOW).sum().shift(1).fillna(0) df["warning_count_last_10"] = df["warning_flag"].rolling(WINDOW).sum().shift(1).fillna(0) df["failure_rate_recent_window"] = ( df["error_count_last_10"] + df["critical_count_last_10"] ) / WINDOW # trend features df["rolling_error_rate_20"] = df["error_flag"].rolling(20).mean().shift(1) df["rolling_warning_rate_20"] = df["warning_flag"].rolling(20).mean().shift(1) df["error_acceleration"] = df["error_flag"].diff().rolling(10).sum() return df def module_features(df): stats = df.groupby("module").agg( total_logs=("severity", "count"), error_logs=("error_flag", "sum"), critical_logs=("critical_flag", "sum") ) stats["historical_error_rate"] = stats["error_logs"] / stats["total_logs"] stats["historical_critical_ratio"] = stats["critical_logs"] / stats["total_logs"] stats["module_failure_density"] = ( stats["error_logs"] + stats["critical_logs"] ) / stats["total_logs"] df = df.merge(stats, on="module", how="left") return df def text_features(df): df["clean_message"] = df["message"].str.lower() df["message_length"] = df["clean_message"].str.len() keywords = ["timeout", "overflow", "stall", "violation"] for k in keywords: df[f"kw_{k}"] = df["clean_message"].str.contains(k).astype(int) vectorizer = joblib.load("models/tfidf_vectorizer.pkl") X = vectorizer.transform(df["clean_message"]) tfidf = pd.DataFrame( X.toarray(), columns=[f"tfidf_{i}" for i in range(X.shape[1])] ) df = pd.concat([df.reset_index(drop=True), tfidf], axis=1) joblib.dump(vectorizer, "tfidf_vectorizer.pkl") return df def run_pipeline(input_file, output_file): df = parse_log_file(input_file) df = severity_flags(df) df = temporal_features(df) df = rolling_features(df) df = module_features(df) df = text_features(df) df.to_csv(output_file, index=False) print("Feature extraction complete") if __name__ == "__main__": run_pipeline("C:/Codes/SanDisk/rtl_logs_with_severity.txt", "data/features.csv")