| """Integration test for code sandbox client""" |
|
|
| import pytest |
|
|
| import src.schemas.sandbox_models as dm |
| from src.tools.code_sandbox.async_sandbox_client import AsyncSandboxClient |
|
|
| |
| |
| SAMPLE_SESSION_ID = "test_session_id" |
| SAMPLE_FILE_PATH = "tests/integration_tests/sample_file.txt" |
| SAMPLE_FILE_NAME = "sample_file.txt" |
| SAMPLE_IMAGE_PATH = "tests/integration_tests/sample_image.jpg" |
| SAMPLE_IMAGE_NAME = "sample_image.jpg" |
|
|
|
|
| def test_run_code_success(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
|
|
| result = client.run_code("print('Hello World!')", timeout=10) |
| assert isinstance(result, dm.RunCodeOutput) |
| assert result.data is not None |
| assert result.data.is_partial == False |
| assert result.data.result is not None |
| code_output = result.data.result.code_output_result |
| assert len(code_output) == 1 |
| assert code_output[0].__type == "stdout" |
| assert code_output[0].content == "Hello World!\n" |
|
|
|
|
| def test_run_code_timeout(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
|
|
| result = client.run_code("while True:\n pass", timeout=1) |
| assert isinstance(result, dm.RunCodeOutput) |
| assert result.data.is_partial == True |
| assert result.data.result is not None |
| assert result.message == "the code doesn't finish in timeout value 1" |
|
|
|
|
| def test_run_code_error(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
|
|
| result = client.run_code("print('hello', error='abc')", timeout=10) |
| assert isinstance(result, dm.RunCodeOutput) |
| assert result.data is not None |
| assert result.data.is_partial == False |
| assert result.data.result is not None |
| code_output = result.data.result.code_output_result |
| assert len(code_output) == 1 |
| assert code_output[0].__type == "stderr" |
|
|
|
|
| def test_upload_download_text_file(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| result = client.upload_file(SAMPLE_FILE_PATH, SAMPLE_FILE_NAME) |
| assert isinstance(result, dm.UploadOutput) |
| assert result.message == "succeed" |
| assert result.data == f"/mnt/{SAMPLE_FILE_NAME}" |
|
|
| result = client.download_file(SAMPLE_FILE_NAME) |
| assert isinstance(result, dm.DownloadSuccessOutput) |
| with open(SAMPLE_FILE_PATH, "r") as f: |
| f_content = f.read() |
| assert result.content == f_content |
|
|
|
|
| def test_upload_download_image_file(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| result = client.upload_file(SAMPLE_IMAGE_PATH, SAMPLE_IMAGE_NAME) |
| assert isinstance(result, dm.UploadOutput) |
| assert result.message == "succeed" |
| assert result.data == f"/mnt/{SAMPLE_IMAGE_NAME}" |
|
|
| result = client.download_file(SAMPLE_IMAGE_NAME) |
| assert isinstance(result, dm.DownloadSuccessOutput) |
| with open(SAMPLE_IMAGE_PATH, "rb") as f: |
| f_content = f.read() |
| assert result.content.encode() == f_content |
|
|
|
|
| def test_download_nonexist_file(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| result = client.download_file("this_file_should_not_exist") |
|
|
| assert isinstance(result, dm.ErrorResponse) |
|
|
|
|
| def test_heartbeat(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| result = client.heartbeat() |
|
|
| assert isinstance(result, (dm.HeartbeatOutput, dm.ErrorResponse)) |
|
|
|
|
| def test_refresh_sandbox(): |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| result = client.refresh_sandbox() |
|
|
| assert isinstance(result, (dm.RefreshSandboxOutput, dm.ErrorResponse)) |
|
|
|
|
| |
| @pytest.fixture(scope='session', autouse=True) |
| def refresh_sandbox_after_all_tests(): |
| yield |
| client = AsyncSandboxClient(SAMPLE_SESSION_ID) |
| client.refresh_sandbox() |