bd04 commited on
Commit
7650036
·
1 Parent(s): 54edf93

Add inference source

Browse files
Files changed (3) hide show
  1. Source/inference.py +105 -0
  2. Source/lstm.py +41 -0
  3. Source/preprocessing.py +10 -0
Source/inference.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import argparse
4
+ import numpy as np
5
+ import torch
6
+ import cv2
7
+ from torchvision import models
8
+ from torchvision.models import ResNet50_Weights
9
+ from lstm import MultiLayerBiLSTMClassifier
10
+ from preprocessing import preprocessingData
11
+
12
+
13
+ def load_label_map(dataset):
14
+ # Resolve label map relative to this file
15
+ base = os.path.dirname(__file__)
16
+ label_path = os.path.join(base, f"label_map_idx2label_{dataset}.json")
17
+ if not os.path.exists(label_path):
18
+ raise FileNotFoundError(f"Label map not found: {label_path}")
19
+ with open(label_path, "r", encoding="utf-8") as f:
20
+ return json.load(f)
21
+
22
+
23
+ def read_video_frames(video_path, num_frames=16):
24
+ cap = cv2.VideoCapture(video_path)
25
+ if not cap.isOpened():
26
+ raise RuntimeError(f"Cannot open video file: {video_path}")
27
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
28
+ if total_frames == 0:
29
+ raise RuntimeError(f"Video contains no frames: {video_path}")
30
+
31
+ frame_indices = np.linspace(0, total_frames - 1, num_frames).astype(int)
32
+ frames = []
33
+ for idx in range(total_frames):
34
+ ret, frame = cap.read()
35
+ if not ret:
36
+ break
37
+ if idx in frame_indices:
38
+ frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
39
+ frames.append(frame_rgb)
40
+ cap.release()
41
+
42
+ if len(frames) == 0:
43
+ raise RuntimeError("No frames extracted from video.")
44
+ while len(frames) < num_frames:
45
+ frames.append(frames[-1])
46
+
47
+ return frames[:num_frames]
48
+
49
+
50
+ def load_model(model_path, input_size, hidden_size, num_layers, num_classes):
51
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
52
+ model = MultiLayerBiLSTMClassifier(input_size, hidden_size, num_layers, num_classes).to(device)
53
+ model.load_state_dict(torch.load(model_path, map_location=device))
54
+ model.eval()
55
+ return model
56
+
57
+
58
+ def predict_activity(dataset, video_path, model_path, num_frames=32, hidden_size=256, num_layers=2):
59
+ """
60
+ Run inference on a single video and return (predicted_class_index, predicted_label).
61
+ This function is import-friendly for web apps (Gradio/Streamlit).
62
+ """
63
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
64
+
65
+ # Load label map and number of classes
66
+ label_map = load_label_map(dataset)
67
+ num_classes = len(label_map)
68
+
69
+ # Step 1: Read and process video
70
+ frames = read_video_frames(video_path, num_frames)
71
+ transform = preprocessingData()
72
+ transformed_frames = [transform(frame) for frame in frames]
73
+ frames_tensor = torch.stack(transformed_frames, dim=0).to(device)
74
+
75
+ # Step 2: Extract features
76
+ resnet = models.resnet50(weights=ResNet50_Weights.DEFAULT).to(device)
77
+ resnet_feat = torch.nn.Sequential(*list(resnet.children())[:-1])
78
+ resnet.eval()
79
+ with torch.no_grad():
80
+ features_tensor = resnet_feat(frames_tensor)
81
+ features = torch.flatten(features_tensor, start_dim=1).cpu().numpy()
82
+
83
+ # Step 3: Load model
84
+ input_size = features.shape[1]
85
+ model = load_model(model_path, input_size, hidden_size, num_layers, num_classes)
86
+
87
+ # Step 4: Predict
88
+ with torch.no_grad():
89
+ input_seq = torch.from_numpy(features).unsqueeze(0).float().to(device)
90
+ outputs = model(input_seq)
91
+ predicted_class = torch.argmax(outputs, dim=1).item()
92
+ predicted_label = label_map[str(predicted_class)]
93
+
94
+ return predicted_class, predicted_label
95
+
96
+
97
+ if __name__ == "__main__":
98
+ parser = argparse.ArgumentParser(description="Inference on a single video using trained HAR model")
99
+ parser.add_argument("dataset", type=str, help="Dataset used to train model (ucf11 or ucf50)")
100
+ parser.add_argument("video_path", type=str, help="Path to input video file")
101
+ parser.add_argument("model_path", type=str, help="Path to trained model (.pt)")
102
+ args = parser.parse_args()
103
+
104
+ cls, lbl = predict_activity(args.dataset.lower(), args.video_path, args.model_path)
105
+ print(f"Predicted class index: {cls} ({lbl})")
Source/lstm.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import torch.nn as nn
3
+
4
+
5
+ class LSTMClassifier(nn.Module):
6
+ def __init__(self, input_size, hidden_size, num_classes):
7
+ super(LSTMClassifier, self).__init__()
8
+ self.hidden_size = hidden_size
9
+ self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
10
+ self.fc = nn.Linear(hidden_size, num_classes)
11
+
12
+ def forward(self, x):
13
+ h0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)
14
+ c0 = torch.zeros(1, x.size(0), self.hidden_size).to(x.device)
15
+ # Forward propagate LSTM
16
+ out, _ = self.lstm(x, (h0, c0))
17
+ # Decode the hidden state of the last time step
18
+ out = self.fc(out[:, -1, :])
19
+ out = nn.functional.softmax(out, dim=1)
20
+ return out
21
+
22
+ class MultiLayerBiLSTMClassifier(nn.Module):
23
+ def __init__(self, input_size, hidden_size, num_layers, num_classes):
24
+ super().__init__()
25
+ self.hidden_size = hidden_size
26
+ self.num_layers = num_layers
27
+ self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True, dropout=0.2)
28
+ self.fc = nn.Linear(hidden_size*2, num_classes) # *2 to account for bidirectional LSTM
29
+ self.dropout = nn.Dropout(0.2)
30
+
31
+ def forward(self, x):
32
+ # Initialize hidden state and cell state with zeros
33
+ h0 = torch.zeros(2*self.num_layers, x.size(0), self.hidden_size).to(x.device) # *2 to account for bidirectional LSTM
34
+ c0 = torch.zeros(2*self.num_layers, x.size(0), self.hidden_size).to(x.device) # *2 to account for bidirectional LSTM
35
+ # Forward propagate bidirectional LSTM
36
+ out, _ = self.lstm(x, (h0, c0))
37
+ out = self.dropout(out[:, -1, :]) # Apply dropout before FC layer
38
+ # Decode the hidden state of the last time step
39
+ out = self.fc(out)
40
+ #out = nn.functional.softmax(out, dim=1)
41
+ return out
Source/preprocessing.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ import torchvision.transforms as transforms
2
+
3
+ def preprocessingData():
4
+ transform = transforms.Compose([
5
+ transforms.ToPILImage(), # Converts the frame from a NumPy array to a PIL Image, which is required for further transformations.
6
+ transforms.Resize((224, 224)), # Resizes the frame to 224x224 pixels, the input size expected by ResNet50.
7
+ transforms.ToTensor(), # Converts the PIL Image to a PyTorch tensor and scales pixel values to [0, 1].
8
+ transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # Normalizes the tensor using the mean and standard deviation of the ImageNet dataset, which ResNet50 was trained on.
9
+ ])
10
+ return transform