Spaces:
Sleeping
Sleeping
| import os | |
| from datetime import datetime | |
| import pytest | |
| from swarms.structs.base_structure import BaseStructure | |
| class TestBaseStructure: | |
| def test_init(self): | |
| base_structure = BaseStructure( | |
| name="TestStructure", | |
| description="Test description", | |
| save_metadata=True, | |
| save_artifact_path="./test_artifacts", | |
| save_metadata_path="./test_metadata", | |
| save_error_path="./test_errors", | |
| ) | |
| assert base_structure.name == "TestStructure" | |
| assert base_structure.description == "Test description" | |
| assert base_structure.save_metadata is True | |
| assert base_structure.save_artifact_path == "./test_artifacts" | |
| assert base_structure.save_metadata_path == "./test_metadata" | |
| assert base_structure.save_error_path == "./test_errors" | |
| def test_save_to_file_and_load_from_file(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| file_path = os.path.join(tmp_dir, "test_file.json") | |
| data_to_save = {"key": "value"} | |
| base_structure = BaseStructure() | |
| base_structure.save_to_file(data_to_save, file_path) | |
| loaded_data = base_structure.load_from_file(file_path) | |
| assert loaded_data == data_to_save | |
| def test_save_metadata_and_load_metadata(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_metadata_path=tmp_dir) | |
| metadata = {"name": "Test", "description": "Test metadata"} | |
| base_structure.save_metadata(metadata) | |
| loaded_metadata = base_structure.load_metadata() | |
| assert loaded_metadata == metadata | |
| def test_log_error(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_error_path=tmp_dir) | |
| error_message = "Test error message" | |
| base_structure.log_error(error_message) | |
| log_file = os.path.join(tmp_dir, "TestStructure_errors.log") | |
| with open(log_file) as file: | |
| lines = file.readlines() | |
| assert len(lines) == 1 | |
| assert lines[0] == f"{error_message}\n" | |
| def test_save_artifact_and_load_artifact(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_artifact_path=tmp_dir) | |
| artifact = {"key": "value"} | |
| artifact_name = "test_artifact" | |
| base_structure.save_artifact(artifact, artifact_name) | |
| loaded_artifact = base_structure.load_artifact(artifact_name) | |
| assert loaded_artifact == artifact | |
| def test_current_timestamp(self): | |
| base_structure = BaseStructure() | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| timestamp = base_structure._current_timestamp() | |
| assert timestamp == current_time | |
| def test_log_event(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_metadata_path=tmp_dir) | |
| event = "Test event" | |
| event_type = "INFO" | |
| base_structure.log_event(event, event_type) | |
| log_file = os.path.join(tmp_dir, "TestStructure_events.log") | |
| with open(log_file) as file: | |
| lines = file.readlines() | |
| assert len(lines) == 1 | |
| assert ( | |
| lines[0] == f"[{base_structure._current_timestamp()}]" | |
| f" [{event_type}] {event}\n" | |
| ) | |
| async def test_run_async(self): | |
| base_structure = BaseStructure() | |
| async def async_function(): | |
| return "Async Test Result" | |
| result = await base_structure.run_async(async_function) | |
| assert result == "Async Test Result" | |
| async def test_save_metadata_async(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_metadata_path=tmp_dir) | |
| metadata = {"name": "Test", "description": "Test metadata"} | |
| await base_structure.save_metadata_async(metadata) | |
| loaded_metadata = base_structure.load_metadata() | |
| assert loaded_metadata == metadata | |
| async def test_log_error_async(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_error_path=tmp_dir) | |
| error_message = "Test error message" | |
| await base_structure.log_error_async(error_message) | |
| log_file = os.path.join(tmp_dir, "TestStructure_errors.log") | |
| with open(log_file) as file: | |
| lines = file.readlines() | |
| assert len(lines) == 1 | |
| assert lines[0] == f"{error_message}\n" | |
| async def test_save_artifact_async(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_artifact_path=tmp_dir) | |
| artifact = {"key": "value"} | |
| artifact_name = "test_artifact" | |
| await base_structure.save_artifact_async( | |
| artifact, artifact_name | |
| ) | |
| loaded_artifact = base_structure.load_artifact(artifact_name) | |
| assert loaded_artifact == artifact | |
| async def test_load_artifact_async(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_artifact_path=tmp_dir) | |
| artifact = {"key": "value"} | |
| artifact_name = "test_artifact" | |
| base_structure.save_artifact(artifact, artifact_name) | |
| loaded_artifact = await base_structure.load_artifact_async( | |
| artifact_name | |
| ) | |
| assert loaded_artifact == artifact | |
| async def test_log_event_async(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure(save_metadata_path=tmp_dir) | |
| event = "Test event" | |
| event_type = "INFO" | |
| await base_structure.log_event_async(event, event_type) | |
| log_file = os.path.join(tmp_dir, "TestStructure_events.log") | |
| with open(log_file) as file: | |
| lines = file.readlines() | |
| assert len(lines) == 1 | |
| assert ( | |
| lines[0] == f"[{base_structure._current_timestamp()}]" | |
| f" [{event_type}] {event}\n" | |
| ) | |
| async def test_asave_to_file(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| file_path = os.path.join(tmp_dir, "test_file.json") | |
| data_to_save = {"key": "value"} | |
| base_structure = BaseStructure() | |
| await base_structure.asave_to_file(data_to_save, file_path) | |
| loaded_data = base_structure.load_from_file(file_path) | |
| assert loaded_data == data_to_save | |
| async def test_aload_from_file(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| file_path = os.path.join(tmp_dir, "test_file.json") | |
| data_to_save = {"key": "value"} | |
| base_structure = BaseStructure() | |
| base_structure.save_to_file(data_to_save, file_path) | |
| loaded_data = await base_structure.aload_from_file(file_path) | |
| assert loaded_data == data_to_save | |
| def test_run_in_thread(self): | |
| base_structure = BaseStructure() | |
| result = base_structure.run_in_thread( | |
| lambda: "Thread Test Result" | |
| ) | |
| assert result.result() == "Thread Test Result" | |
| def test_save_and_decompress_data(self): | |
| base_structure = BaseStructure() | |
| data = {"key": "value"} | |
| compressed_data = base_structure.compress_data(data) | |
| decompressed_data = base_structure.decompres_data( | |
| compressed_data | |
| ) | |
| assert decompressed_data == data | |
| def test_run_batched(self): | |
| base_structure = BaseStructure() | |
| def run_function(data): | |
| return f"Processed {data}" | |
| batched_data = list(range(10)) | |
| result = base_structure.run_batched( | |
| batched_data, batch_size=5, func=run_function | |
| ) | |
| expected_result = [ | |
| f"Processed {data}" for data in batched_data | |
| ] | |
| assert result == expected_result | |
| def test_load_config(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| config_file = os.path.join(tmp_dir, "config.json") | |
| config_data = {"key": "value"} | |
| base_structure = BaseStructure() | |
| base_structure.save_to_file(config_data, config_file) | |
| loaded_config = base_structure.load_config(config_file) | |
| assert loaded_config == config_data | |
| def test_backup_data(self, tmpdir): | |
| tmp_dir = tmpdir.mkdir("test_dir") | |
| base_structure = BaseStructure() | |
| data_to_backup = {"key": "value"} | |
| base_structure.backup_data( | |
| data_to_backup, backup_path=tmp_dir | |
| ) | |
| backup_files = os.listdir(tmp_dir) | |
| assert len(backup_files) == 1 | |
| loaded_data = base_structure.load_from_file( | |
| os.path.join(tmp_dir, backup_files[0]) | |
| ) | |
| assert loaded_data == data_to_backup | |
| def test_monitor_resources(self): | |
| base_structure = BaseStructure() | |
| base_structure.monitor_resources() | |
| def test_run_with_resources(self): | |
| base_structure = BaseStructure() | |
| def run_function(): | |
| base_structure.monitor_resources() | |
| return "Resource Test Result" | |
| result = base_structure.run_with_resources(run_function) | |
| assert result == "Resource Test Result" | |
| def test_run_with_resources_batched(self): | |
| base_structure = BaseStructure() | |
| def run_function(data): | |
| base_structure.monitor_resources() | |
| return f"Processed {data}" | |
| batched_data = list(range(10)) | |
| result = base_structure.run_with_resources_batched( | |
| batched_data, batch_size=5, func=run_function | |
| ) | |
| expected_result = [ | |
| f"Processed {data}" for data in batched_data | |
| ] | |
| assert result == expected_result | |