File size: 2,043 Bytes
4a77231 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | """Cloud Monitoring tool wrapper — Google Cloud Monitoring API."""
import os
import time
from typing import Any
from google.cloud import monitoring_v3
PROJECT = os.getenv("GCP_PROJECT", "")
def cloud_monitoring_query(metric_type: str, lookback_seconds: int = 900,
project: str = "") -> dict[str, Any]:
"""Query a GCP-native metric.
metric_type examples:
- 'kubernetes.io/container/cpu/core_usage_time'
- 'kubernetes.io/container/memory/used_bytes'
- 'cloudsql.googleapis.com/database/cpu/utilization'
"""
proj = project or PROJECT
if not proj:
return {"success": False, "error": "GCP_PROJECT env var not set"}
try:
client = monitoring_v3.MetricServiceClient()
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
interval = monitoring_v3.TimeInterval(
{
"end_time": {"seconds": seconds, "nanos": nanos},
"start_time": {"seconds": seconds - lookback_seconds, "nanos": nanos},
}
)
results = client.list_time_series(
request={
"name": f"projects/{proj}",
"filter": f'metric.type="{metric_type}"',
"interval": interval,
"view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
}
)
series = []
for ts in results:
points = [
{
"value": p.value.double_value or p.value.int64_value,
"timestamp": p.interval.end_time.seconds,
}
for p in ts.points
]
series.append({
"resource_labels": dict(ts.resource.labels),
"metric_labels": dict(ts.metric.labels),
"points": points[:100],
})
return {"success": True, "count": len(series), "series": series}
except Exception as e:
return {"success": False, "error": str(e)}
|