Spaces:
Sleeping
Sleeping
| # test_integration_workflows.py | |
| """ | |
| Integration tests for complete verification workflows. | |
| Tests end-to-end workflows including: | |
| - Full verification workflow: select dataset β review message β provide feedback β view results β export CSV | |
| - Session resumption workflow | |
| - Error recovery workflows | |
| """ | |
| import pytest | |
| from datetime import datetime | |
| from src.core.verification_models import ( | |
| VerificationSession, | |
| TestMessage, | |
| ) | |
| from src.core.verification_store import JSONVerificationStore | |
| from src.core.message_queue_manager import MessageQueueManager | |
| from src.core.verification_feedback_handler import VerificationFeedbackHandler | |
| from src.core.verification_metrics import VerificationMetricsCalculator | |
| from src.core.verification_csv_exporter import VerificationCSVExporter | |
| from src.core.test_datasets import TestDatasetManager | |
| class TestCompleteVerificationWorkflow: | |
| """Tests for complete verification workflow.""" | |
| def test_full_workflow_select_dataset_to_export_csv( | |
| self, temp_storage_dir, test_data_generator, assertion_helpers | |
| ): | |
| """ | |
| Test full workflow: select dataset β review message β provide feedback β view results β export CSV | |
| This test verifies the complete end-to-end workflow of the verification mode. | |
| """ | |
| # Step 1: Initialize storage and create session | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| # Step 2: Select a dataset (using mixed scenarios for variety) | |
| dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET | |
| assert dataset is not None | |
| assert len(dataset.messages) > 0 | |
| # Step 3: Create a verification session | |
| session = test_data_generator.create_verification_session( | |
| session_id="workflow_test_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| # Step 4: Initialize message queue | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| # Step 5: Create feedback handler | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Step 6: Process first 3 messages | |
| messages_to_process = dataset.messages[:3] | |
| for i, message in enumerate(messages_to_process): | |
| # Get current message | |
| current_msg_id = queue_manager.get_current_message_id() | |
| assert current_msg_id == message.message_id | |
| # Provide feedback (alternate between correct and incorrect) | |
| if i % 2 == 0: | |
| # Mark as correct | |
| handler.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test_indicator"], | |
| ) | |
| else: | |
| # Mark as incorrect with correction | |
| correction = "red" if message.pre_classified_label != "red" else "green" | |
| handler.handle_incorrect_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test_indicator"], | |
| ground_truth_label=correction, | |
| verifier_notes="Test correction", | |
| ) | |
| # Step 7: Verify session statistics | |
| stats = handler.get_session_statistics() | |
| assert stats["verified_count"] == 3 | |
| assert stats["correct_count"] == 2 # First and third are correct | |
| assert stats["incorrect_count"] == 1 # Second is incorrect | |
| # Step 8: Export to CSV | |
| csv_content = store.export_to_csv(session.session_id) | |
| # Step 9: Verify CSV content | |
| assertion_helpers.assert_csv_has_summary_section(csv_content) | |
| assertion_helpers.assert_csv_contains_columns( | |
| csv_content, | |
| ["Patient Message", "Classifier Said", "You Said", "Notes", "Date"] | |
| ) | |
| # Verify CSV has correct number of data rows (3 messages + header + summary) | |
| lines = csv_content.split("\n") | |
| assert len(lines) > 5 # Summary + header + at least 3 data rows | |
| # Verify accuracy in CSV | |
| assert "Accuracy %" in csv_content | |
| assert "66" in csv_content or "67" in csv_content # 2/3 β 66.67% | |
| def test_workflow_with_all_correct_feedback( | |
| self, temp_storage_dir, test_data_generator, assertion_helpers | |
| ): | |
| """Test workflow where all feedback is marked as correct.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="all_correct_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Mark all messages as correct | |
| for message in dataset.messages[:5]: | |
| handler.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.90, | |
| classifier_indicators=["positive"], | |
| ) | |
| # Verify all are correct | |
| stats = handler.get_session_statistics() | |
| assert stats["verified_count"] == 5 | |
| assert stats["correct_count"] == 5 | |
| assert stats["incorrect_count"] == 0 | |
| assert stats["accuracy"] == 100.0 | |
| # Export and verify | |
| csv_content = store.export_to_csv(session.session_id) | |
| assert "100.0" in csv_content # 100% accuracy | |
| def test_workflow_with_all_incorrect_feedback( | |
| self, temp_storage_dir, test_data_generator, assertion_helpers | |
| ): | |
| """Test workflow where all feedback is marked as incorrect.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.SUICIDAL_IDEATION_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="all_incorrect_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Mark all messages as incorrect (change red to yellow) | |
| for message in dataset.messages[:5]: | |
| handler.handle_incorrect_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.90, | |
| classifier_indicators=["severe"], | |
| ground_truth_label="yellow", # Wrong correction | |
| verifier_notes="Classifier was wrong", | |
| ) | |
| # Verify all are incorrect | |
| stats = handler.get_session_statistics() | |
| assert stats["verified_count"] == 5 | |
| assert stats["correct_count"] == 0 | |
| assert stats["incorrect_count"] == 5 | |
| assert stats["accuracy"] == 0.0 | |
| # Export and verify | |
| csv_content = store.export_to_csv(session.session_id) | |
| assert "0.0" in csv_content # 0% accuracy | |
| def test_workflow_with_mixed_classifications( | |
| self, temp_storage_dir, test_data_generator, assertion_helpers | |
| ): | |
| """Test workflow with mixed classification types.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="mixed_class_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Process messages and verify accuracy by type | |
| for message in dataset.messages[:6]: | |
| handler.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| stats = handler.get_session_statistics() | |
| # Verify accuracy by type is calculated | |
| assert "accuracy_by_type" in stats | |
| assert "green" in stats["accuracy_by_type"] | |
| assert "yellow" in stats["accuracy_by_type"] | |
| assert "red" in stats["accuracy_by_type"] | |
| class TestSessionResumptionWorkflow: | |
| """Tests for session resumption workflow.""" | |
| def test_resume_session_after_partial_verification( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test resuming a session after partial verification.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.ANXIETY_WORRY_DATASET | |
| # Create and partially complete a session | |
| session = test_data_generator.create_verification_session( | |
| session_id="resume_test_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Process first 3 messages | |
| for message in dataset.messages[:3]: | |
| handler.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["anxiety"], | |
| ) | |
| # Get stats before closing | |
| stats_before = handler.get_session_statistics() | |
| assert stats_before["verified_count"] == 3 | |
| # Simulate closing and reopening the session | |
| loaded_session = store.load_session(session.session_id) | |
| assert loaded_session is not None | |
| assert len(loaded_session.verifications) == 3 | |
| # Resume with new queue manager and handler | |
| queue_manager_resumed = MessageQueueManager(loaded_session) | |
| queue_manager_resumed.initialize_queue(dataset.messages) | |
| handler_resumed = VerificationFeedbackHandler( | |
| loaded_session, store, queue_manager_resumed | |
| ) | |
| # Verify we can continue from where we left off | |
| stats_after = handler_resumed.get_session_statistics() | |
| assert stats_after["verified_count"] == 3 | |
| assert stats_after["correct_count"] == 3 | |
| # Process more messages | |
| for message in dataset.messages[3:5]: | |
| handler_resumed.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["anxiety"], | |
| ) | |
| # Verify total count increased | |
| stats_final = handler_resumed.get_session_statistics() | |
| assert stats_final["verified_count"] == 5 | |
| def test_resume_session_preserves_all_data( | |
| self, temp_storage_dir, test_data_generator, assertion_helpers | |
| ): | |
| """Test that resuming a session preserves all verification data.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="preserve_data_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Create records with specific notes | |
| test_notes = [ | |
| "First message note", | |
| "Second message note", | |
| "Third message note", | |
| ] | |
| for i, message in enumerate(dataset.messages[:3]): | |
| if i == 0: | |
| handler.handle_correct_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| else: | |
| handler.handle_incorrect_feedback( | |
| message=message, | |
| classifier_decision=message.pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ground_truth_label="green" if message.pre_classified_label != "green" else "red", | |
| verifier_notes=test_notes[i], | |
| ) | |
| # Load session and verify data is preserved | |
| loaded_session = store.load_session(session.session_id) | |
| assert len(loaded_session.verifications) == 3 | |
| assert loaded_session.verifications[0].is_correct is True | |
| assert loaded_session.verifications[1].verifier_notes == test_notes[1] | |
| assert loaded_session.verifications[2].verifier_notes == test_notes[2] | |
| def test_get_last_session_returns_most_recent( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test that get_last_session returns the most recently created session.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| # Create multiple sessions | |
| session1 = test_data_generator.create_verification_session( | |
| session_id="session_1", | |
| verifier_name="Verifier 1", | |
| ) | |
| store.save_session(session1) | |
| session2 = test_data_generator.create_verification_session( | |
| session_id="session_2", | |
| verifier_name="Verifier 2", | |
| ) | |
| store.save_session(session2) | |
| session3 = test_data_generator.create_verification_session( | |
| session_id="session_3", | |
| verifier_name="Verifier 3", | |
| ) | |
| store.save_session(session3) | |
| # Get last session | |
| last_session = store.get_last_session() | |
| # Should be session 3 (most recent) | |
| assert last_session is not None | |
| assert last_session.session_id == "session_3" | |
| class TestErrorRecoveryWorkflows: | |
| """Tests for error recovery workflows.""" | |
| def test_recovery_from_failed_feedback_submission( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test recovery when feedback submission fails.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="error_recovery_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Try to handle feedback with missing correction (should fail) | |
| with pytest.raises(Exception): | |
| handler.handle_incorrect_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ground_truth_label="", # Missing correction | |
| verifier_notes="", | |
| ) | |
| # Verify session is still in valid state | |
| loaded_session = store.load_session(session.session_id) | |
| assert len(loaded_session.verifications) == 0 # No records added | |
| # Should be able to retry with valid data | |
| result = handler.handle_correct_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| assert result is True | |
| # Verify record was saved on retry | |
| loaded_session = store.load_session(session.session_id) | |
| assert len(loaded_session.verifications) == 1 | |
| def test_recovery_from_csv_export_failure( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test recovery when CSV export fails.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| session = test_data_generator.create_verification_session( | |
| session_id="csv_error_001", | |
| total_messages=0, | |
| ) | |
| store.save_session(session) | |
| # Try to export with no verified messages (should fail) | |
| # The error message is formatted by the error handler | |
| with pytest.raises((ValueError, RuntimeError)): | |
| store.export_to_csv(session.session_id) | |
| # Add some messages and retry | |
| dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| handler.handle_correct_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| # Now export should succeed | |
| csv_content = store.export_to_csv(session.session_id) | |
| assert csv_content is not None | |
| assert len(csv_content) > 0 | |
| def test_recovery_from_session_load_failure( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test recovery when session load fails.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| # Try to load non-existent session | |
| loaded_session = store.load_session("non_existent_session") | |
| assert loaded_session is None | |
| # Should be able to create new session | |
| session = test_data_generator.create_verification_session( | |
| session_id="recovery_new_session", | |
| ) | |
| store.save_session(session) | |
| # Now load should succeed | |
| loaded_session = store.load_session("recovery_new_session") | |
| assert loaded_session is not None | |
| assert loaded_session.session_id == "recovery_new_session" | |
| def test_recovery_from_invalid_correction_selection( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test recovery when invalid correction is selected.""" | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.ANXIETY_WORRY_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="invalid_correction_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Try with invalid correction | |
| with pytest.raises(Exception): | |
| handler.handle_incorrect_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ground_truth_label="invalid_option", | |
| verifier_notes="", | |
| ) | |
| # Verify session is still valid | |
| loaded_session = store.load_session(session.session_id) | |
| assert len(loaded_session.verifications) == 0 | |
| # Should be able to retry with valid correction | |
| result = handler.handle_incorrect_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ground_truth_label="red", | |
| verifier_notes="", | |
| ) | |
| assert result is True | |
| def test_recovery_from_completed_session_modification_attempt( | |
| self, temp_storage_dir, test_data_generator | |
| ): | |
| """Test recovery when attempting to modify a completed session.""" | |
| from src.core.verification_feedback_handler import FeedbackValidationError | |
| store = JSONVerificationStore(storage_dir=temp_storage_dir) | |
| dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET | |
| session = test_data_generator.create_verification_session( | |
| session_id="completed_session_001", | |
| dataset_id=dataset.dataset_id, | |
| dataset_name=dataset.name, | |
| total_messages=len(dataset.messages), | |
| ) | |
| store.save_session(session) | |
| queue_manager = MessageQueueManager(session) | |
| queue_manager.initialize_queue(dataset.messages) | |
| handler = VerificationFeedbackHandler(session, store, queue_manager) | |
| # Add some feedback | |
| handler.handle_correct_feedback( | |
| message=dataset.messages[0], | |
| classifier_decision=dataset.messages[0].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| # Mark session as complete | |
| store.mark_session_complete(session.session_id) | |
| # Try to add more feedback (should fail with FeedbackValidationError) | |
| with pytest.raises(FeedbackValidationError, match="Cannot modify completed session"): | |
| handler.handle_correct_feedback( | |
| message=dataset.messages[1], | |
| classifier_decision=dataset.messages[1].pre_classified_label, | |
| classifier_confidence=0.85, | |
| classifier_indicators=["test"], | |
| ) | |
| # Verify original feedback is still there | |
| loaded_session = store.load_session(session.session_id) | |
| assert len(loaded_session.verifications) == 1 | |
| assert loaded_session.is_complete is True | |