Spaces:
Sleeping
Sleeping
| # app.py | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
| from sklearn.ensemble import RandomForestClassifier | |
| import joblib | |
| import os | |
| # Load Hugging Face model for anomaly detection | |
| tokenizer = AutoTokenizer.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
| model = AutoModelForSequenceClassification.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
| anomaly_detection = pipeline("text-classification", model=model, tokenizer=tokenizer) | |
| # Train or load Random Forest model for failure prediction | |
| if not os.path.exists('failure_prediction_model.pkl'): | |
| data = pd.DataFrame({ | |
| 'cpu_usage': [10, 20, 15, 35, 55], | |
| 'memory_usage': [30, 60, 45, 50, 80], | |
| 'error_rate': [0, 1, 0, 2, 5], | |
| 'failure': [0, 1, 0, 1, 1] | |
| }) | |
| X = data[['cpu_usage', 'memory_usage', 'error_rate']] | |
| y = data['failure'] | |
| failure_prediction_model = RandomForestClassifier(n_estimators=100, random_state=42) | |
| failure_prediction_model.fit(X, y) | |
| joblib.dump(failure_prediction_model, 'failure_prediction_model.pkl') | |
| else: | |
| failure_prediction_model = joblib.load('failure_prediction_model.pkl') | |
| # Preprocess logs for anomaly detection | |
| def preprocess_logs(logs): | |
| logs['timestamp'] = pd.to_datetime(logs['timestamp']) | |
| logs['log_message'] = logs['log_message'].str.lower() | |
| return logs | |
| # Detect anomalies in logs with label mapping | |
| def detect_anomaly(logs): | |
| preprocessed_logs = preprocess_logs(logs) | |
| label_map = { # Map Hugging Face output labels to meaningful labels | |
| "LABEL_0": "Normal", | |
| "LABEL_1": "Anomaly" | |
| } | |
| results = [] | |
| for log in preprocessed_logs['log_message']: | |
| anomaly_result = anomaly_detection(log) | |
| label = anomaly_result[0]['label'] | |
| results.append(label_map.get(label, label)) # Map the label or return the original label | |
| return results | |
| # Predict failures based on device metrics | |
| def predict_failure(device_metrics): | |
| if device_metrics is None: | |
| return "Device metrics are missing." | |
| if 'cpu_usage' not in device_metrics or 'memory_usage' not in device_metrics or 'error_rate' not in device_metrics: | |
| return "Invalid metrics format. Please provide 'cpu_usage', 'memory_usage', and 'error_rate'." | |
| metrics_array = np.array([device_metrics['cpu_usage'], device_metrics['memory_usage'], device_metrics['error_rate']]).reshape(1, -1) | |
| failure_prediction = failure_prediction_model.predict(metrics_array) | |
| return failure_prediction | |
| # Process logs and predict anomalies and failures | |
| def process_logs_and_predict(log_file, metrics): | |
| # Read and validate log file format | |
| try: | |
| logs = pd.read_json(log_file) | |
| if not isinstance(logs, pd.DataFrame) or logs.empty: | |
| return "Invalid log file format. Please upload a JSON array of log entries." | |
| except ValueError as e: | |
| return f"Error reading JSON file: {str(e)}" | |
| # Detect anomalies | |
| anomalies = detect_anomaly(logs) | |
| # Predict failures using device metrics | |
| failure_pred = predict_failure(metrics) | |
| return f"Anomalies Detected: {anomalies}, Failure Prediction: {failure_pred}" | |
| # Gradio interface | |
| iface = gr.Interface(fn=process_logs_and_predict, | |
| inputs=["file", "json"], | |
| outputs="text", | |
| title="Cisco Device Monitoring", | |
| description="Upload log files to detect anomalies and predict potential device failures.") | |
| iface.launch() | |