| def test_create_list_update_delete_task(api_client, api_context, sample_task_payload): | |
| response = api_client.post("/api/tasks/", json=sample_task_payload) | |
| assert response.status_code == 200 | |
| created = response.json()["task"] | |
| assert created["task_name"] == sample_task_payload["task_name"] | |
| response = api_client.get("/api/tasks") | |
| assert response.status_code == 200 | |
| tasks = response.json() | |
| assert len(tasks) == 1 | |
| assert tasks[0]["keyword"] == sample_task_payload["keyword"] | |
| response = api_client.patch("/api/tasks/0", json={"enabled": False}) | |
| assert response.status_code == 200 | |
| updated = response.json()["task"] | |
| assert updated["enabled"] is False | |
| response = api_client.delete("/api/tasks/0") | |
| assert response.status_code == 200 | |
| response = api_client.get("/api/tasks") | |
| assert response.status_code == 200 | |
| assert response.json() == [] | |
| def test_start_stop_task_updates_status(api_client, api_context, sample_task_payload): | |
| response = api_client.post("/api/tasks/", json=sample_task_payload) | |
| assert response.status_code == 200 | |
| response = api_client.post("/api/tasks/start/0") | |
| assert response.status_code == 200 | |
| response = api_client.get("/api/tasks/0") | |
| assert response.status_code == 200 | |
| assert response.json()["is_running"] is True | |
| response = api_client.post("/api/tasks/stop/0") | |
| assert response.status_code == 200 | |
| response = api_client.get("/api/tasks/0") | |
| assert response.status_code == 200 | |
| assert response.json()["is_running"] is False | |
| process_service = api_context["process_service"] | |
| assert process_service.started == [(0, sample_task_payload["task_name"])] | |
| assert process_service.stopped == [0] | |