| import json |
|
|
| import elasticache_auto_discovery |
| from pymemcache.client.hash import HashClient |
|
|
| from inference.core.env import ELASTICACHE_ENDPOINT |
| from inference.core.logger import logger |
|
|
| nodes = elasticache_auto_discovery.discover(ELASTICACHE_ENDPOINT) |
|
|
| |
| nodes = map(lambda x: (x[1], int(x[2])), nodes) |
| memcache_client = HashClient(nodes) |
|
|
|
|
| def trackUsage(endpoint, actor, n=1): |
| """Tracks the usage of an endpoint by an actor. |
| |
| This function increments the usage count for a given endpoint by an actor. |
| It also handles initialization if the count does not exist. |
| |
| Args: |
| endpoint (str): The endpoint being accessed. |
| actor (str): The actor accessing the endpoint. |
| n (int, optional): The number of times the endpoint was accessed. Defaults to 1. |
| |
| Returns: |
| None: This function does not return anything but updates the memcache client. |
| """ |
| |
| try: |
| job = endpoint + "endpoint:::actor" + actor |
| current_infers = memcache_client.incr(job, n) |
| if current_infers is None: |
| memcache_client.set(job, n) |
| current_infers = n |
|
|
| |
| job_keys = memcache_client.get("JOB_KEYS") |
| if job_keys is None: |
| memcache_client.add("JOB_KEYS", json.dumps([job])) |
| else: |
| decoded = json.loads(job_keys) |
| decoded.append(job) |
| decoded = list(set(decoded)) |
| memcache_client.set("JOB_KEYS", json.dumps(decoded)) |
|
|
| actor_keys = memcache_client.get("ACTOR_KEYS") |
| if actor_keys is None: |
| ak = {} |
| ak[actor] = n |
| memcache_client.add("ACTOR_KEYS", json.dumps(ak)) |
| else: |
| decoded = json.loads(actor_keys) |
| if actor in actor_keys: |
| actor_keys[actor] += n |
| else: |
| actor_keys[actor] = n |
| memcache_client.set("ACTOR_KEYS", json.dumps(actor_keys)) |
|
|
| except Exception as e: |
| logger.debug("WARNING: there was an error in counting this inference") |
| logger.debug(e) |
|
|