| """Unit tests for storage service. |
| |
| This module tests the StorageService class to ensure proper JSON file |
| persistence, error handling, and data integrity. |
| |
| Requirements: 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7 |
| """ |
|
|
| import json |
| import pytest |
| import tempfile |
| import shutil |
| from pathlib import Path |
| from datetime import datetime |
|
|
| from app.storage import StorageService, StorageError |
| from app.models import ( |
| RecordData, |
| ParsedData, |
| MoodData, |
| InspirationData, |
| TodoData |
| ) |
|
|
|
|
| @pytest.fixture |
| def temp_data_dir(): |
| """Create a temporary directory for test data.""" |
| temp_dir = tempfile.mkdtemp() |
| yield temp_dir |
| shutil.rmtree(temp_dir) |
|
|
|
|
| @pytest.fixture |
| def storage_service(temp_data_dir): |
| """Create a StorageService instance with temporary directory.""" |
| return StorageService(temp_data_dir) |
|
|
|
|
| class TestStorageServiceInitialization: |
| """Tests for StorageService initialization.""" |
| |
| def test_init_creates_data_directory(self, temp_data_dir): |
| """Test that initialization creates the data directory if it doesn't exist.""" |
| |
| shutil.rmtree(temp_data_dir) |
| assert not Path(temp_data_dir).exists() |
| |
| |
| service = StorageService(temp_data_dir) |
| |
| |
| assert Path(temp_data_dir).exists() |
| assert Path(temp_data_dir).is_dir() |
| |
| def test_init_sets_file_paths(self, storage_service, temp_data_dir): |
| """Test that initialization sets correct file paths.""" |
| assert storage_service.records_file == Path(temp_data_dir) / "records.json" |
| assert storage_service.moods_file == Path(temp_data_dir) / "moods.json" |
| assert storage_service.inspirations_file == Path(temp_data_dir) / "inspirations.json" |
| assert storage_service.todos_file == Path(temp_data_dir) / "todos.json" |
|
|
|
|
| class TestFileInitialization: |
| """Tests for file initialization logic. |
| |
| Requirements: 7.5 |
| """ |
| |
| def test_ensure_file_exists_creates_new_file(self, storage_service): |
| """Test that _ensure_file_exists creates a new file with empty array.""" |
| test_file = storage_service.data_dir / "test.json" |
| assert not test_file.exists() |
| |
| storage_service._ensure_file_exists(test_file) |
| |
| assert test_file.exists() |
| with open(test_file, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| assert data == [] |
| |
| def test_ensure_file_exists_preserves_existing_file(self, storage_service): |
| """Test that _ensure_file_exists doesn't overwrite existing files.""" |
| test_file = storage_service.data_dir / "test.json" |
| existing_data = [{"key": "value"}] |
| |
| with open(test_file, 'w', encoding='utf-8') as f: |
| json.dump(existing_data, f) |
| |
| storage_service._ensure_file_exists(test_file) |
| |
| with open(test_file, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| assert data == existing_data |
|
|
|
|
| class TestSaveRecord: |
| """Tests for save_record method. |
| |
| Requirements: 7.1, 7.7 |
| """ |
| |
| def test_save_record_creates_file_if_not_exists(self, storage_service): |
| """Test that save_record creates records.json if it doesn't exist.""" |
| assert not storage_service.records_file.exists() |
| |
| record = RecordData( |
| record_id="test-id", |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="测试文本", |
| parsed_data=ParsedData() |
| ) |
| |
| storage_service.save_record(record) |
| |
| assert storage_service.records_file.exists() |
| |
| def test_save_record_generates_uuid_if_not_set(self, storage_service): |
| """Test that save_record generates a UUID if record_id is not set.""" |
| record = RecordData( |
| record_id="", |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="测试文本", |
| parsed_data=ParsedData() |
| ) |
| |
| record_id = storage_service.save_record(record) |
| |
| assert record_id |
| assert len(record_id) == 36 |
| assert record.record_id == record_id |
| |
| def test_save_record_preserves_existing_id(self, storage_service): |
| """Test that save_record preserves existing record_id.""" |
| existing_id = "my-custom-id" |
| record = RecordData( |
| record_id=existing_id, |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="测试文本", |
| parsed_data=ParsedData() |
| ) |
| |
| record_id = storage_service.save_record(record) |
| |
| assert record_id == existing_id |
| |
| def test_save_record_appends_to_existing_records(self, storage_service): |
| """Test that save_record appends to existing records.""" |
| |
| record1 = RecordData( |
| record_id="id-1", |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="文本1", |
| parsed_data=ParsedData() |
| ) |
| storage_service.save_record(record1) |
| |
| |
| record2 = RecordData( |
| record_id="id-2", |
| timestamp="2024-01-01T13:00:00Z", |
| input_type="text", |
| original_text="文本2", |
| parsed_data=ParsedData() |
| ) |
| storage_service.save_record(record2) |
| |
| |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| records = json.load(f) |
| |
| assert len(records) == 2 |
| assert records[0]["record_id"] == "id-1" |
| assert records[1]["record_id"] == "id-2" |
| |
| def test_save_record_with_complete_data(self, storage_service): |
| """Test saving a record with complete parsed data.""" |
| record = RecordData( |
| record_id="complete-id", |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="今天很开心", |
| parsed_data=ParsedData( |
| mood=MoodData(type="开心", intensity=8, keywords=["愉快"]), |
| inspirations=[InspirationData(core_idea="新想法", category="工作")], |
| todos=[TodoData(task="完成任务")] |
| ) |
| ) |
| |
| storage_service.save_record(record) |
| |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| records = json.load(f) |
| |
| assert len(records) == 1 |
| saved_record = records[0] |
| assert saved_record["record_id"] == "complete-id" |
| assert saved_record["parsed_data"]["mood"]["type"] == "开心" |
| assert len(saved_record["parsed_data"]["inspirations"]) == 1 |
| assert len(saved_record["parsed_data"]["todos"]) == 1 |
|
|
|
|
| class TestAppendMood: |
| """Tests for append_mood method. |
| |
| Requirements: 7.2 |
| """ |
| |
| def test_append_mood_creates_file_if_not_exists(self, storage_service): |
| """Test that append_mood creates moods.json if it doesn't exist.""" |
| assert not storage_service.moods_file.exists() |
| |
| mood = MoodData(type="开心", intensity=8, keywords=["愉快"]) |
| storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z") |
| |
| assert storage_service.moods_file.exists() |
| |
| def test_append_mood_adds_metadata(self, storage_service): |
| """Test that append_mood adds record_id and timestamp.""" |
| mood = MoodData(type="开心", intensity=8, keywords=["愉快"]) |
| storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z") |
| |
| with open(storage_service.moods_file, 'r', encoding='utf-8') as f: |
| moods = json.load(f) |
| |
| assert len(moods) == 1 |
| assert moods[0]["record_id"] == "record-1" |
| assert moods[0]["timestamp"] == "2024-01-01T12:00:00Z" |
| assert moods[0]["type"] == "开心" |
| assert moods[0]["intensity"] == 8 |
| |
| def test_append_mood_multiple_moods(self, storage_service): |
| """Test appending multiple moods.""" |
| mood1 = MoodData(type="开心", intensity=8) |
| mood2 = MoodData(type="焦虑", intensity=6) |
| |
| storage_service.append_mood(mood1, "record-1", "2024-01-01T12:00:00Z") |
| storage_service.append_mood(mood2, "record-2", "2024-01-01T13:00:00Z") |
| |
| with open(storage_service.moods_file, 'r', encoding='utf-8') as f: |
| moods = json.load(f) |
| |
| assert len(moods) == 2 |
| assert moods[0]["type"] == "开心" |
| assert moods[1]["type"] == "焦虑" |
|
|
|
|
| class TestAppendInspirations: |
| """Tests for append_inspirations method. |
| |
| Requirements: 7.3 |
| """ |
| |
| def test_append_inspirations_creates_file_if_not_exists(self, storage_service): |
| """Test that append_inspirations creates inspirations.json if it doesn't exist.""" |
| assert not storage_service.inspirations_file.exists() |
| |
| inspirations = [InspirationData(core_idea="想法", category="工作")] |
| storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z") |
| |
| assert storage_service.inspirations_file.exists() |
| |
| def test_append_inspirations_empty_list(self, storage_service): |
| """Test that append_inspirations handles empty list gracefully.""" |
| storage_service.append_inspirations([], "record-1", "2024-01-01T12:00:00Z") |
| |
| |
| assert not storage_service.inspirations_file.exists() |
| |
| def test_append_inspirations_adds_metadata(self, storage_service): |
| """Test that append_inspirations adds record_id and timestamp.""" |
| inspirations = [ |
| InspirationData(core_idea="想法1", tags=["标签1"], category="工作") |
| ] |
| storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z") |
| |
| with open(storage_service.inspirations_file, 'r', encoding='utf-8') as f: |
| all_inspirations = json.load(f) |
| |
| assert len(all_inspirations) == 1 |
| assert all_inspirations[0]["record_id"] == "record-1" |
| assert all_inspirations[0]["timestamp"] == "2024-01-01T12:00:00Z" |
| assert all_inspirations[0]["core_idea"] == "想法1" |
| |
| def test_append_inspirations_multiple_items(self, storage_service): |
| """Test appending multiple inspirations at once.""" |
| inspirations = [ |
| InspirationData(core_idea="想法1", category="工作"), |
| InspirationData(core_idea="想法2", category="生活"), |
| InspirationData(core_idea="想法3", category="学习") |
| ] |
| storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z") |
| |
| with open(storage_service.inspirations_file, 'r', encoding='utf-8') as f: |
| all_inspirations = json.load(f) |
| |
| assert len(all_inspirations) == 3 |
| assert all_inspirations[0]["core_idea"] == "想法1" |
| assert all_inspirations[1]["core_idea"] == "想法2" |
| assert all_inspirations[2]["core_idea"] == "想法3" |
|
|
|
|
| class TestAppendTodos: |
| """Tests for append_todos method. |
| |
| Requirements: 7.4 |
| """ |
| |
| def test_append_todos_creates_file_if_not_exists(self, storage_service): |
| """Test that append_todos creates todos.json if it doesn't exist.""" |
| assert not storage_service.todos_file.exists() |
| |
| todos = [TodoData(task="任务1")] |
| storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z") |
| |
| assert storage_service.todos_file.exists() |
| |
| def test_append_todos_empty_list(self, storage_service): |
| """Test that append_todos handles empty list gracefully.""" |
| storage_service.append_todos([], "record-1", "2024-01-01T12:00:00Z") |
| |
| |
| assert not storage_service.todos_file.exists() |
| |
| def test_append_todos_adds_metadata(self, storage_service): |
| """Test that append_todos adds record_id and timestamp.""" |
| todos = [ |
| TodoData(task="任务1", time="明天", location="办公室") |
| ] |
| storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z") |
| |
| with open(storage_service.todos_file, 'r', encoding='utf-8') as f: |
| all_todos = json.load(f) |
| |
| assert len(all_todos) == 1 |
| assert all_todos[0]["record_id"] == "record-1" |
| assert all_todos[0]["timestamp"] == "2024-01-01T12:00:00Z" |
| assert all_todos[0]["task"] == "任务1" |
| assert all_todos[0]["status"] == "pending" |
| |
| def test_append_todos_multiple_items(self, storage_service): |
| """Test appending multiple todos at once.""" |
| todos = [ |
| TodoData(task="任务1", time="今天"), |
| TodoData(task="任务2", location="家里"), |
| TodoData(task="任务3") |
| ] |
| storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z") |
| |
| with open(storage_service.todos_file, 'r', encoding='utf-8') as f: |
| all_todos = json.load(f) |
| |
| assert len(all_todos) == 3 |
| assert all_todos[0]["task"] == "任务1" |
| assert all_todos[1]["task"] == "任务2" |
| assert all_todos[2]["task"] == "任务3" |
|
|
|
|
| class TestErrorHandling: |
| """Tests for error handling. |
| |
| Requirements: 7.6 |
| """ |
| |
| def test_storage_error_on_write_failure(self, storage_service, monkeypatch): |
| """Test that StorageError is raised when file writing fails.""" |
| |
| def mock_open_error(*args, **kwargs): |
| if 'w' in args or kwargs.get('mode') == 'w': |
| raise IOError("Permission denied") |
| return open(*args, **kwargs) |
| |
| monkeypatch.setattr("builtins.open", mock_open_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service._write_json_file(storage_service.records_file, []) |
| |
| assert "Failed to write file" in str(exc_info.value) |
| |
| def test_storage_error_on_read_failure(self, storage_service): |
| """Test that StorageError is raised when file reading fails.""" |
| |
| with open(storage_service.records_file, 'w') as f: |
| f.write("invalid json content") |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service._read_json_file(storage_service.records_file) |
| |
| assert "Failed to read file" in str(exc_info.value) |
| |
| def test_save_record_write_failure(self, storage_service, monkeypatch): |
| """Test that save_record raises StorageError when file writing fails.""" |
| record = RecordData( |
| record_id="test-id", |
| timestamp="2024-01-01T12:00:00Z", |
| input_type="text", |
| original_text="测试文本", |
| parsed_data=ParsedData() |
| ) |
| |
| |
| import json |
| original_dump = json.dump |
| |
| def mock_dump_error(*args, **kwargs): |
| raise IOError("Disk full") |
| |
| monkeypatch.setattr("json.dump", mock_dump_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service.save_record(record) |
| |
| |
| assert "Failed to" in str(exc_info.value) |
| |
| def test_append_mood_write_failure(self, storage_service, monkeypatch): |
| """Test that append_mood raises StorageError when file writing fails.""" |
| mood = MoodData(type="开心", intensity=8, keywords=["愉快"]) |
| |
| |
| import json |
| |
| def mock_dump_error(*args, **kwargs): |
| raise IOError("Disk full") |
| |
| monkeypatch.setattr("json.dump", mock_dump_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z") |
| |
| |
| assert "Failed to" in str(exc_info.value) |
| |
| def test_append_inspirations_write_failure(self, storage_service, monkeypatch): |
| """Test that append_inspirations raises StorageError when file writing fails.""" |
| inspirations = [InspirationData(core_idea="想法", category="工作")] |
| |
| |
| import json |
| |
| def mock_dump_error(*args, **kwargs): |
| raise IOError("Disk full") |
| |
| monkeypatch.setattr("json.dump", mock_dump_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z") |
| |
| |
| assert "Failed to" in str(exc_info.value) |
| |
| def test_append_todos_write_failure(self, storage_service, monkeypatch): |
| """Test that append_todos raises StorageError when file writing fails.""" |
| todos = [TodoData(task="任务1")] |
| |
| |
| import json |
| |
| def mock_dump_error(*args, **kwargs): |
| raise IOError("Disk full") |
| |
| monkeypatch.setattr("json.dump", mock_dump_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z") |
| |
| |
| assert "Failed to" in str(exc_info.value) |
| |
| def test_ensure_file_exists_creation_failure(self, storage_service, monkeypatch): |
| """Test that _ensure_file_exists raises StorageError when file creation fails.""" |
| test_file = storage_service.data_dir / "test.json" |
| |
| |
| def mock_open_error(*args, **kwargs): |
| if 'w' in kwargs.get('mode', ''): |
| raise IOError("Permission denied") |
| return open(*args, **kwargs) |
| |
| monkeypatch.setattr("builtins.open", mock_open_error) |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service._ensure_file_exists(test_file) |
| |
| assert "Failed to initialize file" in str(exc_info.value) |
| |
| def test_read_json_file_with_corrupted_data(self, storage_service): |
| """Test that _read_json_file raises StorageError with corrupted JSON.""" |
| |
| with open(storage_service.records_file, 'w') as f: |
| f.write('{"incomplete": "json"') |
| |
| with pytest.raises(StorageError) as exc_info: |
| storage_service._read_json_file(storage_service.records_file) |
| |
| assert "Failed to read file" in str(exc_info.value) |
| |
| def test_read_json_file_with_non_list_data(self, storage_service): |
| """Test that _read_json_file can read non-list JSON (returns as-is).""" |
| |
| with open(storage_service.records_file, 'w') as f: |
| json.dump({"key": "value"}, f) |
| |
| |
| result = storage_service._read_json_file(storage_service.records_file) |
| assert result == {"key": "value"} |
|
|
|
|
|
|
| class TestConcurrentWriteSafety: |
| """Tests for concurrent write safety. |
| |
| These tests document the current behavior of the storage service under |
| concurrent access. The current implementation does NOT provide thread-safe |
| file operations, so these tests verify that race conditions can occur. |
| |
| In a production system, you would need to add file locking or use a |
| proper database to ensure thread safety. |
| |
| Requirements: 7.6 |
| """ |
| |
| def test_concurrent_save_record_race_condition(self, storage_service): |
| """Test that demonstrates race conditions can occur with concurrent save_record calls. |
| |
| This test documents that the current implementation is NOT thread-safe. |
| Multiple threads writing simultaneously can cause data corruption or loss. |
| """ |
| import threading |
| |
| num_threads = 5 |
| records_per_thread = 3 |
| threads = [] |
| errors = [] |
| successful_saves = [] |
| lock = threading.Lock() |
| |
| def save_records(thread_id): |
| try: |
| for i in range(records_per_thread): |
| record = RecordData( |
| record_id="", |
| timestamp=f"2024-01-01T{thread_id:02d}:{i:02d}:00Z", |
| input_type="text", |
| original_text=f"Thread {thread_id} Record {i}", |
| parsed_data=ParsedData() |
| ) |
| record_id = storage_service.save_record(record) |
| with lock: |
| successful_saves.append(record_id) |
| except Exception as e: |
| with lock: |
| errors.append(e) |
| |
| |
| for thread_id in range(num_threads): |
| thread = threading.Thread(target=save_records, args=(thread_id,)) |
| threads.append(thread) |
| thread.start() |
| |
| |
| for thread in threads: |
| thread.join() |
| |
| |
| |
| if errors: |
| |
| assert all(isinstance(e, StorageError) for e in errors), \ |
| "All errors should be StorageError instances" |
| else: |
| |
| try: |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| records = json.load(f) |
| |
| |
| |
| assert isinstance(records, list), "Records file should contain a list" |
| assert len(records) > 0, "At least some records should be saved" |
| except json.JSONDecodeError: |
| |
| pytest.skip("File corrupted due to concurrent writes (expected behavior)") |
| |
| def test_sequential_writes_are_safe(self, storage_service): |
| """Test that sequential (non-concurrent) writes work correctly. |
| |
| This test verifies that when operations are performed sequentially, |
| all data is saved correctly without corruption. |
| """ |
| num_records = 20 |
| saved_ids = [] |
| |
| |
| for i in range(num_records): |
| record = RecordData( |
| record_id="", |
| timestamp=f"2024-01-01T00:{i:02d}:00Z", |
| input_type="text", |
| original_text=f"Record {i}", |
| parsed_data=ParsedData() |
| ) |
| record_id = storage_service.save_record(record) |
| saved_ids.append(record_id) |
| |
| |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| records = json.load(f) |
| |
| assert len(records) == num_records, \ |
| f"Expected {num_records} records, found {len(records)}" |
| |
| |
| assert len(set(saved_ids)) == num_records, \ |
| "All record IDs should be unique" |
| |
| |
| file_ids = [r["record_id"] for r in records] |
| for saved_id in saved_ids: |
| assert saved_id in file_ids, \ |
| f"Record {saved_id} should be in the file" |
| |
| def test_concurrent_writes_with_different_files(self, storage_service): |
| """Test that concurrent writes to DIFFERENT files work better. |
| |
| When threads write to different files (records vs moods vs inspirations vs todos), |
| there's less chance of corruption since they don't share the same file. |
| """ |
| import threading |
| |
| errors = [] |
| lock = threading.Lock() |
| |
| def save_record(): |
| try: |
| record = RecordData( |
| record_id="", |
| timestamp="2024-01-01T00:00:00Z", |
| input_type="text", |
| original_text="Test record", |
| parsed_data=ParsedData() |
| ) |
| storage_service.save_record(record) |
| except Exception as e: |
| with lock: |
| errors.append(("record", e)) |
| |
| def save_mood(): |
| try: |
| mood = MoodData(type="开心", intensity=8) |
| storage_service.append_mood(mood, "test-id", "2024-01-01T00:00:00Z") |
| except Exception as e: |
| with lock: |
| errors.append(("mood", e)) |
| |
| def save_inspiration(): |
| try: |
| inspirations = [InspirationData(core_idea="想法", category="工作")] |
| storage_service.append_inspirations(inspirations, "test-id", "2024-01-01T00:00:00Z") |
| except Exception as e: |
| with lock: |
| errors.append(("inspiration", e)) |
| |
| def save_todo(): |
| try: |
| todos = [TodoData(task="任务")] |
| storage_service.append_todos(todos, "test-id", "2024-01-01T00:00:00Z") |
| except Exception as e: |
| with lock: |
| errors.append(("todo", e)) |
| |
| |
| threads = [ |
| threading.Thread(target=save_record), |
| threading.Thread(target=save_mood), |
| threading.Thread(target=save_inspiration), |
| threading.Thread(target=save_todo) |
| ] |
| |
| for thread in threads: |
| thread.start() |
| |
| for thread in threads: |
| thread.join() |
| |
| |
| |
| if errors: |
| |
| error_types = [e[0] for e in errors] |
| pytest.skip(f"Some operations failed due to race conditions: {error_types}") |
| |
| |
| assert storage_service.records_file.exists() |
| assert storage_service.moods_file.exists() |
| assert storage_service.inspirations_file.exists() |
| assert storage_service.todos_file.exists() |
| |
| def test_error_handling_preserves_file_integrity(self, storage_service): |
| """Test that when errors occur, existing file data is not corrupted. |
| |
| This verifies that even if a write operation fails, the existing |
| data in the file remains intact and readable. |
| """ |
| |
| record1 = RecordData( |
| record_id="initial-id", |
| timestamp="2024-01-01T00:00:00Z", |
| input_type="text", |
| original_text="Initial record", |
| parsed_data=ParsedData() |
| ) |
| storage_service.save_record(record1) |
| |
| |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| initial_records = json.load(f) |
| assert len(initial_records) == 1 |
| |
| |
| record2 = RecordData( |
| record_id="second-id", |
| timestamp="2024-01-01T01:00:00Z", |
| input_type="text", |
| original_text="Second record", |
| parsed_data=ParsedData() |
| ) |
| storage_service.save_record(record2) |
| |
| |
| with open(storage_service.records_file, 'r', encoding='utf-8') as f: |
| final_records = json.load(f) |
| assert len(final_records) == 2 |
| assert final_records[0]["record_id"] == "initial-id" |
| assert final_records[1]["record_id"] == "second-id" |
|
|