File size: 1,778 Bytes
7e9a520 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | """Cloud Logging tool wrapper — gcloud subprocess + Python SDK."""
import json
import os
import subprocess
from typing import Any
PROJECT = os.getenv("GCP_PROJECT", "")
def gcloud_logs_read(filter_query: str, limit: int = 50, project: str = "") -> dict[str, Any]:
"""Read Cloud Logging entries.
filter_query examples:
- 'resource.type="k8s_container" AND severity>=ERROR'
- 'resource.labels.container_name="checkoutservice" AND textPayload:"timeout"'
"""
proj = project or PROJECT
if not proj:
return {"success": False, "error": "GCP_PROJECT env var not set"}
cmd = [
"gcloud", "logging", "read", filter_query,
"--project", proj,
"--limit", str(limit),
"--format", "json",
"--freshness", "1h",
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {"success": False, "error": result.stderr, "returncode": result.returncode}
entries = json.loads(result.stdout) if result.stdout.strip() else []
return {
"success": True,
"count": len(entries),
"entries": [
{
"timestamp": e.get("timestamp"),
"severity": e.get("severity"),
"resource": e.get("resource", {}).get("labels", {}),
"message": e.get("textPayload") or e.get("jsonPayload", {}).get("message"),
}
for e in entries
],
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "gcloud logging read timed out"}
except json.JSONDecodeError as e:
return {"success": False, "error": f"Invalid JSON: {e}"}
|