LEGIONM36's picture
Upload 4 files
d31a75f verified
raw
history blame
8.81 kB
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import torchvision.models.video as models
import time
from model import FeatureFusionNetwork
# --- Configuration ---
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATASET_DIR = os.path.join(BASE_DIR, "Dataset")
MODEL_SAVE_PATH = "best_model_fusion.pth"
IMG_SIZE = 112
SEQ_LEN = 16
BATCH_SIZE = 16
EPOCHS = 80
LEARNING_RATE = 1e-4
PATIENCE = 5
# --- Dataset ---
class StandardDataset(Dataset):
def __init__(self, video_paths, labels):
self.video_paths = video_paths
self.labels = labels
def __len__(self):
return len(self.video_paths)
def __getitem__(self, idx):
path = self.video_paths[idx]
label = self.labels[idx]
cap = cv2.VideoCapture(path)
frames = []
try:
while True:
ret, frame = cap.read()
if not ret: break
frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
frames.append(frame)
finally:
cap.release()
if len(frames) == 0:
frames = np.zeros((SEQ_LEN, IMG_SIZE, IMG_SIZE, 3), dtype=np.float32)
elif len(frames) < SEQ_LEN:
while len(frames) < SEQ_LEN: frames.append(frames[-1])
elif len(frames) > SEQ_LEN:
indices = np.linspace(0, len(frames)-1, SEQ_LEN, dtype=int)
frames = [frames[i] for i in indices]
frames = np.array(frames, dtype=np.float32) / 255.0
# (T, H, W, C) -> (C, T, H, W)
frames = torch.tensor(frames).permute(3, 0, 1, 2)
return frames, label
# --- Data Preparation ---
def prepare_data():
violence_dir = os.path.join(DATASET_DIR, 'violence')
no_violence_dir = os.path.join(DATASET_DIR, 'no-violence')
if not os.path.exists(violence_dir) or not os.path.exists(no_violence_dir):
raise FileNotFoundError("Dataset directories not found.")
violence_files = [os.path.join(violence_dir, f) for f in os.listdir(violence_dir) if f.endswith('.avi') or f.endswith('.mp4')]
no_violence_files = [os.path.join(no_violence_dir, f) for f in os.listdir(no_violence_dir) if f.endswith('.avi') or f.endswith('.mp4')]
X = violence_files + no_violence_files
y = [1] * len(violence_files) + [0] * len(no_violence_files)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.30, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42, stratify=y_temp)
return (X_train, y_train), (X_val, y_val), (X_test, y_test)
# --- Early Stopping ---
class EarlyStopping:
def __init__(self, patience=5, verbose=False, path='checkpoint.pth'):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.inf
self.path = path
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score:
self.counter += 1
if self.verbose:
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model, self.path) # FULL MODEL SAVE
self.val_loss_min = val_loss
if __name__ == "__main__":
start_time = time.time()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
try:
(X_train, y_train), (X_val, y_val), (X_test, y_test) = prepare_data()
print(f"Dataset Split Stats:")
print(f"Train: {len(X_train)} samples")
print(f"Val: {len(X_val)} samples")
print(f"Test: {len(X_test)} samples")
except Exception as e:
print(f"Data preparation failed: {e}")
exit(1)
train_dataset = StandardDataset(X_train, y_train)
val_dataset = StandardDataset(X_val, y_val)
test_dataset = StandardDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
model = FeatureFusionNetwork().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2)
early_stopping = EarlyStopping(patience=PATIENCE, verbose=True, path=MODEL_SAVE_PATH)
print("\nStarting Feature Fusion Training...")
for epoch in range(EPOCHS):
model.train()
train_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
if batch_idx % 10 == 0:
print(f"Epoch {epoch+1} Batch {batch_idx}/{len(train_loader)} Loss: {loss.item():.4f}", end='\r')
train_acc = 100 * correct / total
avg_train_loss = train_loss / len(train_loader)
model.eval()
val_loss = 0.0
correct_val = 0
total_val = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_val += labels.size(0)
correct_val += (predicted == labels).sum().item()
val_acc = 100 * correct_val / total_val
avg_val_loss = val_loss / len(val_loader)
print(f'\nEpoch [{epoch+1}/{EPOCHS}] '
f'Train Loss: {avg_train_loss:.4f} Acc: {train_acc:.2f}% '
f'Val Loss: {avg_val_loss:.4f} Acc: {val_acc:.2f}%')
scheduler.step(avg_val_loss)
early_stopping(avg_val_loss, model)
if early_stopping.early_stop:
print("Early stopping triggered")
break
print("\nLoading best Fusion model for evaluation...")
if os.path.exists(MODEL_SAVE_PATH):
model = torch.load(MODEL_SAVE_PATH)
else:
print("Warning: Model file not found.")
model.eval()
all_preds = []
all_labels = []
print("Evaluating on Test set...")
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
print("\n=== Feature Fusion Model Evaluation Report ===")
print(classification_report(all_labels, all_preds, target_names=['No Violence', 'Violence']))
print("Confusion Matrix:")
print(confusion_matrix(all_labels, all_preds))
acc = accuracy_score(all_labels, all_preds)
print(f"\nFinal Test Accuracy: {acc*100:.2f}%")
elapsed = time.time() - start_time
print(f"\nTotal execution time: {elapsed/60:.2f} minutes")