Krishna1107's picture
fixed inference
2794920
"""Workflow simulator — YAML parse + CI rule checks."""
import re
from typing import Any, Dict, List, Optional
import yaml
from server.models import FileContent
class WorkflowSimulator:
def validate(self, workflow: Optional[FileContent], files: Dict[str, FileContent]):
if workflow is None:
return {"parse_success": True, "execution_success": True}
content = workflow.content
# single-brace expressions: ${ } should be ${{ }}
# Match ${ ... } that is NOT ${{ ... }}
single_brace = re.findall(r'\$\{(?!\{)\s*[^}]+\}', content)
if single_brace:
return {
"parse_success": False,
"execution_success": False,
"error": (
"Unrecognized expression syntax. "
"Use ${{ expression }} with double braces for GitHub Actions expressions."
),
}
# parse yaml
try:
parsed = yaml.safe_load(content)
except yaml.YAMLError as exc:
return {
"parse_success": False,
"execution_success": False,
"error": f"YAML parse error: {exc}",
}
if not isinstance(parsed, dict):
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow root must be a mapping",
}
# needs an 'on' trigger
if "on" not in parsed and True not in parsed:
# yaml.safe_load converts `on:` to True key in some contexts
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow must define an 'on' trigger event",
}
# validate trigger structure
on_value = parsed.get("on") or parsed.get(True)
if isinstance(on_value, dict):
for event_key, event_config in on_value.items():
if isinstance(event_config, dict):
# Check branches is a list, not a bare string
branches_val = event_config.get("branches")
if isinstance(branches_val, str):
return {
"parse_success": False,
"execution_success": False,
"error": (
f"Unexpected value '{branches_val}' for 'on.{event_key}.branches'. "
"Expected a sequence (list) value."
),
}
# jobs block
jobs = parsed.get("jobs")
if not isinstance(jobs, dict) or not jobs:
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow must define at least one job",
}
# Content-level flags for cross-cutting checks
has_buildx_setup = "docker/setup-buildx-action" in content
has_platforms = "platforms:" in content
has_docker_login = "docker login" in content
has_docker_push = "docker push" in content
has_username_secret = "secrets.DOCKER_USERNAME" in content
has_password_secret = "secrets.DOCKER_PASSWORD" in content
has_github_token_secret = "secrets.GITHUB_TOKEN" in content
# Collect job IDs for needs validation
job_ids = set(jobs.keys())
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
# runs-on is required
if "runs-on" not in job:
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' is missing required field 'runs-on'",
}
# check 'needs' refs point to real jobs
needs = job.get("needs")
if needs:
needed = [needs] if isinstance(needs, str) else (needs if isinstance(needs, list) else [])
for dep in needed:
if dep not in job_ids:
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' depends on unknown job '{dep}'",
}
steps = job.get("steps", [])
if not isinstance(steps, list):
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' steps must be a list",
}
# every step needs 'uses' or 'run'
for step in steps:
if not isinstance(step, dict):
continue
has_uses = "uses" in step
has_run = "run" in step
if not has_uses and not has_run:
step_name = step.get("name", "unnamed")
return {
"parse_success": False,
"execution_success": False,
"error": f"Every step must define a 'uses' or 'run' key. Step '{step_name}' has neither.",
}
# checkout must come before docker build
checkout_index = -1
build_index = -1
for idx, step in enumerate(steps):
if not isinstance(step, dict):
continue
uses = step.get("uses", "")
run_cmd = step.get("run", "")
if isinstance(uses, str) and "actions/checkout" in uses:
checkout_index = idx
if (isinstance(run_cmd, str) and "docker build" in run_cmd) or (
isinstance(uses, str) and "docker/build-push-action" in uses
):
build_index = idx
if build_index != -1 and (checkout_index == -1 or checkout_index > build_index):
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Checkout must happen before Docker build steps",
}
# cross-job artifact dependency: download needs 'needs'
# If a job uses download-artifact but doesn't declare needs on the upload job
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
uses_download = any(
isinstance(s, dict) and "actions/download-artifact" in str(s.get("uses", ""))
for s in steps
)
if uses_download:
needs = job.get("needs")
if not needs:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
f"Job '{job_name}' uses download-artifact but has no 'needs' dependency — "
"add 'needs' to ensure the upload job completes first"
),
}
# docker login needs secrets wired via env
if has_docker_login:
# Check if the login step has env block with secrets
login_has_env_secrets = has_username_secret and has_password_secret
if not login_has_env_secrets:
# Check if login uses $DOCKER_USERNAME (env var) without secret mapping
if "$DOCKER_USERNAME" in content and not has_username_secret:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Docker login secrets not wired — add env block with secrets.DOCKER_USERNAME and secrets.DOCKER_PASSWORD",
}
# push without login
if has_docker_push and not has_docker_login:
# Check if using docker/login-action instead
has_login_action = "docker/login-action" in content
if not has_login_action:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Docker push without login — add a docker login step before pushing",
}
# ghcr.io needs GITHUB_TOKEN not DOCKER_PASSWORD
if "docker login ghcr.io" in content:
if has_password_secret and not has_github_token_secret:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GHCR requires GITHUB_TOKEN for authentication, not DOCKER_PASSWORD",
}
# ghcr push needs packages:write permission
if "ghcr.io" in content and "docker push" in content:
# Check if permissions block has packages: write
if "packages: write" not in content and "packages:write" not in content:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GITHUB_TOKEN does not have packages:write permission — add permissions block",
}
# multi-platform needs buildx
if has_platforms and not has_buildx_setup:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Multi-platform build requires docker/setup-buildx-action",
}
# GHA cache export needs mode=max
if "cache-to:" in content and "cache-from:" in content:
# Check for mode=max
if "cache-to: type=gha" in content and "mode=max" not in content:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GHA cache export needs mode=max for proper cache support",
}
# context vs dockerfile path mismatch
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
for step in job.get("steps", []):
if not isinstance(step, dict):
continue
with_block = step.get("with", {})
if not isinstance(with_block, dict):
continue
context = with_block.get("context")
file_path = with_block.get("file")
if context and file_path and isinstance(context, str) and isinstance(file_path, str):
# If context is a subdirectory but file is at root
if context not in {".", "./"} and not file_path.startswith(context):
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"Dockerfile path '{file_path}' does not match build context '{context}'",
}
# shell env var from secret but not mapped in env block
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
for step in job.get("steps", []):
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
env_block = step.get("env", {})
if not isinstance(env_block, dict):
env_block = {}
# Find env vars used in run that look like they should come from secrets
env_var_refs = re.findall(r'\$([A-Z][A-Z0-9_]+)', run_cmd)
for var in env_var_refs:
# Skip GitHub expression vars (they're in ${{ }})
if var in ("GITHUB_SHA", "GITHUB_REF", "GITHUB_ACTOR", "GITHUB_REPOSITORY"):
continue
# Common secret-backed env vars
if var in ("SLACK_WEBHOOK_URL", "DEPLOY_TOKEN", "NPM_TOKEN", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"):
if var not in env_block:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"{var} is empty — secret not available in shell environment. Map it via env block.",
}
# build-push-action without load:true when image is used locally after
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
build_push_idx = None
build_push_has_load = False
for idx, step in enumerate(steps):
if not isinstance(step, dict):
continue
uses = step.get("uses", "")
if isinstance(uses, str) and "docker/build-push-action" in uses:
build_push_idx = idx
with_block = step.get("with", {})
if isinstance(with_block, dict):
push_val = str(with_block.get("push", "")).lower()
load_val = str(with_block.get("load", "")).lower()
build_push_has_load = load_val == "true"
# Only flag if push is false (local use intended)
if push_val == "false" and not build_push_has_load:
# Check if a later step uses docker run
for later in steps[idx + 1:]:
if not isinstance(later, dict):
continue
run_cmd = later.get("run", "")
if isinstance(run_cmd, str) and "docker run" in run_cmd:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
"build-push-action with Buildx does not load images into local daemon by default — "
"add 'load: true' to make the image available for docker run"
),
}
# registry mismatch between build tag and push command
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
build_registry = None
for step in steps:
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
# Extract registry from docker build -t
build_match = re.search(r'docker build\s+.*-t\s+(\S+)', run_cmd)
if build_match:
tag = build_match.group(1)
if "ghcr.io" in tag:
build_registry = "ghcr.io"
elif "docker.io" in tag or "/" in tag:
# docker.io is default for user/image format
build_registry = tag.split("/")[0] if "." in tag.split("/")[0] else "docker.io"
push_match = re.search(r'docker push\s+(\S+)', run_cmd)
if push_match and build_registry:
push_tag = push_match.group(1)
if "ghcr.io" in push_tag:
push_registry = "ghcr.io"
elif "docker.io" in push_tag:
push_registry = "docker.io"
else:
push_registry = push_tag.split("/")[0] if "." in push_tag.split("/")[0] else "docker.io"
if build_registry != push_registry:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
f"Registry mismatch: image built with {build_registry} tag "
f"but push targets {push_registry}"
),
}
# docker tag referencing non-existent image tag
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
built_tags = set()
for step in steps:
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
# Collect tags from docker build -t
for m in re.finditer(r'docker build\s+.*-t\s+(\S+)', run_cmd):
built_tags.add(m.group(1))
# Check docker tag source exists
tag_match = re.search(r'docker tag\s+(\S+)\s+(\S+)', run_cmd)
if tag_match:
source = tag_match.group(1)
# If source contains ${{ it's a template — compare the template expression
if source not in built_tags and "${{" not in source:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"No such image: {source} — docker tag source does not match any built image",
}
# Check if source uses a different tag template than what was built
if "${{" in source:
# Normalize: extract the expression
source_expr = re.search(r'\$\{\{(.+?)\}\}', source)
if source_expr:
source_key = source_expr.group(1).strip()
found_matching = False
for bt in built_tags:
bt_expr = re.search(r'\$\{\{(.+?)\}\}', bt)
if bt_expr and bt_expr.group(1).strip() == source_key:
found_matching = True
break
# Also check if the base image name matches
source_base = source.split(":")[0] if ":" in source else source
built_bases = {bt.split(":")[0] if ":" in bt else bt for bt in built_tags}
if not found_matching and source_base in built_bases:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"No such image: docker tag source tag does not match any built image tag",
}
# node version vs package.json engines
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
strategy = job.get("strategy", {})
if not isinstance(strategy, dict):
continue
matrix = strategy.get("matrix", {})
if not isinstance(matrix, dict):
continue
node_versions = matrix.get("node", [])
if isinstance(node_versions, list):
# Check package.json engines constraint
pkg = files.get("package.json")
if pkg:
engines_match = re.search(r'"node"\s*:\s*">=(\d+)"', pkg.content)
if engines_match:
min_version = int(engines_match.group(1))
for v in node_versions:
if isinstance(v, int) and v < min_version:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"Matrix job (node: {v}) failed: package.json requires Node >= {min_version}",
}
return {"parse_success": True, "execution_success": True}