Spiritual_Health_Project / tests /verification_mode /test_integration_workflows.py
DocUA's picture
βœ… Enhanced Verification Modes - Production Ready
7bbd836
# test_integration_workflows.py
"""
Integration tests for complete verification workflows.
Tests end-to-end workflows including:
- Full verification workflow: select dataset β†’ review message β†’ provide feedback β†’ view results β†’ export CSV
- Session resumption workflow
- Error recovery workflows
"""
import pytest
from datetime import datetime
from src.core.verification_models import (
VerificationSession,
TestMessage,
)
from src.core.verification_store import JSONVerificationStore
from src.core.message_queue_manager import MessageQueueManager
from src.core.verification_feedback_handler import VerificationFeedbackHandler
from src.core.verification_metrics import VerificationMetricsCalculator
from src.core.verification_csv_exporter import VerificationCSVExporter
from src.core.test_datasets import TestDatasetManager
class TestCompleteVerificationWorkflow:
"""Tests for complete verification workflow."""
def test_full_workflow_select_dataset_to_export_csv(
self, temp_storage_dir, test_data_generator, assertion_helpers
):
"""
Test full workflow: select dataset β†’ review message β†’ provide feedback β†’ view results β†’ export CSV
This test verifies the complete end-to-end workflow of the verification mode.
"""
# Step 1: Initialize storage and create session
store = JSONVerificationStore(storage_dir=temp_storage_dir)
# Step 2: Select a dataset (using mixed scenarios for variety)
dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET
assert dataset is not None
assert len(dataset.messages) > 0
# Step 3: Create a verification session
session = test_data_generator.create_verification_session(
session_id="workflow_test_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
# Step 4: Initialize message queue
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
# Step 5: Create feedback handler
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Step 6: Process first 3 messages
messages_to_process = dataset.messages[:3]
for i, message in enumerate(messages_to_process):
# Get current message
current_msg_id = queue_manager.get_current_message_id()
assert current_msg_id == message.message_id
# Provide feedback (alternate between correct and incorrect)
if i % 2 == 0:
# Mark as correct
handler.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test_indicator"],
)
else:
# Mark as incorrect with correction
correction = "red" if message.pre_classified_label != "red" else "green"
handler.handle_incorrect_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test_indicator"],
ground_truth_label=correction,
verifier_notes="Test correction",
)
# Step 7: Verify session statistics
stats = handler.get_session_statistics()
assert stats["verified_count"] == 3
assert stats["correct_count"] == 2 # First and third are correct
assert stats["incorrect_count"] == 1 # Second is incorrect
# Step 8: Export to CSV
csv_content = store.export_to_csv(session.session_id)
# Step 9: Verify CSV content
assertion_helpers.assert_csv_has_summary_section(csv_content)
assertion_helpers.assert_csv_contains_columns(
csv_content,
["Patient Message", "Classifier Said", "You Said", "Notes", "Date"]
)
# Verify CSV has correct number of data rows (3 messages + header + summary)
lines = csv_content.split("\n")
assert len(lines) > 5 # Summary + header + at least 3 data rows
# Verify accuracy in CSV
assert "Accuracy %" in csv_content
assert "66" in csv_content or "67" in csv_content # 2/3 β‰ˆ 66.67%
def test_workflow_with_all_correct_feedback(
self, temp_storage_dir, test_data_generator, assertion_helpers
):
"""Test workflow where all feedback is marked as correct."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET
session = test_data_generator.create_verification_session(
session_id="all_correct_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Mark all messages as correct
for message in dataset.messages[:5]:
handler.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.90,
classifier_indicators=["positive"],
)
# Verify all are correct
stats = handler.get_session_statistics()
assert stats["verified_count"] == 5
assert stats["correct_count"] == 5
assert stats["incorrect_count"] == 0
assert stats["accuracy"] == 100.0
# Export and verify
csv_content = store.export_to_csv(session.session_id)
assert "100.0" in csv_content # 100% accuracy
def test_workflow_with_all_incorrect_feedback(
self, temp_storage_dir, test_data_generator, assertion_helpers
):
"""Test workflow where all feedback is marked as incorrect."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.SUICIDAL_IDEATION_DATASET
session = test_data_generator.create_verification_session(
session_id="all_incorrect_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Mark all messages as incorrect (change red to yellow)
for message in dataset.messages[:5]:
handler.handle_incorrect_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.90,
classifier_indicators=["severe"],
ground_truth_label="yellow", # Wrong correction
verifier_notes="Classifier was wrong",
)
# Verify all are incorrect
stats = handler.get_session_statistics()
assert stats["verified_count"] == 5
assert stats["correct_count"] == 0
assert stats["incorrect_count"] == 5
assert stats["accuracy"] == 0.0
# Export and verify
csv_content = store.export_to_csv(session.session_id)
assert "0.0" in csv_content # 0% accuracy
def test_workflow_with_mixed_classifications(
self, temp_storage_dir, test_data_generator, assertion_helpers
):
"""Test workflow with mixed classification types."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET
session = test_data_generator.create_verification_session(
session_id="mixed_class_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Process messages and verify accuracy by type
for message in dataset.messages[:6]:
handler.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
stats = handler.get_session_statistics()
# Verify accuracy by type is calculated
assert "accuracy_by_type" in stats
assert "green" in stats["accuracy_by_type"]
assert "yellow" in stats["accuracy_by_type"]
assert "red" in stats["accuracy_by_type"]
class TestSessionResumptionWorkflow:
"""Tests for session resumption workflow."""
def test_resume_session_after_partial_verification(
self, temp_storage_dir, test_data_generator
):
"""Test resuming a session after partial verification."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.ANXIETY_WORRY_DATASET
# Create and partially complete a session
session = test_data_generator.create_verification_session(
session_id="resume_test_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Process first 3 messages
for message in dataset.messages[:3]:
handler.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["anxiety"],
)
# Get stats before closing
stats_before = handler.get_session_statistics()
assert stats_before["verified_count"] == 3
# Simulate closing and reopening the session
loaded_session = store.load_session(session.session_id)
assert loaded_session is not None
assert len(loaded_session.verifications) == 3
# Resume with new queue manager and handler
queue_manager_resumed = MessageQueueManager(loaded_session)
queue_manager_resumed.initialize_queue(dataset.messages)
handler_resumed = VerificationFeedbackHandler(
loaded_session, store, queue_manager_resumed
)
# Verify we can continue from where we left off
stats_after = handler_resumed.get_session_statistics()
assert stats_after["verified_count"] == 3
assert stats_after["correct_count"] == 3
# Process more messages
for message in dataset.messages[3:5]:
handler_resumed.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["anxiety"],
)
# Verify total count increased
stats_final = handler_resumed.get_session_statistics()
assert stats_final["verified_count"] == 5
def test_resume_session_preserves_all_data(
self, temp_storage_dir, test_data_generator, assertion_helpers
):
"""Test that resuming a session preserves all verification data."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET
session = test_data_generator.create_verification_session(
session_id="preserve_data_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Create records with specific notes
test_notes = [
"First message note",
"Second message note",
"Third message note",
]
for i, message in enumerate(dataset.messages[:3]):
if i == 0:
handler.handle_correct_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
else:
handler.handle_incorrect_feedback(
message=message,
classifier_decision=message.pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
ground_truth_label="green" if message.pre_classified_label != "green" else "red",
verifier_notes=test_notes[i],
)
# Load session and verify data is preserved
loaded_session = store.load_session(session.session_id)
assert len(loaded_session.verifications) == 3
assert loaded_session.verifications[0].is_correct is True
assert loaded_session.verifications[1].verifier_notes == test_notes[1]
assert loaded_session.verifications[2].verifier_notes == test_notes[2]
def test_get_last_session_returns_most_recent(
self, temp_storage_dir, test_data_generator
):
"""Test that get_last_session returns the most recently created session."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
# Create multiple sessions
session1 = test_data_generator.create_verification_session(
session_id="session_1",
verifier_name="Verifier 1",
)
store.save_session(session1)
session2 = test_data_generator.create_verification_session(
session_id="session_2",
verifier_name="Verifier 2",
)
store.save_session(session2)
session3 = test_data_generator.create_verification_session(
session_id="session_3",
verifier_name="Verifier 3",
)
store.save_session(session3)
# Get last session
last_session = store.get_last_session()
# Should be session 3 (most recent)
assert last_session is not None
assert last_session.session_id == "session_3"
class TestErrorRecoveryWorkflows:
"""Tests for error recovery workflows."""
def test_recovery_from_failed_feedback_submission(
self, temp_storage_dir, test_data_generator
):
"""Test recovery when feedback submission fails."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET
session = test_data_generator.create_verification_session(
session_id="error_recovery_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Try to handle feedback with missing correction (should fail)
with pytest.raises(Exception):
handler.handle_incorrect_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
ground_truth_label="", # Missing correction
verifier_notes="",
)
# Verify session is still in valid state
loaded_session = store.load_session(session.session_id)
assert len(loaded_session.verifications) == 0 # No records added
# Should be able to retry with valid data
result = handler.handle_correct_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
assert result is True
# Verify record was saved on retry
loaded_session = store.load_session(session.session_id)
assert len(loaded_session.verifications) == 1
def test_recovery_from_csv_export_failure(
self, temp_storage_dir, test_data_generator
):
"""Test recovery when CSV export fails."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
session = test_data_generator.create_verification_session(
session_id="csv_error_001",
total_messages=0,
)
store.save_session(session)
# Try to export with no verified messages (should fail)
# The error message is formatted by the error handler
with pytest.raises((ValueError, RuntimeError)):
store.export_to_csv(session.session_id)
# Add some messages and retry
dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
handler.handle_correct_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
# Now export should succeed
csv_content = store.export_to_csv(session.session_id)
assert csv_content is not None
assert len(csv_content) > 0
def test_recovery_from_session_load_failure(
self, temp_storage_dir, test_data_generator
):
"""Test recovery when session load fails."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
# Try to load non-existent session
loaded_session = store.load_session("non_existent_session")
assert loaded_session is None
# Should be able to create new session
session = test_data_generator.create_verification_session(
session_id="recovery_new_session",
)
store.save_session(session)
# Now load should succeed
loaded_session = store.load_session("recovery_new_session")
assert loaded_session is not None
assert loaded_session.session_id == "recovery_new_session"
def test_recovery_from_invalid_correction_selection(
self, temp_storage_dir, test_data_generator
):
"""Test recovery when invalid correction is selected."""
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.ANXIETY_WORRY_DATASET
session = test_data_generator.create_verification_session(
session_id="invalid_correction_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Try with invalid correction
with pytest.raises(Exception):
handler.handle_incorrect_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
ground_truth_label="invalid_option",
verifier_notes="",
)
# Verify session is still valid
loaded_session = store.load_session(session.session_id)
assert len(loaded_session.verifications) == 0
# Should be able to retry with valid correction
result = handler.handle_incorrect_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
ground_truth_label="red",
verifier_notes="",
)
assert result is True
def test_recovery_from_completed_session_modification_attempt(
self, temp_storage_dir, test_data_generator
):
"""Test recovery when attempting to modify a completed session."""
from src.core.verification_feedback_handler import FeedbackValidationError
store = JSONVerificationStore(storage_dir=temp_storage_dir)
dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET
session = test_data_generator.create_verification_session(
session_id="completed_session_001",
dataset_id=dataset.dataset_id,
dataset_name=dataset.name,
total_messages=len(dataset.messages),
)
store.save_session(session)
queue_manager = MessageQueueManager(session)
queue_manager.initialize_queue(dataset.messages)
handler = VerificationFeedbackHandler(session, store, queue_manager)
# Add some feedback
handler.handle_correct_feedback(
message=dataset.messages[0],
classifier_decision=dataset.messages[0].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
# Mark session as complete
store.mark_session_complete(session.session_id)
# Try to add more feedback (should fail with FeedbackValidationError)
with pytest.raises(FeedbackValidationError, match="Cannot modify completed session"):
handler.handle_correct_feedback(
message=dataset.messages[1],
classifier_decision=dataset.messages[1].pre_classified_label,
classifier_confidence=0.85,
classifier_indicators=["test"],
)
# Verify original feedback is still there
loaded_session = store.load_session(session.session_id)
assert len(loaded_session.verifications) == 1
assert loaded_session.is_complete is True