# /*--------------------------------------------------------------------------------------------- # * Copyright (c) 2023 STMicroelectronics. All rights reserved. # * This software is licensed under terms that can be found in the LICENSE # * file in the root directory of this software component. # * If no LICENSE file comes with this software, it is provided AS-IS. # *--------------------------------------------------------------------------------------------*/ import os import functools import typing from common.stm32ai_dc.backend.cloud.generate_nbg_service import GenerateNbgService from common.stm32ai_dc.backend.cloud.benchmark_service import BenchmarkService from common.stm32ai_dc.backend.cloud.file_service import FileService from common.stm32ai_dc.backend.cloud.helpers import get_supported_versions from common.stm32ai_dc.backend.cloud.login_service import LoginService from common.stm32ai_dc.backend.cloud.user_service import UserService from common.stm32ai_dc.backend.cloud.stm32ai_service import Stm32AiService from common.stm32ai_dc.errors import GenerateNbgFailure, AnalyzeServerError, BenchmarkServerError, InvalidCrendetialsException from common.stm32ai_dc.errors import FileFormatError, GenerateServerError from common.stm32ai_dc.errors import InternalErrorThatShouldNotHappened from common.stm32ai_dc.errors import ParameterError, ValidateServerError from common.stm32ai_dc.errors import LoginFailureException from common.stm32ai_dc.types import AnalyzeResult, BackendVersionType, BenchmarkResult, BoardData, MpuBenchmarkResult, MpuParameters from common.stm32ai_dc.types import GenerateResult, Stm32AiBackend, CliParameters from common.stm32ai_dc.types import ValidateResult, ValidateResultMetrics class CloudBackend(Stm32AiBackend): def __init__( self, username: str, password: str, version: typing.Union[str, None] = None, platform: BackendVersionType = BackendVersionType.STM32, silent = False, ) -> None: self.username = username self.password = password self.version = version self.supportedVersions = get_supported_versions() self.silent = silent self.login_service = LoginService() try: # user_version = next(x for x in self.supportedVersions if x.get('platform',{}).get(platform, x['version']) == version) # self.version = user_version['version'] self.version = self.supportedVersions['version'] except Exception as e: print( f'[WARN] Version {self.version} for platform {platform} is not supported by Developer Cloud.') for v in self.supportedVersions: if (v.get('isLatest', False) == True): latest = v if (latest): self.version = latest.get('version', None) print( f"[WARN] It will use the latest version by default ({self.version}, {platform} version: {latest.get('platform').get(platform, '---')})") if username is None or password is None or not len(username) or not len(password): # Try to use previous tokens saved in home directory sso_resp = self.login_service.get_access_token() if (sso_resp): self.auth_token = sso_resp else: raise LoginFailureException( username, password, details='Empty login or stored token invalid.') else: try: self.auth_token = self.login_service.login( username=username, password=password) except InvalidCrendetialsException as e : raise e except Exception: raise LoginFailureException( username, password, details='Login failure, try again and please check your credentials and/or your network' ) if self.auth_token is None: raise LoginFailureException( username, password, details='Please check your credentials') self.user_service = UserService(self.auth_token) self.stm32ai_service = Stm32AiService(self.auth_token, self.version, self.silent) self.file_service = FileService(self.auth_token) self.benchmark_service = BenchmarkService(self.auth_token, self.silent) self.generate_nbg_service = GenerateNbgService(self.auth_token) def get_user(self): return self.user_service.get_user() def analyze(self, options: CliParameters) -> AnalyzeResult: rid = self.stm32ai_service.trigger_analyze(options) result = self.stm32ai_service.wait_for_run(rid) if result is None: if 'message' in result: raise AnalyzeServerError(f"Missing data in server \ response: {result['message']}") raise AnalyzeServerError('Missing data in server response') report = result.get('report', None) graph = result.get('graph', None) info = result.get('info', None) cinfo_graph = info['graphs'][0] if info is not None else None date_time = info['environment']['generated_model']['generated_time'] if info is not None else report['date_time'] cli_version_str = info['environment']['tools'][0]['version'] if info is not None else report['cli_version_str'] cli_parameters = info['environment']['tools'][0]['arguments'] if info is not None else report['cli_parameters'] memory_footprint = graph.get('memory_footprint', {}) if graph else info.get('memory_footprint', {}) macc = functools.reduce(lambda a, b: a+b, map(lambda a: a['macc'], cinfo_graph['nodes']), 0) if cinfo_graph \ else graph['macc'] if graph \ else 0 analyze_result = AnalyzeResult( activations_size=memory_footprint.get('activations', 0), weights=memory_footprint.get('weights', 0), macc=macc, rom_size=memory_footprint.get('weights', 0) + memory_footprint.get('kernel_flash', 0) + memory_footprint.get('toolchain_flash', 0), ram_size=memory_footprint.get('activations', 0) + functools.reduce( lambda a, b: a+b, memory_footprint.get('io', []), 0) + memory_footprint.get('kernel_ram', 0) + memory_footprint.get('toolchain_ram', memory_footprint.get('extra_ram', 0)), total_ram_io_size=functools.reduce( lambda a, b: a+b, memory_footprint.get('io', []), 0), report=report, graph=graph, info=info, date_time=date_time, cli_version_str=cli_version_str, cli_parameters=cli_parameters, estimated_library_flash_size=memory_footprint.get( 'kernel_flash', -1), estimated_library_ram_size=memory_footprint.get( 'kernel_ram', -1) ) return analyze_result def generate(self, options: CliParameters) -> GenerateResult: import zipfile # Local import to avoid global perf drawback if not os.path.exists(options.output): os.makedirs(options.output, exist_ok=True) rid = self.stm32ai_service.trigger_generate(options) result = self.stm32ai_service.wait_for_run(rid) if result is None or 'url' not in result: if 'message' in result: raise GenerateServerError(f"Missing data in server \ response: {result.get('message')}") raise GenerateServerError('Missing data in server response') local_filepath = self.file_service.download_generated_file( result.get('url', None), options.output) if local_filepath: # TODO: Check is file is really zip file or JSON content if zipfile.is_zipfile(local_filepath): zfile = zipfile.ZipFile(local_filepath) zfile.extractall(options.output) zfile.close() return GenerateResult( server_url=result.get('url', None), zipfile_path=local_filepath, output_path=options.output, graph=result.get('graph', {}), report=result.get('report', {})) else: raise FileFormatError("A zip file was expected \ as server reply") raise InternalErrorThatShouldNotHappened("No local file received") def validate(self, options: CliParameters) -> ValidateResult: rid = self.stm32ai_service.trigger_validate(options) result = self.stm32ai_service.wait_for_run(rid) if result is None: if 'message' in result: raise ValidateServerError(f"Missing data in server \ response: {result.get('message')}") raise ValidateServerError('Missing data in server response') report = result.get('report', {}) graph = result.get('graph', {}) cinfo = result.get('info', None) if cinfo != None: validate_result_metrics = [] for valmetrics in cinfo['validation']['val_metrics']: validate_result_metrics.append(ValidateResultMetrics( accuracy=valmetrics['acc'], description=valmetrics['desc'], l2r=valmetrics['l2r'], mae=valmetrics['mae'], mean=valmetrics['mean'], rmse=valmetrics['rmse'], std=valmetrics['std'], ts_name=valmetrics.get('ts_name', '') )) cinfo_graph = cinfo['graphs'][0] memory_footprint=cinfo.get('memory_footprint', {}) validate_result = ValidateResult( rom_size=memory_footprint['weights'], macc=functools.reduce(lambda a, b: a+b, map(lambda a: a['macc'], cinfo_graph['nodes']), 0), ram_size=memory_footprint['activations'], total_ram_io_size=functools.reduce( lambda a, b: a+b, memory_footprint['io'], 0), validation_metrics=validate_result_metrics, validation_error=cinfo['validation']['val_error'], validation_error_description=cinfo['validation']['val_error_desc'], # rom_compression_factor=report['rom_cfact'], # report_version=report['report_version'], date_time=cinfo['environment']['generated_model']['generated_time'], cli_version_str=cinfo['environment']['tools'][0]['version'], cli_parameters=cinfo['environment']['tools'][0]['arguments'], report=report, graph=graph, info=cinfo, estimated_library_flash_size=int(memory_footprint.get("kernel_flash", 0) or 0) + int(memory_footprint.get("extra_flash", 0) or 0) + int(memory_footprint.get("toolchain_flash", 0) or 0), estimated_library_ram_size=int(memory_footprint.get("kernel_ram", 0) or 0) + int(memory_footprint.get("extra_ram", 0) or 0) + int(memory_footprint.get("toolchain_ram", 0) or 0), ) return validate_result else: validate_result_metrics = [] for valmetrics in report['val_metrics']: validate_result_metrics.append(ValidateResultMetrics( accuracy=valmetrics['acc'], description=valmetrics['desc'], l2r=valmetrics['l2r'], mae=valmetrics['mae'], mean=valmetrics['mae'], rmse=valmetrics['rmse'], std=valmetrics['std'], ts_name=valmetrics['ts_name'] )) validate_result = ValidateResult( rom_size=report['rom_size'], macc=report['rom_n_macc'], ram_size=report['ram_size'][0], total_ram_io_size=functools.reduce( lambda a, b: a+b, report.get('ram_io_size', 0), 0), validation_metrics=validate_result_metrics, validation_error=report['val_error'], validation_error_description=report['val_error_desc'], # rom_compression_factor=report['rom_cfact'], # report_version=report['report_version'], date_time=report['date_time'], cli_version_str=report['cli_version_str'], cli_parameters=report['cli_parameters'], report=report, graph=graph, info=None, estimated_library_flash_size=memory_footprint.get("kernel_flash", 0) + memory_footprint.get("toolchain_flash", 0) + memory_footprint.get("extra_flash", 0), estimated_library_ram_size=memory_footprint.get("kernel_ram", 0) + memory_footprint.get("extra_ram", 0) + memory_footprint.get("toolchain_ram", 0), ) return validate_result def benchmark(self, options: typing.Union[CliParameters, MpuParameters], board_name: str, timeout: int): if options.model not in list(map(lambda x: x['name'], self.file_service.list_models())): raise ParameterError("options.model should be a file name that is \ already uploaded on the cloud") bid = self.benchmark_service.trigger_benchmark( options, board_name, self.version) result = self.benchmark_service.wait_for_run(bid, timeout=timeout) if (isinstance(options, MpuParameters)): benchmark_report = result.get('benchmark', {}) exec_time = benchmark_report.get('exec_time', {}) tool_version = benchmark_report.get('tool_version', {}) benchmark_result = MpuBenchmarkResult( device=benchmark_report.get('board', 'N/A'), duration_ms=exec_time.get('duration_ms', -1), npu_percent=exec_time.get('npu_percent', -1), gpu_percent=exec_time.get('gpu_percent', -1), cpu_percent=exec_time.get('cpu_percent', -1), rom_size=benchmark_report.get('model_size', -1), ram_size=benchmark_report.get('ram_peak', -1), date_time=benchmark_report.get('date'), info=benchmark_report, cli_version_str=tool_version.get('name') + ' ' + tool_version.get('major') + '.' + tool_version.get('minor') + '.' + tool_version.get('micro') ) return benchmark_result elif (isinstance(options, CliParameters)): if result.get('benchmark', {}).get('info', None) != None: cinfo = result['benchmark']['info'] memory_mapping = result.get('memoryMapping', {}) if result.get('memoryMapping') is not None else {} report = result['benchmark']['report'] validate_result_metrics = [] for valmetrics in cinfo.get('validation', {}).get('val_metrics', []): validate_result_metrics.append(ValidateResultMetrics( accuracy=valmetrics['acc'], description=valmetrics['desc'], l2r=valmetrics['l2r'], mae=valmetrics['mae'], mean=valmetrics['mean'], rmse=valmetrics['rmse'], std=valmetrics['std'], ts_name=valmetrics.get('ts_name', '') )) cinfo_graph = cinfo['graphs'][0] graph = result['benchmark']['graph'] exec_time = cinfo_graph.get('exec_time') if cinfo_graph else graph.get('exec_time', {}) c_arrays = {} for arr in [b for b in cinfo['buffers'] if b['is_param'] is True]: c_arrays[arr['name']] = arr memory_footprint = memory_mapping.get('memoryFootprint', cinfo.get("memory_footprint", {})) memory_pools = [p for p in cinfo.get('memory_pools', []) if p.get('size_bytes', -1) >= 0 and not p.get('virtual', False) and len(p.get('subpools', [])) == 0 and p.get('rights', 'ACC_WRITE') != 'ACC_READ'] internal_memory_pools = [p for p in memory_pools if "EXTERNAL" not in p['name'] and 0x60000000 > int(p.get('address', 0), 16 if p.get('address', '').lower().startswith('0x') else 10)] external_memory_pools = [p for p in memory_pools if "EXTERNAL" in p['name'] or 0x60000000 <= int(p.get('address', 0), 16 if p.get('address', '').lower().startswith('0x') else 10)] layers_in_internal_flash = memory_mapping.get('layersInExternalFlash', []) layers_in_external_flash = memory_mapping.get('layersInInternalFlash', []) internal_flash_usage = functools.reduce(lambda a, b: a+b, map(lambda a: c_arrays.get(a + "_array", {}).get('size_bytes', 0), layers_in_internal_flash), 0) external_flash_usage = functools.reduce(lambda a, b: a+b, map(lambda a: c_arrays.get(a + "_array", {}).get('size_bytes', 0), layers_in_external_flash), 0) internal_ram_consumption=functools.reduce(lambda a, b: a + b.get("used_size_bytes", 0), internal_memory_pools, 0) external_ram_consumption=functools.reduce(lambda a, b: a + b.get("used_size_bytes", 0), external_memory_pools, 0) benchmark_result = BenchmarkResult( activations_size=memory_footprint.get('activations', 0), weights=memory_footprint.get('weights', 0), rom_size=memory_footprint.get('weights', 0) + memory_footprint.get('kernel_flash', 0) + memory_footprint.get('toolchain_flash', 0), ram_size=memory_footprint.get('activations', 0) + functools.reduce( lambda a, b: a+b, memory_footprint.get('io', []), 0) + memory_footprint.get('kernel_ram', 0) + memory_footprint.get('toolchain_ram', memory_footprint.get('extra_ram', 0)), macc=functools.reduce(lambda a, b: a+b, map(lambda a: a['macc'], cinfo_graph['nodes']), 0), total_ram_io_size=functools.reduce( lambda a, b: a+b, memory_footprint['io'], 0), validation_metrics=validate_result_metrics, validation_error=cinfo.get('validation', {}).get('val_error', ''), validation_error_description=cinfo.get('validation', {}).get('val_error_desc', ''), # rom_compression_factor=report['rom_cfact'], # report_version=report['report_version'], date_time=cinfo['environment']['generated_model']['generated_time'], cli_version_str=cinfo['environment']['tools'][0]['version'], cli_parameters=cinfo['environment']['tools'][0]['arguments'], report=report, graph=graph, info=cinfo, cycles=exec_time.get('cycles', -1), duration_ms=exec_time.get('duration_ms', -1), device=cinfo['environment'].get('device', ""), cycles_by_macc=exec_time.get('cycles_by_macc', -1), estimated_library_flash_size=int(memory_footprint.get("kernel_flash", 0) or 0) + int(memory_footprint.get("extra_flash", 0) or 0) + int(memory_footprint.get("toolchain_flash", 0) or 0), estimated_library_ram_size=int(memory_footprint.get("kernel_ram", 0) or 0) + int(memory_footprint.get("extra_ram", 0) or 0) + int(memory_footprint.get("toolchain_ram", 0) or 0), use_external_ram=memory_mapping.get("useExternalRam", external_ram_consumption > 0), use_external_flash=memory_mapping.get("useExternalFlash", False), internal_ram_consumption=internal_ram_consumption, external_ram_consumption=external_ram_consumption, internal_flash_consumption=internal_flash_usage, external_flash_consumption=external_flash_usage, ) return benchmark_result elif result: benchmark = result.get('benchmark', {}) memory_mapping = result.get('memoryMapping', {}) if result.get('memoryMapping') is not None else {} report = benchmark['report'] validate_result_metrics = [] for valmetrics in report['val_metrics']: validate_result_metrics.append(ValidateResultMetrics( accuracy=valmetrics['acc'], description=valmetrics['desc'], l2r=valmetrics['l2r'], mae=valmetrics['mae'], mean=valmetrics['mean'], rmse=valmetrics['rmse'], std=valmetrics['std'], ts_name=valmetrics['ts_name'] )) graph = benchmark['graph'] exec_time = graph['exec_time'] c_arrays = {} for arr in graph.get('c_arrays', []): c_arrays[arr['name']] = arr memory_footprint = memory_mapping.get('memoryFootprint', graph.get("memory_footprint", {})) memory_pools = [p for p in graph.get('memory_pools', []) if p.get('size_bytes', -1) >= 0] internal_memory_pools = [p for p in memory_pools if "EXTERNAL" not in p['name']] external_memory_pools = [p for p in memory_pools if "EXTERNAL" in p['name']] layers_in_internal_flash = memory_mapping.get('layersInExternalFlash', []) layers_in_external_flash = memory_mapping.get('layersInInternalFlash', []) internal_flash_usage = functools.reduce(lambda a, b: a+b, map(lambda a: c_arrays.get(a + "_array").get('c_size_in_byte'), layers_in_internal_flash), 0) external_flash_usage = functools.reduce(lambda a, b: a+b, map(lambda a: c_arrays.get(a + "_array").get('c_size_in_byte'), layers_in_external_flash), 0) benchmark_result = BenchmarkResult( activations_size=memory_footprint.get('activations', 0), weights=memory_footprint.get('weights', 0), rom_size=report['rom_size'], macc=report['rom_n_macc'], ram_size=report['ram_size'][0], total_ram_io_size=functools.reduce( lambda a, b: a+b, report.get('ram_io_size', 0), 0), validation_metrics=validate_result_metrics, validation_error=report['val_error'], validation_error_description=report['val_error_desc'], # rom_compression_factor=report['rom_cfact'], # report_version=report['report_version'], date_time=report['date_time'], cli_version_str=report['cli_version_str'], cli_parameters=report['cli_parameters'], report=report, graph=graph, info=None, cycles=exec_time.get('cycles', -1), duration_ms=exec_time.get('duration_ms', -1), device=exec_time.get('device', ""), cycles_by_macc=exec_time.get('cycles_by_macc', -1), estimated_library_flash_size=memory_footprint.get("kernel_flash", 0) + memory_footprint.get("extra_flash", 0) + memory_footprint.get("toolchain_flash", 0), estimated_library_ram_size=memory_footprint.get("kernel_ram", 0) + memory_footprint.get("extra_ram", 0) + memory_footprint.get("toolchain_ram", 0), use_external_ram=memory_mapping.get("useExternalRam", False), use_external_flash=memory_mapping.get("useExternalFlash", False), internal_ram_consumption=functools.reduce(lambda a, b: a + b.get("used_size", 0), internal_memory_pools, 0), external_ram_consumption=functools.reduce(lambda a, b: a + b.get("used_size", 0), external_memory_pools, 0), internal_flash_consumption=internal_flash_usage, external_flash_consumption=external_flash_usage, ) return benchmark_result else: raise BenchmarkServerError("Benchmark service return wrong format") def generate_nbg(self, model_name, timeout = 300): bid = self.generate_nbg_service.trigger_optimize( model_name) result = self.generate_nbg_service.wait_for_run(bid, timeout=timeout) if result.get('blobName', None) != None: return result.get('blobName') else: raise GenerateNbgFailure('NBG Generation failed. No file found') def get_benchmark_boards(self): out: typing.List[BoardData] = [] boards_data = self.benchmark_service.list_boards() for k in boards_data.keys(): out.append(BoardData( name=k, boardCount=boards_data[k].get('boardCount', -1), flashSize=boards_data[k].get('flashSize', ''), deviceCpu=boards_data[k].get('deviceCpu', ''), deviceId=boards_data[k].get('deviceId', ''))) return out def list_models(self): return self.file_service.list_models() def upload_model(self, model_path: str): return self.file_service.upload_model(model_path) def download_model(self, model_name: str, path: str): return self.file_service.download_model(model_name, path) def delete_model(self, model_name: str): return self.file_service.delete_model(model_name)