In [1]:
import numpy as np
from tensorflow.keras.layers import BatchNormalization, Conv2D, MaxPooling2D
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics import mean_squared_error
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import backend as K
from tensorflow.keras.models import model_from_json
from matplotlib import cm as CM
import matplotlib.pyplot as plt
import tensorflow as tf
from tqdm import tqdm
import scipy.io as io
from PIL import Image
import PIL
import h5py
import os
import glob
import cv2
import random
import math
import sys

In [18]:
K.clear_session()
root = 'DATA'

In [3]:
part_A_train = os.path.join(root, 'part_A_final/train_data', 'images')
part_A_test = os.path.join(root, 'part_A_final/test_data', 'images')
part_B_train = os.path.join(root, 'part_B_final/train_data', 'images')
part_B_test = os.path.join(root, 'part_B_final/test_data', 'images')
temp = 'test_images'
path_sets = [part_A_train]

In [4]:
img_paths = []
for path in path_sets:
    for img_path in glob.glob(os.path.join(path, '*.jpg')):
        img_paths.append(str(img_path))
print("Total images:", len(img_paths))

Total images: 300


In [5]:
def create_img(path):
    """Function to load, normalize and return image"""
    im = Image.open(path).convert('RGB')
    im = np.array(im)
    im = im / 255.0
    
    im[:, :, 0] = (im[:, :, 0] - 0.485) / 0.229
    im[:, :, 1] = (im[:, :, 1] - 0.456) / 0.224
    im[:, :, 2] = (im[:, :, 2] - 0.406) / 0.225
    
    return im


def get_input(path):
    path = path[0]
    img = create_img(path)
    return img


def get_output(path):
    """Import and resize target density map"""
    gt_file = h5py.File(path, 'r')
    target = np.asarray(gt_file['density'])
    
    img = cv2.resize(target, (int(target.shape[1]/8), int(target.shape[0]/8)), 
                     interpolation=cv2.INTER_CUBIC) * 64
    img = np.expand_dims(img, axis=2)  # Changed from axis=3 to axis=2
    
    return img



def preprocess_input(image, target):
    """Crop image and target (optional data augmentation)"""
    crop_size = (int(image.shape[0]/2), int(image.shape[1]/2))
    
    if random.randint(0, 9) <= -1:
        dx = int(random.randint(0, 1) * image.shape[0] * 1./2)
        dy = int(random.randint(0, 1) * image.shape[1] * 1./2)
    else:
        dx = int(random.random() * image.shape[0] * 1./2)
        dy = int(random.random() * image.shape[1] * 1./2)
    
    img = image[dx:crop_size[0]+dx, dy:crop_size[1]+dy]
    target_aug = target[dx:crop_size[0]+dx, dy:crop_size[1]+dy]
    
    return (img, target_aug)

In [6]:
def image_generator(files, batch_size=1):
    """Image data generator"""
    import numpy as np  # Add this line
    while True:
        input_path = np.random.choice(a=files, size=1)
        
        inputt = get_input(input_path)
        # Match the path from preprocessing
        output_path = input_path[0].replace('.jpg', '.h5').replace('images', 'ground')
        
        if not os.path.exists(output_path):
            print(f"File not found: {output_path}")
            continue
            
        output = get_output(output_path)
        
        batch_x = np.expand_dims(inputt, axis=0)
        batch_y = np.expand_dims(output, axis=0)
        
        yield (batch_x, batch_y)


In [7]:
def save_mod(model, str1, str2):
    """Save model weights and architecture"""
    model.save_weights(str1)
    model_json = model.to_json()
    
    with open(str2, "w") as json_file:
        json_file.write(model_json)

In [8]:
def init_weights_vgg(model):
    """Initialize VGG16 pretrained weights"""
    # Use built-in VGG16 with ImageNet weights
    vgg = VGG16(weights='imagenet', include_top=False)
    
    vgg_weights = []
    for layer in vgg.layers:
        if 'conv' in layer.name:
            vgg_weights.append(layer.get_weights())
    
    offset = 0
    i = 0
    while i < 10:
        if 'conv' in model.layers[i+offset].name:
            model.layers[i+offset].set_weights(vgg_weights[i])
            i = i + 1
        else:
            offset = offset + 1
    
    return model


In [9]:
def euclidean_distance_loss(y_true, y_pred):
    """Euclidean distance as a measure of loss (Loss function)"""
    return tf.sqrt(tf.reduce_sum(tf.square(y_pred - y_true), axis=-1))


