| import cv2 |
| from .plot import * |
| from abcli import file |
| from abcli import path |
| from abcli import string |
| from abcli.plugins import graphics |
| from abcli.tasks import host |
| from abcli.tasks import objects |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import tensorflow as tf |
| from tqdm import * |
| import time |
| import abcli.logging |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| default_window_size = 28 |
|
|
|
|
| class Image_Classifier(object): |
| def __init__(self): |
| self.class_names = [] |
| self.model = None |
|
|
| self.params = { |
| "convnet": False, |
| "object_name": "", |
| "model_size": "", |
| "window_size": default_window_size, |
| } |
|
|
| def load(self, model_path): |
| success, self.class_names = file.load_json(f"{model_path}/class_names.json") |
| if not success: |
| return False |
|
|
| success, self.params = file.load_json(f"{model_path}/params.json", default={}) |
| if not success: |
| return False |
|
|
| self.params["object_name"] = path.name(model_path) |
|
|
| self.params["model_size"] = file.size(f"{model_path}/image_classifier/model") |
|
|
| try: |
| self.model = tf.keras.models.load_model( |
| f"{model_path}/image_classifier/model" |
| ) |
| except: |
| from abcli.logging import crash_report |
|
|
| crash_report("image_classifier.load({}) failed".format(model_path)) |
| return False |
|
|
| logger.info( |
| "{}.load({}x{}:{}): {}{} class(es): {}".format( |
| self.__class__.__name__, |
| self.params["window_size"], |
| self.params["window_size"], |
| path.name(model_path), |
| "convnet - " if self.params["convnet"] else "", |
| len(self.class_names), |
| ",".join(self.class_names), |
| ) |
| ) |
| self.model.summary() |
|
|
| return True |
|
|
| def predict(self, test_images, test_labels, output_path="", page_count=-1): |
| logger.info( |
| "image_classifier.predict({},{}){}".format( |
| string.pretty_shape_of_matrix(test_images), |
| string.pretty_shape_of_matrix(test_labels), |
| "-> {}".format(output_path) if output_path else "", |
| ) |
| ) |
|
|
| prediction_time = time.time() |
| predictions = self.model.predict(test_images) |
| prediction_time = (time.time() - prediction_time) / test_images.shape[0] |
| logger.info( |
| "image_classifier.predict(): {} / frame".format( |
| string.pretty_duration( |
| prediction_time, |
| include_ms=True, |
| ) |
| ) |
| ) |
|
|
| if not output_path: |
| return True |
|
|
| if not file.save( |
| f"{output_path}/image_classifier/predictions.pyndarray", predictions |
| ): |
| return False |
|
|
| if test_labels is not None: |
| from sklearn.metrics import confusion_matrix |
|
|
| logger.info("image_classifier.predict(): rendering confusion_matrix...") |
|
|
| cm = confusion_matrix( |
| test_labels, |
| np.argmax(predictions, axis=1), |
| labels=range(len(self.class_names)), |
| |
| ) |
| cm = cm / np.sum(cm, axis=1)[:, np.newaxis] |
| logger.debug("confusion_matrix: {}".format(cm)) |
|
|
| if not file.save( |
| f"{output_path}/image_classifier/model/confusion_matrix.pyndarray", cm |
| ): |
| return False |
|
|
| if not graphics.render_confusion_matrix( |
| cm, |
| self.class_names, |
| f"{output_path}/image_classifier/model/confusion_matrix.jpg", |
| { |
| "header": [ |
| " | ".join(host.signature()), |
| " | ".join(objects.signature()), |
| ], |
| "footer": self.signature(prediction_time), |
| }, |
| ): |
| return False |
|
|
| if test_labels is not None: |
| logger.info( |
| "image_classifier.predict(): rendering test_labels distribution..." |
| ) |
|
|
| |
| |
| distribution = np.bincount(test_labels) |
| distribution = distribution / np.sum(distribution) |
|
|
| if not graphics.render_distribution( |
| distribution, |
| self.class_names, |
| f"{output_path}/image_classifier/model/label_distribution.jpg", |
| { |
| "header": [ |
| " | ".join(host.signature()), |
| " | ".join(objects.signature()), |
| ], |
| "footer": self.signature(prediction_time), |
| "title": "distribution of test_labels", |
| }, |
| ): |
| return False |
|
|
| max_index = test_images.shape[0] |
| if page_count != -1: |
| max_index = min(24 * page_count, max_index) |
| logger.info( |
| f"image_classifier.predict(): rendering {max_index / 24} frame(s)..." |
| ) |
| for index in tqdm(range(0, max_index, 24)): |
| self.render( |
| predictions[index : index + 24], |
| None if test_labels is None else test_labels[index : index + 24], |
| test_images[index : index + 24], |
| "{}/image_classifier/prediction/{:05d}.jpg".format( |
| output_path, |
| int(index / 24), |
| ), |
| prediction_time, |
| ) |
|
|
| return True |
|
|
| def predict_frame(self, frame): |
| prediction_time = time.time() |
| try: |
| prediction = self.model.predict( |
| np.expand_dims( |
| cv2.resize( |
| frame, (self.params["window_size"], self.params["window_size"]) |
| ) |
| / 255.0, |
| axis=0, |
| ) |
| ) |
| except: |
| from abcli.logging import crash_report |
|
|
| crash_report("image_classifier.predict_frame() crashed.") |
| return False, -1 |
|
|
| prediction_time = time.time() - prediction_time |
|
|
| output = np.argmax(prediction) |
|
|
| logger.info( |
| "image_classifier.prediction: [{}] -> {} - took {}".format( |
| ",".join( |
| [ |
| "{}:{:.2f}".format(class_name, value) |
| for class_name, value in zip(self.class_names, prediction[0]) |
| ] |
| ), |
| self.class_names[output], |
| string.pretty_duration( |
| prediction_time, |
| include_ms=True, |
| short=True, |
| ), |
| ) |
| ) |
|
|
| return True, output |
|
|
| def render( |
| self, |
| predictions, |
| test_labels, |
| test_images, |
| output_filename="", |
| prediction_time=0, |
| ): |
| num_rows = 4 |
| num_cols = 6 |
| num_images = num_rows * num_cols |
| plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows)) |
| for i in range(min(num_images, len(predictions))): |
| plt.subplot(num_rows, 2 * num_cols, 2 * i + 1) |
| plot_image(i, predictions[i], test_labels, test_images, self.class_names) |
| plt.subplot(num_rows, 2 * num_cols, 2 * i + 2) |
| plot_value_array(i, predictions[i], test_labels) |
| plt.tight_layout() |
|
|
| if output_filename: |
| filename_ = file.auxiliary("prediction", "png") |
| plt.savefig(filename_) |
| plt.close() |
|
|
| success, image = file.load_image(filename_) |
| if success: |
| image = graphics.add_signature( |
| image, |
| [" | ".join(host.signature()), " | ".join(objects.signature())], |
| self.signature(prediction_time), |
| ) |
| file.save_image(output_filename, image) |
|
|
| def save(self, model_path): |
| model_filename = "{}/image_classifier/model".format(model_path) |
| file.prepare_for_saving(model_filename) |
| try: |
| self.model.save(model_filename) |
| logger.info("image_classifier.model -> {}".format(model_filename)) |
| except: |
| from abcli.logging import crash_report |
|
|
| crash_report("image_classifier.save({}) failed".format(model_path)) |
| return False |
|
|
| self.params["object_name"] = path.name(model_path) |
|
|
| self.params["model_size"] = file.size( |
| "{}/image_classifier/model".format(model_path) |
| ) |
|
|
| if not file.save_json( |
| "{}/class_names.json".format(model_path), self.class_names |
| ): |
| return False |
|
|
| if not file.save_json("{}/params.json".format(model_path), self.params): |
| return False |
|
|
| return True |
|
|
| def signature(self, prediction_time): |
| return [ |
| " | ".join( |
| [ |
| "image_classifier", |
| self.params["object_name"], |
| string.pretty_bytes(self.params["model_size"]) |
| if self.params["model_size"] |
| else "", |
| string.pretty_shape(self.input_shape), |
| "/".join(string.shorten(self.class_names)), |
| "took {} / frame".format( |
| string.pretty_duration( |
| prediction_time, |
| include_ms=True, |
| largest=True, |
| short=True, |
| ) |
| ), |
| ] |
| ) |
| ] |
|
|
| @staticmethod |
| def train(data_path, model_path, color=False, convnet=True, epochs=10): |
| classifier = Image_Classifier() |
| classifier.params["convnet"] = convnet |
|
|
| logger.info( |
| "image_classifier.train({}) -{}> {}".format( |
| data_path, |
| "convnet-" if classifier.params["convnet"] else "", |
| model_path, |
| ) |
| ) |
|
|
| success, train_images = file.load(f"{data_path}/train_images.pyndarray") |
| if success: |
| success, train_labels = file.load(f"{data_path}/train_labels.pyndarray") |
| if success: |
| success, test_images = file.load(f"{data_path}/test_images.pyndarray") |
| if success: |
| success, test_labels = file.load(f"{data_path}/test_labels.pyndarray") |
| if success: |
| success, classifier.class_names = file.load_json( |
| f"{data_path}/class_names.json" |
| ) |
| if not success: |
| return False |
|
|
| from tensorflow.keras.utils import to_categorical |
|
|
| train_labels = to_categorical(train_labels) |
| test_labels = to_categorical(test_labels) |
|
|
| window_size = train_images.shape[1] |
| input_shape = ( |
| (window_size, window_size, 3) |
| if color |
| else (window_size, window_size, 1) |
| if convnet |
| else (window_size, window_size) |
| ) |
| logger.info(f"input:{string.pretty_shape(input_shape)}") |
|
|
| if convnet and not color: |
| train_images = np.expand_dims(train_images, axis=3) |
| test_images = np.expand_dims(test_images, axis=3) |
|
|
| for name, thing in zip( |
| "train_images,train_labels,test_images,test_labels".split(","), |
| [train_images, train_labels, test_images, test_labels], |
| ): |
| logger.info("{}: {}".format(name, string.pretty_shape_of_matrix(thing))) |
| logger.info( |
| f"{len(classifier.class_names)} class(es): {', '.join(classifier.class_names)}" |
| ) |
|
|
| train_images = train_images / 255.0 |
| test_images = test_images / 255.0 |
|
|
| if convnet: |
| |
| classifier.model = tf.keras.Sequential( |
| [ |
| tf.keras.layers.Conv2D( |
| filters=48, |
| kernel_size=3, |
| activation="relu", |
| input_shape=input_shape, |
| ), |
| tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| tf.keras.layers.Conv2D( |
| filters=48, kernel_size=3, activation="relu" |
| ), |
| tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| tf.keras.layers.Conv2D( |
| filters=32, kernel_size=3, activation="relu" |
| ), |
| tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| tf.keras.layers.Flatten(), |
| tf.keras.layers.Dense(128, activation="relu"), |
| tf.keras.layers.Dense(64, activation="relu"), |
| tf.keras.layers.Dense(len(classifier.class_names)), |
| tf.keras.layers.Activation("softmax"), |
| ] |
| ) |
| else: |
| |
| classifier.model = tf.keras.Sequential( |
| [ |
| tf.keras.layers.Flatten(input_shape=input_shape), |
| tf.keras.layers.Dense(128, activation="relu"), |
| tf.keras.layers.Dense(len(classifier.class_names)), |
| tf.keras.layers.Activation("softmax"), |
| ] |
| ) |
|
|
| classifier.model.summary() |
|
|
| classifier.model.compile( |
| optimizer="adam", |
| loss=tf.keras.losses.categorical_crossentropy, |
| metrics=["accuracy"], |
| ) |
|
|
| classifier.model.fit(train_images, train_labels, epochs=epochs) |
|
|
| test_accuracy = float( |
| classifier.model.evaluate(test_images, test_labels, verbose=2)[1] |
| ) |
| logger.info("test accuracy: {:.4f}".format(test_accuracy)) |
|
|
| if not file.save_json( |
| f"{model_path}/eval.json", |
| {"metrics": {"test_accuracy": test_accuracy}}, |
| ): |
| return False |
|
|
| if not classifier.save(model_path): |
| return False |
|
|
| return classifier.predict( |
| test_images, |
| np.argmax(test_labels, axis=1), |
| model_path, |
| page_count=10, |
| ) |
|
|
| @property |
| def input_shape(self): |
| return self.model.layers[0].input_shape[1:] if self.model.layers else [] |
|
|