Spaces:
Paused
Paused
| """ | |
| Helper functions to query prometheus API | |
| """ | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from litellm import get_secret | |
| from litellm._logging import verbose_logger | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| get_async_httpx_client, | |
| httpxSpecialProvider, | |
| ) | |
| PROMETHEUS_URL: Optional[str] = get_secret("PROMETHEUS_URL") # type: ignore | |
| PROMETHEUS_SELECTED_INSTANCE: Optional[str] = get_secret("PROMETHEUS_SELECTED_INSTANCE") # type: ignore | |
| async_http_handler = get_async_httpx_client( | |
| llm_provider=httpxSpecialProvider.LoggingCallback | |
| ) | |
| async def get_metric_from_prometheus( | |
| metric_name: str, | |
| ): | |
| # Get the start of the current day in Unix timestamp | |
| if PROMETHEUS_URL is None: | |
| raise ValueError( | |
| "PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env" | |
| ) | |
| query = f"{metric_name}[24h]" | |
| now = int(time.time()) | |
| response = await async_http_handler.get( | |
| f"{PROMETHEUS_URL}/api/v1/query", params={"query": query, "time": now} | |
| ) # End of the day | |
| _json_response = response.json() | |
| verbose_logger.debug("json response from prometheus /query api %s", _json_response) | |
| results = response.json()["data"]["result"] | |
| return results | |
| async def get_fallback_metric_from_prometheus(): | |
| """ | |
| Gets fallback metrics from prometheus for the last 24 hours | |
| """ | |
| response_message = "" | |
| relevant_metrics = [ | |
| "litellm_deployment_successful_fallbacks_total", | |
| "litellm_deployment_failed_fallbacks_total", | |
| ] | |
| for metric in relevant_metrics: | |
| response_json = await get_metric_from_prometheus( | |
| metric_name=metric, | |
| ) | |
| if response_json: | |
| verbose_logger.debug("response json %s", response_json) | |
| for result in response_json: | |
| verbose_logger.debug("result= %s", result) | |
| metric = result["metric"] | |
| metric_values = result["values"] | |
| most_recent_value = metric_values[0] | |
| if PROMETHEUS_SELECTED_INSTANCE is not None: | |
| if metric.get("instance") != PROMETHEUS_SELECTED_INSTANCE: | |
| continue | |
| value = int(float(most_recent_value[1])) # Convert value to integer | |
| primary_model = metric.get("primary_model", "Unknown") | |
| fallback_model = metric.get("fallback_model", "Unknown") | |
| response_message += f"`{value} successful fallback requests` with primary model=`{primary_model}` -> fallback model=`{fallback_model}`" | |
| response_message += "\n" | |
| verbose_logger.debug("response message %s", response_message) | |
| return response_message | |
| def is_prometheus_connected() -> bool: | |
| if PROMETHEUS_URL is not None: | |
| return True | |
| return False | |
| async def get_daily_spend_from_prometheus(api_key: Optional[str]): | |
| """ | |
| Expected Response Format: | |
| [ | |
| { | |
| "date": "2024-08-18T00:00:00+00:00", | |
| "spend": 1.001818099998933 | |
| }, | |
| ...] | |
| """ | |
| if PROMETHEUS_URL is None: | |
| raise ValueError( | |
| "PROMETHEUS_URL not set please set 'PROMETHEUS_URL=<>' in .env" | |
| ) | |
| # Calculate the start and end dates for the last 30 days | |
| end_date = datetime.utcnow() | |
| start_date = end_date - timedelta(days=30) | |
| # Format dates as ISO 8601 strings with UTC offset | |
| start_str = start_date.isoformat() + "+00:00" | |
| end_str = end_date.isoformat() + "+00:00" | |
| url = f"{PROMETHEUS_URL}/api/v1/query_range" | |
| if api_key is None: | |
| query = "sum(delta(litellm_spend_metric_total[1d]))" | |
| else: | |
| query = ( | |
| f'sum(delta(litellm_spend_metric_total{{hashed_api_key="{api_key}"}}[1d]))' | |
| ) | |
| params = { | |
| "query": query, | |
| "start": start_str, | |
| "end": end_str, | |
| "step": "86400", # Step size of 1 day in seconds | |
| } | |
| response = await async_http_handler.get(url, params=params) | |
| _json_response = response.json() | |
| verbose_logger.debug("json response from prometheus /query api %s", _json_response) | |
| results = response.json()["data"]["result"] | |
| formatted_results = [] | |
| for result in results: | |
| metric_data = result["values"] | |
| for timestamp, value in metric_data: | |
| # Convert timestamp to ISO 8601 string with UTC offset | |
| date = datetime.fromtimestamp(float(timestamp)).isoformat() + "+00:00" | |
| spend = float(value) | |
| formatted_results.append({"date": date, "spend": spend}) | |
| return formatted_results | |