# """ # Model utility functions for saving, loading, and inspecting models. # """ # import os # import json # from pathlib import Path # from typing import Dict, Optional, Union # import tensorflow as tf # from tensorflow.keras.models import Model, load_model as keras_load_model # import sys # sys.path.append(str(Path(__file__).parent.parent.parent)) # from src.config import MODELS_DIR, CUSTOM_CNN_PATH, MOBILENET_PATH, VGG_PATH # def save_model( # model: Model, # save_path: Union[str, Path], # save_format: str = 'h5', # include_optimizer: bool = True, # save_metadata: bool = True, # metadata: Optional[Dict] = None # ) -> None: # """ # Save a trained model to disk. # Args: # model: Keras model to save # save_path: Path to save the model # save_format: Format to save ('h5' or 'tf') # include_optimizer: Whether to include optimizer state # save_metadata: Whether to save training metadata # metadata: Optional metadata dictionary # """ # save_path = Path(save_path) # # Create directory if needed # save_path.parent.mkdir(parents=True, exist_ok=True) # if save_format == 'h5': # model.save(str(save_path), include_optimizer=include_optimizer) # else: # # SavedModel format # model.save(str(save_path.with_suffix('')), save_format='tf') # # Save metadata if requested # if save_metadata and metadata: # metadata_path = save_path.with_suffix('.json') # with open(metadata_path, 'w') as f: # json.dump(metadata, f, indent=2) # print(f"Model saved to: {save_path}") # def load_model( # model_path: Union[str, Path], # custom_objects: Optional[Dict] = None, # compile_model: bool = True # ) -> Model: # """ # Load a saved model from disk. # Args: # model_path: Path to the saved model # custom_objects: Optional custom objects for loading # compile_model: Whether to compile the model # Returns: # Loaded Keras model # """ # model_path = Path(model_path) # if not model_path.exists(): # # Check if it's a SavedModel directory # if model_path.with_suffix('').exists(): # model_path = model_path.with_suffix('') # else: # raise FileNotFoundError(f"Model not found: {model_path}") # model = keras_load_model( # str(model_path), # custom_objects=custom_objects, # compile=compile_model # ) # print(f"Model loaded from: {model_path}") # return model # def load_model_metadata(model_path: Union[str, Path]) -> Optional[Dict]: # """ # Load metadata for a saved model. # Args: # model_path: Path to the saved model # Returns: # Metadata dictionary or None # """ # metadata_path = Path(model_path).with_suffix('.json') # if metadata_path.exists(): # with open(metadata_path, 'r') as f: # return json.load(f) # return None # def get_model_summary(model: Model, print_summary: bool = True) -> Dict: # """ # Get a summary of the model architecture. # Args: # model: Keras model # print_summary: Whether to print the summary # Returns: # Dictionary with model statistics # """ # if print_summary: # model.summary() # # Calculate parameters # trainable = sum([tf.keras.backend.count_params(w) for w in model.trainable_weights]) # non_trainable = sum([tf.keras.backend.count_params(w) for w in model.non_trainable_weights]) # summary = { # "name": model.name, # "total_params": trainable + non_trainable, # "trainable_params": trainable, # "non_trainable_params": non_trainable, # "num_layers": len(model.layers), # "input_shape": model.input_shape, # "output_shape": model.output_shape # } # return summary # def get_available_models() -> Dict[str, Dict]: # """ # Get information about available pre-trained models. # Returns: # Dictionary with model information # """ # models = {} # model_paths = { # "custom_cnn": CUSTOM_CNN_PATH, # "mobilenet": MOBILENET_PATH, # "vgg19": VGG_PATH # } # for name, path in model_paths.items(): # if Path(path).exists(): # metadata = load_model_metadata(path) # models[name] = { # "path": str(path), # "exists": True, # "metadata": metadata # } # else: # models[name] = { # "path": str(path), # "exists": False, # "metadata": None # } # return models # def compare_models(models: Dict[str, Model]) -> Dict: # """ # Compare multiple models. # Args: # models: Dictionary of model name -> model # Returns: # Comparison dictionary # """ # comparison = {} # for name, model in models.items(): # summary = get_model_summary(model, print_summary=False) # comparison[name] = { # "params": summary["total_params"], # "trainable_params": summary["trainable_params"], # "layers": summary["num_layers"] # } # return comparison # def export_to_tflite( # model: Model, # save_path: Union[str, Path], # quantize: bool = False # ) -> None: # """ # Export model to TensorFlow Lite format. # Args: # model: Keras model to export # save_path: Path to save the TFLite model # quantize: Whether to apply quantization # """ # converter = tf.lite.TFLiteConverter.from_keras_model(model) # if quantize: # converter.optimizations = [tf.lite.Optimize.DEFAULT] # tflite_model = converter.convert() # save_path = Path(save_path) # save_path.parent.mkdir(parents=True, exist_ok=True) # with open(save_path, 'wb') as f: # f.write(tflite_model) # print(f"TFLite model saved to: {save_path}") # if __name__ == "__main__": # print("Available models:") # models = get_available_models() # for name, info in models.items(): # status = "✓ Trained" if info["exists"] else "✗ Not trained" # print(f" {name}: {status}") """ Model utility functions for saving, loading, and inspecting models. """ import os import json from pathlib import Path from typing import Dict, Optional, Union import tensorflow as tf from tensorflow.keras.models import Model, load_model as keras_load_model import sys sys.path.append(str(Path(__file__).parent.parent.parent)) from src.config import MODELS_DIR, CUSTOM_CNN_PATH, MOBILENET_PATH, VGG_PATH # --------------------------------------------------------------------------- # Legacy preprocessing functions # --------------------------------------------------------------------------- # Older saved .h5 models used Lambda layers that baked these functions in. # Current model code uses Rescaling layers instead, but these definitions # must remain so keras_load_model() can deserialise the old .h5 files. # --------------------------------------------------------------------------- def preprocess_mobilenet(x): """Legacy MobileNetV2 preprocessor — scales pixels to [-1, 1].""" return x / 127.5 - 1.0 def preprocess_vgg(x): """Legacy VGG-19 preprocessor — mean-subtracted scaling.""" return x * 255.0 - 127.5 _LEGACY_CUSTOM_OBJECTS: Dict = { "preprocess_mobilenet": preprocess_mobilenet, "preprocess_vgg": preprocess_vgg, } def save_model( model: Model, save_path: Union[str, Path], save_format: str = 'h5', include_optimizer: bool = True, save_metadata: bool = True, metadata: Optional[Dict] = None ) -> None: """ Save a trained model to disk. Args: model: Keras model to save save_path: Path to save the model save_format: Format to save ('h5' or 'tf') include_optimizer: Whether to include optimizer state save_metadata: Whether to save training metadata metadata: Optional metadata dictionary """ save_path = Path(save_path) # Create directory if needed save_path.parent.mkdir(parents=True, exist_ok=True) if save_format == 'h5': model.save(str(save_path), include_optimizer=include_optimizer) else: # SavedModel format model.save(str(save_path.with_suffix('')), save_format='tf') # Save metadata if requested if save_metadata and metadata: metadata_path = save_path.with_suffix('.json') with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) print(f"Model saved to: {save_path}") def load_model( model_path: Union[str, Path], custom_objects: Optional[Dict] = None, compile_model: bool = True ) -> Model: """ Load a saved model from disk. Args: model_path: Path to the saved model custom_objects: Optional custom objects for loading compile_model: Whether to compile the model Returns: Loaded Keras model """ model_path = Path(model_path) # Always include legacy preprocessing functions so that old .h5 models # saved with Lambda layers can be loaded without extra steps. merged_objects = dict(_LEGACY_CUSTOM_OBJECTS) if custom_objects: merged_objects.update(custom_objects) if not model_path.exists(): # Check if it's a SavedModel directory if model_path.with_suffix('').exists(): model_path = model_path.with_suffix('') else: raise FileNotFoundError(f"Model not found: {model_path}") model = keras_load_model( str(model_path), custom_objects=merged_objects, compile=compile_model ) print(f"Model loaded from: {model_path}") return model def load_model_metadata(model_path: Union[str, Path]) -> Optional[Dict]: """ Load metadata for a saved model. Args: model_path: Path to the saved model Returns: Metadata dictionary or None """ metadata_path = Path(model_path).with_suffix('.json') if metadata_path.exists(): with open(metadata_path, 'r') as f: return json.load(f) return None def get_model_summary(model: Model, print_summary: bool = True) -> Dict: """ Get a summary of the model architecture. Args: model: Keras model print_summary: Whether to print the summary Returns: Dictionary with model statistics """ if print_summary: model.summary() # Calculate parameters trainable = sum([tf.keras.backend.count_params(w) for w in model.trainable_weights]) non_trainable = sum([tf.keras.backend.count_params(w) for w in model.non_trainable_weights]) summary = { "name": model.name, "total_params": trainable + non_trainable, "trainable_params": trainable, "non_trainable_params": non_trainable, "num_layers": len(model.layers), "input_shape": model.input_shape, "output_shape": model.output_shape } return summary def get_available_models() -> Dict[str, Dict]: """ Get information about available pre-trained models. Returns: Dictionary with model information """ models = {} model_paths = { "custom_cnn": CUSTOM_CNN_PATH, "mobilenet": MOBILENET_PATH, "vgg19": VGG_PATH } for name, path in model_paths.items(): if Path(path).exists(): metadata = load_model_metadata(path) models[name] = { "path": str(path), "exists": True, "metadata": metadata } else: models[name] = { "path": str(path), "exists": False, "metadata": None } return models def compare_models(models: Dict[str, Model]) -> Dict: """ Compare multiple models. Args: models: Dictionary of model name -> model Returns: Comparison dictionary """ comparison = {} for name, model in models.items(): summary = get_model_summary(model, print_summary=False) comparison[name] = { "params": summary["total_params"], "trainable_params": summary["trainable_params"], "layers": summary["num_layers"] } return comparison def export_to_tflite( model: Model, save_path: Union[str, Path], quantize: bool = False ) -> None: """ Export model to TensorFlow Lite format. Args: model: Keras model to export save_path: Path to save the TFLite model quantize: Whether to apply quantization """ converter = tf.lite.TFLiteConverter.from_keras_model(model) if quantize: converter.optimizations = [tf.lite.Optimize.DEFAULT] tflite_model = converter.convert() save_path = Path(save_path) save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'wb') as f: f.write(tflite_model) print(f"TFLite model saved to: {save_path}") if __name__ == "__main__": print("Available models:") models = get_available_models() for name, info in models.items(): status = "✓ Trained" if info["exists"] else "✗ Not trained" print(f" {name}: {status}")