In [10]:
# def CrowdNet():
#     """Neural network model: VGG + Conv"""
#     # Variable Input Size
#     rows = None
#     cols = None
    
#     # Batch Normalisation option
#     batch_norm = 0
#     kernel = (3, 3)
#     init = RandomNormal(stddev=0.01)
#     model = Sequential()
    
#     # Custom VGG
#     if batch_norm:
#         model.add(Conv2D(64, kernel_size=kernel, input_shape=(rows, cols, 3), 
#                         activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(64, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same'))
#         model.add(BatchNormalization())
#     else:
#         model.add(Conv2D(64, kernel_size=kernel, activation='relu', padding='same',
#                         input_shape=(rows, cols, 3), kernel_initializer=init))
#         model.add(Conv2D(64, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(MaxPooling2D(strides=2))
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
#         model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', 
#                         kernel_initializer=init))
    
#     # Dilated Conv2D layers
#     model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(256, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(128, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(64, (3, 3), activation='relu', dilation_rate=2, 
#                     kernel_initializer=init, padding='same'))
#     model.add(Conv2D(1, (1, 1), activation='relu', dilation_rate=1, 
#                     kernel_initializer=init, padding='same'))
    
#     # Fixed: Use learning_rate instead of lr, removed decay

#     sgd = SGD(learning_rate=1e-7, momentum=0.95)
#     model.compile(optimizer=sgd, loss='mse', metrics=['mse'])  # Use 'mse' instead
    
#     model = init_weights_vgg(model)
#     return model

def CrowdNet():
    """Neural network model: VGG + Conv"""
    rows = None
    cols = None
    batch_norm = 0
    kernel = (3, 3)
    init = RandomNormal(stddev=0.01)
    model = Sequential()
    
    # VGG layers (without batch norm)
    model.add(Conv2D(64, kernel_size=kernel, activation='relu', padding='same',
                    input_shape=(rows, cols, 3), kernel_initializer=init))
    model.add(Conv2D(64, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(MaxPooling2D(strides=2))
    model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(Conv2D(128, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(MaxPooling2D(strides=2))
    model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(Conv2D(256, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(MaxPooling2D(strides=2))
    model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    model.add(Conv2D(512, kernel_size=kernel, activation='relu', padding='same', kernel_initializer=init))
    
    # Dilated Conv layers
    model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(512, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(256, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(128, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(64, (3, 3), activation='relu', dilation_rate=2, kernel_initializer=init, padding='same'))
    model.add(Conv2D(1, (1, 1), activation='relu', dilation_rate=1, kernel_initializer=init, padding='same'))
    
    sgd = SGD(learning_rate=1e-7, momentum=0.95)
    model.compile(optimizer=sgd, loss='mse', metrics=['mse'], run_eagerly=True)  # Added run_eagerly=True
    
    model = init_weights_vgg(model)
    return model

   

In [11]:
# Build model
print("Building model...")
model = CrowdNet()


Building model...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [12]:
model.summary()

In [13]:
train_gen = image_generator(img_paths,1)

In [14]:
# Test if density maps exist
test_path = img_paths[0].replace('.jpg', '.h5').replace('images', 'ground')
print(f"Checking: {test_path}")
print(f"Exists: {os.path.exists(test_path)}")


Checking: data\part_A_final/train_data\ground\IMG_1.h5
Exists: True


In [15]:

# Train model (Fixed: use fit instead of fit_generator)
import numpy as np
print("Starting training...")
model.fit(train_gen, epochs=5, steps_per_epoch=20, verbose=1)

Starting training...
Epoch 1/5
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2057s[0m 104s/step - loss: 0.0270 - mse: 0.0270
Epoch 2/5
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1935s[0m 96s/step - loss: 0.0596 - mse: 0.0596
Epoch 3/5
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2106s[0m 103s/step - loss: 0.0830 - mse: 0.0830
Epoch 4/5
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3105s[0m 158s/step - loss: 0.0255 - mse: 0.0255
Epoch 5/5
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1841s[0m 92s/step - loss: 0.0355 - mse: 0.0355


<keras.src.callbacks.history.History at 0x1d1e927dfd0>

In [20]:
os.makedirs('weights', exist_ok=True)
os.makedirs('models', exist_ok=True)
save_mod(model, "weights/model_A.weights.h5", "models/Model.json")
