Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| from fastapi import FastAPI, Request, Response | |
| from datasets import load_dataset | |
| import os | |
| import requests | |
| import json | |
| import logging | |
| import time | |
| from azure.ai.ml import MLClient | |
| from azure.identity import DefaultAzureCredential | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Configurations | |
| SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL") | |
| CATALOG_WEBHOOKS_FLEET_DATASET = os.environ.get("CATALOG_WEBHOOKS_FLEET_DATASET") | |
| SIMSHIP_WEBHOOK = os.environ.get("SIMSHIP_WEBHOOK") | |
| SIMSHIP_WEBHOOK_SECRET = os.environ.get("SIMSHIP_WEBHOOK_SECRET") | |
| # Initialize Azure ML Client | |
| AZURE_SUBSCRIPTION_ID = os.getenv('AZURE_SUBSCRIPTION_ID') | |
| AZURE_RESOURCE_GROUP = os.getenv('AZURE_RESOURCE_GROUP') | |
| def archive_model_in_azure(model_id: str) -> None: | |
| client = MLClient( | |
| credential=DefaultAzureCredential(), | |
| subscription_id=AZURE_SUBSCRIPTION_ID, | |
| resource_group_name=AZURE_RESOURCE_GROUP, | |
| registry_name="HuggingFace", | |
| ) | |
| name = model_id.replace("/", "-").replace("_", "-").lower() | |
| models = list(client.models.list(name=name)) | |
| if len(models) < 1: | |
| msg = f"Model {name} not available in Azure ML, then no need to delete anything." | |
| logger.error(msg) | |
| raise ValueError(msg) | |
| for model in models: | |
| try: | |
| client.models.archive(name=model.name, version=model.version) # type: ignore | |
| logger.info(f"Archived {model.name=} {model.version=}") | |
| except Exception as e: | |
| msg = f"Failed when archiving model with {name=} with exception {e}" | |
| logger.error(msg) | |
| raise RuntimeError(msg) | |
| logger.info(f"Model {name} already archived on Azure ML!") | |
| def send_slack_message(payload: dict, max_retries: int = 3, timeout: int = 10, retry_delay: int = 2): | |
| """ | |
| Send a message to Slack using webhook URL with timeout and retry mechanism | |
| Args: | |
| message: The message to send to Slack | |
| max_retries: Maximum number of retry attempts (default: 3) | |
| timeout: Request timeout in seconds (default: 30) | |
| retry_delay: Delay between retries in seconds (default: 2) | |
| """ | |
| if not SLACK_WEBHOOK_URL: | |
| logger.warning(f"No Slack webhook URL configured. Payload: {payload}") | |
| return | |
| for attempt in range(max_retries + 1): # +1 for initial attempt | |
| try: | |
| logger.info(f"Sending Slack message (attempt {attempt + 1}/{max_retries + 1})") | |
| response = requests.post( | |
| SLACK_WEBHOOK_URL, | |
| json=payload, | |
| timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| logger.info(f"Slack message sent successfully: {payload}") | |
| return # Success, exit the function | |
| except requests.exceptions.Timeout as e: | |
| logger.warning(f"Slack request timeout on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Failed to send Slack message after {max_retries + 1} attempts due to timeout") | |
| except requests.exceptions.HTTPError as e: | |
| # For HTTP errors, check if it's worth retrying | |
| if hasattr(e.response, 'status_code'): | |
| status_code = e.response.status_code | |
| if status_code in [429, 500, 502, 503, 504]: # Retry-able errors | |
| logger.warning(f"Slack HTTP error {status_code} on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Failed to send Slack message after {max_retries + 1} attempts due to HTTP error {status_code}") | |
| else: | |
| # Non-retryable HTTP error (4xx except 429) | |
| logger.error(f"Failed to send Slack message due to non-retryable HTTP error {status_code}: {e}") | |
| break | |
| else: | |
| logger.error(f"Failed to send Slack message due to HTTP error: {e}") | |
| break | |
| except requests.exceptions.ConnectionError as e: | |
| logger.warning(f"Slack connection error on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Failed to send Slack message after {max_retries + 1} attempts due to connection error") | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Slack request error on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error(f"Failed to send Slack message after {max_retries + 1} attempts due to request error") | |
| def process_model_catalog_webhook(data: dict): | |
| """ | |
| Process model catalog webhook | |
| """ | |
| event = data.get("event", {}) | |
| repo = data.get("repo", {}) | |
| movedTo = data.get("movedTo", {}) | |
| # repo name changed | |
| if ( | |
| repo.get("type") == "model" and | |
| event.get("scope") == "repo" and | |
| event.get("action") == "move" | |
| ): | |
| message = ( | |
| "π Model Catalog Sync Alert π\n\n" | |
| f"Model in the catalog renamed π \n" | |
| f"{repo.get('name', '')} β https://hf.co/{movedTo.get('name', '')}" | |
| ) | |
| send_slack_message({"text": message, "type": "", "link": ""}) | |
| # repo deleted | |
| elif ( | |
| repo.get("type") == "model" and | |
| event.get("scope") == "repo" and | |
| event.get("action") == "delete" | |
| ): | |
| message = ( | |
| "π Model Catalog Sync Alert π\n\n" | |
| f"Model in the catalog deleted ποΈ\n" | |
| f"https://hf.co/{repo.get('name', '')}" | |
| ) | |
| send_slack_message({"text": message, "type": "", "link": ""}) | |
| try: | |
| archive_model_in_azure(model_id=repo.get("name", "")) | |
| except Exception as e: | |
| logger.error(f"Model {repo.get('name','')} couldn't be archived with {e}") | |
| # other events | |
| else: | |
| pass | |
| def process_simship_webhook(data: dict): | |
| """ | |
| Process simship webhook | |
| """ | |
| event = data.get("event", {}) | |
| repo = data.get("repo", {}) | |
| updatedConfig = data.get("updatedConfig", {}) | |
| # repo creation | |
| if ( | |
| repo.get("type") == "model" and | |
| event.get("scope") == "repo" and | |
| event.get("action") == "create" | |
| ): | |
| link = f"https://hf.co/{repo.get('name', '')}" | |
| message = ( | |
| "π₯οΈ SimShip Provider Alert π₯οΈ\n\n" | |
| f"SimShip Model created π\n" | |
| f"{link}" | |
| ) | |
| send_slack_message({"text": message, "link": link, "type": "simship"}) | |
| # repo visibility update | |
| elif ( | |
| repo.get("type") == "model" and | |
| event.get("scope") == "repo.config" and | |
| event.get("action") == "update" and | |
| updatedConfig.get("private") is False | |
| ): | |
| link = f"https://hf.co/{repo.get('name', '')}" | |
| message = ( | |
| "π₯οΈ SimShip Provider Alert π₯οΈ\n\n" | |
| f"SimShip Model made public π\n" | |
| f"{link}" | |
| ) | |
| send_slack_message({"text": message, "link": link, "type": "simship"}) | |
| # other events | |
| else: | |
| pass | |
| app = FastAPI() | |
| async def model_catalog_webhook(request: Request): | |
| logger.info(f"Received model catalog webhook request from {request.client.host}") | |
| catalog_webhooks_fleet_dataset = load_dataset(CATALOG_WEBHOOKS_FLEET_DATASET) | |
| catalog_webhooks_fleet_dataset = catalog_webhooks_fleet_dataset['train'] | |
| secrets = catalog_webhooks_fleet_dataset['secret'] | |
| if request.headers.get("X-Webhook-Secret") not in secrets: | |
| logger.warning("Invalid webhook secret received for model catalog") | |
| return Response("Invalid secret", status_code=401) | |
| else: | |
| secret = request.headers.get("X-Webhook-Secret") | |
| idx = secrets.index(secret) | |
| data = await request.json() | |
| logger.info(f"Model catalog webhook payload: {json.dumps(data, indent=2)}") | |
| webhook_id = data.get("webhook", {}).get("id", None) | |
| if webhook_id == catalog_webhooks_fleet_dataset[idx]['webhook_id']: | |
| logger.info("Processing model catalog webhook") | |
| process_model_catalog_webhook(data) | |
| else: | |
| logger.warning("Invalid webhook ID received for model catalog") | |
| return Response("Invalid webhook ID", status_code=401) | |
| return Response("Model catalog webhook notification received and processed!", status_code=200) | |
| async def simship_webhook(request: Request): | |
| logger.info(f"Received simship webhook request from {request.client.host}") | |
| if request.headers.get("X-Webhook-Secret") != SIMSHIP_WEBHOOK_SECRET: | |
| logger.warning("Invalid webhook secret received for simship") | |
| return Response("Invalid secret", status_code=401) | |
| data = await request.json() | |
| logger.info(f"Simship webhook payload: {json.dumps(data, indent=2)}") | |
| webhook_id = data.get("webhook", {}).get("id", None) | |
| if webhook_id == SIMSHIP_WEBHOOK: | |
| logger.info("Processing simship webhook") | |
| process_simship_webhook(data) | |
| else: | |
| logger.warning("Invalid webhook ID received for simship") | |
| return Response("Invalid webhook ID", status_code=401) | |
| return Response("Simship webhook notification received and processed!", status_code=200) | |