# test_integration_workflows.py """ Integration tests for complete verification workflows. Tests end-to-end workflows including: - Full verification workflow: select dataset → review message → provide feedback → view results → export CSV - Session resumption workflow - Error recovery workflows """ import pytest from datetime import datetime from src.core.verification_models import ( VerificationSession, TestMessage, ) from src.core.verification_store import JSONVerificationStore from src.core.message_queue_manager import MessageQueueManager from src.core.verification_feedback_handler import VerificationFeedbackHandler from src.core.verification_metrics import VerificationMetricsCalculator from src.core.verification_csv_exporter import VerificationCSVExporter from src.core.test_datasets import TestDatasetManager class TestCompleteVerificationWorkflow: """Tests for complete verification workflow.""" def test_full_workflow_select_dataset_to_export_csv( self, temp_storage_dir, test_data_generator, assertion_helpers ): """ Test full workflow: select dataset → review message → provide feedback → view results → export CSV This test verifies the complete end-to-end workflow of the verification mode. """ # Step 1: Initialize storage and create session store = JSONVerificationStore(storage_dir=temp_storage_dir) # Step 2: Select a dataset (using mixed scenarios for variety) dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET assert dataset is not None assert len(dataset.messages) > 0 # Step 3: Create a verification session session = test_data_generator.create_verification_session( session_id="workflow_test_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) # Step 4: Initialize message queue queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) # Step 5: Create feedback handler handler = VerificationFeedbackHandler(session, store, queue_manager) # Step 6: Process first 3 messages messages_to_process = dataset.messages[:3] for i, message in enumerate(messages_to_process): # Get current message current_msg_id = queue_manager.get_current_message_id() assert current_msg_id == message.message_id # Provide feedback (alternate between correct and incorrect) if i % 2 == 0: # Mark as correct handler.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test_indicator"], ) else: # Mark as incorrect with correction correction = "red" if message.pre_classified_label != "red" else "green" handler.handle_incorrect_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test_indicator"], ground_truth_label=correction, verifier_notes="Test correction", ) # Step 7: Verify session statistics stats = handler.get_session_statistics() assert stats["verified_count"] == 3 assert stats["correct_count"] == 2 # First and third are correct assert stats["incorrect_count"] == 1 # Second is incorrect # Step 8: Export to CSV csv_content = store.export_to_csv(session.session_id) # Step 9: Verify CSV content assertion_helpers.assert_csv_has_summary_section(csv_content) assertion_helpers.assert_csv_contains_columns( csv_content, ["Patient Message", "Classifier Said", "You Said", "Notes", "Date"] ) # Verify CSV has correct number of data rows (3 messages + header + summary) lines = csv_content.split("\n") assert len(lines) > 5 # Summary + header + at least 3 data rows # Verify accuracy in CSV assert "Accuracy %" in csv_content assert "66" in csv_content or "67" in csv_content # 2/3 ≈ 66.67% def test_workflow_with_all_correct_feedback( self, temp_storage_dir, test_data_generator, assertion_helpers ): """Test workflow where all feedback is marked as correct.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET session = test_data_generator.create_verification_session( session_id="all_correct_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Mark all messages as correct for message in dataset.messages[:5]: handler.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.90, classifier_indicators=["positive"], ) # Verify all are correct stats = handler.get_session_statistics() assert stats["verified_count"] == 5 assert stats["correct_count"] == 5 assert stats["incorrect_count"] == 0 assert stats["accuracy"] == 100.0 # Export and verify csv_content = store.export_to_csv(session.session_id) assert "100.0" in csv_content # 100% accuracy def test_workflow_with_all_incorrect_feedback( self, temp_storage_dir, test_data_generator, assertion_helpers ): """Test workflow where all feedback is marked as incorrect.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.SUICIDAL_IDEATION_DATASET session = test_data_generator.create_verification_session( session_id="all_incorrect_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Mark all messages as incorrect (change red to yellow) for message in dataset.messages[:5]: handler.handle_incorrect_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.90, classifier_indicators=["severe"], ground_truth_label="yellow", # Wrong correction verifier_notes="Classifier was wrong", ) # Verify all are incorrect stats = handler.get_session_statistics() assert stats["verified_count"] == 5 assert stats["correct_count"] == 0 assert stats["incorrect_count"] == 5 assert stats["accuracy"] == 0.0 # Export and verify csv_content = store.export_to_csv(session.session_id) assert "0.0" in csv_content # 0% accuracy def test_workflow_with_mixed_classifications( self, temp_storage_dir, test_data_generator, assertion_helpers ): """Test workflow with mixed classification types.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET session = test_data_generator.create_verification_session( session_id="mixed_class_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Process messages and verify accuracy by type for message in dataset.messages[:6]: handler.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) stats = handler.get_session_statistics() # Verify accuracy by type is calculated assert "accuracy_by_type" in stats assert "green" in stats["accuracy_by_type"] assert "yellow" in stats["accuracy_by_type"] assert "red" in stats["accuracy_by_type"] class TestSessionResumptionWorkflow: """Tests for session resumption workflow.""" def test_resume_session_after_partial_verification( self, temp_storage_dir, test_data_generator ): """Test resuming a session after partial verification.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.ANXIETY_WORRY_DATASET # Create and partially complete a session session = test_data_generator.create_verification_session( session_id="resume_test_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Process first 3 messages for message in dataset.messages[:3]: handler.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["anxiety"], ) # Get stats before closing stats_before = handler.get_session_statistics() assert stats_before["verified_count"] == 3 # Simulate closing and reopening the session loaded_session = store.load_session(session.session_id) assert loaded_session is not None assert len(loaded_session.verifications) == 3 # Resume with new queue manager and handler queue_manager_resumed = MessageQueueManager(loaded_session) queue_manager_resumed.initialize_queue(dataset.messages) handler_resumed = VerificationFeedbackHandler( loaded_session, store, queue_manager_resumed ) # Verify we can continue from where we left off stats_after = handler_resumed.get_session_statistics() assert stats_after["verified_count"] == 3 assert stats_after["correct_count"] == 3 # Process more messages for message in dataset.messages[3:5]: handler_resumed.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["anxiety"], ) # Verify total count increased stats_final = handler_resumed.get_session_statistics() assert stats_final["verified_count"] == 5 def test_resume_session_preserves_all_data( self, temp_storage_dir, test_data_generator, assertion_helpers ): """Test that resuming a session preserves all verification data.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.MIXED_SCENARIOS_DATASET session = test_data_generator.create_verification_session( session_id="preserve_data_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Create records with specific notes test_notes = [ "First message note", "Second message note", "Third message note", ] for i, message in enumerate(dataset.messages[:3]): if i == 0: handler.handle_correct_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) else: handler.handle_incorrect_feedback( message=message, classifier_decision=message.pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ground_truth_label="green" if message.pre_classified_label != "green" else "red", verifier_notes=test_notes[i], ) # Load session and verify data is preserved loaded_session = store.load_session(session.session_id) assert len(loaded_session.verifications) == 3 assert loaded_session.verifications[0].is_correct is True assert loaded_session.verifications[1].verifier_notes == test_notes[1] assert loaded_session.verifications[2].verifier_notes == test_notes[2] def test_get_last_session_returns_most_recent( self, temp_storage_dir, test_data_generator ): """Test that get_last_session returns the most recently created session.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) # Create multiple sessions session1 = test_data_generator.create_verification_session( session_id="session_1", verifier_name="Verifier 1", ) store.save_session(session1) session2 = test_data_generator.create_verification_session( session_id="session_2", verifier_name="Verifier 2", ) store.save_session(session2) session3 = test_data_generator.create_verification_session( session_id="session_3", verifier_name="Verifier 3", ) store.save_session(session3) # Get last session last_session = store.get_last_session() # Should be session 3 (most recent) assert last_session is not None assert last_session.session_id == "session_3" class TestErrorRecoveryWorkflows: """Tests for error recovery workflows.""" def test_recovery_from_failed_feedback_submission( self, temp_storage_dir, test_data_generator ): """Test recovery when feedback submission fails.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET session = test_data_generator.create_verification_session( session_id="error_recovery_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Try to handle feedback with missing correction (should fail) with pytest.raises(Exception): handler.handle_incorrect_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ground_truth_label="", # Missing correction verifier_notes="", ) # Verify session is still in valid state loaded_session = store.load_session(session.session_id) assert len(loaded_session.verifications) == 0 # No records added # Should be able to retry with valid data result = handler.handle_correct_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) assert result is True # Verify record was saved on retry loaded_session = store.load_session(session.session_id) assert len(loaded_session.verifications) == 1 def test_recovery_from_csv_export_failure( self, temp_storage_dir, test_data_generator ): """Test recovery when CSV export fails.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) session = test_data_generator.create_verification_session( session_id="csv_error_001", total_messages=0, ) store.save_session(session) # Try to export with no verified messages (should fail) # The error message is formatted by the error handler with pytest.raises((ValueError, RuntimeError)): store.export_to_csv(session.session_id) # Add some messages and retry dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) handler.handle_correct_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) # Now export should succeed csv_content = store.export_to_csv(session.session_id) assert csv_content is not None assert len(csv_content) > 0 def test_recovery_from_session_load_failure( self, temp_storage_dir, test_data_generator ): """Test recovery when session load fails.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) # Try to load non-existent session loaded_session = store.load_session("non_existent_session") assert loaded_session is None # Should be able to create new session session = test_data_generator.create_verification_session( session_id="recovery_new_session", ) store.save_session(session) # Now load should succeed loaded_session = store.load_session("recovery_new_session") assert loaded_session is not None assert loaded_session.session_id == "recovery_new_session" def test_recovery_from_invalid_correction_selection( self, temp_storage_dir, test_data_generator ): """Test recovery when invalid correction is selected.""" store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.ANXIETY_WORRY_DATASET session = test_data_generator.create_verification_session( session_id="invalid_correction_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Try with invalid correction with pytest.raises(Exception): handler.handle_incorrect_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ground_truth_label="invalid_option", verifier_notes="", ) # Verify session is still valid loaded_session = store.load_session(session.session_id) assert len(loaded_session.verifications) == 0 # Should be able to retry with valid correction result = handler.handle_incorrect_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ground_truth_label="red", verifier_notes="", ) assert result is True def test_recovery_from_completed_session_modification_attempt( self, temp_storage_dir, test_data_generator ): """Test recovery when attempting to modify a completed session.""" from src.core.verification_feedback_handler import FeedbackValidationError store = JSONVerificationStore(storage_dir=temp_storage_dir) dataset = TestDatasetManager.HEALTHY_POSITIVE_DATASET session = test_data_generator.create_verification_session( session_id="completed_session_001", dataset_id=dataset.dataset_id, dataset_name=dataset.name, total_messages=len(dataset.messages), ) store.save_session(session) queue_manager = MessageQueueManager(session) queue_manager.initialize_queue(dataset.messages) handler = VerificationFeedbackHandler(session, store, queue_manager) # Add some feedback handler.handle_correct_feedback( message=dataset.messages[0], classifier_decision=dataset.messages[0].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) # Mark session as complete store.mark_session_complete(session.session_id) # Try to add more feedback (should fail with FeedbackValidationError) with pytest.raises(FeedbackValidationError, match="Cannot modify completed session"): handler.handle_correct_feedback( message=dataset.messages[1], classifier_decision=dataset.messages[1].pre_classified_label, classifier_confidence=0.85, classifier_indicators=["test"], ) # Verify original feedback is still there loaded_session = store.load_session(session.session_id) assert len(loaded_session.verifications) == 1 assert loaded_session.is_complete is True