Spaces:
Running
Running
| """Resource verification service — queries MiniStack for ground-truth state.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from typing import Any | |
| from server.services.environment_strategy import EnvironmentStrategy | |
| logger = logging.getLogger(__name__) | |
| def _extract_json_path(data: Any, path: str) -> Any: | |
| """Simple JSON path extractor supporting dot notation and array indexing. | |
| Supports paths like: $.Table.ProvisionedThroughput.ReadCapacityUnits | |
| $.Rules[0].Expiration.Days | |
| $.Buckets[].Name | |
| """ | |
| parts = path.lstrip("$").lstrip(".").split(".") | |
| current = data | |
| for part in parts: | |
| if current is None: | |
| return None | |
| # Handle array index like Rules[0] | |
| if "[" in part: | |
| key, idx_str = part.split("[", 1) | |
| idx_str = idx_str.rstrip("]") | |
| if key: | |
| current = current.get(key) if isinstance(current, dict) else None | |
| if current is None: | |
| return None | |
| if idx_str == "": | |
| # Wildcard — return list of values | |
| if isinstance(current, list): | |
| remaining = ".".join(parts[parts.index(part) + 1 :]) | |
| if remaining: | |
| return [ | |
| _extract_json_path(item, f"$.{remaining}") | |
| for item in current | |
| ] | |
| return current | |
| return None | |
| try: | |
| current = current[int(idx_str)] | |
| except (IndexError, TypeError): | |
| return None | |
| else: | |
| current = current.get(part) if isinstance(current, dict) else None | |
| return current | |
| class ResourceVerifier: | |
| """Verifies resource state by querying MiniStack via AWS CLI.""" | |
| def __init__(self, backend: EnvironmentStrategy) -> None: | |
| self._backend = backend | |
| def resource_exists(self, service: str, name: str) -> bool: | |
| """Check if a specific resource exists in MiniStack. | |
| Uses service-specific verification commands and checks for the | |
| exact resource name (not just any resource of that type). | |
| """ | |
| service_lower = service.lower() | |
| verifiers = { | |
| "s3": self._check_s3_bucket, | |
| "dynamodb": self._check_dynamodb_table, | |
| "lambda": self._check_lambda_function, | |
| "sqs": self._check_sqs_queue, | |
| "sns": self._check_sns_topic, | |
| "iam": self._check_iam_resource, | |
| "apigateway": self._check_apigateway, | |
| "secretsmanager": self._check_secretsmanager, | |
| "ecs": self._check_ecs_cluster, | |
| "rds": self._check_rds_instance, | |
| "elasticache": self._check_elasticache_cluster, | |
| "route53": self._check_route53_hosted_zone, | |
| "elbv2": self._check_elbv2_load_balancer, | |
| "efs": self._check_efs_filesystem, | |
| "cognito-idp": self._check_cognito_user_pool, | |
| "ssm": self._check_ssm_parameter, | |
| "events": self._check_eventbridge_rule, | |
| "apigatewayv2": self._check_apigatewayv2, | |
| "cloudformation": self._check_cloudformation_stack, | |
| "glue": self._check_glue_database, | |
| "ebs": self._check_ebs_volume, | |
| "firehose": self._check_firehose_stream, | |
| } | |
| verifier = verifiers.get(service_lower) | |
| if verifier is None: | |
| logger.warning("No verifier for service: %s", service) | |
| return False | |
| return verifier(name) | |
| def check_state(self, state_check: dict[str, Any]) -> bool: | |
| """Run an arbitrary command and assert on its output. | |
| Supports: | |
| - output_contains: substring check on stdout | |
| - json_path + expected: extract value from JSON stdout and compare | |
| """ | |
| command = state_check.get("command", "") | |
| if not command: | |
| return False | |
| success, stdout, _ = self._backend.execute_command(command) | |
| if not success: | |
| return False | |
| # Check output_contains | |
| if "output_contains" in state_check: | |
| if state_check["output_contains"] not in stdout: | |
| return False | |
| # Check json_path + expected | |
| if "json_path" in state_check and "expected" in state_check: | |
| try: | |
| data = json.loads(stdout) | |
| value = _extract_json_path(data, state_check["json_path"]) | |
| expected = state_check["expected"] | |
| # Compare as strings for flexibility | |
| if str(value) != str(expected): | |
| return False | |
| except (json.JSONDecodeError, KeyError, TypeError): | |
| return False | |
| return True | |
| # -- Service-specific verifiers ------------------------------------------- | |
| def _check_s3_bucket(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws s3api list-buckets --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| buckets = data.get("Buckets", []) | |
| return any(b.get("Name") == name for b in buckets) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_dynamodb_table(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws dynamodb describe-table --table-name {name}" | |
| ) | |
| return success | |
| def _check_lambda_function(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws lambda get-function --function-name {name}" | |
| ) | |
| return success | |
| def _check_sqs_queue(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws sqs get-queue-url --queue-name {name}" | |
| ) | |
| return success | |
| def _check_sns_topic(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws sns list-topics --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| topics = data.get("Topics", []) | |
| return any(name in t.get("TopicArn", "") for t in topics) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_iam_resource(self, name: str) -> bool: | |
| """Check for IAM roles, users, and policies by name.""" | |
| # Try role first | |
| success, _, _ = self._backend.execute_command( | |
| f"aws iam get-role --role-name {name}" | |
| ) | |
| if success: | |
| return True | |
| # Try user | |
| success, _, _ = self._backend.execute_command( | |
| f"aws iam get-user --user-name {name}" | |
| ) | |
| if success: | |
| return True | |
| # Try policy (list and match by name) | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws iam list-policies --scope Local --output json" | |
| ) | |
| if success: | |
| try: | |
| data = json.loads(stdout) | |
| policies = data.get("Policies", []) | |
| if any(p.get("PolicyName") == name for p in policies): | |
| return True | |
| except (json.JSONDecodeError, TypeError): | |
| pass | |
| return False | |
| def _check_secretsmanager(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws secretsmanager describe-secret --secret-id {name}" | |
| ) | |
| return success | |
| def _check_apigateway(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws apigateway get-rest-apis --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| items = data.get("items", []) | |
| return any(i.get("name") == name for i in items) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_ecs_cluster(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| f"aws ecs describe-clusters --clusters {name}" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| clusters = data.get("clusters", []) | |
| return any( | |
| c.get("clusterName") == name and c.get("status") != "INACTIVE" | |
| for c in clusters | |
| ) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_rds_instance(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws rds describe-db-instances --db-instance-identifier {name}" | |
| ) | |
| return success | |
| def _check_elasticache_cluster(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws elasticache describe-cache-clusters --cache-cluster-id {name}" | |
| ) | |
| return success | |
| def _check_route53_hosted_zone(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws route53 list-hosted-zones --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| zones = data.get("HostedZones", []) | |
| return any(z.get("Name", "").rstrip(".") == name.rstrip(".") for z in zones) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_elbv2_load_balancer(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| f"aws elbv2 describe-load-balancers --names {name}" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| lbs = data.get("LoadBalancers", []) | |
| return any(lb.get("LoadBalancerName") == name for lb in lbs) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_efs_filesystem(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws efs describe-file-systems --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| filesystems = data.get("FileSystems", []) | |
| return any( | |
| fs.get("CreationToken") == name | |
| or any(t.get("Value") == name for t in fs.get("Tags", [])) | |
| for fs in filesystems | |
| ) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_cognito_user_pool(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws cognito-idp list-user-pools --max-results 60 --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| pools = data.get("UserPools", []) | |
| return any(p.get("Name") == name for p in pools) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_ssm_parameter(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws ssm get-parameter --name {name}" | |
| ) | |
| return success | |
| def _check_eventbridge_rule(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws events describe-rule --name {name}" | |
| ) | |
| return success | |
| def _check_apigatewayv2(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws apigatewayv2 get-apis --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| items = data.get("Items", []) | |
| return any(i.get("Name") == name for i in items) | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_cloudformation_stack(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws cloudformation describe-stacks --stack-name {name}" | |
| ) | |
| return success | |
| def _check_glue_database(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws glue get-database --name {name}" | |
| ) | |
| return success | |
| def _check_ebs_volume(self, name: str) -> bool: | |
| success, stdout, _ = self._backend.execute_command( | |
| "aws ec2 describe-volumes --output json" | |
| ) | |
| if not success: | |
| return False | |
| try: | |
| data = json.loads(stdout) | |
| volumes = data.get("Volumes", []) | |
| return len(volumes) > 0 | |
| except (json.JSONDecodeError, TypeError): | |
| return False | |
| def _check_firehose_stream(self, name: str) -> bool: | |
| success, _, _ = self._backend.execute_command( | |
| f"aws firehose describe-delivery-stream --delivery-stream-name {name}" | |
| ) | |
| return success | |