shrimp-pond-anomaly / CNN-Autoencoder.py
ducdatit2002's picture
Upload folder using huggingface_hub
e58cdae verified
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import (
confusion_matrix,
precision_score,
recall_score,
f1_score,
accuracy_score,
classification_report
)
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
# ============================================
# 1. CÀI ĐẶT THAM SỐ CHUNG
# ============================================
DATA_PATH = "dataset.xlsx"
FEATURE_COLUMNS = [
"Temp",
"Turbidity (cm)",
"DO(mg/L)",
"BOD (mg/L)",
"CO2",
"pH`",
"Alkalinity (mg L-1 )",
"Hardness (mg L-1 )",
"Calcium (mg L-1 )",
"Ammonia (mg L-1 )",
"Nitrite (mg L-1 )",
"Phosphorus (mg L-1 )",
"H2S (mg L-1 )",
"Plankton (No. L-1)"
]
LABEL_COL = "Water Quality"
SEQUENCE_LENGTH = 10
TRAIN_RATIO = 0.8
VAL_RATIO = 0.1
TEST_RATIO = 0.1
# CNN-AE hyperparameters
INPUT_DIM = len(FEATURE_COLUMNS) # 14 features
SEQ_LEN = SEQUENCE_LENGTH # 10 time steps
CHANNELS = INPUT_DIM # treat each feature as a channel
AE_LR = 1e-3
AE_EPOCHS = 50
BATCH_SIZE = 64
RANDOM_STATE = 42
THRESHOLD_STD_FACTOR = 2 # threshold = mean + 2*std on validation normal
# ============================================
# 2. ĐỌC VÀ TIỀN XỬ LÝ DỮ LIỆU
# ============================================
df = pd.read_excel(DATA_PATH)
df = df.dropna(how="all")
# Đảm bảo label là int
df[LABEL_COL] = df[LABEL_COL].astype(int)
labels_all = df[LABEL_COL].values # shape = (num_total,)
# Chuyển dấu phẩy sang dấu chấm, convert các cột tính sang float
for col in FEATURE_COLUMNS:
if df[col].dtype == object or df[col].dtype == str:
df[col] = df[col].apply(lambda x: str(x).replace(",", "."))
df[col] = df[col].astype(float)
data_raw = df[FEATURE_COLUMNS].values # shape = (num_total, 14)
# Chuẩn hóa min-max
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data_raw) # shape = (num_total, 14)
# ============================================
# 3. DATASET CHO TIME-SERIES
# ============================================
class CNNTimeSeriesDataset(Dataset):
"""
Trả về x_window: shape (channels, seq_len)
Mỗi channel tương ứng một feature.
"""
def __init__(self, data, seq_len):
self.data = data
self.seq_len = seq_len
self.num_items = data.shape[0] - seq_len
def __len__(self):
return self.num_items
def __getitem__(self, idx):
window = self.data[idx : idx + self.seq_len] # shape = (seq_len, features)
# transpose thành (features, seq_len) để input cho Conv1d
x = window.T # (channels, seq_len)
return torch.tensor(x, dtype=torch.float32)
num_total = data_scaled.shape[0]
num_items = num_total - SEQUENCE_LENGTH
# Tạo mảng y_seq: nhãn tại cuối mỗi window
y_seq = np.zeros(num_items, dtype=int)
for i in range(num_items):
y_seq[i] = labels_all[i + SEQUENCE_LENGTH]
# ============================================
# 4. STRATIFIED SPLIT (TRAIN/VAL/TEST)
# ============================================
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=TEST_RATIO, random_state=RANDOM_STATE)
for train_val_idx, test_idx in sss1.split(np.zeros(num_items), y_seq):
pass
val_size_rel = VAL_RATIO / (TRAIN_RATIO + VAL_RATIO)
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=val_size_rel, random_state=RANDOM_STATE)
for train_idx_rel, val_idx_rel in sss2.split(np.zeros(len(train_val_idx)), y_seq[train_val_idx]):
pass
train_idx = train_val_idx[train_idx_rel]
val_idx = train_val_idx[val_idx_rel]
def count_labels(indices, y):
u, c = np.unique(y[indices], return_counts=True)
return dict(zip(u.tolist(), c.tolist()))
print("Train labels:", count_labels(train_idx, y_seq))
print("Val labels:", count_labels(val_idx, y_seq))
print("Test labels:", count_labels(test_idx, y_seq))
# ============================================
# 5. TẠO DATALOADER CHO AUTOENCODER (CHỈ DÙNG NORMAL)
# ============================================
dataset_all = CNNTimeSeriesDataset(data_scaled, SEQUENCE_LENGTH)
# Chỉ lấy index có nhãn 0 hoặc 1 cho train/val AE
train_normal_idx = [i for i in train_idx if y_seq[i] < 2]
val_normal_idx = [i for i in val_idx if y_seq[i] < 2]
train_ae_dataset = Subset(dataset_all, train_normal_idx)
val_ae_dataset = Subset(dataset_all, val_normal_idx)
test_dataset = Subset(dataset_all, test_idx)
train_ae_loader = DataLoader(train_ae_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
val_ae_loader = DataLoader(val_ae_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)
# ============================================
# 6. XÂY DỰNG LSTM‐CNN AUTOENCODER
# ============================================
class CNNAutoencoder(nn.Module):
def __init__(self, channels, seq_len):
super(CNNAutoencoder, self).__init__()
self.channels = channels
self.seq_len = seq_len
# Encoder: Conv1d layers
self.encoder = nn.Sequential(
nn.Conv1d(in_channels=channels, out_channels=32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
nn.ReLU(),
# Giữ nguyên chiều seq_len nhưng tăng depth
)
# Decoder: ConvTranspose1d layers
self.decoder = nn.Sequential(
nn.ConvTranspose1d(in_channels=128, out_channels=64, kernel_size=3, padding=1),
nn.ReLU(),
nn.ConvTranspose1d(in_channels=64, out_channels=32, kernel_size=3, padding=1),
nn.ReLU(),
nn.ConvTranspose1d(in_channels=32, out_channels=channels, kernel_size=3, padding=1),
nn.Sigmoid() # output trong [0,1] do data đã chuẩn hóa
)
def forward(self, x):
"""
x: (batch, channels, seq_len)
trả về x_recon: (batch, channels, seq_len)
"""
z = self.encoder(x)
x_recon = self.decoder(z)
return x_recon
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ae_model = CNNAutoencoder(channels=CHANNELS, seq_len=SEQ_LEN).to(device)
ae_criterion = nn.MSELoss()
ae_optimizer = torch.optim.Adam(ae_model.parameters(), lr=AE_LR)
# ============================================
# 7. HUẤN LUYỆN CNN‐AE
# ============================================
best_val_loss = float("inf")
best_ae_path = "best_cnn_ae.pth"
for epoch in range(1, AE_EPOCHS + 1):
ae_model.train()
train_loss_sum = 0.0
for x_batch in train_ae_loader:
# x_batch: (batch, channels, seq_len)
x_batch = x_batch.to(device)
ae_optimizer.zero_grad()
x_recon = ae_model(x_batch)
loss = ae_criterion(x_recon, x_batch)
loss.backward()
ae_optimizer.step()
train_loss_sum += loss.item() * x_batch.size(0)
train_loss = train_loss_sum / len(train_ae_loader.dataset)
ae_model.eval()
val_loss_sum = 0.0
with torch.no_grad():
for x_batch in val_ae_loader:
x_batch = x_batch.to(device)
x_recon = ae_model(x_batch)
loss = ae_criterion(x_recon, x_batch)
val_loss_sum += loss.item() * x_batch.size(0)
val_loss = val_loss_sum / len(val_ae_loader.dataset)
print(f"Epoch {epoch:02d} | AE Train Loss: {train_loss:.6f} | AE Val Loss: {val_loss:.6f}")
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(ae_model.state_dict(), best_ae_path)
ae_model.load_state_dict(torch.load(best_ae_path, map_location=device))
# ============================================
# 8. TÍNH RECONSTRUCTION ERROR TRÊN VALIDATION NORMAL
# ============================================
val_norm_errors = []
ae_model.eval()
with torch.no_grad():
for x_batch in val_ae_loader:
x_batch = x_batch.to(device)
x_recon = ae_model(x_batch)
# MSE dọc (channels x seq_len) cho mỗi sample
batch_errors = torch.mean((x_recon - x_batch) ** 2, dim=(1, 2))
val_norm_errors.append(batch_errors.cpu().numpy())
val_norm_errors = np.concatenate(val_norm_errors, axis=0)
mu_val = np.mean(val_norm_errors)
sigma_val = np.std(val_norm_errors)
threshold = mu_val + THRESHOLD_STD_FACTOR * sigma_val
print(f"\nThreshold (mean + {THRESHOLD_STD_FACTOR}*std) từ validation normal: {threshold:.6f}")
# ============================================
# 9. TÍNH RECONSTRUCTION ERROR TRÊN TEST & PHÁT HIỆN BẤT THƯỜNG
# ============================================
test_errors = []
ae_model.eval()
with torch.no_grad():
for x_batch in test_loader:
x_batch = x_batch.to(device)
x_recon = ae_model(x_batch)
batch_errors = torch.mean((x_recon - x_batch) ** 2, dim=(1, 2))
test_errors.append(batch_errors.cpu().numpy())
test_errors = np.concatenate(test_errors, axis=0)
anomalies = test_errors > threshold
num_anomalies = np.sum(anomalies)
print(f"Phát hiện {num_anomalies} samples bất thường trong tập test (trên tổng {len(test_errors)})")
print("Chỉ số sample bất thường (relative to test set):", np.where(anomalies)[0])
# ============================================
# 10. ĐÁNH GIÁ KẾT QUẢ
# ============================================
y_true = []
for idx in test_idx:
y_true.append(1 if labels_all[idx + SEQUENCE_LENGTH] == 2 else 0)
y_true = np.array(y_true, dtype=int)
y_pred = anomalies.astype(int)
cm = confusion_matrix(y_true, y_pred)
tn, fp, fn, tp = cm.ravel()
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
accuracy = accuracy_score(y_true, y_pred)
print("\n=== Confusion Matrix ===")
print(cm)
print(f"TN: {tn}, FP: {fp}")
print(f"FN: {fn}, TP: {tp}\n")
print("=== Metrics for Anomaly Detection ===")
print(f"Accuracy : {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall : {recall:.4f}")
print(f"F1-score : {f1:.4f}\n")
print("=== Classification Report ===")
print(
classification_report(
y_true,
y_pred,
target_names=["Normal (0)", "Anomaly (1)"],
zero_division=0
)
)