| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | import logging |
| | import asyncio |
| | import threading |
| | import pathlib |
| | import hashlib |
| | from typing import ( |
| | Dict, |
| | Any, |
| | Type, |
| | Optional, |
| | List, |
| | Sequence, |
| | Union, |
| | Set, |
| | Mapping, |
| | Tuple, |
| | ) |
| | from dataclasses import dataclass |
| | from PySide import QtCore, QtGui |
| | from .store.base import AssetStore |
| | from .asset import Asset |
| | from .serializer import AssetSerializer |
| | from .uri import AssetUri |
| | from .cache import AssetCache, CacheKey |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| | logging.basicConfig(level=logging.ERROR) |
| |
|
| |
|
| | @dataclass |
| | class _AssetConstructionData: |
| | """Holds raw data and type info needed to construct an asset instance.""" |
| |
|
| | store: str |
| | uri: AssetUri |
| | raw_data: bytes |
| | asset_class: Type[Asset] |
| | |
| | dependencies_data: Optional[Dict[AssetUri, Optional["_AssetConstructionData"]]] = None |
| |
|
| |
|
| | class AssetManager: |
| | def __init__(self, cache_max_size_bytes: int = 100 * 1024 * 1024): |
| | self.stores: Dict[str, AssetStore] = {} |
| | self._serializers: List[Tuple[Type[AssetSerializer], Type[Asset]]] = [] |
| | self._asset_classes: Dict[str, Type[Asset]] = {} |
| | self.asset_cache = AssetCache(max_size_bytes=cache_max_size_bytes) |
| | self._cacheable_stores: Set[str] = set() |
| | logger.debug(f"AssetManager initialized (Thread: {threading.current_thread().name})") |
| |
|
| | def register_store(self, store: AssetStore, cacheable: bool = False): |
| | """Registers an AssetStore with the manager.""" |
| | logger.debug(f"Registering store: {store.name}, cacheable: {cacheable}") |
| | self.stores[store.name] = store |
| | if cacheable: |
| | self._cacheable_stores.add(store.name) |
| |
|
| | def get_serializer_for_class(self, asset_class: Type[Asset]): |
| | for serializer, theasset_class in self._serializers: |
| | if issubclass(asset_class, theasset_class): |
| | return serializer |
| | raise ValueError(f"No serializer found for class {asset_class}") |
| |
|
| | def register_asset(self, asset_class: Type[Asset], serializer: Type[AssetSerializer]): |
| | """Registers an Asset class with the manager.""" |
| | if not issubclass(asset_class, Asset): |
| | raise TypeError(f"Item '{asset_class.__name__}' must be a subclass of Asset.") |
| | if not issubclass(serializer, AssetSerializer): |
| | raise TypeError(f"Item '{serializer.__name__}' must be a subclass of AssetSerializer.") |
| | self._serializers.append((serializer, asset_class)) |
| |
|
| | asset_type_name = getattr(asset_class, "asset_type", None) |
| | if not isinstance(asset_type_name, str) or not asset_type_name: |
| | raise TypeError( |
| | f"Asset class '{asset_class.__name__}' must have a non-empty string 'asset_type' attribute." |
| | ) |
| |
|
| | logger.debug(f"Registering asset type: '{asset_type_name}' -> {asset_class.__name__}") |
| | self._asset_classes[asset_type_name] = asset_class |
| |
|
| | async def _fetch_asset_construction_data_recursive_async( |
| | self, |
| | uri: AssetUri, |
| | store_names: Sequence[str], |
| | visited_uris: Set[AssetUri], |
| | depth: Optional[int] = None, |
| | ) -> Optional[_AssetConstructionData]: |
| | |
| | if uri.asset_type == "library": |
| | logger.info( |
| | f"LIBRARY FETCH: Loading library '{uri.asset_id}' with depth={depth} from stores {store_names}" |
| | ) |
| |
|
| | logger.debug( |
| | f"_fetch_asset_construction_data_recursive_async called {store_names} {uri} {depth}" |
| | ) |
| |
|
| | if uri in visited_uris: |
| | logger.error(f"Cyclic dependency detected for URI: {uri}") |
| | raise RuntimeError(f"Cyclic dependency encountered for URI: {uri}") |
| |
|
| | |
| | if not store_names: |
| | raise ValueError("At least one store name must be provided.") |
| |
|
| | asset_class = self._asset_classes.get(uri.asset_type) |
| | if not asset_class: |
| | raise ValueError(f"No asset class registered for URI: {uri}") |
| |
|
| | |
| | raw_data = None |
| | found_store_name = None |
| |
|
| | |
| | if uri.asset_type == "toolbit": |
| | logger.info( |
| | f"TOOLBIT SEARCH: Looking for toolbit '{uri.asset_id}' in stores: {store_names}" |
| | ) |
| |
|
| | for current_store_name in store_names: |
| | store = self.stores.get(current_store_name) |
| | if not store: |
| | logger.warning(f"Store '{current_store_name}' not registered. Skipping.") |
| | continue |
| |
|
| | |
| | if uri.asset_type == "toolbit": |
| | store_path = getattr(store, "base_path", "unknown") |
| | logger.info( |
| | f"TOOLBIT SEARCH: Checking store '{current_store_name}' at path: {store_path}" |
| | ) |
| |
|
| | try: |
| | raw_data = await store.get(uri) |
| | found_store_name = current_store_name |
| | if uri.asset_type == "toolbit": |
| | logger.info( |
| | f"TOOLBIT FOUND: '{uri.asset_id}' found in store '{found_store_name}'" |
| | ) |
| | logger.debug( |
| | f"_fetch_asset_construction_data_recursive_async: Asset {uri} found in store {found_store_name}" |
| | ) |
| | break |
| | except FileNotFoundError: |
| | if uri.asset_type == "toolbit": |
| | logger.info( |
| | f"TOOLBIT SEARCH: '{uri.asset_id}' NOT found in store '{current_store_name}'" |
| | ) |
| | logger.debug( |
| | f"_fetch_asset_construction_data_recursive_async: Asset {uri} not found in store {current_store_name}" |
| | ) |
| | continue |
| |
|
| | if raw_data is None or not found_store_name: |
| | if uri.asset_type == "toolbit": |
| | logger.warning( |
| | f"TOOLBIT NOT FOUND: '{uri.asset_id}' not found in any of the stores: {store_names}" |
| | ) |
| | return None |
| |
|
| | if depth == 0: |
| | if uri.asset_type == "library": |
| | logger.warning( |
| | f"LIBRARY SHALLOW: Library '{uri.asset_id}' loaded with depth=0 - no dependencies will be resolved" |
| | ) |
| | return _AssetConstructionData( |
| | store=found_store_name, |
| | uri=uri, |
| | raw_data=raw_data, |
| | asset_class=asset_class, |
| | dependencies_data=None, |
| | ) |
| |
|
| | |
| | serializer = self.get_serializer_for_class(asset_class) |
| | dependency_uris = asset_class.extract_dependencies(raw_data, serializer) |
| |
|
| | |
| | |
| | deps_construction_data: Dict[AssetUri, Optional[_AssetConstructionData]] = {} |
| |
|
| | for dep_uri in dependency_uris: |
| | visited_uris.add(uri) |
| | try: |
| | |
| | dep_data = await self._fetch_asset_construction_data_recursive_async( |
| | dep_uri, |
| | store_names, |
| | visited_uris, |
| | None if depth is None else depth - 1, |
| | ) |
| | finally: |
| | visited_uris.remove(uri) |
| | deps_construction_data[dep_uri] = dep_data |
| |
|
| | logger.debug( |
| | f"ToolBitShape '{uri.asset_id}' dependencies_data: {deps_construction_data is None}" |
| | ) |
| |
|
| | return _AssetConstructionData( |
| | store=found_store_name, |
| | uri=uri, |
| | raw_data=raw_data, |
| | asset_class=asset_class, |
| | dependencies_data=deps_construction_data, |
| | ) |
| |
|
| | def _calculate_cache_key_from_construction_data( |
| | self, |
| | construction_data: _AssetConstructionData, |
| | ) -> Optional[CacheKey]: |
| | if not construction_data or not construction_data.raw_data: |
| | return None |
| |
|
| | if construction_data.dependencies_data is None: |
| | deps_signature_tuple: Tuple = ("shallow_children",) |
| | else: |
| | deps_signature_tuple = tuple( |
| | sorted(str(uri) for uri in construction_data.dependencies_data.keys()) |
| | ) |
| |
|
| | raw_data_hash = int(hashlib.sha256(construction_data.raw_data).hexdigest(), 16) |
| |
|
| | return CacheKey( |
| | store_name=construction_data.store, |
| | asset_uri_str=str(construction_data.uri), |
| | raw_data_hash=raw_data_hash, |
| | dependency_signature=deps_signature_tuple, |
| | ) |
| |
|
| | def _build_asset_tree_from_data_sync( |
| | self, |
| | construction_data: Optional[_AssetConstructionData], |
| | ) -> Optional[Asset]: |
| | """ |
| | Synchronously and recursively builds an asset instance. |
| | Integrates caching logic. |
| | """ |
| | if not construction_data: |
| | return None |
| |
|
| | cache_key: Optional[CacheKey] = None |
| | if construction_data.store in self._cacheable_stores: |
| | cache_key = self._calculate_cache_key_from_construction_data(construction_data) |
| | if cache_key: |
| | cached_asset = self.asset_cache.get(cache_key) |
| | if cached_asset is not None: |
| | return cached_asset |
| |
|
| | logger.debug( |
| | f"BuildAssetTreeSync: Instantiating '{construction_data.uri}' " |
| | f"of type '{construction_data.asset_class.__name__}'" |
| | ) |
| |
|
| | resolved_dependencies: Optional[Mapping[AssetUri, Any]] = None |
| | if construction_data.dependencies_data is not None: |
| | resolved_dependencies = {} |
| |
|
| | |
| | if construction_data.uri.asset_type == "library": |
| | logger.info( |
| | f"LIBRARY DEPS: Resolving {len(construction_data.dependencies_data)} dependencies for library '{construction_data.uri.asset_id}'" |
| | ) |
| |
|
| | for ( |
| | dep_uri, |
| | dep_data_node, |
| | ) in construction_data.dependencies_data.items(): |
| | |
| | if dep_uri.asset_type == "toolbit": |
| | logger.info( |
| | f"TOOLBIT DEP: Resolving dependency '{dep_uri.asset_id}' for library '{construction_data.uri.asset_id}'" |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | try: |
| | dep = self._build_asset_tree_from_data_sync(dep_data_node) |
| | if dep_uri.asset_type == "toolbit": |
| | if dep: |
| | logger.info( |
| | f"TOOLBIT DEP: Successfully resolved '{dep_uri.asset_id}' -> {type(dep).__name__}" |
| | ) |
| | else: |
| | logger.warning( |
| | f"TOOLBIT DEP: Dependency '{dep_uri.asset_id}' resolved to None" |
| | ) |
| | except Exception as e: |
| | if dep_uri.asset_type == "toolbit": |
| | logger.error(f"TOOLBIT DEP: Error resolving '{dep_uri.asset_id}': {e}") |
| | logger.error( |
| | f"Error building dependency '{dep_uri}' for asset '{construction_data.uri}': {e}", |
| | exc_info=True, |
| | ) |
| | else: |
| | resolved_dependencies[dep_uri] = dep |
| |
|
| | |
| | if construction_data.uri.asset_type == "library": |
| | toolbit_deps = [ |
| | uri for uri in resolved_dependencies.keys() if uri.asset_type == "toolbit" |
| | ] |
| | logger.info( |
| | f"LIBRARY DEPS: Resolved {len(resolved_dependencies)} total dependencies ({len(toolbit_deps)} toolbits) for library '{construction_data.uri.asset_id}'" |
| | ) |
| | else: |
| | |
| | if construction_data.uri.asset_type == "library": |
| | logger.warning( |
| | f"LIBRARY NO DEPS: Library '{construction_data.uri.asset_id}' has dependencies_data=None - was loaded with depth=0" |
| | ) |
| |
|
| | asset_class = construction_data.asset_class |
| | serializer = self.get_serializer_for_class(asset_class) |
| | try: |
| | |
| | if construction_data.uri.asset_type == "library": |
| | dep_count = len(resolved_dependencies) if resolved_dependencies else 0 |
| | logger.info( |
| | f"LIBRARY INSTANTIATE: Creating library '{construction_data.uri.asset_id}' with {dep_count} dependencies" |
| | ) |
| |
|
| | final_asset = asset_class.from_bytes( |
| | construction_data.raw_data, |
| | construction_data.uri.asset_id, |
| | resolved_dependencies, |
| | serializer, |
| | ) |
| | except Exception as e: |
| | logger.error( |
| | f"Error instantiating asset '{construction_data.uri}' of type '{asset_class.__name__}': {e}", |
| | exc_info=True, |
| | ) |
| | return None |
| |
|
| | if final_asset is not None and cache_key: |
| | |
| | direct_deps_uris_strs: Set[str] = set() |
| | if construction_data.dependencies_data is not None: |
| | direct_deps_uris_strs = { |
| | str(uri) for uri in construction_data.dependencies_data.keys() |
| | } |
| | raw_data_size = len(construction_data.raw_data) |
| | self.asset_cache.put( |
| | cache_key, |
| | final_asset, |
| | raw_data_size, |
| | direct_deps_uris_strs, |
| | ) |
| | return final_asset |
| |
|
| | def get( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> Asset: |
| | """ |
| | Retrieves an asset by its URI (synchronous wrapper), to a specified depth. |
| | IMPORTANT: Assumes this method is CALLED ONLY from the main UI thread |
| | if Asset.from_bytes performs UI operations. |
| | Depth None means infinite depth. Depth 0 means only this asset, no dependencies. |
| | """ |
| | |
| | calling_thread_name = threading.current_thread().name |
| | stores_list = [store] if isinstance(store, str) else store |
| |
|
| | |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| | if asset_uri_obj.asset_type == "library": |
| | logger.info( |
| | f"LIBRARY GET: Request for library '{asset_uri_obj.asset_id}' with depth={depth}" |
| | ) |
| | elif asset_uri_obj.asset_type == "toolbit": |
| | logger.info( |
| | f"TOOLBIT GET: Direct request for toolbit '{asset_uri_obj.asset_id}' with depth={depth} from stores {stores_list}" |
| | ) |
| | |
| | import traceback |
| |
|
| | stack = traceback.format_stack() |
| | caller_info = "".join(stack[-3:-1]) |
| | logger.info(f"TOOLBIT GET CALLER:\n{caller_info}") |
| |
|
| | logger.debug( |
| | f"AssetManager.get(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}" |
| | ) |
| | if ( |
| | QtGui.QApplication.instance() |
| | and QtCore.QThread.currentThread() is not QtGui.QApplication.instance().thread() |
| | ): |
| | logger.warning( |
| | "AssetManager.get() called from a non-main thread! UI in from_bytes may fail!" |
| | ) |
| |
|
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| |
|
| | |
| | try: |
| | logger.debug( |
| | f"Get: Starting asyncio.run for data fetching of '{asset_uri_obj}', depth {depth}." |
| | ) |
| | all_construction_data = asyncio.run( |
| | self._fetch_asset_construction_data_recursive_async( |
| | asset_uri_obj, stores_list, set(), depth |
| | ) |
| | ) |
| | logger.debug( |
| | f"Get: asyncio.run for data fetching of '{asset_uri_obj}', depth {depth} completed." |
| | ) |
| | except Exception as e: |
| | logger.error( |
| | f"Get: Error during asyncio.run data fetching for '{asset_uri_obj}': {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | if all_construction_data is None: |
| | |
| | raise FileNotFoundError(f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'") |
| |
|
| | |
| | |
| | deps_count = 0 |
| | found_deps_count = 0 |
| | if all_construction_data.dependencies_data is not None: |
| | deps_count = len(all_construction_data.dependencies_data) |
| | found_deps_count = sum( |
| | 1 |
| | for d in all_construction_data.dependencies_data.values() |
| | if d is not None |
| | ) |
| |
|
| | logger.debug( |
| | f"Get: Starting synchronous asset tree build for '{asset_uri_obj}' " |
| | f"and {deps_count} dependencies ({found_deps_count} resolved)." |
| | ) |
| | |
| | final_asset = self._build_asset_tree_from_data_sync(all_construction_data) |
| | if not final_asset: |
| | raise ValueError(f"failed to build asset {uri}") |
| | logger.debug(f"Get: Synchronous asset tree build for '{asset_uri_obj}' completed.") |
| | return final_asset |
| |
|
| | def get_or_none( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> Optional[Asset]: |
| | """ |
| | Convenience wrapper for get() that does not raise FileNotFoundError; returns |
| | None instead |
| | """ |
| | try: |
| | return self.get(uri, store, depth) |
| | except FileNotFoundError: |
| | return None |
| |
|
| | async def get_async( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> Optional[Asset]: |
| | """ |
| | Retrieves an asset by its URI (asynchronous), to a specified depth. |
| | NOTE: If Asset.from_bytes does UI work, this method should ideally be awaited |
| | from an asyncio loop that is integrated with the main UI thread (e.g., via QtAsyncio). |
| | If awaited from a plain worker thread's asyncio loop, from_bytes will run on that worker. |
| | """ |
| | calling_thread_name = threading.current_thread().name |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"AssetManager.get_async(uri='{uri}', stores='{stores_list}', depth='{depth}') called from thread: {calling_thread_name}" |
| | ) |
| |
|
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| |
|
| | all_construction_data = await self._fetch_asset_construction_data_recursive_async( |
| | asset_uri_obj, stores_list, set(), depth |
| | ) |
| |
|
| | if all_construction_data is None: |
| | |
| | |
| | raise FileNotFoundError( |
| | f"Asset '{asset_uri_obj}' not found in stores '{stores_list}' (async path)" |
| | ) |
| | |
| |
|
| | |
| | logger.debug( |
| | f"get_async: Building asset tree for '{asset_uri_obj}', depth {depth} in current async context." |
| | ) |
| | return self._build_asset_tree_from_data_sync(all_construction_data) |
| |
|
| | def get_raw( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> bytes: |
| | """Retrieves raw asset data by its URI (synchronous wrapper).""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"AssetManager.get_raw(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}" |
| | ) |
| |
|
| | try: |
| | return asyncio.run(self.get_raw_async(uri, stores_list)) |
| | except Exception as e: |
| | logger.error( |
| | f"GetRaw: Error during asyncio.run for '{uri}': {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | async def get_raw_async( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> bytes: |
| | """Retrieves raw asset data by its URI (asynchronous).""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"AssetManager.get_raw_async(uri='{uri}', stores='{stores_list}') from T:{threading.current_thread().name}" |
| | ) |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| |
|
| | for current_store_name in stores_list: |
| | thestore = self.stores.get(current_store_name) |
| | if not thestore: |
| | logger.warning(f"Store '{current_store_name}' not registered. Skipping.") |
| | continue |
| | try: |
| | raw_data = await thestore.get(asset_uri_obj) |
| | logger.debug( |
| | f"GetRawAsync: Asset {asset_uri_obj} found in store {current_store_name}" |
| | ) |
| | return raw_data |
| | except FileNotFoundError: |
| | logger.debug( |
| | f"GetRawAsync: Asset {asset_uri_obj} not found in store {current_store_name}" |
| | ) |
| | continue |
| |
|
| | raise FileNotFoundError(f"Asset '{asset_uri_obj}' not found in stores '{stores_list}'") |
| |
|
| | def get_bulk( |
| | self, |
| | uris: Sequence[Union[AssetUri, str]], |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> List[Any]: |
| | """Retrieves multiple assets by their URIs (synchronous wrapper), to a specified depth.""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"AssetManager.get_bulk for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'" |
| | ) |
| |
|
| | async def _fetch_all_construction_data_bulk_async(): |
| | tasks = [ |
| | self._fetch_asset_construction_data_recursive_async( |
| | AssetUri(u) if isinstance(u, str) else u, |
| | stores_list, |
| | set(), |
| | depth, |
| | ) |
| | for u in uris |
| | ] |
| | |
| | |
| | return await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| | try: |
| | logger.debug("GetBulk: Starting bulk data fetching") |
| | all_construction_data_list = asyncio.run(_fetch_all_construction_data_bulk_async()) |
| | logger.debug("GetBulk: bulk data fetching completed") |
| | except Exception as e: |
| | logger.error( |
| | f"GetBulk: Unexpected error during asyncio.run for bulk data: {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | assets = [] |
| | for i, data_or_exc in enumerate(all_construction_data_list): |
| | original_uri_input = uris[i] |
| | |
| | if isinstance(data_or_exc, Exception): |
| | logger.error( |
| | f"GetBulk: Re-raising exception for '{original_uri_input}': {data_or_exc}", |
| | exc_info=False, |
| | ) |
| | raise data_or_exc |
| | elif isinstance(data_or_exc, _AssetConstructionData): |
| | |
| | |
| | assets.append(self._build_asset_tree_from_data_sync(data_or_exc)) |
| | elif data_or_exc is None: |
| | logger.debug(f"GetBulk: Asset '{original_uri_input}' not found") |
| | assets.append(None) |
| | else: |
| | logger.error( |
| | f"GetBulk: Unexpected item in construction data list for '{original_uri_input}': {type(data_or_exc)}" |
| | ) |
| | |
| | raise RuntimeError( |
| | f"Unexpected data type for {original_uri_input}: {type(data_or_exc)}" |
| | ) |
| | return assets |
| |
|
| | async def get_bulk_async( |
| | self, |
| | uris: Sequence[Union[AssetUri, str]], |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> List[Any]: |
| | """Retrieves multiple assets by their URIs (asynchronous), to a specified depth.""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"AssetManager.get_bulk_async for {len(uris)} URIs from stores '{stores_list}', depth '{depth}'" |
| | ) |
| | tasks = [ |
| | self._fetch_asset_construction_data_recursive_async( |
| | AssetUri(u) if isinstance(u, str) else u, |
| | stores_list, |
| | set(), |
| | depth, |
| | ) |
| | for u in uris |
| | ] |
| | all_construction_data_list = await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| | assets = [] |
| | for i, data_or_exc in enumerate(all_construction_data_list): |
| | if isinstance(data_or_exc, _AssetConstructionData): |
| | |
| | asset = self._build_asset_tree_from_data_sync(data_or_exc) |
| | assets.append(asset) |
| | elif isinstance(data_or_exc, FileNotFoundError) or data_or_exc is None: |
| | assets.append(None) |
| | elif isinstance(data_or_exc, Exception): |
| | assets.append(data_or_exc) |
| | return assets |
| |
|
| | def exists( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> bool: |
| | """ |
| | Returns True if the asset exists in any of the specified stores, False otherwise. |
| | """ |
| |
|
| | async def _exists_async(stores_list: Sequence[str]): |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| | logger.debug( |
| | f"ExistsAsync (internal): Trying stores '{stores_list}'. Available stores: {list(self.stores.keys())}" |
| | ) |
| | for current_store_name in stores_list: |
| | store = self.stores.get(current_store_name) |
| | if not store: |
| | logger.error(f"Store '{current_store_name}' not registered. Skipping.") |
| | raise ValueError(f"No store registered for name: {store}") |
| | try: |
| | exists = await store.exists(asset_uri_obj) |
| | if exists: |
| | logger.debug( |
| | f"ExistsAsync: Asset {asset_uri_obj} found in store {current_store_name}" |
| | ) |
| | return True |
| | else: |
| | logger.debug( |
| | f"ExistsAsync: Asset {asset_uri_obj} not found in store {current_store_name}" |
| | ) |
| | continue |
| | except Exception as e: |
| | logger.error( |
| | f"ExistsAsync: Error checking store '{current_store_name}': {e}", |
| | exc_info=False, |
| | ) |
| | continue |
| | return False |
| |
|
| | stores_list = [store] if isinstance(store, str) else store |
| | try: |
| | return asyncio.run(_exists_async(stores_list)) |
| | except Exception as e: |
| | logger.error( |
| | f"AssetManager.exists: Error during asyncio.run for '{uri}': {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | def fetch( |
| | self, |
| | asset_type: Optional[str] = None, |
| | limit: Optional[int] = None, |
| | offset: Optional[int] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> List[Asset]: |
| | """Fetches asset instances based on type, limit, and offset (synchronous), to a specified depth.""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"Fetch(type='{asset_type}', stores='{stores_list}', depth='{depth}')") |
| | |
| | |
| | |
| | list_store = stores_list[0] if stores_list else "local" |
| | asset_uris = self.list_assets(asset_type, limit, offset, list_store) |
| | results = self.get_bulk(asset_uris, stores_list, depth) |
| | |
| | return [asset for asset in results if isinstance(asset, Asset)] |
| |
|
| | async def fetch_async( |
| | self, |
| | asset_type: Optional[str] = None, |
| | limit: Optional[int] = None, |
| | offset: Optional[int] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | depth: Optional[int] = None, |
| | ) -> List[Asset]: |
| | """Fetches asset instances based on type, limit, and offset (asynchronous), to a specified depth.""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"FetchAsync(type='{asset_type}', stores='{stores_list}', depth='{depth}')") |
| | |
| | |
| | |
| | list_store = stores_list[0] if stores_list else "local" |
| | asset_uris = await self.list_assets_async(asset_type, limit, offset, list_store) |
| | results = await self.get_bulk_async( |
| | asset_uris, stores_list, depth |
| | ) |
| | return [asset for asset in results if isinstance(asset, Asset)] |
| |
|
| | def list_assets( |
| | self, |
| | asset_type: Optional[str] = None, |
| | limit: Optional[int] = None, |
| | offset: Optional[int] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> List[AssetUri]: |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"ListAssets(type='{asset_type}', stores='{stores_list}')") |
| | |
| | |
| | |
| | list_store = stores_list[0] if stores_list else "local" |
| | return asyncio.run(self.list_assets_async(asset_type, limit, offset, list_store)) |
| |
|
| | async def list_assets_async( |
| | self, |
| | asset_type: Optional[str] = None, |
| | limit: Optional[int] = None, |
| | offset: Optional[int] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> List[AssetUri]: |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"ListAssetsAsync executing for type='{asset_type}', stores='{stores_list}'") |
| | |
| | |
| | |
| | list_store = stores_list[0] if stores_list else "local" |
| | logger.debug( |
| | f"ListAssetsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}" |
| | ) |
| | try: |
| | selected_store = self.stores[list_store] |
| | except KeyError: |
| | raise ValueError(f"No store registered for name: {list_store}") |
| | return await selected_store.list_assets(asset_type, limit, offset) |
| |
|
| | def count_assets( |
| | self, |
| | asset_type: Optional[str] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> int: |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"CountAssets(type='{asset_type}', stores='{stores_list}')") |
| | |
| | |
| | |
| | count_store = stores_list[0] if stores_list else "local" |
| | return asyncio.run(self.count_assets_async(asset_type, count_store)) |
| |
|
| | async def count_assets_async( |
| | self, |
| | asset_type: Optional[str] = None, |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> int: |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"CountAssetsAsync executing for type='{asset_type}', stores='{stores_list}'") |
| | |
| | |
| | |
| | count_store = stores_list[0] if stores_list else "local" |
| | logger.debug( |
| | f"CountAssetsAsync: Looking up store '{count_store}'. Available stores: {list(self.stores.keys())}" |
| | ) |
| | try: |
| | selected_store = self.stores[count_store] |
| | except KeyError: |
| | raise ValueError(f"No store registered for name: {count_store}") |
| | return await selected_store.count_assets(asset_type) |
| |
|
| | def _is_registered_type(self, obj: Asset) -> bool: |
| | """Helper to extract asset_type, id, and data from an object instance.""" |
| | for registered_class_type in self._asset_classes.values(): |
| | if isinstance(obj, registered_class_type): |
| | return True |
| | return False |
| |
|
| | async def add_async(self, obj: Asset, store: str = "local") -> AssetUri: |
| | """ |
| | Adds an asset to the store, either creating a new one or updating an existing one. |
| | Uses obj.get_url() to determine if the asset exists. |
| | """ |
| | logger.debug(f"AddAsync: Adding {type(obj).__name__} to store '{store}'") |
| | uri = obj.get_uri() |
| | if not self._is_registered_type(obj): |
| | logger.warning(f"Asset has unregistered type '{uri.asset_type}' ({type(obj).__name__})") |
| |
|
| | serializer = self.get_serializer_for_class(obj.__class__) |
| | data = obj.to_bytes(serializer) |
| | return await self.add_raw_async(uri.asset_type, uri.asset_id, data, store) |
| |
|
| | def add(self, obj: Asset, store: str = "local") -> AssetUri: |
| | """Synchronous wrapper for adding an asset to the store.""" |
| | logger.debug( |
| | f"Add: Adding {type(obj).__name__} to store '{store}' from T:{threading.current_thread().name}" |
| | ) |
| | return asyncio.run(self.add_async(obj, store)) |
| |
|
| | async def add_raw_async( |
| | self, asset_type: str, asset_id: str, data: bytes, store: str = "local" |
| | ) -> AssetUri: |
| | """ |
| | Adds raw asset data to the store, either creating a new asset or updating an existing one. |
| | """ |
| | logger.debug(f"AddRawAsync: type='{asset_type}', id='{asset_id}', store='{store}'") |
| | if not asset_type or not asset_id: |
| | raise ValueError("asset_type and asset_id must be provided for add_raw.") |
| | if not isinstance(data, bytes): |
| | raise TypeError("Data for add_raw must be bytes.") |
| | selected_store = self.stores.get(store) |
| | if not selected_store: |
| | raise ValueError(f"No store registered for name: {store}") |
| | uri = AssetUri.build(asset_type=asset_type, asset_id=asset_id) |
| | try: |
| | uri = await selected_store.update(uri, data) |
| | logger.debug(f"AddRawAsync: Updated existing asset at {uri}") |
| | except FileNotFoundError: |
| | logger.debug( |
| | f"AddRawAsync: Asset not found, creating new asset with {asset_type} and {asset_id}" |
| | ) |
| | uri = await selected_store.create(asset_type, asset_id, data) |
| |
|
| | if store in self._cacheable_stores: |
| | self.asset_cache.invalidate_for_uri(str(uri)) |
| | return uri |
| |
|
| | def add_raw( |
| | self, asset_type: str, asset_id: str, data: bytes, store: str = "local" |
| | ) -> AssetUri: |
| | """Synchronous wrapper for adding raw asset data to the store.""" |
| | logger.debug( |
| | f"AddRaw: type='{asset_type}', id='{asset_id}', store='{store}' from T:{threading.current_thread().name}" |
| | ) |
| | try: |
| | return asyncio.run(self.add_raw_async(asset_type, asset_id, data, store)) |
| | except Exception as e: |
| | logger.error( |
| | f"AddRaw: Error for type='{asset_type}', id='{asset_id}': {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | async def copy_async( |
| | self, |
| | src: AssetUri, |
| | dest_store: str, |
| | store: str = "local", |
| | dest: Optional[AssetUri] = None, |
| | ) -> AssetUri: |
| | """ |
| | Copies an asset from one location to another asynchronously. |
| | |
| | Performs a shallow copy by wrapping get_raw_async and add_raw_async. |
| | If dest is None, it defaults to the uri given in src. |
| | An assertion is raised if src and store are the same as dest and |
| | dest_store. |
| | If the destination already exists it should be silently overwritten. |
| | """ |
| | if dest is None: |
| | dest = src |
| |
|
| | if src == dest and store == dest_store: |
| | raise ValueError("Source and destination cannot be the same asset in the same store.") |
| |
|
| | raw_data = await self.get_raw_async(src, store) |
| | return await self.add_raw_async(dest.asset_type, dest.asset_id, raw_data, dest_store) |
| |
|
| | def copy( |
| | self, |
| | src: AssetUri, |
| | dest_store: str, |
| | store: str = "local", |
| | dest: Optional[AssetUri] = None, |
| | ) -> AssetUri: |
| | """ |
| | Copies an asset from one location to another synchronously. |
| | |
| | Performs a shallow copy by wrapping get_raw and add_raw. |
| | If dest is None, it defaults to the uri given in src. |
| | An assertion is raised if src and store are the same as dest and |
| | dest_store. |
| | If the destination already exists it should be silently overwritten. |
| | """ |
| | return asyncio.run(self.copy_async(src, dest_store, store, dest)) |
| |
|
| | async def deepcopy_async( |
| | self, |
| | src: AssetUri, |
| | dest_store: str, |
| | store: str = "local", |
| | dest: Optional[AssetUri] = None, |
| | ) -> AssetUri: |
| | """ |
| | Asynchronously deep copies an asset and its dependencies from a source store |
| | to a destination store. |
| | |
| | Args: |
| | src: The AssetUri of the source asset. |
| | dest_store: The name of the destination store. |
| | store: The name of the source store (defaults to "local"). |
| | dest: Optional. The new AssetUri for the top-level asset in the |
| | destination store. If None, the original URI is used. |
| | |
| | Returns: |
| | The AssetUri of the copied top-level asset in the destination store. |
| | |
| | Raises: |
| | ValueError: If the source or destination store is not registered. |
| | FileNotFoundError: If the source asset is not found. |
| | RuntimeError: If a cyclic dependency is detected. |
| | """ |
| | logger.debug( |
| | f"DeepcopyAsync URI '{src}' from store '{store}' to '{dest_store}'" |
| | f" with dest '{dest}'" |
| | ) |
| | if dest is None: |
| | dest = src |
| |
|
| | if store not in self.stores: |
| | raise ValueError(f"Source store '{store}' not registered.") |
| | if dest_store not in self.stores: |
| | raise ValueError(f"Destination store '{dest_store}' not registered.") |
| | if store == dest_store and src == dest: |
| | raise ValueError(f"File '{src}' cannot be copied to itself.") |
| |
|
| | |
| | |
| | construction_data = await self._fetch_asset_construction_data_recursive_async( |
| | src, [store], set(), depth=None |
| | ) |
| | if construction_data is None: |
| | raise FileNotFoundError(f"Source asset '{src}' not found in store '{store}'.") |
| |
|
| | |
| | |
| | assets_to_copy: List[_AssetConstructionData] = [] |
| |
|
| | def collect_assets(data: _AssetConstructionData): |
| | if data.dependencies_data is not None: |
| | for dep_data in data.dependencies_data.values(): |
| | if dep_data: |
| | collect_assets(dep_data) |
| | assets_to_copy.append(data) |
| |
|
| | collect_assets(construction_data) |
| |
|
| | |
| | dest_store: AssetStore = self.stores[dest_store] |
| | copied_uris: Set[AssetUri] = set() |
| | for asset_data in assets_to_copy: |
| | |
| | asset_uri = dest if asset_data.uri == src else asset_data.uri |
| | if asset_uri in copied_uris: |
| | logger.debug( |
| | f"Dependency '{asset_uri}' already added to '{dest_store}'," " skipping copy." |
| | ) |
| | continue |
| | copied_uris.add(asset_uri) |
| |
|
| | |
| | |
| | exists_in_dest = await dest_store.exists(asset_uri) |
| | if exists_in_dest and asset_uri != src: |
| | logger.debug( |
| | f"Dependency '{asset_uri}' already exists in '{dest_store}'," " skipping copy." |
| | ) |
| | continue |
| |
|
| | |
| | |
| | if exists_in_dest: |
| | |
| | logger.debug(f"Updating asset '{asset_uri}' in '{dest_store}'") |
| | dest = await dest_store.update( |
| | asset_uri, |
| | asset_data.raw_data, |
| | ) |
| | else: |
| | |
| | logger.debug(f"Creating asset '{asset_uri}' in '{dest_store}'") |
| | logger.debug(f"Raw data before writing: {asset_data.raw_data}") |
| | await dest_store.create( |
| | asset_uri.asset_type, |
| | asset_uri.asset_id, |
| | asset_data.raw_data, |
| | ) |
| |
|
| | logger.debug(f"DeepcopyAsync completed for '{src}' to '{dest}'") |
| | return dest |
| |
|
| | def deepcopy( |
| | self, |
| | src: AssetUri, |
| | dest_store: str, |
| | store: str = "local", |
| | dest: Optional[AssetUri] = None, |
| | ) -> AssetUri: |
| | """ |
| | Synchronously deep copies an asset and its dependencies from a source store |
| | to a destination store. |
| | |
| | Args: |
| | src: The AssetUri of the source asset. |
| | dest_store: The name of the destination store. |
| | store: The name of the source store (defaults to "local"). |
| | dest: Optional. The new AssetUri for the top-level asset in the |
| | destination store. If None, the original URI is used. |
| | |
| | Returns: |
| | The AssetUri of the copied top-level asset in the destination store. |
| | |
| | Raises: |
| | ValueError: If the source or destination store is not registered. |
| | FileNotFoundError: If the source asset is not found. |
| | RuntimeError: If a cyclic dependency is detected. |
| | """ |
| | logger.debug( |
| | f"Deepcopy URI '{src}' from store '{store}' to '{dest_store}'" f" with dest '{dest}'" |
| | ) |
| | return asyncio.run(self.deepcopy_async(src, dest_store, store, dest)) |
| |
|
| | def add_file( |
| | self, |
| | asset_type: str, |
| | path: pathlib.Path, |
| | store: str = "local", |
| | asset_id: Optional[str] = None, |
| | ) -> AssetUri: |
| | """ |
| | Convenience wrapper around add_raw(). |
| | If asset_id is None, the path.stem is used as the id. |
| | """ |
| | return self.add_raw(asset_type, asset_id or path.stem, path.read_bytes(), store=store) |
| |
|
| | def delete(self, uri: Union[AssetUri, str], store: str = "local") -> None: |
| | logger.debug(f"Delete URI '{uri}' from store '{store}'") |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| |
|
| | async def _do_delete_async(): |
| | selected_store = self.stores[store] |
| | await selected_store.delete(asset_uri_obj) |
| | if store in self._cacheable_stores: |
| | self.asset_cache.invalidate_for_uri(str(asset_uri_obj)) |
| |
|
| | asyncio.run(_do_delete_async()) |
| |
|
| | async def delete_async(self, uri: Union[AssetUri, str], store: str = "local") -> None: |
| | logger.debug(f"DeleteAsync URI '{uri}' from store '{store}'") |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| | selected_store = self.stores[store] |
| | await selected_store.delete(asset_uri_obj) |
| | if store in self._cacheable_stores: |
| | self.asset_cache.invalidate_for_uri(str(asset_uri_obj)) |
| |
|
| | async def is_empty_async(self, asset_type: Optional[str] = None, store: str = "local") -> bool: |
| | """Checks if the asset store has any assets of a given type (asynchronous).""" |
| | logger.debug(f"IsEmptyAsync: type='{asset_type}', store='{store}'") |
| | logger.debug( |
| | f"IsEmptyAsync: Looking up store '{store}'. Available stores: {list(self.stores.keys())}" |
| | ) |
| | selected_store = self.stores.get(store) |
| | if not selected_store: |
| | raise ValueError(f"No store registered for name: {store}") |
| | return await selected_store.is_empty(asset_type) |
| |
|
| | def is_empty(self, asset_type: Optional[str] = None, store: str = "local") -> bool: |
| | """Checks if the asset store has any assets of a given type (synchronous wrapper).""" |
| | logger.debug( |
| | f"IsEmpty: type='{asset_type}', store='{store}' from T:{threading.current_thread().name}" |
| | ) |
| | try: |
| | return asyncio.run(self.is_empty_async(asset_type, store)) |
| | except Exception as e: |
| | logger.error( |
| | f"IsEmpty: Error for type='{asset_type}', store='{store}': {e}", |
| | exc_info=False, |
| | ) |
| | raise |
| |
|
| | async def list_versions_async( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> List[AssetUri]: |
| | """Lists available versions for a given asset URI (asynchronous).""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug(f"ListVersionsAsync: uri='{uri}', stores='{stores_list}'") |
| | asset_uri_obj = AssetUri(uri) if isinstance(uri, str) else uri |
| |
|
| | |
| | |
| | |
| | list_store = stores_list[0] if stores_list else "local" |
| | logger.debug( |
| | f"ListVersionsAsync: Looking up store '{list_store}'. Available stores: {list(self.stores.keys())}" |
| | ) |
| | selected_store = self.stores.get(list_store) |
| | if not selected_store: |
| | raise ValueError(f"No store registered for name: {list_store}") |
| | return await selected_store.list_versions(asset_uri_obj) |
| |
|
| | def list_versions( |
| | self, |
| | uri: Union[AssetUri, str], |
| | store: Union[str, Sequence[str]] = "local", |
| | ) -> List[AssetUri]: |
| | """Lists available versions for a given asset URI (synchronous wrapper).""" |
| | stores_list = [store] if isinstance(store, str) else store |
| | logger.debug( |
| | f"ListVersions: uri='{uri}', stores='{stores_list}' from T:{threading.current_thread().name}" |
| | ) |
| | try: |
| | return asyncio.run(self.list_versions_async(uri, stores_list)) |
| | except Exception as e: |
| | logger.error( |
| | f"ListVersions: Error for uri='{uri}', stores='{stores_list}': {e}", |
| | exc_info=False, |
| | ) |
| | return [] |
| |
|
| | def get_registered_asset_types(self) -> List[str]: |
| | """Returns a list of registered asset type names.""" |
| | return list(self._asset_classes.keys()) |
| |
|