|
|
import os |
|
|
import random |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
from Generate_caption import load_model |
|
|
from tensorflow.keras.preprocessing.image import load_img, img_to_array |
|
|
from tensorflow.keras.applications.vgg16 import preprocess_input |
|
|
from tensorflow.keras.preprocessing.text import Tokenizer |
|
|
from tensorflow.keras.utils import pad_sequences, to_categorical |
|
|
from tensorflow.keras.models import Model, load_model |
|
|
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization, Embedding, GRU, add, LayerNormalization |
|
|
from tensorflow.keras.optimizers import Adam |
|
|
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau |
|
|
|
|
|
|
|
|
def extract_image_features(model, image_folder): |
|
|
features = {} |
|
|
directory = os.path( image_folder) |
|
|
for item in tqdm(os.listdir(directory), desc="Extracting Features"): |
|
|
item_path = os.path.join(directory, item) |
|
|
if os.path.isfile(item_path): |
|
|
try: |
|
|
image = load_img(item_path, target_size=(224, 224)) |
|
|
img_array = img_to_array(image) |
|
|
img_array = img_array.reshape((1, img_array.shape[0], img_array.shape[1], img_array.shape[2])) |
|
|
img_array = preprocess_input(img_array) |
|
|
feature = model.predict(img_array, verbose=0) |
|
|
image_id = item.split('.')[0] |
|
|
features[image_id] = feature |
|
|
except Exception as e: |
|
|
print(f"Error processing image {item_path}: {e}") |
|
|
return features |
|
|
|
|
|
|
|
|
def read_captions_file(file_path): |
|
|
try: |
|
|
with open(file_path, 'r') as file: |
|
|
next(file) |
|
|
captions = file.read() |
|
|
return captions |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Error reading the file: {e}") |
|
|
|
|
|
|
|
|
def create_image_caption_mapping(captions): |
|
|
mapping = {} |
|
|
for line in tqdm(captions.split('\n'), desc="Processing Captions"): |
|
|
tokens = line.split(',') |
|
|
if len(tokens) < 2: |
|
|
continue |
|
|
image_id, caption = tokens[0], tokens[1:] |
|
|
caption = " ".join(caption) |
|
|
if image_id not in mapping: |
|
|
mapping[image_id] = [] |
|
|
mapping[image_id].append(caption) |
|
|
return mapping |
|
|
|
|
|
|
|
|
def preprocess_text(mapping): |
|
|
for key, captions in mapping.items(): |
|
|
for i in range(len(captions)): |
|
|
caption = captions[i].lower() |
|
|
caption = caption.replace('[^A-Za-z]', ' ').replace('\s+', ' ') |
|
|
caption = 'startseq ' + " ".join([word for word in caption.split() if len(word) > 1]) + ' endseq' |
|
|
captions[i] = caption |
|
|
|
|
|
|
|
|
def extract_captions(mapping): |
|
|
captions_list = [] |
|
|
for key in mapping: |
|
|
captions_list.extend(mapping[key]) |
|
|
return captions_list |
|
|
|
|
|
|
|
|
def prepare_tokenizer(captions_list): |
|
|
tokenizer = Tokenizer() |
|
|
tokenizer.fit_on_texts(captions_list) |
|
|
vocab_size = len(tokenizer.word_index) + 1 |
|
|
return tokenizer, vocab_size |
|
|
|
|
|
|
|
|
def calculate_max_length(captions_list): |
|
|
return max(len(caption.split()) for caption in captions_list) |
|
|
|
|
|
|
|
|
def split(image_ids, train_ratio, val_ratio=None): |
|
|
random.shuffle(image_ids) |
|
|
total = len(image_ids) |
|
|
train_split = int(total * train_ratio) |
|
|
val_split = int(total * (train_ratio + val_ratio)) if val_ratio else train_split |
|
|
train_ids = image_ids[:train_split] |
|
|
val_ids = image_ids[train_split:val_split] if val_ratio else [] |
|
|
test_ids = image_ids[val_split:] |
|
|
return train_ids, val_ids, test_ids |
|
|
|
|
|
|
|
|
def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size): |
|
|
X1, X2, y = [], [], [] |
|
|
n = 0 |
|
|
while True: |
|
|
for key in data_keys: |
|
|
n += 1 |
|
|
captions = mapping[key] |
|
|
for caption in captions: |
|
|
seq = tokenizer.texts_to_sequences([caption])[0] |
|
|
for i in range(1, len(seq)): |
|
|
in_seq, out_seq = seq[:i], seq[i] |
|
|
in_seq = pad_sequences([in_seq], maxlen=max_length, padding='post')[0] |
|
|
out_seq = to_categorical([out_seq], num_classes=vocab_size)[0] |
|
|
X1.append(features[key][0]) |
|
|
X2.append(in_seq) |
|
|
y.append(out_seq) |
|
|
if n == batch_size: |
|
|
yield {"image": np.array(X1), "text": np.array(X2)}, np.array(y) |
|
|
X1, X2, y = [], [], [] |
|
|
n = 0 |
|
|
|
|
|
|
|
|
def build_model(vocab_size, max_length): |
|
|
inputs1 = Input(shape=(4096,), name="image") |
|
|
fe1 = Dropout(0.4)(inputs1) |
|
|
fe2 = Dense(256, activation='relu')(fe1) |
|
|
fe3 = BatchNormalization()(fe2) |
|
|
|
|
|
inputs2 = Input(shape=(max_length,), name="text") |
|
|
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2) |
|
|
se2 = Dropout(0.4)(se1) |
|
|
se3 = GRU(256, recurrent_dropout=0.3, return_sequences=False)(se2) |
|
|
|
|
|
decoder1 = add([fe3, se3]) |
|
|
decoder2 = LayerNormalization()(decoder1) |
|
|
decoder3 = Dense(512, activation='relu')(decoder2) |
|
|
decoder4 = Dropout(0.3)(decoder3) |
|
|
outputs = Dense(vocab_size, activation='softmax')(decoder4) |
|
|
|
|
|
model = Model(inputs=[inputs1, inputs2], outputs=outputs) |
|
|
optimizer = Adam(learning_rate=0.001) |
|
|
model.compile(loss='categorical_crossentropy', optimizer=optimizer) |
|
|
|
|
|
return model |
|
|
|
|
|
|
|
|
def load_existing_or_new_model(vocab_size, max_length, model_path="seven_version_model.keras"): |
|
|
if os.path.exists(model_path): |
|
|
print("Loading existing model...") |
|
|
return load_model(model_path) |
|
|
else: |
|
|
print("No existing model found. Creating a new one...") |
|
|
return build_model(vocab_size, max_length) |
|
|
|
|
|
|
|
|
def continue_training(model, train, val, mapping, features, tokenizer, max_length, vocab_size, batch_size, epochs): |
|
|
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1) |
|
|
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6, verbose=1) |
|
|
|
|
|
steps = len(train) // batch_size |
|
|
|
|
|
for i in range(epochs): |
|
|
print(f"Epoch {i + 1}/{epochs}") |
|
|
generator = data_generator(train, mapping, features, tokenizer, max_length, vocab_size, batch_size) |
|
|
validation_generator = data_generator(val, mapping, features, tokenizer, max_length, vocab_size, batch_size) |
|
|
|
|
|
model.fit(generator, validation_data=validation_generator, epochs=1, steps_per_epoch=steps, |
|
|
validation_steps=len(val) // batch_size, verbose=1, callbacks=[early_stopping, lr_scheduler]) |
|
|
|
|
|
model.save("seven_version_model.keras") |
|
|
print("Updated model saved successfully.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|