| """Workflow simulator — YAML parse + CI rule checks.""" |
|
|
| import re |
| from typing import Any, Dict, List, Optional |
|
|
| import yaml |
|
|
| from server.models import FileContent |
|
|
|
|
| class WorkflowSimulator: |
| def validate(self, workflow: Optional[FileContent], files: Dict[str, FileContent]): |
| if workflow is None: |
| return {"parse_success": True, "execution_success": True} |
|
|
| content = workflow.content |
|
|
| |
| |
| single_brace = re.findall(r'\$\{(?!\{)\s*[^}]+\}', content) |
| if single_brace: |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": ( |
| "Unrecognized expression syntax. " |
| "Use ${{ expression }} with double braces for GitHub Actions expressions." |
| ), |
| } |
|
|
| |
| try: |
| parsed = yaml.safe_load(content) |
| except yaml.YAMLError as exc: |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": f"YAML parse error: {exc}", |
| } |
|
|
| if not isinstance(parsed, dict): |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": "Workflow root must be a mapping", |
| } |
|
|
| |
| if "on" not in parsed and True not in parsed: |
| |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": "Workflow must define an 'on' trigger event", |
| } |
|
|
| |
| on_value = parsed.get("on") or parsed.get(True) |
| if isinstance(on_value, dict): |
| for event_key, event_config in on_value.items(): |
| if isinstance(event_config, dict): |
| |
| branches_val = event_config.get("branches") |
| if isinstance(branches_val, str): |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": ( |
| f"Unexpected value '{branches_val}' for 'on.{event_key}.branches'. " |
| "Expected a sequence (list) value." |
| ), |
| } |
|
|
| |
| jobs = parsed.get("jobs") |
| if not isinstance(jobs, dict) or not jobs: |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": "Workflow must define at least one job", |
| } |
|
|
| |
| has_buildx_setup = "docker/setup-buildx-action" in content |
| has_platforms = "platforms:" in content |
| has_docker_login = "docker login" in content |
| has_docker_push = "docker push" in content |
| has_username_secret = "secrets.DOCKER_USERNAME" in content |
| has_password_secret = "secrets.DOCKER_PASSWORD" in content |
| has_github_token_secret = "secrets.GITHUB_TOKEN" in content |
|
|
| |
| job_ids = set(jobs.keys()) |
|
|
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
|
|
| |
| if "runs-on" not in job: |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": f"Job '{job_name}' is missing required field 'runs-on'", |
| } |
|
|
| |
| needs = job.get("needs") |
| if needs: |
| needed = [needs] if isinstance(needs, str) else (needs if isinstance(needs, list) else []) |
| for dep in needed: |
| if dep not in job_ids: |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": f"Job '{job_name}' depends on unknown job '{dep}'", |
| } |
|
|
| steps = job.get("steps", []) |
| if not isinstance(steps, list): |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": f"Job '{job_name}' steps must be a list", |
| } |
|
|
| |
| for step in steps: |
| if not isinstance(step, dict): |
| continue |
| has_uses = "uses" in step |
| has_run = "run" in step |
| if not has_uses and not has_run: |
| step_name = step.get("name", "unnamed") |
| return { |
| "parse_success": False, |
| "execution_success": False, |
| "error": f"Every step must define a 'uses' or 'run' key. Step '{step_name}' has neither.", |
| } |
|
|
| |
| checkout_index = -1 |
| build_index = -1 |
| for idx, step in enumerate(steps): |
| if not isinstance(step, dict): |
| continue |
| uses = step.get("uses", "") |
| run_cmd = step.get("run", "") |
| if isinstance(uses, str) and "actions/checkout" in uses: |
| checkout_index = idx |
| if (isinstance(run_cmd, str) and "docker build" in run_cmd) or ( |
| isinstance(uses, str) and "docker/build-push-action" in uses |
| ): |
| build_index = idx |
|
|
| if build_index != -1 and (checkout_index == -1 or checkout_index > build_index): |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "Checkout must happen before Docker build steps", |
| } |
|
|
| |
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| steps = job.get("steps", []) |
| if not isinstance(steps, list): |
| continue |
| uses_download = any( |
| isinstance(s, dict) and "actions/download-artifact" in str(s.get("uses", "")) |
| for s in steps |
| ) |
| if uses_download: |
| needs = job.get("needs") |
| if not needs: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": ( |
| f"Job '{job_name}' uses download-artifact but has no 'needs' dependency — " |
| "add 'needs' to ensure the upload job completes first" |
| ), |
| } |
|
|
| |
| if has_docker_login: |
| |
| login_has_env_secrets = has_username_secret and has_password_secret |
| if not login_has_env_secrets: |
| |
| if "$DOCKER_USERNAME" in content and not has_username_secret: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "Docker login secrets not wired — add env block with secrets.DOCKER_USERNAME and secrets.DOCKER_PASSWORD", |
| } |
|
|
| |
| if has_docker_push and not has_docker_login: |
| |
| has_login_action = "docker/login-action" in content |
| if not has_login_action: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "Docker push without login — add a docker login step before pushing", |
| } |
|
|
| |
| if "docker login ghcr.io" in content: |
| if has_password_secret and not has_github_token_secret: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "GHCR requires GITHUB_TOKEN for authentication, not DOCKER_PASSWORD", |
| } |
|
|
| |
| if "ghcr.io" in content and "docker push" in content: |
| |
| if "packages: write" not in content and "packages:write" not in content: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "GITHUB_TOKEN does not have packages:write permission — add permissions block", |
| } |
|
|
| |
| if has_platforms and not has_buildx_setup: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "Multi-platform build requires docker/setup-buildx-action", |
| } |
|
|
| |
| if "cache-to:" in content and "cache-from:" in content: |
| |
| if "cache-to: type=gha" in content and "mode=max" not in content: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": "GHA cache export needs mode=max for proper cache support", |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| for step in job.get("steps", []): |
| if not isinstance(step, dict): |
| continue |
| with_block = step.get("with", {}) |
| if not isinstance(with_block, dict): |
| continue |
| context = with_block.get("context") |
| file_path = with_block.get("file") |
| if context and file_path and isinstance(context, str) and isinstance(file_path, str): |
| |
| if context not in {".", "./"} and not file_path.startswith(context): |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": f"Dockerfile path '{file_path}' does not match build context '{context}'", |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| for step in job.get("steps", []): |
| if not isinstance(step, dict): |
| continue |
| run_cmd = step.get("run", "") |
| if not isinstance(run_cmd, str): |
| continue |
| env_block = step.get("env", {}) |
| if not isinstance(env_block, dict): |
| env_block = {} |
| |
| env_var_refs = re.findall(r'\$([A-Z][A-Z0-9_]+)', run_cmd) |
| for var in env_var_refs: |
| |
| if var in ("GITHUB_SHA", "GITHUB_REF", "GITHUB_ACTOR", "GITHUB_REPOSITORY"): |
| continue |
| |
| if var in ("SLACK_WEBHOOK_URL", "DEPLOY_TOKEN", "NPM_TOKEN", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"): |
| if var not in env_block: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": f"{var} is empty — secret not available in shell environment. Map it via env block.", |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| steps = job.get("steps", []) |
| if not isinstance(steps, list): |
| continue |
| build_push_idx = None |
| build_push_has_load = False |
| for idx, step in enumerate(steps): |
| if not isinstance(step, dict): |
| continue |
| uses = step.get("uses", "") |
| if isinstance(uses, str) and "docker/build-push-action" in uses: |
| build_push_idx = idx |
| with_block = step.get("with", {}) |
| if isinstance(with_block, dict): |
| push_val = str(with_block.get("push", "")).lower() |
| load_val = str(with_block.get("load", "")).lower() |
| build_push_has_load = load_val == "true" |
| |
| if push_val == "false" and not build_push_has_load: |
| |
| for later in steps[idx + 1:]: |
| if not isinstance(later, dict): |
| continue |
| run_cmd = later.get("run", "") |
| if isinstance(run_cmd, str) and "docker run" in run_cmd: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": ( |
| "build-push-action with Buildx does not load images into local daemon by default — " |
| "add 'load: true' to make the image available for docker run" |
| ), |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| steps = job.get("steps", []) |
| if not isinstance(steps, list): |
| continue |
| build_registry = None |
| for step in steps: |
| if not isinstance(step, dict): |
| continue |
| run_cmd = step.get("run", "") |
| if not isinstance(run_cmd, str): |
| continue |
| |
| build_match = re.search(r'docker build\s+.*-t\s+(\S+)', run_cmd) |
| if build_match: |
| tag = build_match.group(1) |
| if "ghcr.io" in tag: |
| build_registry = "ghcr.io" |
| elif "docker.io" in tag or "/" in tag: |
| |
| build_registry = tag.split("/")[0] if "." in tag.split("/")[0] else "docker.io" |
| push_match = re.search(r'docker push\s+(\S+)', run_cmd) |
| if push_match and build_registry: |
| push_tag = push_match.group(1) |
| if "ghcr.io" in push_tag: |
| push_registry = "ghcr.io" |
| elif "docker.io" in push_tag: |
| push_registry = "docker.io" |
| else: |
| push_registry = push_tag.split("/")[0] if "." in push_tag.split("/")[0] else "docker.io" |
| if build_registry != push_registry: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": ( |
| f"Registry mismatch: image built with {build_registry} tag " |
| f"but push targets {push_registry}" |
| ), |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| steps = job.get("steps", []) |
| if not isinstance(steps, list): |
| continue |
| built_tags = set() |
| for step in steps: |
| if not isinstance(step, dict): |
| continue |
| run_cmd = step.get("run", "") |
| if not isinstance(run_cmd, str): |
| continue |
| |
| for m in re.finditer(r'docker build\s+.*-t\s+(\S+)', run_cmd): |
| built_tags.add(m.group(1)) |
| |
| tag_match = re.search(r'docker tag\s+(\S+)\s+(\S+)', run_cmd) |
| if tag_match: |
| source = tag_match.group(1) |
| |
| if source not in built_tags and "${{" not in source: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": f"No such image: {source} — docker tag source does not match any built image", |
| } |
| |
| if "${{" in source: |
| |
| source_expr = re.search(r'\$\{\{(.+?)\}\}', source) |
| if source_expr: |
| source_key = source_expr.group(1).strip() |
| found_matching = False |
| for bt in built_tags: |
| bt_expr = re.search(r'\$\{\{(.+?)\}\}', bt) |
| if bt_expr and bt_expr.group(1).strip() == source_key: |
| found_matching = True |
| break |
| |
| source_base = source.split(":")[0] if ":" in source else source |
| built_bases = {bt.split(":")[0] if ":" in bt else bt for bt in built_tags} |
| if not found_matching and source_base in built_bases: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": f"No such image: docker tag source tag does not match any built image tag", |
| } |
|
|
| |
| for job_name, job in jobs.items(): |
| if not isinstance(job, dict): |
| continue |
| strategy = job.get("strategy", {}) |
| if not isinstance(strategy, dict): |
| continue |
| matrix = strategy.get("matrix", {}) |
| if not isinstance(matrix, dict): |
| continue |
| node_versions = matrix.get("node", []) |
| if isinstance(node_versions, list): |
| |
| pkg = files.get("package.json") |
| if pkg: |
| engines_match = re.search(r'"node"\s*:\s*">=(\d+)"', pkg.content) |
| if engines_match: |
| min_version = int(engines_match.group(1)) |
| for v in node_versions: |
| if isinstance(v, int) and v < min_version: |
| return { |
| "parse_success": True, |
| "execution_success": False, |
| "exec_error": f"Matrix job (node: {v}) failed: package.json requires Node >= {min_version}", |
| } |
|
|
| return {"parse_success": True, "execution_success": True} |
|
|