Nora / tests /test_storage.py
GitHub Action
Deploy clean version of Nora
59bd45e
"""Unit tests for storage service.
This module tests the StorageService class to ensure proper JSON file
persistence, error handling, and data integrity.
Requirements: 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7
"""
import json
import pytest
import tempfile
import shutil
from pathlib import Path
from datetime import datetime
from app.storage import StorageService, StorageError
from app.models import (
RecordData,
ParsedData,
MoodData,
InspirationData,
TodoData
)
@pytest.fixture
def temp_data_dir():
"""Create a temporary directory for test data."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
@pytest.fixture
def storage_service(temp_data_dir):
"""Create a StorageService instance with temporary directory."""
return StorageService(temp_data_dir)
class TestStorageServiceInitialization:
"""Tests for StorageService initialization."""
def test_init_creates_data_directory(self, temp_data_dir):
"""Test that initialization creates the data directory if it doesn't exist."""
# Remove the directory
shutil.rmtree(temp_data_dir)
assert not Path(temp_data_dir).exists()
# Initialize service
service = StorageService(temp_data_dir)
# Verify directory was created
assert Path(temp_data_dir).exists()
assert Path(temp_data_dir).is_dir()
def test_init_sets_file_paths(self, storage_service, temp_data_dir):
"""Test that initialization sets correct file paths."""
assert storage_service.records_file == Path(temp_data_dir) / "records.json"
assert storage_service.moods_file == Path(temp_data_dir) / "moods.json"
assert storage_service.inspirations_file == Path(temp_data_dir) / "inspirations.json"
assert storage_service.todos_file == Path(temp_data_dir) / "todos.json"
class TestFileInitialization:
"""Tests for file initialization logic.
Requirements: 7.5
"""
def test_ensure_file_exists_creates_new_file(self, storage_service):
"""Test that _ensure_file_exists creates a new file with empty array."""
test_file = storage_service.data_dir / "test.json"
assert not test_file.exists()
storage_service._ensure_file_exists(test_file)
assert test_file.exists()
with open(test_file, 'r', encoding='utf-8') as f:
data = json.load(f)
assert data == []
def test_ensure_file_exists_preserves_existing_file(self, storage_service):
"""Test that _ensure_file_exists doesn't overwrite existing files."""
test_file = storage_service.data_dir / "test.json"
existing_data = [{"key": "value"}]
with open(test_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f)
storage_service._ensure_file_exists(test_file)
with open(test_file, 'r', encoding='utf-8') as f:
data = json.load(f)
assert data == existing_data
class TestSaveRecord:
"""Tests for save_record method.
Requirements: 7.1, 7.7
"""
def test_save_record_creates_file_if_not_exists(self, storage_service):
"""Test that save_record creates records.json if it doesn't exist."""
assert not storage_service.records_file.exists()
record = RecordData(
record_id="test-id",
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="测试文本",
parsed_data=ParsedData()
)
storage_service.save_record(record)
assert storage_service.records_file.exists()
def test_save_record_generates_uuid_if_not_set(self, storage_service):
"""Test that save_record generates a UUID if record_id is not set."""
record = RecordData(
record_id="",
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="测试文本",
parsed_data=ParsedData()
)
record_id = storage_service.save_record(record)
assert record_id
assert len(record_id) == 36 # UUID format
assert record.record_id == record_id
def test_save_record_preserves_existing_id(self, storage_service):
"""Test that save_record preserves existing record_id."""
existing_id = "my-custom-id"
record = RecordData(
record_id=existing_id,
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="测试文本",
parsed_data=ParsedData()
)
record_id = storage_service.save_record(record)
assert record_id == existing_id
def test_save_record_appends_to_existing_records(self, storage_service):
"""Test that save_record appends to existing records."""
# Save first record
record1 = RecordData(
record_id="id-1",
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="文本1",
parsed_data=ParsedData()
)
storage_service.save_record(record1)
# Save second record
record2 = RecordData(
record_id="id-2",
timestamp="2024-01-01T13:00:00Z",
input_type="text",
original_text="文本2",
parsed_data=ParsedData()
)
storage_service.save_record(record2)
# Verify both records exist
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
records = json.load(f)
assert len(records) == 2
assert records[0]["record_id"] == "id-1"
assert records[1]["record_id"] == "id-2"
def test_save_record_with_complete_data(self, storage_service):
"""Test saving a record with complete parsed data."""
record = RecordData(
record_id="complete-id",
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="今天很开心",
parsed_data=ParsedData(
mood=MoodData(type="开心", intensity=8, keywords=["愉快"]),
inspirations=[InspirationData(core_idea="新想法", category="工作")],
todos=[TodoData(task="完成任务")]
)
)
storage_service.save_record(record)
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
records = json.load(f)
assert len(records) == 1
saved_record = records[0]
assert saved_record["record_id"] == "complete-id"
assert saved_record["parsed_data"]["mood"]["type"] == "开心"
assert len(saved_record["parsed_data"]["inspirations"]) == 1
assert len(saved_record["parsed_data"]["todos"]) == 1
class TestAppendMood:
"""Tests for append_mood method.
Requirements: 7.2
"""
def test_append_mood_creates_file_if_not_exists(self, storage_service):
"""Test that append_mood creates moods.json if it doesn't exist."""
assert not storage_service.moods_file.exists()
mood = MoodData(type="开心", intensity=8, keywords=["愉快"])
storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z")
assert storage_service.moods_file.exists()
def test_append_mood_adds_metadata(self, storage_service):
"""Test that append_mood adds record_id and timestamp."""
mood = MoodData(type="开心", intensity=8, keywords=["愉快"])
storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z")
with open(storage_service.moods_file, 'r', encoding='utf-8') as f:
moods = json.load(f)
assert len(moods) == 1
assert moods[0]["record_id"] == "record-1"
assert moods[0]["timestamp"] == "2024-01-01T12:00:00Z"
assert moods[0]["type"] == "开心"
assert moods[0]["intensity"] == 8
def test_append_mood_multiple_moods(self, storage_service):
"""Test appending multiple moods."""
mood1 = MoodData(type="开心", intensity=8)
mood2 = MoodData(type="焦虑", intensity=6)
storage_service.append_mood(mood1, "record-1", "2024-01-01T12:00:00Z")
storage_service.append_mood(mood2, "record-2", "2024-01-01T13:00:00Z")
with open(storage_service.moods_file, 'r', encoding='utf-8') as f:
moods = json.load(f)
assert len(moods) == 2
assert moods[0]["type"] == "开心"
assert moods[1]["type"] == "焦虑"
class TestAppendInspirations:
"""Tests for append_inspirations method.
Requirements: 7.3
"""
def test_append_inspirations_creates_file_if_not_exists(self, storage_service):
"""Test that append_inspirations creates inspirations.json if it doesn't exist."""
assert not storage_service.inspirations_file.exists()
inspirations = [InspirationData(core_idea="想法", category="工作")]
storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z")
assert storage_service.inspirations_file.exists()
def test_append_inspirations_empty_list(self, storage_service):
"""Test that append_inspirations handles empty list gracefully."""
storage_service.append_inspirations([], "record-1", "2024-01-01T12:00:00Z")
# File should not be created for empty list
assert not storage_service.inspirations_file.exists()
def test_append_inspirations_adds_metadata(self, storage_service):
"""Test that append_inspirations adds record_id and timestamp."""
inspirations = [
InspirationData(core_idea="想法1", tags=["标签1"], category="工作")
]
storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z")
with open(storage_service.inspirations_file, 'r', encoding='utf-8') as f:
all_inspirations = json.load(f)
assert len(all_inspirations) == 1
assert all_inspirations[0]["record_id"] == "record-1"
assert all_inspirations[0]["timestamp"] == "2024-01-01T12:00:00Z"
assert all_inspirations[0]["core_idea"] == "想法1"
def test_append_inspirations_multiple_items(self, storage_service):
"""Test appending multiple inspirations at once."""
inspirations = [
InspirationData(core_idea="想法1", category="工作"),
InspirationData(core_idea="想法2", category="生活"),
InspirationData(core_idea="想法3", category="学习")
]
storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z")
with open(storage_service.inspirations_file, 'r', encoding='utf-8') as f:
all_inspirations = json.load(f)
assert len(all_inspirations) == 3
assert all_inspirations[0]["core_idea"] == "想法1"
assert all_inspirations[1]["core_idea"] == "想法2"
assert all_inspirations[2]["core_idea"] == "想法3"
class TestAppendTodos:
"""Tests for append_todos method.
Requirements: 7.4
"""
def test_append_todos_creates_file_if_not_exists(self, storage_service):
"""Test that append_todos creates todos.json if it doesn't exist."""
assert not storage_service.todos_file.exists()
todos = [TodoData(task="任务1")]
storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z")
assert storage_service.todos_file.exists()
def test_append_todos_empty_list(self, storage_service):
"""Test that append_todos handles empty list gracefully."""
storage_service.append_todos([], "record-1", "2024-01-01T12:00:00Z")
# File should not be created for empty list
assert not storage_service.todos_file.exists()
def test_append_todos_adds_metadata(self, storage_service):
"""Test that append_todos adds record_id and timestamp."""
todos = [
TodoData(task="任务1", time="明天", location="办公室")
]
storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z")
with open(storage_service.todos_file, 'r', encoding='utf-8') as f:
all_todos = json.load(f)
assert len(all_todos) == 1
assert all_todos[0]["record_id"] == "record-1"
assert all_todos[0]["timestamp"] == "2024-01-01T12:00:00Z"
assert all_todos[0]["task"] == "任务1"
assert all_todos[0]["status"] == "pending"
def test_append_todos_multiple_items(self, storage_service):
"""Test appending multiple todos at once."""
todos = [
TodoData(task="任务1", time="今天"),
TodoData(task="任务2", location="家里"),
TodoData(task="任务3")
]
storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z")
with open(storage_service.todos_file, 'r', encoding='utf-8') as f:
all_todos = json.load(f)
assert len(all_todos) == 3
assert all_todos[0]["task"] == "任务1"
assert all_todos[1]["task"] == "任务2"
assert all_todos[2]["task"] == "任务3"
class TestErrorHandling:
"""Tests for error handling.
Requirements: 7.6
"""
def test_storage_error_on_write_failure(self, storage_service, monkeypatch):
"""Test that StorageError is raised when file writing fails."""
# Mock the open function to raise an exception
def mock_open_error(*args, **kwargs):
if 'w' in args or kwargs.get('mode') == 'w':
raise IOError("Permission denied")
return open(*args, **kwargs)
monkeypatch.setattr("builtins.open", mock_open_error)
with pytest.raises(StorageError) as exc_info:
storage_service._write_json_file(storage_service.records_file, [])
assert "Failed to write file" in str(exc_info.value)
def test_storage_error_on_read_failure(self, storage_service):
"""Test that StorageError is raised when file reading fails."""
# Create an invalid JSON file
with open(storage_service.records_file, 'w') as f:
f.write("invalid json content")
with pytest.raises(StorageError) as exc_info:
storage_service._read_json_file(storage_service.records_file)
assert "Failed to read file" in str(exc_info.value)
def test_save_record_write_failure(self, storage_service, monkeypatch):
"""Test that save_record raises StorageError when file writing fails."""
record = RecordData(
record_id="test-id",
timestamp="2024-01-01T12:00:00Z",
input_type="text",
original_text="测试文本",
parsed_data=ParsedData()
)
# Mock json.dump to raise an exception
import json
original_dump = json.dump
def mock_dump_error(*args, **kwargs):
raise IOError("Disk full")
monkeypatch.setattr("json.dump", mock_dump_error)
with pytest.raises(StorageError) as exc_info:
storage_service.save_record(record)
# Error can occur during initialization or write
assert "Failed to" in str(exc_info.value)
def test_append_mood_write_failure(self, storage_service, monkeypatch):
"""Test that append_mood raises StorageError when file writing fails."""
mood = MoodData(type="开心", intensity=8, keywords=["愉快"])
# Mock json.dump to raise an exception
import json
def mock_dump_error(*args, **kwargs):
raise IOError("Disk full")
monkeypatch.setattr("json.dump", mock_dump_error)
with pytest.raises(StorageError) as exc_info:
storage_service.append_mood(mood, "record-1", "2024-01-01T12:00:00Z")
# Error can occur during initialization or write
assert "Failed to" in str(exc_info.value)
def test_append_inspirations_write_failure(self, storage_service, monkeypatch):
"""Test that append_inspirations raises StorageError when file writing fails."""
inspirations = [InspirationData(core_idea="想法", category="工作")]
# Mock json.dump to raise an exception
import json
def mock_dump_error(*args, **kwargs):
raise IOError("Disk full")
monkeypatch.setattr("json.dump", mock_dump_error)
with pytest.raises(StorageError) as exc_info:
storage_service.append_inspirations(inspirations, "record-1", "2024-01-01T12:00:00Z")
# Error can occur during initialization or write
assert "Failed to" in str(exc_info.value)
def test_append_todos_write_failure(self, storage_service, monkeypatch):
"""Test that append_todos raises StorageError when file writing fails."""
todos = [TodoData(task="任务1")]
# Mock json.dump to raise an exception
import json
def mock_dump_error(*args, **kwargs):
raise IOError("Disk full")
monkeypatch.setattr("json.dump", mock_dump_error)
with pytest.raises(StorageError) as exc_info:
storage_service.append_todos(todos, "record-1", "2024-01-01T12:00:00Z")
# Error can occur during initialization or write
assert "Failed to" in str(exc_info.value)
def test_ensure_file_exists_creation_failure(self, storage_service, monkeypatch):
"""Test that _ensure_file_exists raises StorageError when file creation fails."""
test_file = storage_service.data_dir / "test.json"
# Mock open to raise an exception
def mock_open_error(*args, **kwargs):
if 'w' in kwargs.get('mode', ''):
raise IOError("Permission denied")
return open(*args, **kwargs)
monkeypatch.setattr("builtins.open", mock_open_error)
with pytest.raises(StorageError) as exc_info:
storage_service._ensure_file_exists(test_file)
assert "Failed to initialize file" in str(exc_info.value)
def test_read_json_file_with_corrupted_data(self, storage_service):
"""Test that _read_json_file raises StorageError with corrupted JSON."""
# Create a file with corrupted JSON
with open(storage_service.records_file, 'w') as f:
f.write('{"incomplete": "json"')
with pytest.raises(StorageError) as exc_info:
storage_service._read_json_file(storage_service.records_file)
assert "Failed to read file" in str(exc_info.value)
def test_read_json_file_with_non_list_data(self, storage_service):
"""Test that _read_json_file can read non-list JSON (returns as-is)."""
# Create a file with valid JSON but not a list
with open(storage_service.records_file, 'w') as f:
json.dump({"key": "value"}, f)
# This should not raise an error - it returns the data as-is
result = storage_service._read_json_file(storage_service.records_file)
assert result == {"key": "value"}
class TestConcurrentWriteSafety:
"""Tests for concurrent write safety.
These tests document the current behavior of the storage service under
concurrent access. The current implementation does NOT provide thread-safe
file operations, so these tests verify that race conditions can occur.
In a production system, you would need to add file locking or use a
proper database to ensure thread safety.
Requirements: 7.6
"""
def test_concurrent_save_record_race_condition(self, storage_service):
"""Test that demonstrates race conditions can occur with concurrent save_record calls.
This test documents that the current implementation is NOT thread-safe.
Multiple threads writing simultaneously can cause data corruption or loss.
"""
import threading
num_threads = 5
records_per_thread = 3
threads = []
errors = []
successful_saves = []
lock = threading.Lock()
def save_records(thread_id):
try:
for i in range(records_per_thread):
record = RecordData(
record_id="", # Force UUID generation
timestamp=f"2024-01-01T{thread_id:02d}:{i:02d}:00Z",
input_type="text",
original_text=f"Thread {thread_id} Record {i}",
parsed_data=ParsedData()
)
record_id = storage_service.save_record(record)
with lock:
successful_saves.append(record_id)
except Exception as e:
with lock:
errors.append(e)
# Start all threads
for thread_id in range(num_threads):
thread = threading.Thread(target=save_records, args=(thread_id,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Document the behavior: either errors occur or some data may be lost
# This is expected with the current non-thread-safe implementation
if errors:
# Race conditions caused errors - this is expected
assert all(isinstance(e, StorageError) for e in errors), \
"All errors should be StorageError instances"
else:
# No errors, but verify data integrity
try:
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
records = json.load(f)
# Due to race conditions, we may have lost some records
# Just verify the file is still valid JSON and contains some records
assert isinstance(records, list), "Records file should contain a list"
assert len(records) > 0, "At least some records should be saved"
except json.JSONDecodeError:
# File may be corrupted due to concurrent writes
pytest.skip("File corrupted due to concurrent writes (expected behavior)")
def test_sequential_writes_are_safe(self, storage_service):
"""Test that sequential (non-concurrent) writes work correctly.
This test verifies that when operations are performed sequentially,
all data is saved correctly without corruption.
"""
num_records = 20
saved_ids = []
# Save records sequentially
for i in range(num_records):
record = RecordData(
record_id="",
timestamp=f"2024-01-01T00:{i:02d}:00Z",
input_type="text",
original_text=f"Record {i}",
parsed_data=ParsedData()
)
record_id = storage_service.save_record(record)
saved_ids.append(record_id)
# Verify all records were saved
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
records = json.load(f)
assert len(records) == num_records, \
f"Expected {num_records} records, found {len(records)}"
# Verify all IDs are unique
assert len(set(saved_ids)) == num_records, \
"All record IDs should be unique"
# Verify all saved IDs are in the file
file_ids = [r["record_id"] for r in records]
for saved_id in saved_ids:
assert saved_id in file_ids, \
f"Record {saved_id} should be in the file"
def test_concurrent_writes_with_different_files(self, storage_service):
"""Test that concurrent writes to DIFFERENT files work better.
When threads write to different files (records vs moods vs inspirations vs todos),
there's less chance of corruption since they don't share the same file.
"""
import threading
errors = []
lock = threading.Lock()
def save_record():
try:
record = RecordData(
record_id="",
timestamp="2024-01-01T00:00:00Z",
input_type="text",
original_text="Test record",
parsed_data=ParsedData()
)
storage_service.save_record(record)
except Exception as e:
with lock:
errors.append(("record", e))
def save_mood():
try:
mood = MoodData(type="开心", intensity=8)
storage_service.append_mood(mood, "test-id", "2024-01-01T00:00:00Z")
except Exception as e:
with lock:
errors.append(("mood", e))
def save_inspiration():
try:
inspirations = [InspirationData(core_idea="想法", category="工作")]
storage_service.append_inspirations(inspirations, "test-id", "2024-01-01T00:00:00Z")
except Exception as e:
with lock:
errors.append(("inspiration", e))
def save_todo():
try:
todos = [TodoData(task="任务")]
storage_service.append_todos(todos, "test-id", "2024-01-01T00:00:00Z")
except Exception as e:
with lock:
errors.append(("todo", e))
# Start threads writing to different files
threads = [
threading.Thread(target=save_record),
threading.Thread(target=save_mood),
threading.Thread(target=save_inspiration),
threading.Thread(target=save_todo)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# When writing to different files, operations should succeed
# (though there's still a small chance of issues during file initialization)
if errors:
# Document which operations failed
error_types = [e[0] for e in errors]
pytest.skip(f"Some operations failed due to race conditions: {error_types}")
# Verify all files were created
assert storage_service.records_file.exists()
assert storage_service.moods_file.exists()
assert storage_service.inspirations_file.exists()
assert storage_service.todos_file.exists()
def test_error_handling_preserves_file_integrity(self, storage_service):
"""Test that when errors occur, existing file data is not corrupted.
This verifies that even if a write operation fails, the existing
data in the file remains intact and readable.
"""
# Save some initial data
record1 = RecordData(
record_id="initial-id",
timestamp="2024-01-01T00:00:00Z",
input_type="text",
original_text="Initial record",
parsed_data=ParsedData()
)
storage_service.save_record(record1)
# Verify initial data is saved
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
initial_records = json.load(f)
assert len(initial_records) == 1
# Now try to save another record (this should succeed)
record2 = RecordData(
record_id="second-id",
timestamp="2024-01-01T01:00:00Z",
input_type="text",
original_text="Second record",
parsed_data=ParsedData()
)
storage_service.save_record(record2)
# Verify both records are saved
with open(storage_service.records_file, 'r', encoding='utf-8') as f:
final_records = json.load(f)
assert len(final_records) == 2
assert final_records[0]["record_id"] == "initial-id"
assert final_records[1]["record_id"] == "second-id"