Spaces:
Running
Running
| """ | |
| Title: OCR model for reading Captchas | |
| Author: [A_K_Nain](https://twitter.com/A_K_Nain) | |
| Date created: 2020/06/14 | |
| Last modified: 2024/03/13 | |
| Description: How to implement an OCR model using CNNs, RNNs and CTC loss. | |
| Accelerator: GPU | |
| Converted to Keras 3 by: [Sitam Meur](https://github.com/sitamgithub-MSIT) | |
| """ | |
| """ | |
| ## Introduction | |
| This example demonstrates a simple OCR model built with the Functional API. Apart from | |
| combining CNN and RNN, it also illustrates how you can instantiate a new layer | |
| and use it as an "Endpoint layer" for implementing CTC loss. For a detailed | |
| guide to layer subclassing, please check out | |
| [this page](https://keras.io/guides/making_new_layers_and_models_via_subclassing/) | |
| in the developer guides. | |
| """ | |
| """ | |
| ## Setup | |
| """ | |
| import os | |
| os.environ["KERAS_BACKEND"] = "tensorflow" | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from pathlib import Path | |
| import tensorflow as tf | |
| import keras | |
| from keras import ops | |
| from keras import layers | |
| """ | |
| ## Load the data: [Captcha Images](https://www.kaggle.com/fournierp/captcha-version-2-images) | |
| Let's download the data. | |
| """ | |
| """shell | |
| curl -LO https://github.com/AakashKumarNain/CaptchaCracker/raw/master/captcha_images_v2.zip | |
| unzip -qq captcha_images_v2.zip | |
| """ | |
| """ | |
| The dataset contains 1040 captcha files as `png` images. The label for each sample is a string, | |
| the name of the file (minus the file extension). | |
| We will map each character in the string to an integer for training the model. Similary, | |
| we will need to map the predictions of the model back to strings. For this purpose | |
| we will maintain two dictionaries, mapping characters to integers, and integers to characters, | |
| respectively. | |
| """ | |
| # Path to the data directory | |
| data_dir = Path("./captcha_images_v2/") | |
| # Get list of all the images | |
| images = sorted(list(map(str, list(data_dir.glob("*.png"))))) | |
| labels = [img.split(os.path.sep)[-1].split(".png")[0] for img in images] | |
| characters = set(char for label in labels for char in label) | |
| characters = sorted(list(characters)) | |
| print("Number of images found: ", len(images)) | |
| print("Number of labels found: ", len(labels)) | |
| print("Number of unique characters: ", len(characters)) | |
| print("Characters present: ", characters) | |
| # Batch size for training and validation | |
| batch_size = 16 | |
| # Desired image dimensions | |
| img_width = 200 | |
| img_height = 50 | |
| # Factor by which the image is going to be downsampled | |
| # by the convolutional blocks. We will be using two | |
| # convolution blocks and each block will have | |
| # a pooling layer which downsample the features by a factor of 2. | |
| # Hence total downsampling factor would be 4. | |
| downsample_factor = 4 | |
| # Maximum length of any captcha in the dataset | |
| max_length = max([len(label) for label in labels]) | |
| """ | |
| ## Preprocessing | |
| """ | |
| # Mapping characters to integers | |
| char_to_num = layers.StringLookup(vocabulary=list(characters), mask_token=None) | |
| # Mapping integers back to original characters | |
| num_to_char = layers.StringLookup( | |
| vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True | |
| ) | |
| def split_data(images, labels, train_size=0.9, shuffle=True): | |
| # 1. Get the total size of the dataset | |
| size = len(images) | |
| # 2. Make an indices array and shuffle it, if required | |
| indices = ops.arange(size) | |
| if shuffle: | |
| indices = keras.random.shuffle(indices) | |
| # 3. Get the size of training samples | |
| train_samples = int(size * train_size) | |
| # 4. Split data into training and validation sets | |
| x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]] | |
| x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]] | |
| return x_train, x_valid, y_train, y_valid | |
| # Splitting data into training and validation sets | |
| x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels)) | |
| def encode_single_sample(img_path, label): | |
| # 1. Read image | |
| img = tf.io.read_file(img_path) | |
| # 2. Decode and convert to grayscale | |
| img = tf.io.decode_png(img, channels=1) | |
| # 3. Convert to float32 in [0, 1] range | |
| img = tf.image.convert_image_dtype(img, tf.float32) | |
| # 4. Resize to the desired size | |
| img = ops.image.resize(img, [img_height, img_width]) | |
| # 5. Transpose the image because we want the time | |
| # dimension to correspond to the width of the image. | |
| img = ops.transpose(img, axes=[1, 0, 2]) | |
| # 6. Map the characters in label to numbers | |
| label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8")) | |
| # 7. Return a dict as our model is expecting two inputs | |
| return {"image": img, "label": label} | |
| """ | |
| ## Create `Dataset` objects | |
| """ | |
| train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) | |
| train_dataset = ( | |
| train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE) | |
| .batch(batch_size) | |
| .prefetch(buffer_size=tf.data.AUTOTUNE) | |
| ) | |
| validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid)) | |
| validation_dataset = ( | |
| validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE) | |
| .batch(batch_size) | |
| .prefetch(buffer_size=tf.data.AUTOTUNE) | |
| ) | |
| """ | |
| ## Visualize the data | |
| """ | |
| _, ax = plt.subplots(4, 4, figsize=(10, 5)) | |
| for batch in train_dataset.take(1): | |
| images = batch["image"] | |
| labels = batch["label"] | |
| for i in range(16): | |
| img = (images[i] * 255).numpy().astype("uint8") | |
| label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8") | |
| ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray") | |
| ax[i // 4, i % 4].set_title(label) | |
| ax[i // 4, i % 4].axis("off") | |
| plt.show() | |
| """ | |
| ## Model | |
| """ | |
| def ctc_batch_cost(y_true, y_pred, input_length, label_length): | |
| label_length = ops.cast(ops.squeeze(label_length, axis=-1), dtype="int32") | |
| input_length = ops.cast(ops.squeeze(input_length, axis=-1), dtype="int32") | |
| sparse_labels = ops.cast( | |
| ctc_label_dense_to_sparse(y_true, label_length), dtype="int32" | |
| ) | |
| y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon()) | |
| return ops.expand_dims( | |
| tf.compat.v1.nn.ctc_loss( | |
| inputs=y_pred, labels=sparse_labels, sequence_length=input_length | |
| ), | |
| 1, | |
| ) | |
| def ctc_label_dense_to_sparse(labels, label_lengths): | |
| label_shape = ops.shape(labels) | |
| num_batches_tns = ops.stack([label_shape[0]]) | |
| max_num_labels_tns = ops.stack([label_shape[1]]) | |
| def range_less_than(old_input, current_input): | |
| return ops.expand_dims(ops.arange(ops.shape(old_input)[1]), 0) < tf.fill( | |
| max_num_labels_tns, current_input | |
| ) | |
| init = ops.cast(tf.fill([1, label_shape[1]], 0), dtype="bool") | |
| dense_mask = tf.compat.v1.scan( | |
| range_less_than, label_lengths, initializer=init, parallel_iterations=1 | |
| ) | |
| dense_mask = dense_mask[:, 0, :] | |
| label_array = ops.reshape( | |
| ops.tile(ops.arange(0, label_shape[1]), num_batches_tns), label_shape | |
| ) | |
| label_ind = tf.compat.v1.boolean_mask(label_array, dense_mask) | |
| batch_array = ops.transpose( | |
| ops.reshape( | |
| ops.tile(ops.arange(0, label_shape[0]), max_num_labels_tns), | |
| tf.reverse(label_shape, [0]), | |
| ) | |
| ) | |
| batch_ind = tf.compat.v1.boolean_mask(batch_array, dense_mask) | |
| indices = ops.transpose( | |
| ops.reshape(ops.concatenate([batch_ind, label_ind], axis=0), [2, -1]) | |
| ) | |
| vals_sparse = tf.compat.v1.gather_nd(labels, indices) | |
| return tf.SparseTensor( | |
| ops.cast(indices, dtype="int64"), | |
| vals_sparse, | |
| ops.cast(label_shape, dtype="int64"), | |
| ) | |
| class CTCLayer(layers.Layer): | |
| def __init__(self, name=None): | |
| super().__init__(name=name) | |
| self.loss_fn = ctc_batch_cost | |
| def call(self, y_true, y_pred): | |
| # Compute the training-time loss value and add it | |
| # to the layer using `self.add_loss()`. | |
| batch_len = ops.cast(ops.shape(y_true)[0], dtype="int64") | |
| input_length = ops.cast(ops.shape(y_pred)[1], dtype="int64") | |
| label_length = ops.cast(ops.shape(y_true)[1], dtype="int64") | |
| input_length = input_length * ops.ones(shape=(batch_len, 1), dtype="int64") | |
| label_length = label_length * ops.ones(shape=(batch_len, 1), dtype="int64") | |
| loss = self.loss_fn(y_true, y_pred, input_length, label_length) | |
| self.add_loss(loss) | |
| # At test time, just return the computed predictions | |
| return y_pred | |
| def build_model(): | |
| # Inputs to the model | |
| input_img = layers.Input( | |
| shape=(img_width, img_height, 1), name="image", dtype="float32" | |
| ) | |
| labels = layers.Input(name="label", shape=(None,), dtype="float32") | |
| # First conv block | |
| x = layers.Conv2D( | |
| 32, | |
| (3, 3), | |
| activation="relu", | |
| kernel_initializer="he_normal", | |
| padding="same", | |
| name="Conv1", | |
| )(input_img) | |
| x = layers.MaxPooling2D((2, 2), name="pool1")(x) | |
| # Second conv block | |
| x = layers.Conv2D( | |
| 64, | |
| (3, 3), | |
| activation="relu", | |
| kernel_initializer="he_normal", | |
| padding="same", | |
| name="Conv2", | |
| )(x) | |
| x = layers.MaxPooling2D((2, 2), name="pool2")(x) | |
| # We have used two max pool with pool size and strides 2. | |
| # Hence, downsampled feature maps are 4x smaller. The number of | |
| # filters in the last layer is 64. Reshape accordingly before | |
| # passing the output to the RNN part of the model | |
| new_shape = ((img_width // 4), (img_height // 4) * 64) | |
| x = layers.Reshape(target_shape=new_shape, name="reshape")(x) | |
| x = layers.Dense(64, activation="relu", name="dense1")(x) | |
| x = layers.Dropout(0.2)(x) | |
| # RNNs | |
| x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x) | |
| x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x) | |
| # Output layer | |
| x = layers.Dense( | |
| len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2" | |
| )(x) | |
| # Add CTC layer for calculating CTC loss at each step | |
| output = CTCLayer(name="ctc_loss")(labels, x) | |
| # Define the model | |
| model = keras.models.Model( | |
| inputs=[input_img, labels], outputs=output, name="ocr_model_v1" | |
| ) | |
| # Optimizer | |
| opt = keras.optimizers.Adam() | |
| # Compile the model and return | |
| model.compile(optimizer=opt) | |
| return model | |
| # Get the model | |
| model = build_model() | |
| model.summary() | |
| """ | |
| ## Training | |
| """ | |
| # TODO restore epoch count. | |
| epochs = 100 | |
| early_stopping_patience = 10 | |
| # Add early stopping | |
| early_stopping = keras.callbacks.EarlyStopping( | |
| monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True | |
| ) | |
| # Train the model | |
| history = model.fit( | |
| train_dataset, | |
| validation_data=validation_dataset, | |
| epochs=epochs, | |
| callbacks=[early_stopping], | |
| ) | |
| """ | |
| ## Inference | |
| You can use the trained model hosted on [Hugging Face Hub](https://huggingface.co/keras-io/ocr-for-captcha) | |
| and try the demo on [Hugging Face Spaces](https://huggingface.co/spaces/keras-io/ocr-for-captcha). | |
| """ | |
| def ctc_decode(y_pred, input_length, greedy=True, beam_width=100, top_paths=1): | |
| input_shape = ops.shape(y_pred) | |
| num_samples, num_steps = input_shape[0], input_shape[1] | |
| y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon()) | |
| input_length = ops.cast(input_length, dtype="int32") | |
| if greedy: | |
| (decoded, log_prob) = tf.nn.ctc_greedy_decoder( | |
| inputs=y_pred, sequence_length=input_length | |
| ) | |
| else: | |
| (decoded, log_prob) = tf.compat.v1.nn.ctc_beam_search_decoder( | |
| inputs=y_pred, | |
| sequence_length=input_length, | |
| beam_width=beam_width, | |
| top_paths=top_paths, | |
| ) | |
| decoded_dense = [] | |
| for st in decoded: | |
| st = tf.SparseTensor(st.indices, st.values, (num_samples, num_steps)) | |
| decoded_dense.append(tf.sparse.to_dense(sp_input=st, default_value=-1)) | |
| return (decoded_dense, log_prob) | |
| # Get the prediction model by extracting layers till the output layer | |
| prediction_model = keras.models.Model( | |
| model.input[0], model.get_layer(name="dense2").output | |
| ) | |
| prediction_model.summary() | |
| # A utility function to decode the output of the network | |
| def decode_batch_predictions(pred): | |
| input_len = np.ones(pred.shape[0]) * pred.shape[1] | |
| # Use greedy search. For complex tasks, you can use beam search | |
| results = ctc_decode(pred, input_length=input_len, greedy=True)[0][0][ | |
| :, :max_length | |
| ] | |
| # Iterate over the results and get back the text | |
| output_text = [] | |
| for res in results: | |
| res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8") | |
| output_text.append(res) | |
| return output_text | |
| # Let's check results on some validation samples | |
| for batch in validation_dataset.take(1): | |
| batch_images = batch["image"] | |
| batch_labels = batch["label"] | |
| preds = prediction_model.predict(batch_images) | |
| pred_texts = decode_batch_predictions(preds) | |
| orig_texts = [] | |
| for label in batch_labels: | |
| label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8") | |
| orig_texts.append(label) | |
| _, ax = plt.subplots(4, 4, figsize=(15, 5)) | |
| for i in range(len(pred_texts)): | |
| img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8) | |
| img = img.T | |
| title = f"Prediction: {pred_texts[i]}" | |
| ax[i // 4, i % 4].imshow(img, cmap="gray") | |
| ax[i // 4, i % 4].set_title(title) | |
| ax[i // 4, i % 4].axis("off") | |
| plt.show() | |