File size: 21,051 Bytes
a7caaff 85b7ac8 4b07aaf 85b7ac8 4b07aaf a7caaff 4b07aaf a7caaff 85b7ac8 a7caaff 4b07aaf a7caaff 4b07aaf a7caaff 85b7ac8 4b07aaf 85b7ac8 4b07aaf 85b7ac8 4b07aaf 85b7ac8 4b07aaf 85b7ac8 4b07aaf a7caaff 4b07aaf a7caaff 4b07aaf 85b7ac8 4b07aaf 85b7ac8 a7caaff 4b07aaf a7caaff 85b7ac8 4b07aaf 85b7ac8 4b07aaf 85b7ac8 a7caaff 4b07aaf a7caaff 4b07aaf a7caaff 4b07aaf a7caaff 4b07aaf a7caaff 4b07aaf 85b7ac8 a7caaff 85b7ac8 a7caaff 4b07aaf a7caaff 4b07aaf a7caaff 4b07aaf 2794920 a7caaff 4b07aaf 85b7ac8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 | """Workflow simulator β YAML parse + CI rule checks."""
import re
from typing import Any, Dict, List, Optional
import yaml
from server.models import FileContent
class WorkflowSimulator:
def validate(self, workflow: Optional[FileContent], files: Dict[str, FileContent]):
if workflow is None:
return {"parse_success": True, "execution_success": True}
content = workflow.content
# single-brace expressions: ${ } should be ${{ }}
# Match ${ ... } that is NOT ${{ ... }}
single_brace = re.findall(r'\$\{(?!\{)\s*[^}]+\}', content)
if single_brace:
return {
"parse_success": False,
"execution_success": False,
"error": (
"Unrecognized expression syntax. "
"Use ${{ expression }} with double braces for GitHub Actions expressions."
),
}
# parse yaml
try:
parsed = yaml.safe_load(content)
except yaml.YAMLError as exc:
return {
"parse_success": False,
"execution_success": False,
"error": f"YAML parse error: {exc}",
}
if not isinstance(parsed, dict):
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow root must be a mapping",
}
# needs an 'on' trigger
if "on" not in parsed and True not in parsed:
# yaml.safe_load converts `on:` to True key in some contexts
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow must define an 'on' trigger event",
}
# validate trigger structure
on_value = parsed.get("on") or parsed.get(True)
if isinstance(on_value, dict):
for event_key, event_config in on_value.items():
if isinstance(event_config, dict):
# Check branches is a list, not a bare string
branches_val = event_config.get("branches")
if isinstance(branches_val, str):
return {
"parse_success": False,
"execution_success": False,
"error": (
f"Unexpected value '{branches_val}' for 'on.{event_key}.branches'. "
"Expected a sequence (list) value."
),
}
# jobs block
jobs = parsed.get("jobs")
if not isinstance(jobs, dict) or not jobs:
return {
"parse_success": False,
"execution_success": False,
"error": "Workflow must define at least one job",
}
# Content-level flags for cross-cutting checks
has_buildx_setup = "docker/setup-buildx-action" in content
has_platforms = "platforms:" in content
has_docker_login = "docker login" in content
has_docker_push = "docker push" in content
has_username_secret = "secrets.DOCKER_USERNAME" in content
has_password_secret = "secrets.DOCKER_PASSWORD" in content
has_github_token_secret = "secrets.GITHUB_TOKEN" in content
# Collect job IDs for needs validation
job_ids = set(jobs.keys())
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
# runs-on is required
if "runs-on" not in job:
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' is missing required field 'runs-on'",
}
# check 'needs' refs point to real jobs
needs = job.get("needs")
if needs:
needed = [needs] if isinstance(needs, str) else (needs if isinstance(needs, list) else [])
for dep in needed:
if dep not in job_ids:
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' depends on unknown job '{dep}'",
}
steps = job.get("steps", [])
if not isinstance(steps, list):
return {
"parse_success": False,
"execution_success": False,
"error": f"Job '{job_name}' steps must be a list",
}
# every step needs 'uses' or 'run'
for step in steps:
if not isinstance(step, dict):
continue
has_uses = "uses" in step
has_run = "run" in step
if not has_uses and not has_run:
step_name = step.get("name", "unnamed")
return {
"parse_success": False,
"execution_success": False,
"error": f"Every step must define a 'uses' or 'run' key. Step '{step_name}' has neither.",
}
# checkout must come before docker build
checkout_index = -1
build_index = -1
for idx, step in enumerate(steps):
if not isinstance(step, dict):
continue
uses = step.get("uses", "")
run_cmd = step.get("run", "")
if isinstance(uses, str) and "actions/checkout" in uses:
checkout_index = idx
if (isinstance(run_cmd, str) and "docker build" in run_cmd) or (
isinstance(uses, str) and "docker/build-push-action" in uses
):
build_index = idx
if build_index != -1 and (checkout_index == -1 or checkout_index > build_index):
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Checkout must happen before Docker build steps",
}
# cross-job artifact dependency: download needs 'needs'
# If a job uses download-artifact but doesn't declare needs on the upload job
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
uses_download = any(
isinstance(s, dict) and "actions/download-artifact" in str(s.get("uses", ""))
for s in steps
)
if uses_download:
needs = job.get("needs")
if not needs:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
f"Job '{job_name}' uses download-artifact but has no 'needs' dependency β "
"add 'needs' to ensure the upload job completes first"
),
}
# docker login needs secrets wired via env
if has_docker_login:
# Check if the login step has env block with secrets
login_has_env_secrets = has_username_secret and has_password_secret
if not login_has_env_secrets:
# Check if login uses $DOCKER_USERNAME (env var) without secret mapping
if "$DOCKER_USERNAME" in content and not has_username_secret:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Docker login secrets not wired β add env block with secrets.DOCKER_USERNAME and secrets.DOCKER_PASSWORD",
}
# push without login
if has_docker_push and not has_docker_login:
# Check if using docker/login-action instead
has_login_action = "docker/login-action" in content
if not has_login_action:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Docker push without login β add a docker login step before pushing",
}
# ghcr.io needs GITHUB_TOKEN not DOCKER_PASSWORD
if "docker login ghcr.io" in content:
if has_password_secret and not has_github_token_secret:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GHCR requires GITHUB_TOKEN for authentication, not DOCKER_PASSWORD",
}
# ghcr push needs packages:write permission
if "ghcr.io" in content and "docker push" in content:
# Check if permissions block has packages: write
if "packages: write" not in content and "packages:write" not in content:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GITHUB_TOKEN does not have packages:write permission β add permissions block",
}
# multi-platform needs buildx
if has_platforms and not has_buildx_setup:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "Multi-platform build requires docker/setup-buildx-action",
}
# GHA cache export needs mode=max
if "cache-to:" in content and "cache-from:" in content:
# Check for mode=max
if "cache-to: type=gha" in content and "mode=max" not in content:
return {
"parse_success": True,
"execution_success": False,
"exec_error": "GHA cache export needs mode=max for proper cache support",
}
# context vs dockerfile path mismatch
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
for step in job.get("steps", []):
if not isinstance(step, dict):
continue
with_block = step.get("with", {})
if not isinstance(with_block, dict):
continue
context = with_block.get("context")
file_path = with_block.get("file")
if context and file_path and isinstance(context, str) and isinstance(file_path, str):
# If context is a subdirectory but file is at root
if context not in {".", "./"} and not file_path.startswith(context):
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"Dockerfile path '{file_path}' does not match build context '{context}'",
}
# shell env var from secret but not mapped in env block
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
for step in job.get("steps", []):
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
env_block = step.get("env", {})
if not isinstance(env_block, dict):
env_block = {}
# Find env vars used in run that look like they should come from secrets
env_var_refs = re.findall(r'\$([A-Z][A-Z0-9_]+)', run_cmd)
for var in env_var_refs:
# Skip GitHub expression vars (they're in ${{ }})
if var in ("GITHUB_SHA", "GITHUB_REF", "GITHUB_ACTOR", "GITHUB_REPOSITORY"):
continue
# Common secret-backed env vars
if var in ("SLACK_WEBHOOK_URL", "DEPLOY_TOKEN", "NPM_TOKEN", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"):
if var not in env_block:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"{var} is empty β secret not available in shell environment. Map it via env block.",
}
# build-push-action without load:true when image is used locally after
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
build_push_idx = None
build_push_has_load = False
for idx, step in enumerate(steps):
if not isinstance(step, dict):
continue
uses = step.get("uses", "")
if isinstance(uses, str) and "docker/build-push-action" in uses:
build_push_idx = idx
with_block = step.get("with", {})
if isinstance(with_block, dict):
push_val = str(with_block.get("push", "")).lower()
load_val = str(with_block.get("load", "")).lower()
build_push_has_load = load_val == "true"
# Only flag if push is false (local use intended)
if push_val == "false" and not build_push_has_load:
# Check if a later step uses docker run
for later in steps[idx + 1:]:
if not isinstance(later, dict):
continue
run_cmd = later.get("run", "")
if isinstance(run_cmd, str) and "docker run" in run_cmd:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
"build-push-action with Buildx does not load images into local daemon by default β "
"add 'load: true' to make the image available for docker run"
),
}
# registry mismatch between build tag and push command
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
build_registry = None
for step in steps:
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
# Extract registry from docker build -t
build_match = re.search(r'docker build\s+.*-t\s+(\S+)', run_cmd)
if build_match:
tag = build_match.group(1)
if "ghcr.io" in tag:
build_registry = "ghcr.io"
elif "docker.io" in tag or "/" in tag:
# docker.io is default for user/image format
build_registry = tag.split("/")[0] if "." in tag.split("/")[0] else "docker.io"
push_match = re.search(r'docker push\s+(\S+)', run_cmd)
if push_match and build_registry:
push_tag = push_match.group(1)
if "ghcr.io" in push_tag:
push_registry = "ghcr.io"
elif "docker.io" in push_tag:
push_registry = "docker.io"
else:
push_registry = push_tag.split("/")[0] if "." in push_tag.split("/")[0] else "docker.io"
if build_registry != push_registry:
return {
"parse_success": True,
"execution_success": False,
"exec_error": (
f"Registry mismatch: image built with {build_registry} tag "
f"but push targets {push_registry}"
),
}
# docker tag referencing non-existent image tag
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
steps = job.get("steps", [])
if not isinstance(steps, list):
continue
built_tags = set()
for step in steps:
if not isinstance(step, dict):
continue
run_cmd = step.get("run", "")
if not isinstance(run_cmd, str):
continue
# Collect tags from docker build -t
for m in re.finditer(r'docker build\s+.*-t\s+(\S+)', run_cmd):
built_tags.add(m.group(1))
# Check docker tag source exists
tag_match = re.search(r'docker tag\s+(\S+)\s+(\S+)', run_cmd)
if tag_match:
source = tag_match.group(1)
# If source contains ${{ it's a template β compare the template expression
if source not in built_tags and "${{" not in source:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"No such image: {source} β docker tag source does not match any built image",
}
# Check if source uses a different tag template than what was built
if "${{" in source:
# Normalize: extract the expression
source_expr = re.search(r'\$\{\{(.+?)\}\}', source)
if source_expr:
source_key = source_expr.group(1).strip()
found_matching = False
for bt in built_tags:
bt_expr = re.search(r'\$\{\{(.+?)\}\}', bt)
if bt_expr and bt_expr.group(1).strip() == source_key:
found_matching = True
break
# Also check if the base image name matches
source_base = source.split(":")[0] if ":" in source else source
built_bases = {bt.split(":")[0] if ":" in bt else bt for bt in built_tags}
if not found_matching and source_base in built_bases:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"No such image: docker tag source tag does not match any built image tag",
}
# node version vs package.json engines
for job_name, job in jobs.items():
if not isinstance(job, dict):
continue
strategy = job.get("strategy", {})
if not isinstance(strategy, dict):
continue
matrix = strategy.get("matrix", {})
if not isinstance(matrix, dict):
continue
node_versions = matrix.get("node", [])
if isinstance(node_versions, list):
# Check package.json engines constraint
pkg = files.get("package.json")
if pkg:
engines_match = re.search(r'"node"\s*:\s*">=(\d+)"', pkg.content)
if engines_match:
min_version = int(engines_match.group(1))
for v in node_versions:
if isinstance(v, int) and v < min_version:
return {
"parse_success": True,
"execution_success": False,
"exec_error": f"Matrix job (node: {v}) failed: package.json requires Node >= {min_version}",
}
return {"parse_success": True, "execution_success": True}
|