| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from pathlib import Path |
|
|
| from .config import RuntimeConfig |
| from .evaluation import build_submission_summary |
| from .orchestrator import OrchestratorSettings, RemediationOrchestrator |
| from .schemas import IngestRequest |
| from .scanners import detect_iac_type |
|
|
|
|
| def cmd_remediate(args: argparse.Namespace) -> int: |
| path = Path(args.file) |
| content = path.read_text() |
| iac_type = args.iac_type or detect_iac_type(path.name, content) |
| config = RuntimeConfig.from_env() |
| settings = OrchestratorSettings( |
| max_retries=args.max_retries, |
| auto_approve_validated=args.auto_approve, |
| ) |
| orchestrator = RemediationOrchestrator(config=config, settings=settings) |
| decision = orchestrator.remediate( |
| IngestRequest( |
| file_name=path.name, |
| file_content=content, |
| iac_type=iac_type, |
| user_intent=args.intent, |
| ) |
| ) |
| print(json.dumps(decision.model_dump(mode="json"), indent=2)) |
| return 0 |
|
|
|
|
| def cmd_summarize_results(args: argparse.Namespace) -> int: |
| summary = build_submission_summary(Path(args.eval_dir)) |
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| def cmd_self_test(_: argparse.Namespace) -> int: |
| request = IngestRequest( |
| file_name="example.tf", |
| file_content='resource "aws_s3_bucket" "data" { bucket = "demo" }\n', |
| iac_type="terraform", |
| user_intent="Fix detected IaC security findings.", |
| ) |
| assert request.iac_type == "terraform" |
| print("self-test passed: schemas and imports are available") |
| return 0 |
|
|
|
|
| def build_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser(prog="iac-secfix-agents") |
| subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
| remediate = subparsers.add_parser("remediate", help="Run the multi-agent remediation loop for one file.") |
| remediate.add_argument("file") |
| remediate.add_argument("--iac-type", choices=["terraform", "kubernetes", "dockerfile"]) |
| remediate.add_argument("--intent", default="Fix the detected IaC security findings.") |
| remediate.add_argument("--max-retries", type=int, default=3) |
| remediate.add_argument("--auto-approve", action="store_true") |
| remediate.set_defaults(func=cmd_remediate) |
|
|
| summary = subparsers.add_parser("summarize-results", help="Summarize notebook evaluation artifacts.") |
| summary.add_argument("--eval-dir", default="../eval") |
| summary.set_defaults(func=cmd_summarize_results) |
|
|
| self_test = subparsers.add_parser("self-test", help="Run a lightweight import/schema check.") |
| self_test.set_defaults(func=cmd_self_test) |
|
|
| return parser |
|
|
|
|
| def main() -> int: |
| parser = build_parser() |
| args = parser.parse_args() |
| return args.func(args) |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|