goofish / tests /integration /test_api_tasks.py
host1syan's picture
Upload 212 files
5378afe verified
def test_create_list_update_delete_task(api_client, api_context, sample_task_payload):
response = api_client.post("/api/tasks/", json=sample_task_payload)
assert response.status_code == 200
created = response.json()["task"]
assert created["task_name"] == sample_task_payload["task_name"]
response = api_client.get("/api/tasks")
assert response.status_code == 200
tasks = response.json()
assert len(tasks) == 1
assert tasks[0]["keyword"] == sample_task_payload["keyword"]
response = api_client.patch("/api/tasks/0", json={"enabled": False})
assert response.status_code == 200
updated = response.json()["task"]
assert updated["enabled"] is False
response = api_client.delete("/api/tasks/0")
assert response.status_code == 200
response = api_client.get("/api/tasks")
assert response.status_code == 200
assert response.json() == []
def test_start_stop_task_updates_status(api_client, api_context, sample_task_payload):
response = api_client.post("/api/tasks/", json=sample_task_payload)
assert response.status_code == 200
response = api_client.post("/api/tasks/start/0")
assert response.status_code == 200
response = api_client.get("/api/tasks/0")
assert response.status_code == 200
assert response.json()["is_running"] is True
response = api_client.post("/api/tasks/stop/0")
assert response.status_code == 200
response = api_client.get("/api/tasks/0")
assert response.status_code == 200
assert response.json()["is_running"] is False
process_service = api_context["process_service"]
assert process_service.started == [(0, sample_task_payload["task_name"])]
assert process_service.stopped == [0]