"""Workflow simulator — YAML parse + CI rule checks.""" import re from typing import Any, Dict, List, Optional import yaml from server.models import FileContent class WorkflowSimulator: def validate(self, workflow: Optional[FileContent], files: Dict[str, FileContent]): if workflow is None: return {"parse_success": True, "execution_success": True} content = workflow.content # single-brace expressions: ${ } should be ${{ }} # Match ${ ... } that is NOT ${{ ... }} single_brace = re.findall(r'\$\{(?!\{)\s*[^}]+\}', content) if single_brace: return { "parse_success": False, "execution_success": False, "error": ( "Unrecognized expression syntax. " "Use ${{ expression }} with double braces for GitHub Actions expressions." ), } # parse yaml try: parsed = yaml.safe_load(content) except yaml.YAMLError as exc: return { "parse_success": False, "execution_success": False, "error": f"YAML parse error: {exc}", } if not isinstance(parsed, dict): return { "parse_success": False, "execution_success": False, "error": "Workflow root must be a mapping", } # needs an 'on' trigger if "on" not in parsed and True not in parsed: # yaml.safe_load converts `on:` to True key in some contexts return { "parse_success": False, "execution_success": False, "error": "Workflow must define an 'on' trigger event", } # validate trigger structure on_value = parsed.get("on") or parsed.get(True) if isinstance(on_value, dict): for event_key, event_config in on_value.items(): if isinstance(event_config, dict): # Check branches is a list, not a bare string branches_val = event_config.get("branches") if isinstance(branches_val, str): return { "parse_success": False, "execution_success": False, "error": ( f"Unexpected value '{branches_val}' for 'on.{event_key}.branches'. " "Expected a sequence (list) value." ), } # jobs block jobs = parsed.get("jobs") if not isinstance(jobs, dict) or not jobs: return { "parse_success": False, "execution_success": False, "error": "Workflow must define at least one job", } # Content-level flags for cross-cutting checks has_buildx_setup = "docker/setup-buildx-action" in content has_platforms = "platforms:" in content has_docker_login = "docker login" in content has_docker_push = "docker push" in content has_username_secret = "secrets.DOCKER_USERNAME" in content has_password_secret = "secrets.DOCKER_PASSWORD" in content has_github_token_secret = "secrets.GITHUB_TOKEN" in content # Collect job IDs for needs validation job_ids = set(jobs.keys()) for job_name, job in jobs.items(): if not isinstance(job, dict): continue # runs-on is required if "runs-on" not in job: return { "parse_success": False, "execution_success": False, "error": f"Job '{job_name}' is missing required field 'runs-on'", } # check 'needs' refs point to real jobs needs = job.get("needs") if needs: needed = [needs] if isinstance(needs, str) else (needs if isinstance(needs, list) else []) for dep in needed: if dep not in job_ids: return { "parse_success": False, "execution_success": False, "error": f"Job '{job_name}' depends on unknown job '{dep}'", } steps = job.get("steps", []) if not isinstance(steps, list): return { "parse_success": False, "execution_success": False, "error": f"Job '{job_name}' steps must be a list", } # every step needs 'uses' or 'run' for step in steps: if not isinstance(step, dict): continue has_uses = "uses" in step has_run = "run" in step if not has_uses and not has_run: step_name = step.get("name", "unnamed") return { "parse_success": False, "execution_success": False, "error": f"Every step must define a 'uses' or 'run' key. Step '{step_name}' has neither.", } # checkout must come before docker build checkout_index = -1 build_index = -1 for idx, step in enumerate(steps): if not isinstance(step, dict): continue uses = step.get("uses", "") run_cmd = step.get("run", "") if isinstance(uses, str) and "actions/checkout" in uses: checkout_index = idx if (isinstance(run_cmd, str) and "docker build" in run_cmd) or ( isinstance(uses, str) and "docker/build-push-action" in uses ): build_index = idx if build_index != -1 and (checkout_index == -1 or checkout_index > build_index): return { "parse_success": True, "execution_success": False, "exec_error": "Checkout must happen before Docker build steps", } # cross-job artifact dependency: download needs 'needs' # If a job uses download-artifact but doesn't declare needs on the upload job for job_name, job in jobs.items(): if not isinstance(job, dict): continue steps = job.get("steps", []) if not isinstance(steps, list): continue uses_download = any( isinstance(s, dict) and "actions/download-artifact" in str(s.get("uses", "")) for s in steps ) if uses_download: needs = job.get("needs") if not needs: return { "parse_success": True, "execution_success": False, "exec_error": ( f"Job '{job_name}' uses download-artifact but has no 'needs' dependency — " "add 'needs' to ensure the upload job completes first" ), } # docker login needs secrets wired via env if has_docker_login: # Check if the login step has env block with secrets login_has_env_secrets = has_username_secret and has_password_secret if not login_has_env_secrets: # Check if login uses $DOCKER_USERNAME (env var) without secret mapping if "$DOCKER_USERNAME" in content and not has_username_secret: return { "parse_success": True, "execution_success": False, "exec_error": "Docker login secrets not wired — add env block with secrets.DOCKER_USERNAME and secrets.DOCKER_PASSWORD", } # push without login if has_docker_push and not has_docker_login: # Check if using docker/login-action instead has_login_action = "docker/login-action" in content if not has_login_action: return { "parse_success": True, "execution_success": False, "exec_error": "Docker push without login — add a docker login step before pushing", } # ghcr.io needs GITHUB_TOKEN not DOCKER_PASSWORD if "docker login ghcr.io" in content: if has_password_secret and not has_github_token_secret: return { "parse_success": True, "execution_success": False, "exec_error": "GHCR requires GITHUB_TOKEN for authentication, not DOCKER_PASSWORD", } # ghcr push needs packages:write permission if "ghcr.io" in content and "docker push" in content: # Check if permissions block has packages: write if "packages: write" not in content and "packages:write" not in content: return { "parse_success": True, "execution_success": False, "exec_error": "GITHUB_TOKEN does not have packages:write permission — add permissions block", } # multi-platform needs buildx if has_platforms and not has_buildx_setup: return { "parse_success": True, "execution_success": False, "exec_error": "Multi-platform build requires docker/setup-buildx-action", } # GHA cache export needs mode=max if "cache-to:" in content and "cache-from:" in content: # Check for mode=max if "cache-to: type=gha" in content and "mode=max" not in content: return { "parse_success": True, "execution_success": False, "exec_error": "GHA cache export needs mode=max for proper cache support", } # context vs dockerfile path mismatch for job_name, job in jobs.items(): if not isinstance(job, dict): continue for step in job.get("steps", []): if not isinstance(step, dict): continue with_block = step.get("with", {}) if not isinstance(with_block, dict): continue context = with_block.get("context") file_path = with_block.get("file") if context and file_path and isinstance(context, str) and isinstance(file_path, str): # If context is a subdirectory but file is at root if context not in {".", "./"} and not file_path.startswith(context): return { "parse_success": True, "execution_success": False, "exec_error": f"Dockerfile path '{file_path}' does not match build context '{context}'", } # shell env var from secret but not mapped in env block for job_name, job in jobs.items(): if not isinstance(job, dict): continue for step in job.get("steps", []): if not isinstance(step, dict): continue run_cmd = step.get("run", "") if not isinstance(run_cmd, str): continue env_block = step.get("env", {}) if not isinstance(env_block, dict): env_block = {} # Find env vars used in run that look like they should come from secrets env_var_refs = re.findall(r'\$([A-Z][A-Z0-9_]+)', run_cmd) for var in env_var_refs: # Skip GitHub expression vars (they're in ${{ }}) if var in ("GITHUB_SHA", "GITHUB_REF", "GITHUB_ACTOR", "GITHUB_REPOSITORY"): continue # Common secret-backed env vars if var in ("SLACK_WEBHOOK_URL", "DEPLOY_TOKEN", "NPM_TOKEN", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"): if var not in env_block: return { "parse_success": True, "execution_success": False, "exec_error": f"{var} is empty — secret not available in shell environment. Map it via env block.", } # build-push-action without load:true when image is used locally after for job_name, job in jobs.items(): if not isinstance(job, dict): continue steps = job.get("steps", []) if not isinstance(steps, list): continue build_push_idx = None build_push_has_load = False for idx, step in enumerate(steps): if not isinstance(step, dict): continue uses = step.get("uses", "") if isinstance(uses, str) and "docker/build-push-action" in uses: build_push_idx = idx with_block = step.get("with", {}) if isinstance(with_block, dict): push_val = str(with_block.get("push", "")).lower() load_val = str(with_block.get("load", "")).lower() build_push_has_load = load_val == "true" # Only flag if push is false (local use intended) if push_val == "false" and not build_push_has_load: # Check if a later step uses docker run for later in steps[idx + 1:]: if not isinstance(later, dict): continue run_cmd = later.get("run", "") if isinstance(run_cmd, str) and "docker run" in run_cmd: return { "parse_success": True, "execution_success": False, "exec_error": ( "build-push-action with Buildx does not load images into local daemon by default — " "add 'load: true' to make the image available for docker run" ), } # registry mismatch between build tag and push command for job_name, job in jobs.items(): if not isinstance(job, dict): continue steps = job.get("steps", []) if not isinstance(steps, list): continue build_registry = None for step in steps: if not isinstance(step, dict): continue run_cmd = step.get("run", "") if not isinstance(run_cmd, str): continue # Extract registry from docker build -t build_match = re.search(r'docker build\s+.*-t\s+(\S+)', run_cmd) if build_match: tag = build_match.group(1) if "ghcr.io" in tag: build_registry = "ghcr.io" elif "docker.io" in tag or "/" in tag: # docker.io is default for user/image format build_registry = tag.split("/")[0] if "." in tag.split("/")[0] else "docker.io" push_match = re.search(r'docker push\s+(\S+)', run_cmd) if push_match and build_registry: push_tag = push_match.group(1) if "ghcr.io" in push_tag: push_registry = "ghcr.io" elif "docker.io" in push_tag: push_registry = "docker.io" else: push_registry = push_tag.split("/")[0] if "." in push_tag.split("/")[0] else "docker.io" if build_registry != push_registry: return { "parse_success": True, "execution_success": False, "exec_error": ( f"Registry mismatch: image built with {build_registry} tag " f"but push targets {push_registry}" ), } # docker tag referencing non-existent image tag for job_name, job in jobs.items(): if not isinstance(job, dict): continue steps = job.get("steps", []) if not isinstance(steps, list): continue built_tags = set() for step in steps: if not isinstance(step, dict): continue run_cmd = step.get("run", "") if not isinstance(run_cmd, str): continue # Collect tags from docker build -t for m in re.finditer(r'docker build\s+.*-t\s+(\S+)', run_cmd): built_tags.add(m.group(1)) # Check docker tag source exists tag_match = re.search(r'docker tag\s+(\S+)\s+(\S+)', run_cmd) if tag_match: source = tag_match.group(1) # If source contains ${{ it's a template — compare the template expression if source not in built_tags and "${{" not in source: return { "parse_success": True, "execution_success": False, "exec_error": f"No such image: {source} — docker tag source does not match any built image", } # Check if source uses a different tag template than what was built if "${{" in source: # Normalize: extract the expression source_expr = re.search(r'\$\{\{(.+?)\}\}', source) if source_expr: source_key = source_expr.group(1).strip() found_matching = False for bt in built_tags: bt_expr = re.search(r'\$\{\{(.+?)\}\}', bt) if bt_expr and bt_expr.group(1).strip() == source_key: found_matching = True break # Also check if the base image name matches source_base = source.split(":")[0] if ":" in source else source built_bases = {bt.split(":")[0] if ":" in bt else bt for bt in built_tags} if not found_matching and source_base in built_bases: return { "parse_success": True, "execution_success": False, "exec_error": f"No such image: docker tag source tag does not match any built image tag", } # node version vs package.json engines for job_name, job in jobs.items(): if not isinstance(job, dict): continue strategy = job.get("strategy", {}) if not isinstance(strategy, dict): continue matrix = strategy.get("matrix", {}) if not isinstance(matrix, dict): continue node_versions = matrix.get("node", []) if isinstance(node_versions, list): # Check package.json engines constraint pkg = files.get("package.json") if pkg: engines_match = re.search(r'"node"\s*:\s*">=(\d+)"', pkg.content) if engines_match: min_version = int(engines_match.group(1)) for v in node_versions: if isinstance(v, int) and v < min_version: return { "parse_success": True, "execution_success": False, "exec_error": f"Matrix job (node: {v}) failed: package.json requires Node >= {min_version}", } return {"parse_success": True, "execution_success": True}