Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import requests | |
| import streamlit as st | |
| from random import getrandbits, randint | |
| from time import time | |
| from typing import List, Tuple | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %I:%M:%S') | |
| data_dir = os.path.join(os.path.dirname(__file__), '..', 'data') | |
| img_dir = os.path.join(os.path.dirname(__file__), '..', 'img') | |
| config_dir = os.path.join(os.path.dirname(__file__), '..', 'config') | |
| def random_true_false() -> bool: | |
| """ | |
| Utility to get a random True or False to make it clear what I am doing | |
| and not peppering getrandbits around the codebase | |
| """ | |
| return bool(getrandbits(1)) | |
| def pop_n(items: List, n: int) -> List: | |
| """ | |
| Utility to pop a certain number from the end of a list. | |
| Modifies the original list and returns the popped items. | |
| Automagically pops <n if the list does not contain n items | |
| """ | |
| if len(items) < n: | |
| n = len(items) | |
| popped = [] | |
| for _ in range(n): | |
| popped.append(items.pop()) | |
| return popped[::-1] | |
| def generate_group_tag() -> str: | |
| """ | |
| Generates a pseduo-random test group ID for tagging requests which | |
| should be considered together | |
| """ | |
| return f"TestGroup_{randint(100, 999)}_{int(time())}" | |
| def hf_api_token(write: bool = False) -> str: | |
| """ | |
| Utility single access point to look up the hugging face access token. | |
| """ | |
| if write: | |
| token = st.secrets['hf_write_token'] | |
| else: | |
| token = st.secrets['hf_token'] | |
| if token is None: | |
| raise ValueError('No HF access token found in streamlit secrets') | |
| return token | |
| # Constants for the status values returned from Hugging Face | |
| HF_RUNNING = 'running' | |
| HF_SCALEDTOZERO = 'scaleToZero' | |
| HF_PAUSED = 'paused' | |
| HF_FAILED = 'failed' | |
| def hf_endpoint_status(username: str, endpoint_name: str, api_token: str = None) -> Tuple[bool, bool]: | |
| """ | |
| Utility to check the status of a hugging face inference endpoint | |
| :return: tuple of booleans first is if it is available, second is if it fully shutdown. | |
| False, True indicates it is not fully available or fully shutdown so is in process of starting up or shutting down | |
| """ | |
| url = f'https://api.endpoints.huggingface.cloud/v2/endpoint/{username}/{endpoint_name}' | |
| if api_token is None: | |
| api_token = hf_api_token() | |
| response = requests.get(url, headers={"Authorization" : f"Bearer {api_token}"}) | |
| if response.status_code != 200: | |
| raise ValueError(f"Likely config error. Received status code {response.status_code} - {response.text}") | |
| else: | |
| state = response.json()['status']['state'] | |
| return state | |
| def pause_hf_endpoint(username: str, endpoint_name: str, api_token: str = None) -> None: | |
| """ | |
| Utility to pause a hf inference endpoint via API call. Username and endpoint fully qualify the endpoing on | |
| hugging face | |
| """ | |
| url = f'https://api.endpoints.huggingface.cloud/v2/endpoint/{username}/{endpoint_name}/pause' | |
| if api_token is None: | |
| api_token = hf_api_token(write=True) | |
| requests.post(url, headers={"Authorization": f"Bearer {api_token}"}) | |
| def resume_hf_endpoint(username: str, endpoint_name: str, api_token: str = None) -> None: | |
| """ | |
| Utility to resume a hf inference endpoint via API call. Username and endpoint fully qualify the endpoing on | |
| hugging face | |
| """ | |
| url = f'https://api.endpoints.huggingface.cloud/v2/endpoint/{username}/{endpoint_name}/resume' | |
| if api_token is None: | |
| api_token = hf_api_token(write=True) | |
| logging.info(f"Pinging resume endpoint ({url})") | |
| requests.post(url, headers={"Authorization": f"Bearer {api_token}"}) | |
| def join_items_comma_and(items: List[str]) -> str: | |
| """ | |
| Utility to convert a list of items to lowercase strings, comma separated and ending with and | |
| """ | |
| items = [str(i).strip() for i in items] | |
| string_count = len(items) | |
| if string_count == 0: | |
| return "" | |
| if string_count == 1: | |
| return items[0] | |
| return f"{', '.join(items[:-1])} and {items[-1]}" | |
| def escape_dollars(text: str) -> str: | |
| """ | |
| Convenience function to escape dollar signs for prices in markdown | |
| """ | |
| if text is None: | |
| return text | |
| return text.replace("$", "\\$") | |