mishrabp's picture
Upload folder using huggingface_hub
f29cac7 verified
"""
Azure SRE MCP Server
"""
import sys
import os
import logging
from typing import List, Dict, Any, Optional
# Add src to pythonpath
current_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.dirname(os.path.dirname(current_dir))
if src_dir not in sys.path:
sys.path.append(src_dir)
from mcp.server.fastmcp import FastMCP
from core.mcp_telemetry import log_usage, log_trace, log_metric
import uuid
import time
# Azure Imports
try:
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.monitor import MonitorManagementClient
from azure.mgmt.compute import ComputeManagementClient
from azure.monitor.query import LogsQueryClient
except ImportError:
# Allow running without Azure SDKs installed (for testing/mocking)
DefaultAzureCredential = None
ResourceManagementClient = None
MonitorManagementClient = None
ComputeManagementClient = None
LogsQueryClient = None
# Initialize Server
mcp = FastMCP("Azure SRE", host="0.0.0.0")
# Helper to get credential
def get_credential():
if not DefaultAzureCredential:
raise ImportError("Azure SDKs not installed.")
return DefaultAzureCredential()
@mcp.tool()
def list_resources(subscription_id: str, resource_group: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List Azure resources in a subscription or resource group.
"""
start_time = time.time()
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
log_usage("mcp-azure-sre", "list_resources")
try:
cred = get_credential()
client = ResourceManagementClient(cred, subscription_id)
if resource_group:
resources = client.resources.list_by_resource_group(resource_group)
else:
resources = client.resources.list()
results = [{"name": r.name, "type": r.type, "location": r.location, "id": r.id} for r in resources]
duration = (time.time() - start_time) * 1000
log_trace("mcp-azure-sre", trace_id, span_id, "list_resources", duration, "ok")
log_metric("mcp-azure-sre", "resources_scanned", len(results), {"sub": subscription_id})
return results
except Exception as e:
duration = (time.time() - start_time) * 1000
log_trace("mcp-azure-sre", trace_id, span_id, "list_resources", duration, "error")
return [{"error": str(e)}]
@mcp.tool()
def restart_vm(subscription_id: str, resource_group: str, vm_name: str) -> str:
"""
Restart a Virtual Machine.
"""
log_usage("mcp-azure-sre", "restart_vm")
try:
cred = get_credential()
client = ComputeManagementClient(cred, subscription_id)
poller = client.virtual_machines.begin_restart(resource_group, vm_name)
poller.result() # Wait for completion
return f"Successfully restarted VM: {vm_name}"
except Exception as e:
return f"Error restarting VM: {str(e)}"
@mcp.tool()
def get_metrics(subscription_id: str, resource_id: str, metric_names: List[str]) -> List[Dict[str, Any]]:
"""
Get metrics for a resource.
"""
log_usage("mcp-azure-sre", "get_metrics")
try:
cred = get_credential()
client = MonitorManagementClient(cred, subscription_id)
# Default to last 1 hour
metrics_data = client.metrics.list(
resource_id,
metricnames=",".join(metric_names),
timespan="PT1H",
interval="PT1M",
aggregation="Average"
)
results = []
for item in metrics_data.value:
for timeseries in item.timeseries:
for data in timeseries.data:
results.append({
"metric": item.name.value,
"timestamp": str(data.time_stamp),
"average": data.average
})
return results
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
def analyze_logs(workspace_id: str, query: str) -> List[Dict[str, Any]]:
"""
Execute KQL query on Log Analytics Workspace.
"""
log_usage("mcp-azure-sre", "analyze_logs")
try:
cred = get_credential()
client = LogsQueryClient(cred)
response = client.query_workspace(workspace_id, query, timespan="P1D")
if response.status == "Success":
# Convert table to list of dicts
results = []
for table in response.tables:
columns = table.columns
for row in table.rows:
results.append(dict(zip(columns, row)))
return results
else:
return [{"error": "Query failed"}]
except Exception as e:
return [{"error": str(e)}]
@mcp.tool()
def check_health(subscription_id: str, resource_group: str) -> Dict[str, str]:
"""
Perform a health check on key resources in a resource group.
Checks status of VMs.
"""
log_usage("mcp-azure-sre", "check_health")
try:
cred = get_credential()
compute_client = ComputeManagementClient(cred, subscription_id)
vms = compute_client.virtual_machines.list(resource_group)
health_status = {}
for vm in vms:
# Get instance view for power state
instance_view = compute_client.virtual_machines.instance_view(resource_group, vm.name)
statuses = [s.display_status for s in instance_view.statuses if s.code.startswith('PowerState')]
health_status[vm.name] = statuses[0] if statuses else "Unknown"
return health_status
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
import os
if os.environ.get("MCP_TRANSPORT") == "sse":
import uvicorn
port = int(os.environ.get("PORT", 7860))
uvicorn.run(mcp.sse_app(), host="0.0.0.0", port=port)
else:
mcp.run()