Spaces:
Running
Running
| """ | |
| Azure SRE MCP Server | |
| """ | |
| import sys | |
| import os | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| # Add src to pythonpath | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| src_dir = os.path.dirname(os.path.dirname(current_dir)) | |
| if src_dir not in sys.path: | |
| sys.path.append(src_dir) | |
| from mcp.server.fastmcp import FastMCP | |
| from core.mcp_telemetry import log_usage, log_trace, log_metric | |
| import uuid | |
| import time | |
| # Azure Imports | |
| try: | |
| from azure.identity import DefaultAzureCredential | |
| from azure.mgmt.resource import ResourceManagementClient | |
| from azure.mgmt.monitor import MonitorManagementClient | |
| from azure.mgmt.compute import ComputeManagementClient | |
| from azure.monitor.query import LogsQueryClient | |
| except ImportError: | |
| # Allow running without Azure SDKs installed (for testing/mocking) | |
| DefaultAzureCredential = None | |
| ResourceManagementClient = None | |
| MonitorManagementClient = None | |
| ComputeManagementClient = None | |
| LogsQueryClient = None | |
| # Initialize Server | |
| mcp = FastMCP("Azure SRE", host="0.0.0.0") | |
| # Helper to get credential | |
| def get_credential(): | |
| if not DefaultAzureCredential: | |
| raise ImportError("Azure SDKs not installed.") | |
| return DefaultAzureCredential() | |
| def list_resources(subscription_id: str, resource_group: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """ | |
| List Azure resources in a subscription or resource group. | |
| """ | |
| start_time = time.time() | |
| trace_id = str(uuid.uuid4()) | |
| span_id = str(uuid.uuid4()) | |
| log_usage("mcp-azure-sre", "list_resources") | |
| try: | |
| cred = get_credential() | |
| client = ResourceManagementClient(cred, subscription_id) | |
| if resource_group: | |
| resources = client.resources.list_by_resource_group(resource_group) | |
| else: | |
| resources = client.resources.list() | |
| results = [{"name": r.name, "type": r.type, "location": r.location, "id": r.id} for r in resources] | |
| duration = (time.time() - start_time) * 1000 | |
| log_trace("mcp-azure-sre", trace_id, span_id, "list_resources", duration, "ok") | |
| log_metric("mcp-azure-sre", "resources_scanned", len(results), {"sub": subscription_id}) | |
| return results | |
| except Exception as e: | |
| duration = (time.time() - start_time) * 1000 | |
| log_trace("mcp-azure-sre", trace_id, span_id, "list_resources", duration, "error") | |
| return [{"error": str(e)}] | |
| def restart_vm(subscription_id: str, resource_group: str, vm_name: str) -> str: | |
| """ | |
| Restart a Virtual Machine. | |
| """ | |
| log_usage("mcp-azure-sre", "restart_vm") | |
| try: | |
| cred = get_credential() | |
| client = ComputeManagementClient(cred, subscription_id) | |
| poller = client.virtual_machines.begin_restart(resource_group, vm_name) | |
| poller.result() # Wait for completion | |
| return f"Successfully restarted VM: {vm_name}" | |
| except Exception as e: | |
| return f"Error restarting VM: {str(e)}" | |
| def get_metrics(subscription_id: str, resource_id: str, metric_names: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Get metrics for a resource. | |
| """ | |
| log_usage("mcp-azure-sre", "get_metrics") | |
| try: | |
| cred = get_credential() | |
| client = MonitorManagementClient(cred, subscription_id) | |
| # Default to last 1 hour | |
| metrics_data = client.metrics.list( | |
| resource_id, | |
| metricnames=",".join(metric_names), | |
| timespan="PT1H", | |
| interval="PT1M", | |
| aggregation="Average" | |
| ) | |
| results = [] | |
| for item in metrics_data.value: | |
| for timeseries in item.timeseries: | |
| for data in timeseries.data: | |
| results.append({ | |
| "metric": item.name.value, | |
| "timestamp": str(data.time_stamp), | |
| "average": data.average | |
| }) | |
| return results | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| def analyze_logs(workspace_id: str, query: str) -> List[Dict[str, Any]]: | |
| """ | |
| Execute KQL query on Log Analytics Workspace. | |
| """ | |
| log_usage("mcp-azure-sre", "analyze_logs") | |
| try: | |
| cred = get_credential() | |
| client = LogsQueryClient(cred) | |
| response = client.query_workspace(workspace_id, query, timespan="P1D") | |
| if response.status == "Success": | |
| # Convert table to list of dicts | |
| results = [] | |
| for table in response.tables: | |
| columns = table.columns | |
| for row in table.rows: | |
| results.append(dict(zip(columns, row))) | |
| return results | |
| else: | |
| return [{"error": "Query failed"}] | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| def check_health(subscription_id: str, resource_group: str) -> Dict[str, str]: | |
| """ | |
| Perform a health check on key resources in a resource group. | |
| Checks status of VMs. | |
| """ | |
| log_usage("mcp-azure-sre", "check_health") | |
| try: | |
| cred = get_credential() | |
| compute_client = ComputeManagementClient(cred, subscription_id) | |
| vms = compute_client.virtual_machines.list(resource_group) | |
| health_status = {} | |
| for vm in vms: | |
| # Get instance view for power state | |
| instance_view = compute_client.virtual_machines.instance_view(resource_group, vm.name) | |
| statuses = [s.display_status for s in instance_view.statuses if s.code.startswith('PowerState')] | |
| health_status[vm.name] = statuses[0] if statuses else "Unknown" | |
| return health_status | |
| except Exception as e: | |
| return {"error": str(e)} | |
| if __name__ == "__main__": | |
| import os | |
| if os.environ.get("MCP_TRANSPORT") == "sse": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(mcp.sse_app(), host="0.0.0.0", port=port) | |
| else: | |
| mcp.run() | |