Spaces:
Configuration error
Configuration error
| import threading | |
| from contextlib import contextmanager | |
| from datetime import datetime | |
| from typing import Generator, List, Optional, OrderedDict, Union | |
| import redis.lock | |
| from inference.core import logger | |
| from inference.core.active_learning.entities import StrategyLimit, StrategyLimitType | |
| from inference.core.active_learning.utils import TIMESTAMP_FORMAT | |
| from inference.core.cache.base import BaseCache | |
| MAX_LOCK_TIME = 5 | |
| SECONDS_IN_HOUR = 60 * 60 | |
| USAGE_KEY = "usage" | |
| LIMIT_TYPE2KEY_INFIX_GENERATOR = { | |
| StrategyLimitType.MINUTELY: lambda: f"minute_{datetime.utcnow().minute}", | |
| StrategyLimitType.HOURLY: lambda: f"hour_{datetime.utcnow().hour}", | |
| StrategyLimitType.DAILY: lambda: f"day_{datetime.utcnow().strftime(TIMESTAMP_FORMAT)}", | |
| } | |
| LIMIT_TYPE2KEY_EXPIRATION = { | |
| StrategyLimitType.MINUTELY: 120, | |
| StrategyLimitType.HOURLY: 2 * SECONDS_IN_HOUR, | |
| StrategyLimitType.DAILY: 25 * SECONDS_IN_HOUR, | |
| } | |
| def use_credit_of_matching_strategy( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| matching_strategies_limits: OrderedDict[str, List[StrategyLimit]], | |
| ) -> Optional[str]: | |
| # In scope of this function, cache keys updates regarding usage limits for | |
| # specific :workspace and :project are locked - to ensure increment to be done atomically | |
| # Limits are accounted at the moment of registration - which may introduce inaccuracy | |
| # given that registration is postponed from prediction | |
| # Returns: strategy with spare credit if found - else None | |
| with lock_limits(cache=cache, workspace=workspace, project=project): | |
| strategy_with_spare_credit = find_strategy_with_spare_usage_credit( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| matching_strategies_limits=matching_strategies_limits, | |
| ) | |
| if strategy_with_spare_credit is None: | |
| return None | |
| consume_strategy_limits_usage_credit( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_with_spare_credit, | |
| ) | |
| return strategy_with_spare_credit | |
| def return_strategy_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| ) -> None: | |
| # In scope of this function, cache keys updates regarding usage limits for | |
| # specific :workspace and :project are locked - to ensure decrement to be done atomically | |
| # Returning strategy is a bit naive (we may add to a pool of credits from the next period - but only | |
| # if we have previously taken from the previous one and some credits are used in the new pool) - | |
| # in favour of easier implementation. | |
| with lock_limits(cache=cache, workspace=workspace, project=project): | |
| return_strategy_limits_usage_credit( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| def lock_limits( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| ) -> Generator[Union[threading.Lock, redis.lock.Lock], None, None]: | |
| limits_lock_key = generate_cache_key_for_active_learning_usage_lock( | |
| workspace=workspace, | |
| project=project, | |
| ) | |
| with cache.lock(key=limits_lock_key, expire=MAX_LOCK_TIME) as lock: | |
| yield lock | |
| def find_strategy_with_spare_usage_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| matching_strategies_limits: OrderedDict[str, List[StrategyLimit]], | |
| ) -> Optional[str]: | |
| for strategy_name, strategy_limits in matching_strategies_limits.items(): | |
| rejected_by_strategy = ( | |
| datapoint_should_be_rejected_based_on_strategy_usage_limits( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| strategy_limits=strategy_limits, | |
| ) | |
| ) | |
| if not rejected_by_strategy: | |
| return strategy_name | |
| return None | |
| def datapoint_should_be_rejected_based_on_strategy_usage_limits( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| strategy_limits: List[StrategyLimit], | |
| ) -> bool: | |
| for strategy_limit in strategy_limits: | |
| limit_reached = datapoint_should_be_rejected_based_on_limit_usage( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| strategy_limit=strategy_limit, | |
| ) | |
| if limit_reached: | |
| logger.debug( | |
| f"Violated Active Learning strategy limit: {strategy_limit.limit_type.name} " | |
| f"with value {strategy_limit.value} for sampling strategy: {strategy_name}." | |
| ) | |
| return True | |
| return False | |
| def datapoint_should_be_rejected_based_on_limit_usage( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| strategy_limit: StrategyLimit, | |
| ) -> bool: | |
| current_usage = get_current_strategy_limit_usage( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| limit_type=strategy_limit.limit_type, | |
| ) | |
| if current_usage is None: | |
| current_usage = 0 | |
| return current_usage >= strategy_limit.value | |
| def consume_strategy_limits_usage_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| ) -> None: | |
| for limit_type in StrategyLimitType: | |
| consume_strategy_limit_usage_credit( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| limit_type=limit_type, | |
| ) | |
| def consume_strategy_limit_usage_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| limit_type: StrategyLimitType, | |
| ) -> None: | |
| current_value = get_current_strategy_limit_usage( | |
| cache=cache, | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| if current_value is None: | |
| current_value = 0 | |
| current_value += 1 | |
| set_current_strategy_limit_usage( | |
| current_value=current_value, | |
| cache=cache, | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| def return_strategy_limits_usage_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| ) -> None: | |
| for limit_type in StrategyLimitType: | |
| return_strategy_limit_usage_credit( | |
| cache=cache, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| limit_type=limit_type, | |
| ) | |
| def return_strategy_limit_usage_credit( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| limit_type: StrategyLimitType, | |
| ) -> None: | |
| current_value = get_current_strategy_limit_usage( | |
| cache=cache, | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| if current_value is None: | |
| return None | |
| current_value = max(current_value - 1, 0) | |
| set_current_strategy_limit_usage( | |
| current_value=current_value, | |
| cache=cache, | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| def get_current_strategy_limit_usage( | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| limit_type: StrategyLimitType, | |
| ) -> Optional[int]: | |
| usage_key = generate_cache_key_for_active_learning_usage( | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| value = cache.get(usage_key) | |
| if value is None: | |
| return value | |
| return value[USAGE_KEY] | |
| def set_current_strategy_limit_usage( | |
| current_value: int, | |
| cache: BaseCache, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| limit_type: StrategyLimitType, | |
| ) -> None: | |
| usage_key = generate_cache_key_for_active_learning_usage( | |
| limit_type=limit_type, | |
| workspace=workspace, | |
| project=project, | |
| strategy_name=strategy_name, | |
| ) | |
| expire = LIMIT_TYPE2KEY_EXPIRATION[limit_type] | |
| cache.set(key=usage_key, value={USAGE_KEY: current_value}, expire=expire) # type: ignore | |
| def generate_cache_key_for_active_learning_usage_lock( | |
| workspace: str, | |
| project: str, | |
| ) -> str: | |
| return f"active_learning:usage:{workspace}:{project}:usage:lock" | |
| def generate_cache_key_for_active_learning_usage( | |
| limit_type: StrategyLimitType, | |
| workspace: str, | |
| project: str, | |
| strategy_name: str, | |
| ) -> str: | |
| time_infix = LIMIT_TYPE2KEY_INFIX_GENERATOR[limit_type]() | |
| return f"active_learning:usage:{workspace}:{project}:{strategy_name}:{time_infix}" | |