| | import cv2 |
| | from .plot import * |
| | from abcli import file |
| | from abcli import path |
| | from abcli import string |
| | from abcli.plugins import graphics |
| | from abcli.tasks import host |
| | from abcli.tasks import objects |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| | import tensorflow as tf |
| | from tqdm import * |
| | import time |
| | from abcli.logging import crash_report |
| | import abcli.logging |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | default_window_size = 28 |
| |
|
| |
|
| | class Image_Classifier(object): |
| | def __init__(self): |
| | self.class_names = [] |
| | self.model = None |
| |
|
| | self.params = { |
| | "convnet": False, |
| | "object_name": "", |
| | "model_size": "", |
| | "window_size": default_window_size, |
| | } |
| |
|
| | def load(self, model_path): |
| | success, self.class_names = file.load_json( |
| | f"{model_path}/image_classifier/model/class_names.json" |
| | ) |
| | if not success: |
| | return False |
| |
|
| | success, self.params = file.load_json( |
| | f"{model_path}/image_classifier/model/params.json" |
| | ) |
| | if not success: |
| | return False |
| |
|
| | self.params["object_name"] = path.name(model_path) |
| |
|
| | self.params["model_size"] = file.size(f"{model_path}/image_classifier/model") |
| |
|
| | try: |
| | self.model = tf.keras.models.load_model( |
| | f"{model_path}/image_classifier/model" |
| | ) |
| | except: |
| | crash_report("-f image_classifier: load({model_path}): failed.") |
| | return False |
| |
|
| | logger.info( |
| | "{}.load({}): {}x{} {}{} class(es): {}".format( |
| | self.__class__.__name__, |
| | model_path, |
| | self.params["window_size"], |
| | self.params["window_size"], |
| | "convnet - " if self.params["convnet"] else "", |
| | len(self.class_names), |
| | ",".join(self.class_names), |
| | ) |
| | ) |
| | self.model.summary() |
| |
|
| | return True |
| |
|
| | def predict( |
| | self, |
| | test_images, |
| | test_labels=None, |
| | output_path="", |
| | page_count=2, |
| | ): |
| | logger.info( |
| | "image_classifier.predict({},{}){}".format( |
| | string.pretty_shape_of_matrix(test_images), |
| | string.pretty_shape_of_matrix(test_labels), |
| | "-> {}".format(output_path) if output_path else "", |
| | ) |
| | ) |
| |
|
| | prediction_time = time.time() |
| | predictions = self.model.predict(test_images) |
| | prediction_time = (time.time() - prediction_time) / test_images.shape[0] |
| | logger.info( |
| | "image_classifier.predict(): {} / frame".format( |
| | string.pretty_duration( |
| | prediction_time, |
| | include_ms=True, |
| | ) |
| | ) |
| | ) |
| |
|
| | if not output_path: |
| | return True, predictions |
| |
|
| | if not file.save( |
| | f"{output_path}/image_classifier/predictions.pyndarray", predictions |
| | ): |
| | return False, predictions |
| |
|
| | if test_labels is not None: |
| | from sklearn.metrics import confusion_matrix |
| |
|
| | logger.info("image_classifier.predict(): rendering confusion_matrix...") |
| |
|
| | cm = confusion_matrix( |
| | test_labels, |
| | np.argmax(predictions, axis=1), |
| | labels=range(len(self.class_names)), |
| | |
| | ) |
| | cm = cm / np.sum(cm, axis=1)[:, np.newaxis] |
| | logger.debug("confusion_matrix: {}".format(cm)) |
| |
|
| | if not file.save( |
| | f"{output_path}/image_classifier/model/confusion_matrix.pyndarray", cm |
| | ): |
| | return False, predictions |
| |
|
| | if not graphics.render_confusion_matrix( |
| | cm, |
| | self.class_names, |
| | f"{output_path}/image_classifier/model/confusion_matrix.jpg", |
| | header=[ |
| | " | ".join(host.signature()), |
| | " | ".join(objects.signature()), |
| | ], |
| | footer=self.signature(prediction_time), |
| | ): |
| | return False, predictions |
| |
|
| | if test_labels is not None: |
| | logger.info( |
| | "image_classifier.predict(): rendering test_labels distribution..." |
| | ) |
| |
|
| | |
| | |
| | distribution = np.bincount(test_labels) |
| | distribution = distribution / np.sum(distribution) |
| |
|
| | if not graphics.render_distribution( |
| | distribution, |
| | self.class_names, |
| | f"{output_path}/image_classifier/model/label_distribution.jpg", |
| | header=[ |
| | " | ".join(host.signature()), |
| | " | ".join(objects.signature()), |
| | ], |
| | footer=self.signature(prediction_time), |
| | title="distribution of test_labels", |
| | ): |
| | return False, predictions |
| |
|
| | max_index = test_images.shape[0] |
| | if page_count != -1: |
| | max_index = min(24 * page_count, max_index) |
| | logger.info( |
| | f"image_classifier.predict(): rendering {max_index / 24:.0f} sheet(s)..." |
| | ) |
| | for index in tqdm(range(0, max_index, 24)): |
| | self.render( |
| | predictions[index : index + 24], |
| | None if test_labels is None else test_labels[index : index + 24], |
| | test_images[index : index + 24], |
| | "{}/image_classifier/prediction/{:05d}.jpg".format( |
| | output_path, |
| | int(index / 24), |
| | ), |
| | prediction_time, |
| | ) |
| |
|
| | return True, predictions |
| |
|
| | def predict_frame(self, frame): |
| | prediction_time = time.time() |
| | try: |
| | prediction = self.model.predict( |
| | np.expand_dims( |
| | cv2.resize( |
| | frame, (self.params["window_size"], self.params["window_size"]) |
| | ) |
| | / 255.0, |
| | axis=0, |
| | ) |
| | ) |
| | except: |
| | crash_report("-image_classifier: predict_frame(): failed.") |
| | return False, -1 |
| |
|
| | prediction_time = time.time() - prediction_time |
| |
|
| | output = np.argmax(prediction) |
| |
|
| | logger.info( |
| | "image_classifier.prediction: [{}] -> {} - took {}".format( |
| | ",".join( |
| | [ |
| | "{}:{:.2f}".format(class_name, value) |
| | for class_name, value in zip(self.class_names, prediction[0]) |
| | ] |
| | ), |
| | self.class_names[output], |
| | string.pretty_duration( |
| | prediction_time, |
| | include_ms=True, |
| | short=True, |
| | ), |
| | ) |
| | ) |
| |
|
| | return True, output |
| |
|
| | def render( |
| | self, |
| | predictions, |
| | test_labels, |
| | test_images, |
| | output_filename="", |
| | prediction_time=0, |
| | ): |
| | num_rows = 4 |
| | num_cols = 6 |
| | num_images = num_rows * num_cols |
| | plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows)) |
| | for i in range(min(num_images, len(predictions))): |
| | plt.subplot(num_rows, 2 * num_cols, 2 * i + 1) |
| | plot_image(i, predictions[i], test_labels, test_images, self.class_names) |
| | plt.subplot(num_rows, 2 * num_cols, 2 * i + 2) |
| | plot_value_array(i, predictions[i], test_labels) |
| | plt.tight_layout() |
| |
|
| | if output_filename: |
| | filename_ = file.auxiliary("prediction", "png") |
| | plt.savefig(filename_) |
| | plt.close() |
| |
|
| | success, image = file.load_image(filename_) |
| | if success: |
| | image = graphics.add_signature( |
| | image, |
| | [" | ".join(host.signature()), " | ".join(objects.signature())], |
| | self.signature(prediction_time), |
| | ) |
| | file.save_image(output_filename, image) |
| |
|
| | def save(self, model_path): |
| | model_filename = f"{model_path}/image_classifier/model" |
| |
|
| | if not file.prepare_for_saving(model_filename): |
| | return False |
| |
|
| | try: |
| | self.model.save(model_filename) |
| | logger.info(f"image_classifier.model -> {model_filename}") |
| | except: |
| | crash_report(f"-image_classifier: save({model_path}): failed.") |
| | return False |
| |
|
| | self.params["object_name"] = path.name(model_path) |
| |
|
| | self.params["model_size"] = file.size(f"{model_path}/image_classifier/model") |
| |
|
| | if not file.save_json( |
| | f"{model_path}/image_classifier/model/class_names.json", |
| | self.class_names, |
| | ): |
| | return False |
| |
|
| | if not file.save_json( |
| | f"{model_path}/image_classifier/model/params.json", |
| | self.params, |
| | ): |
| | return False |
| |
|
| | return True |
| |
|
| | def signature(self, prediction_time): |
| | return [ |
| | " | ".join( |
| | [ |
| | "image_classifier", |
| | self.params["object_name"], |
| | string.pretty_bytes(self.params["model_size"]) |
| | if self.params["model_size"] |
| | else "", |
| | string.pretty_shape(self.input_shape), |
| | "/".join(string.shorten(self.class_names)), |
| | "took {} / image".format( |
| | string.pretty_duration( |
| | prediction_time, |
| | include_ms=True, |
| | largest=True, |
| | short=True, |
| | ) |
| | ), |
| | ] |
| | ) |
| | ] |
| |
|
| | @staticmethod |
| | def train(data_path, model_path, color=False, convnet=True, epochs=10): |
| | classifier = Image_Classifier() |
| | classifier.params["color"] = color |
| | classifier.params["convnet"] = convnet |
| |
|
| | logger.info( |
| | "image_classifier.train({}) -{}> {}".format( |
| | data_path, |
| | "convnet-" if classifier.params["convnet"] else "", |
| | model_path, |
| | ) |
| | ) |
| |
|
| | success, train_images = file.load(f"{data_path}/train_images.pyndarray") |
| | if success: |
| | success, train_labels = file.load(f"{data_path}/train_labels.pyndarray") |
| | if success: |
| | success, test_images = file.load(f"{data_path}/test_images.pyndarray") |
| | if success: |
| | success, test_labels = file.load(f"{data_path}/test_labels.pyndarray") |
| | if success: |
| | success, classifier.class_names = file.load_json( |
| | f"{data_path}/class_names.json" |
| | ) |
| | if not success: |
| | return False |
| |
|
| | from tensorflow.keras.utils import to_categorical |
| |
|
| | train_labels = to_categorical(train_labels) |
| | test_labels = to_categorical(test_labels) |
| |
|
| | window_size = train_images.shape[1] |
| | input_shape = ( |
| | (window_size, window_size, 3) |
| | if color |
| | else (window_size, window_size, 1) |
| | if convnet |
| | else (window_size, window_size) |
| | ) |
| | logger.info(f"input:{string.pretty_shape(input_shape)}") |
| |
|
| | if convnet and not color: |
| | train_images = np.expand_dims(train_images, axis=3) |
| | test_images = np.expand_dims(test_images, axis=3) |
| |
|
| | for name, thing in zip( |
| | "train_images,train_labels,test_images,test_labels".split(","), |
| | [train_images, train_labels, test_images, test_labels], |
| | ): |
| | logger.info("{}: {}".format(name, string.pretty_shape_of_matrix(thing))) |
| | logger.info( |
| | f"{len(classifier.class_names)} class(es): {', '.join(classifier.class_names)}" |
| | ) |
| |
|
| | train_images = train_images / 255.0 |
| | test_images = test_images / 255.0 |
| |
|
| | if convnet: |
| | |
| | classifier.model = tf.keras.Sequential( |
| | [ |
| | tf.keras.layers.Conv2D( |
| | filters=48, |
| | kernel_size=3, |
| | activation="relu", |
| | input_shape=input_shape, |
| | ), |
| | tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| | tf.keras.layers.Conv2D( |
| | filters=48, kernel_size=3, activation="relu" |
| | ), |
| | tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| | tf.keras.layers.Conv2D( |
| | filters=32, kernel_size=3, activation="relu" |
| | ), |
| | tf.keras.layers.MaxPool2D(pool_size=2, strides=2), |
| | tf.keras.layers.Flatten(), |
| | tf.keras.layers.Dense(128, activation="relu"), |
| | tf.keras.layers.Dense(64, activation="relu"), |
| | tf.keras.layers.Dense(len(classifier.class_names)), |
| | tf.keras.layers.Activation("softmax"), |
| | ] |
| | ) |
| | else: |
| | |
| | classifier.model = tf.keras.Sequential( |
| | [ |
| | tf.keras.layers.Flatten(input_shape=input_shape), |
| | tf.keras.layers.Dense(128, activation="relu"), |
| | tf.keras.layers.Dense(len(classifier.class_names)), |
| | tf.keras.layers.Activation("softmax"), |
| | ] |
| | ) |
| |
|
| | classifier.model.summary() |
| |
|
| | classifier.model.compile( |
| | optimizer="adam", |
| | loss=tf.keras.losses.categorical_crossentropy, |
| | metrics=["accuracy"], |
| | ) |
| |
|
| | classifier.model.fit(train_images, train_labels, epochs=epochs) |
| |
|
| | test_accuracy = float( |
| | classifier.model.evaluate(test_images, test_labels, verbose=2)[1] |
| | ) |
| | logger.info("test accuracy: {:.4f}".format(test_accuracy)) |
| |
|
| | if not file.save_json( |
| | f"{model_path}/image_classifier/model/evaluation.json", |
| | {"metrics": {"test_accuracy": test_accuracy}}, |
| | ): |
| | return False |
| |
|
| | if not classifier.save(model_path): |
| | return False |
| |
|
| | return classifier.predict( |
| | test_images, |
| | np.argmax(test_labels, axis=1), |
| | model_path, |
| | ) |
| |
|
| | @property |
| | def input_shape(self): |
| | return self.model.layers[0].input_shape[1:] if self.model.layers else [] |
| |
|