Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import re | |
| import joblib | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| WINDOW = 10 | |
| def parse_log_file(log_file): | |
| records = [] | |
| pattern = re.compile(r"(\d+)ns\s+\[(\w+)\]\s+(\w+)\s+(.*)") | |
| with open(log_file) as f: | |
| for line in f: | |
| m = pattern.match(line.strip()) | |
| if m: | |
| records.append({ | |
| "time": int(m.group(1)), | |
| "severity": m.group(2), | |
| "module": m.group(3), | |
| "message": m.group(4) | |
| }) | |
| return pd.DataFrame(records) | |
| def severity_flags(df): | |
| df["error_flag"] = (df["severity"] == "ERROR").astype(int) | |
| df["critical_flag"] = (df["severity"] == "CRITICAL").astype(int) | |
| df["warning_flag"] = (df["severity"] == "WARNING").astype(int) | |
| return df | |
| def temporal_features(df): | |
| df = df.sort_values("time") | |
| df["time_since_last_event"] = df["time"].diff().fillna(0) | |
| last_error = df["time"].where(df["severity"] == "ERROR") | |
| last_critical = df["time"].where(df["severity"] == "CRITICAL") | |
| df["time_since_last_error"] = df["time"] - last_error.ffill() | |
| df["time_since_last_critical"] = df["time"] - last_critical.ffill() | |
| df["time_since_last_error"] = df["time_since_last_error"].fillna(0) | |
| df["time_since_last_critical"] = df["time_since_last_critical"].fillna(0) | |
| # transform to reduce dominance | |
| df["log_time_since_last_error"] = np.log1p(df["time_since_last_error"]) | |
| df["log_time_since_last_critical"] = np.log1p(df["time_since_last_critical"]) | |
| return df | |
| def rolling_features(df): | |
| df["error_count_last_10"] = df["error_flag"].rolling(WINDOW).sum().shift(1).fillna(0) | |
| df["critical_count_last_10"] = df["critical_flag"].rolling(WINDOW).sum().shift(1).fillna(0) | |
| df["warning_count_last_10"] = df["warning_flag"].rolling(WINDOW).sum().shift(1).fillna(0) | |
| df["failure_rate_recent_window"] = ( | |
| df["error_count_last_10"] + df["critical_count_last_10"] | |
| ) / WINDOW | |
| # trend features | |
| df["rolling_error_rate_20"] = df["error_flag"].rolling(20).mean().shift(1) | |
| df["rolling_warning_rate_20"] = df["warning_flag"].rolling(20).mean().shift(1) | |
| df["error_acceleration"] = df["error_flag"].diff().rolling(10).sum() | |
| return df | |
| def module_features(df): | |
| stats = df.groupby("module").agg( | |
| total_logs=("severity", "count"), | |
| error_logs=("error_flag", "sum"), | |
| critical_logs=("critical_flag", "sum") | |
| ) | |
| stats["historical_error_rate"] = stats["error_logs"] / stats["total_logs"] | |
| stats["historical_critical_ratio"] = stats["critical_logs"] / stats["total_logs"] | |
| stats["module_failure_density"] = ( | |
| stats["error_logs"] + stats["critical_logs"] | |
| ) / stats["total_logs"] | |
| df = df.merge(stats, on="module", how="left") | |
| return df | |
| def text_features(df): | |
| df["clean_message"] = df["message"].str.lower() | |
| df["message_length"] = df["clean_message"].str.len() | |
| keywords = ["timeout", "overflow", "stall", "violation"] | |
| for k in keywords: | |
| df[f"kw_{k}"] = df["clean_message"].str.contains(k).astype(int) | |
| vectorizer = joblib.load("models/tfidf_vectorizer.pkl") | |
| X = vectorizer.transform(df["clean_message"]) | |
| tfidf = pd.DataFrame( | |
| X.toarray(), | |
| columns=[f"tfidf_{i}" for i in range(X.shape[1])] | |
| ) | |
| df = pd.concat([df.reset_index(drop=True), tfidf], axis=1) | |
| joblib.dump(vectorizer, "tfidf_vectorizer.pkl") | |
| return df | |
| def run_pipeline(input_file, output_file): | |
| df = parse_log_file(input_file) | |
| df = severity_flags(df) | |
| df = temporal_features(df) | |
| df = rolling_features(df) | |
| df = module_features(df) | |
| df = text_features(df) | |
| df.to_csv(output_file, index=False) | |
| print("Feature extraction complete") | |
| if __name__ == "__main__": | |
| run_pipeline("C:/Codes/SanDisk/rtl_logs_with_severity.txt", "data/features.csv") |