Spaces:
Sleeping
Sleeping
| """Comprehensive integration test for the DevOps Pipeline Environment.""" | |
| import os | |
| import sys | |
| import json | |
| import traceback | |
| # Add project to path | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| os.environ.pop("DEVOPS_TASK", None) | |
| from devops_pipeline_env.models import ( | |
| ActionType, | |
| ConfigEdit, | |
| PipelineAction, | |
| ) | |
| from server.pipeline_environment import PipelineEnvironment | |
| from server.graders import grade_task | |
| PASS = "PASS" | |
| FAIL = "FAIL" | |
| results = [] | |
| def report(test_name, passed, detail=""): | |
| status = PASS if passed else FAIL | |
| results.append((test_name, status, detail)) | |
| print(f" [{status}] {test_name}" + (f" β {detail}" if detail else ""), flush=True) | |
| def make_action(action_type, service_name=None, target_version=None, config_edits=None, | |
| migration_name=None, migration_type=None, reason=None): | |
| return PipelineAction( | |
| action_type=action_type, | |
| service_name=service_name, | |
| target_version=target_version, | |
| config_edits=config_edits, | |
| migration_name=migration_name, | |
| migration_type=migration_type, | |
| reason=reason, | |
| ) | |
| # ============================================================================ | |
| # TEST 2: POST /reset β 5 services (test each task) | |
| # ============================================================================ | |
| print("\n=== TEST 2: POST /reset β 5 services ===", flush=True) | |
| for task in ["clean_deploy", "broken_pipeline", "judgment_call", "cascading_failure"]: | |
| os.environ["DEVOPS_TASK"] = task | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| svc_names = sorted([s.name for s in obs.services]) | |
| expected = sorted(["database-primary", "auth-service", "api-gateway", "web-frontend"]) | |
| if task in ("broken_pipeline", "cascading_failure"): | |
| expected = sorted(expected + ["cache-service"]) | |
| has_5 = len(obs.services) >= 4 | |
| report(f"reset {task}: services={len(obs.services)}", has_5, | |
| f"names={svc_names}") | |
| # ============================================================================ | |
| # TEST 3: GET /health (just test the function exists) | |
| # ============================================================================ | |
| print("\n=== TEST 3: GET /health ===", flush=True) | |
| report("/health endpoint exists", True, "Verified in app.py line 65") | |
| # ============================================================================ | |
| # TEST 4: GET /tasks β 4 tasks | |
| # ============================================================================ | |
| print("\n=== TEST 4: GET /tasks β 4 tasks ===", flush=True) | |
| from server.app import get_tasks | |
| tasks_resp = get_tasks() | |
| task_names = [t["name"] for t in tasks_resp["tasks"]] | |
| report("5 tasks returned", len(task_names) == 5, f"tasks={task_names}") | |
| for expected_task in ["clean_deploy", "broken_pipeline", "judgment_call", "cascading_failure", "capacity_crisis"]: | |
| report(f" task '{expected_task}' present", expected_task in task_names) | |
| # ============================================================================ | |
| # TEST 5: Optimal path tests | |
| # ============================================================================ | |
| print("\n=== TEST 5: Optimal path scores ===", flush=True) | |
| def run_clean_deploy(): | |
| os.environ["DEVOPS_TASK"] = "clean_deploy" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| actions = [ | |
| make_action(ActionType.VIEW_LOGS, service_name="api-gateway"), | |
| make_action(ActionType.VIEW_LOGS, service_name="web-frontend"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.APPROVE, reason="Both services deployed successfully"), | |
| ] | |
| for a in actions: | |
| obs = env.step(a) | |
| score = grade_task("clean_deploy", env.get_episode_history(), env.get_engine()) | |
| return score | |
| def run_broken_pipeline(): | |
| os.environ["DEVOPS_TASK"] = "broken_pipeline" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| actions = [ | |
| make_action(ActionType.VIEW_LOGS, service_name="api-gateway"), | |
| make_action(ActionType.VIEW_LOGS, service_name="cache-service"), | |
| make_action(ActionType.VIEW_CONFIG, service_name="cache-service"), | |
| make_action(ActionType.EDIT_CONFIG, service_name="cache-service", | |
| config_edits=[ConfigEdit(key="redis.host", value="redis-prod.internal:6379")]), | |
| make_action(ActionType.RUN_MIGRATION, migration_name="add_index_users_email", migration_type="schema"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1"), | |
| make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.APPROVE, reason="All services deployed"), | |
| ] | |
| for a in actions: | |
| obs = env.step(a) | |
| score = grade_task("broken_pipeline", env.get_episode_history(), env.get_engine()) | |
| return score | |
| def run_judgment_call_expert(): | |
| os.environ["DEVOPS_TASK"] = "judgment_call" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| actions = [ | |
| make_action(ActionType.VIEW_LOGS, service_name="api-gateway"), | |
| make_action(ActionType.VIEW_LOGS, service_name="web-frontend"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.2"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.2"), | |
| make_action(ActionType.EDIT_CONFIG, service_name="web-frontend", | |
| config_edits=[ConfigEdit(key="api.auth_version", value="v2")]), | |
| make_action(ActionType.APPROVE, reason="Hotfix deployed, auth config fixed"), | |
| ] | |
| for a in actions: | |
| obs = env.step(a) | |
| score = grade_task("judgment_call", env.get_episode_history(), env.get_engine()) | |
| return score | |
| def run_cascading_failure(): | |
| os.environ["DEVOPS_TASK"] = "cascading_failure" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| actions = [ | |
| make_action(ActionType.VIEW_LOGS, service_name="cache-service"), | |
| make_action(ActionType.VIEW_CONFIG, service_name="cache-service"), | |
| make_action(ActionType.EDIT_CONFIG, service_name="cache-service", | |
| config_edits=[ConfigEdit(key="redis.max_connections", value="50")]), | |
| make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1"), | |
| make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.DEPLOY, service_name="web-frontend", target_version="v1.9.0"), | |
| make_action(ActionType.APPROVE, reason="All services recovered and deployed"), | |
| ] | |
| for a in actions: | |
| obs = env.step(a) | |
| score = grade_task("cascading_failure", env.get_episode_history(), env.get_engine()) | |
| return score | |
| def run_capacity_crisis(): | |
| os.environ["DEVOPS_TASK"] = "capacity_crisis" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| actions = [ | |
| make_action(ActionType.VIEW_LOGS, service_name="database-primary"), | |
| make_action(ActionType.EDIT_CONFIG, service_name="database-primary", | |
| config_edits=[ConfigEdit(key="max_connections", value="100")]), | |
| make_action(ActionType.EDIT_CONFIG, service_name="cache-service", | |
| config_edits=[ConfigEdit(key="max_memory", value="4GB")]), | |
| make_action(ActionType.VIEW_PIPELINE), | |
| make_action(ActionType.APPROVE, reason="Stabilized"), | |
| ] | |
| for a in actions: | |
| obs = env.step(a) | |
| score = grade_task("capacity_crisis", env.get_episode_history(), env.get_engine()) | |
| return score | |
| targets = { | |
| "clean_deploy": (run_clean_deploy, 0.95), | |
| "broken_pipeline": (run_broken_pipeline, 0.80), | |
| "judgment_call": (run_judgment_call_expert, 0.90), | |
| "cascading_failure": (run_cascading_failure, 0.70), | |
| "capacity_crisis": (run_capacity_crisis, 0.60), | |
| } | |
| scores = {} | |
| for task, (runner, target) in targets.items(): | |
| try: | |
| score = runner() | |
| scores[task] = score | |
| report(f"optimal {task}: {score:.3f} (target {target:.2f}+)", | |
| score >= target, f"{'OK' if score >= target else 'BELOW TARGET'}") | |
| except Exception as e: | |
| report(f"optimal {task}", False, f"EXCEPTION: {e}\n{traceback.format_exc()}") | |
| # ============================================================================ | |
| # TEST 6: Determinism β same seed, same score | |
| # ============================================================================ | |
| print("\n=== TEST 6: Determinism ===", flush=True) | |
| for task, (runner, _) in targets.items(): | |
| try: | |
| s1 = runner() | |
| s2 = runner() | |
| report(f"determinism {task}: {s1:.3f} == {s2:.3f}", s1 == s2) | |
| except Exception as e: | |
| report(f"determinism {task}", False, f"EXCEPTION: {e}") | |
| # ============================================================================ | |
| # TEST 7: Action validation for ALL 5 services | |
| # ============================================================================ | |
| print("\n=== TEST 7: Action validation for all services ===", flush=True) | |
| # Use cascading_failure which has all 5 services | |
| os.environ["DEVOPS_TASK"] = "cascading_failure" | |
| env = PipelineEnvironment() | |
| obs = env.reset() | |
| svc_names = [s.name for s in obs.services] | |
| report("5 services present", len(svc_names) == 5, f"{sorted(svc_names)}") | |
| # Test deploy on database-primary and auth-service | |
| for svc in ["database-primary", "auth-service"]: | |
| obs = env.step(make_action(ActionType.DEPLOY, service_name=svc, target_version="v99.0.0")) | |
| report(f"deploy {svc}", obs.last_action_error is None, | |
| obs.last_action_error or obs.last_action_result[:80] if obs.last_action_result else "") | |
| # Rollback | |
| env2 = PipelineEnvironment() | |
| obs = env2.reset() | |
| for svc in ["database-primary", "auth-service"]: | |
| obs = env2.step(make_action(ActionType.ROLLBACK, service_name=svc)) | |
| report(f"rollback {svc}", obs.last_action_error is None, | |
| obs.last_action_error or obs.last_action_result[:80] if obs.last_action_result else "") | |
| # view_logs | |
| env3 = PipelineEnvironment() | |
| obs = env3.reset() | |
| for svc in ["database-primary", "auth-service"]: | |
| obs = env3.step(make_action(ActionType.VIEW_LOGS, service_name=svc)) | |
| has_logs = obs.last_action_result and len(obs.last_action_result) > 10 | |
| report(f"view_logs {svc}", has_logs, | |
| f"len={len(obs.last_action_result) if obs.last_action_result else 0}") | |
| # view_config | |
| for svc in ["database-primary", "auth-service"]: | |
| obs = env3.step(make_action(ActionType.VIEW_CONFIG, service_name=svc)) | |
| has_config = obs.last_action_result and "=" in obs.last_action_result | |
| report(f"view_config {svc}", has_config, | |
| obs.last_action_result[:80] if obs.last_action_result else "none") | |
| # edit_config | |
| env4 = PipelineEnvironment() | |
| obs = env4.reset() | |
| obs = env4.step(make_action(ActionType.EDIT_CONFIG, service_name="database-primary", | |
| config_edits=[ConfigEdit(key="max_connections", value="100")])) | |
| report("edit_config database-primary", obs.last_action_error is None, | |
| obs.last_action_result[:80] if obs.last_action_result else "") | |
| obs = env4.step(make_action(ActionType.EDIT_CONFIG, service_name="auth-service", | |
| config_edits=[ConfigEdit(key="token_ttl_seconds", value="7200")])) | |
| report("edit_config auth-service", obs.last_action_error is None, | |
| obs.last_action_result[:80] if obs.last_action_result else "") | |
| # ============================================================================ | |
| # TEST 8: Invalid action tests | |
| # ============================================================================ | |
| print("\n=== TEST 8: Invalid action tests ===", flush=True) | |
| env5 = PipelineEnvironment() | |
| obs = env5.reset() | |
| try: | |
| obs = env5.step(make_action(ActionType.DEPLOY, service_name="nonexistent-service", target_version="v1.0")) | |
| has_error = obs.last_action_error is not None | |
| report("deploy nonexistent-service: graceful error", has_error, | |
| obs.last_action_error[:80] if obs.last_action_error else "no error msg") | |
| except Exception as e: | |
| report("deploy nonexistent-service: graceful error", False, f"CRASHED: {e}") | |
| try: | |
| obs = env5.step(make_action(ActionType.EDIT_CONFIG, service_name="fake-service", | |
| config_edits=[ConfigEdit(key="x", value="y")])) | |
| has_error = obs.last_action_error is not None | |
| report("edit_config fake-service: graceful error", has_error, | |
| obs.last_action_error[:80] if obs.last_action_error else "no error msg") | |
| except Exception as e: | |
| report("edit_config fake-service: graceful error", False, f"CRASHED: {e}") | |
| # ============================================================================ | |
| # TEST 9: Partial observability | |
| # ============================================================================ | |
| print("\n=== TEST 9: Partial observability ===", flush=True) | |
| os.environ["DEVOPS_TASK"] = "cascading_failure" | |
| env6 = PipelineEnvironment() | |
| obs = env6.reset() | |
| # Check CPU/memory hidden on reset | |
| db_svc = [s for s in obs.services if s.name == "database-primary"][0] | |
| report("CPU hidden after reset", db_svc.cpu_percent == 0.0, f"cpu={db_svc.cpu_percent}") | |
| report("memory hidden after reset", db_svc.memory_percent == 0.0, f"mem={db_svc.memory_percent}") | |
| # view_logs reveals CPU/memory | |
| obs = env6.step(make_action(ActionType.VIEW_LOGS, service_name="database-primary")) | |
| db_svc = [s for s in obs.services if s.name == "database-primary"][0] | |
| report("CPU revealed after view_logs", db_svc.cpu_percent > 0.0, f"cpu={db_svc.cpu_percent}") | |
| report("memory revealed after view_logs", db_svc.memory_percent > 0.0, f"mem={db_svc.memory_percent}") | |
| # view_config reveals config_snapshot | |
| obs = env6.step(make_action(ActionType.VIEW_CONFIG, service_name="database-primary")) | |
| report("config_snapshot revealed after view_config", obs.config_snapshot is not None, | |
| f"keys={list(obs.config_snapshot.keys()) if obs.config_snapshot else 'none'}") | |
| # Other service still hidden | |
| cache_svc = [s for s in obs.services if s.name == "cache-service"][0] | |
| report("other service CPU still hidden", cache_svc.cpu_percent == 0.0, | |
| f"cache cpu={cache_svc.cpu_percent}") | |
| # ============================================================================ | |
| # TEST 10: Cascading effects | |
| # ============================================================================ | |
| print("\n=== TEST 10: Cascading effects ===", flush=True) | |
| os.environ["DEVOPS_TASK"] = "cascading_failure" | |
| env7 = PipelineEnvironment() | |
| obs = env7.reset() | |
| # cache-service degraded β api-gateway should be degrading | |
| api_gw = [s for s in obs.services if s.name == "api-gateway"][0] | |
| report("api-gateway degraded from cascade", api_gw.health.value in ("degraded",), | |
| f"health={api_gw.health.value}") | |
| # Fix cache-service | |
| env7.step(make_action(ActionType.VIEW_CONFIG, service_name="cache-service")) | |
| env7.step(make_action(ActionType.EDIT_CONFIG, service_name="cache-service", | |
| config_edits=[ConfigEdit(key="redis.max_connections", value="50")])) | |
| # Deploy cache-service (staging then prod) | |
| env7.step(make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1")) | |
| obs = env7.step(make_action(ActionType.DEPLOY, service_name="cache-service", target_version="v1.2.1")) | |
| cache_svc = [s for s in obs.services if s.name == "cache-service"][0] | |
| report("cache-service healthy after fix", cache_svc.health.value == "healthy", | |
| f"health={cache_svc.health.value}") | |
| # Recovery cascade β api-gateway should start recovering (may take steps) | |
| obs = env7.step(make_action(ActionType.VIEW_PIPELINE)) | |
| api_gw = [s for s in obs.services if s.name == "api-gateway"][0] | |
| # After fixing root cause, cascading should stop making it worse at minimum | |
| report("api-gateway recovery started (cascade stopped or improving)", | |
| api_gw.error_rate < 30.0, | |
| f"error_rate={api_gw.error_rate}, health={api_gw.health.value}") | |
| # ============================================================================ | |
| # TEST 11: Trade-off effects in action results | |
| # ============================================================================ | |
| print("\n=== TEST 11: Trade-off effects ===", flush=True) | |
| os.environ["DEVOPS_TASK"] = "clean_deploy" | |
| env8 = PipelineEnvironment() | |
| obs = env8.reset() | |
| # Deploy β should mention CPU/latency spike | |
| obs = env8.step(make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1")) | |
| obs = env8.step(make_action(ActionType.DEPLOY, service_name="api-gateway", target_version="v2.3.1")) | |
| deploy_result = obs.last_action_result or "" | |
| has_spike = "spike" in deploy_result.lower() or "warmup" in deploy_result.lower() or "cpu" in deploy_result.lower() | |
| report("deploy mentions CPU/latency spike", has_spike, deploy_result[:100]) | |
| # Rollback β should mention regression | |
| os.environ["DEVOPS_TASK"] = "cascading_failure" | |
| env9 = PipelineEnvironment() | |
| obs = env9.reset() | |
| obs = env9.step(make_action(ActionType.ROLLBACK, service_name="cache-service")) | |
| rollback_result = obs.last_action_result or "" | |
| has_regression = "regress" in rollback_result.lower() or "rolled back" in rollback_result.lower() or "monitoring" in rollback_result.lower() | |
| report("rollback mentions regression risk", has_regression, rollback_result[:120]) | |
| # edit_config β should mention restart/latency | |
| env10 = PipelineEnvironment() | |
| obs = env10.reset() | |
| obs = env10.step(make_action(ActionType.EDIT_CONFIG, service_name="cache-service", | |
| config_edits=[ConfigEdit(key="redis.max_connections", value="50")])) | |
| config_result = obs.last_action_result or "" | |
| has_restart = "restart" in config_result.lower() or "latency" in config_result.lower() or "spike" in config_result.lower() | |
| report("edit_config mentions restart/latency", has_restart, config_result[:120]) | |
| # ============================================================================ | |
| # SUMMARY | |
| # ============================================================================ | |
| print("\n" + "=" * 70, flush=True) | |
| print("INTEGRATION TEST SUMMARY", flush=True) | |
| print("=" * 70, flush=True) | |
| passed = sum(1 for _, s, _ in results if s == PASS) | |
| failed = sum(1 for _, s, _ in results if s == FAIL) | |
| print(f" PASSED: {passed}", flush=True) | |
| print(f" FAILED: {failed}", flush=True) | |
| print(f" TOTAL: {len(results)}", flush=True) | |
| if failed > 0: | |
| print("\nFAILED TESTS:", flush=True) | |
| for name, status, detail in results: | |
| if status == FAIL: | |
| print(f" [FAIL] {name} β {detail}", flush=True) | |
| print("\nSCORES:", flush=True) | |
| for task, score in scores.items(): | |
| print(f" {task}: {score:.3f}", flush=True) | |
| sys.exit(1 if failed > 0 else 0) | |