Spaces:
Sleeping
Sleeping
| """ | |
| Property-based tests for prompt optimization system. | |
| These tests verify correctness properties across multiple inputs and scenarios | |
| using the Hypothesis library for property-based testing. | |
| """ | |
| import sys | |
| import os | |
| sys.path.append('src') | |
| import pytest | |
| from hypothesis import given, strategies as st, settings | |
| from datetime import datetime | |
| from typing import List, Dict, Any | |
| from config.prompt_management import ( | |
| PromptController, IndicatorCatalog, RulesCatalog, TemplateCatalog | |
| ) | |
| from config.prompt_management.data_models import ( | |
| Indicator, Rule, Template, IndicatorCategory, ValidationResult, | |
| ConversationHistory, Message, Classification, ScenarioType | |
| ) | |
| class TestSharedComponentPropagation: | |
| """ | |
| **Feature: prompt-optimization, Property 5: Shared Component Update Propagation** | |
| **Validates: Requirements 5.1, 5.2, 5.3, 5.4, 5.5** | |
| Property: For any update to shared prompt components (indicators, rules, categories), | |
| all dependent AI agents should receive the changes consistently while maintaining | |
| backward compatibility and validation integrity. | |
| """ | |
| def test_indicator_propagation_consistency(self, indicator_name: str, definition: str, | |
| severity_weight: float, examples: List[str]): | |
| """ | |
| Test that indicator updates propagate consistently to all AI agents. | |
| Property: When an indicator is added to the shared catalog, all AI agents | |
| should receive the same indicator definition in their prompt configurations. | |
| """ | |
| # Create controller and get initial state | |
| controller = PromptController() | |
| # Create test indicator | |
| test_indicator = Indicator( | |
| name=indicator_name, | |
| category=IndicatorCategory.EMOTIONAL, | |
| definition=definition, | |
| examples=examples, | |
| severity_weight=severity_weight | |
| ) | |
| # Add indicator to catalog | |
| success = controller.indicator_catalog.add_indicator(test_indicator) | |
| # Skip if indicator already exists (duplicate name) | |
| if not success: | |
| return | |
| # Clear cache to force reload | |
| controller._prompt_cache.clear() | |
| # Get prompt configurations for different agents | |
| spiritual_config = controller.get_prompt('spiritual_monitor') | |
| triage_config = controller.get_prompt('triage_question') | |
| evaluator_config = controller.get_prompt('triage_evaluator') | |
| # Verify all agents have the same indicator | |
| spiritual_indicators = {ind.name: ind for ind in spiritual_config.shared_indicators} | |
| triage_indicators = {ind.name: ind for ind in triage_config.shared_indicators} | |
| evaluator_indicators = {ind.name: ind for ind in evaluator_config.shared_indicators} | |
| # Property assertion: All agents should have the same indicator | |
| assert indicator_name in spiritual_indicators, f"Spiritual monitor missing indicator: {indicator_name}" | |
| assert indicator_name in triage_indicators, f"Triage question missing indicator: {indicator_name}" | |
| assert indicator_name in evaluator_indicators, f"Triage evaluator missing indicator: {indicator_name}" | |
| # Property assertion: Indicator definitions should be identical | |
| spiritual_ind = spiritual_indicators[indicator_name] | |
| triage_ind = triage_indicators[indicator_name] | |
| evaluator_ind = evaluator_indicators[indicator_name] | |
| assert spiritual_ind.definition == definition, "Spiritual monitor has different definition" | |
| assert triage_ind.definition == definition, "Triage question has different definition" | |
| assert evaluator_ind.definition == definition, "Triage evaluator has different definition" | |
| assert spiritual_ind.severity_weight == severity_weight, "Spiritual monitor has different weight" | |
| assert triage_ind.severity_weight == severity_weight, "Triage question has different weight" | |
| assert evaluator_ind.severity_weight == severity_weight, "Triage evaluator has different weight" | |
| assert spiritual_ind.examples == examples, "Spiritual monitor has different examples" | |
| assert triage_ind.examples == examples, "Triage question has different examples" | |
| assert evaluator_ind.examples == examples, "Triage evaluator has different examples" | |
| def test_rule_propagation_consistency(self, rule_id: str, description: str, | |
| condition: str, action: str, priority: int): | |
| """ | |
| Test that rule updates propagate consistently to all AI agents. | |
| Property: When a rule is added to the shared catalog, all AI agents | |
| should receive the same rule definition in their prompt configurations. | |
| """ | |
| # Create controller | |
| controller = PromptController() | |
| # Create test rule | |
| test_rule = Rule( | |
| rule_id=rule_id, | |
| description=description, | |
| condition=condition, | |
| action=action, | |
| priority=priority | |
| ) | |
| # Add rule to catalog | |
| success = controller.rules_catalog.add_rule(test_rule) | |
| # Skip if rule already exists (duplicate ID) | |
| if not success: | |
| return | |
| # Clear cache to force reload | |
| controller._prompt_cache.clear() | |
| # Get prompt configurations for different agents | |
| spiritual_config = controller.get_prompt('spiritual_monitor') | |
| triage_config = controller.get_prompt('triage_question') | |
| evaluator_config = controller.get_prompt('triage_evaluator') | |
| # Verify all agents have the same rule | |
| spiritual_rules = {rule.rule_id: rule for rule in spiritual_config.shared_rules} | |
| triage_rules = {rule.rule_id: rule for rule in triage_config.shared_rules} | |
| evaluator_rules = {rule.rule_id: rule for rule in evaluator_config.shared_rules} | |
| # Property assertion: All agents should have the same rule | |
| assert rule_id in spiritual_rules, f"Spiritual monitor missing rule: {rule_id}" | |
| assert rule_id in triage_rules, f"Triage question missing rule: {rule_id}" | |
| assert rule_id in evaluator_rules, f"Triage evaluator missing rule: {rule_id}" | |
| # Property assertion: Rule definitions should be identical | |
| spiritual_rule = spiritual_rules[rule_id] | |
| triage_rule = triage_rules[rule_id] | |
| evaluator_rule = evaluator_rules[rule_id] | |
| assert spiritual_rule.description == description, "Spiritual monitor has different description" | |
| assert triage_rule.description == description, "Triage question has different description" | |
| assert evaluator_rule.description == description, "Triage evaluator has different description" | |
| assert spiritual_rule.condition == condition, "Spiritual monitor has different condition" | |
| assert triage_rule.condition == condition, "Triage question has different condition" | |
| assert evaluator_rule.condition == condition, "Triage evaluator has different condition" | |
| assert spiritual_rule.priority == priority, "Spiritual monitor has different priority" | |
| assert triage_rule.priority == priority, "Triage question has different priority" | |
| assert evaluator_rule.priority == priority, "Triage evaluator has different priority" | |
| def test_template_propagation_consistency(self, template_id: str, name: str, | |
| content: str, category: str): | |
| """ | |
| Test that template updates propagate consistently to all AI agents. | |
| Property: When a template is added to the shared catalog, all AI agents | |
| should receive the same template definition in their prompt configurations. | |
| """ | |
| # Create controller | |
| controller = PromptController() | |
| # Create test template | |
| test_template = Template( | |
| template_id=template_id, | |
| name=name, | |
| content=content, | |
| variables=[], # Simplified for testing | |
| category=category | |
| ) | |
| # Add template to catalog | |
| success = controller.template_catalog.add_template(test_template) | |
| # Skip if template already exists (duplicate ID) | |
| if not success: | |
| return | |
| # Clear cache to force reload | |
| controller._prompt_cache.clear() | |
| # Get prompt configurations for different agents | |
| spiritual_config = controller.get_prompt('spiritual_monitor') | |
| triage_config = controller.get_prompt('triage_question') | |
| evaluator_config = controller.get_prompt('triage_evaluator') | |
| # Verify all agents have the same template | |
| spiritual_templates = {tmpl.template_id: tmpl for tmpl in spiritual_config.templates} | |
| triage_templates = {tmpl.template_id: tmpl for tmpl in triage_config.templates} | |
| evaluator_templates = {tmpl.template_id: tmpl for tmpl in evaluator_config.templates} | |
| # Property assertion: All agents should have the same template | |
| assert template_id in spiritual_templates, f"Spiritual monitor missing template: {template_id}" | |
| assert template_id in triage_templates, f"Triage question missing template: {template_id}" | |
| assert template_id in evaluator_templates, f"Triage evaluator missing template: {template_id}" | |
| # Property assertion: Template definitions should be identical | |
| spiritual_tmpl = spiritual_templates[template_id] | |
| triage_tmpl = triage_templates[template_id] | |
| evaluator_tmpl = evaluator_templates[template_id] | |
| assert spiritual_tmpl.name == name, "Spiritual monitor has different template name" | |
| assert triage_tmpl.name == name, "Triage question has different template name" | |
| assert evaluator_tmpl.name == name, "Triage evaluator has different template name" | |
| assert spiritual_tmpl.content == content, "Spiritual monitor has different template content" | |
| assert triage_tmpl.content == content, "Triage question has different template content" | |
| assert evaluator_tmpl.content == content, "Triage evaluator has different template content" | |
| assert spiritual_tmpl.category == category, "Spiritual monitor has different template category" | |
| assert triage_tmpl.category == category, "Triage question has different template category" | |
| assert evaluator_tmpl.category == category, "Triage evaluator has different template category" | |
| def test_validation_integrity_maintained(self): | |
| """ | |
| Test that validation integrity is maintained during component updates. | |
| Property: When shared components are updated, the validation system | |
| should continue to work correctly and catch inconsistencies. | |
| """ | |
| controller = PromptController() | |
| # Initial validation should pass | |
| initial_result = controller.validate_consistency() | |
| assert isinstance(initial_result, ValidationResult), "Validation should return ValidationResult" | |
| # Add a valid indicator | |
| valid_indicator = Indicator( | |
| name="test_valid_indicator", | |
| category=IndicatorCategory.EMOTIONAL, | |
| definition="A test indicator for validation", | |
| examples=["test example"], | |
| severity_weight=0.5 | |
| ) | |
| controller.indicator_catalog.add_indicator(valid_indicator) | |
| # Validation should still work | |
| post_update_result = controller.validate_consistency() | |
| assert isinstance(post_update_result, ValidationResult), "Validation should still work after update" | |
| # Add an invalid indicator (invalid severity weight) | |
| invalid_indicator = Indicator( | |
| name="test_invalid_indicator", | |
| category=IndicatorCategory.EMOTIONAL, | |
| definition="An invalid test indicator", | |
| examples=["test example"], | |
| severity_weight=2.0 # Invalid: > 1.0 | |
| ) | |
| controller.indicator_catalog.add_indicator(invalid_indicator) | |
| # Validation should catch the error | |
| validation_with_error = controller.validate_consistency() | |
| assert not validation_with_error.is_valid, "Validation should catch invalid severity weight" | |
| assert any("severity weight" in error.lower() for error in validation_with_error.errors), \ | |
| "Should have severity weight error" | |
| def test_session_isolation_property(self, session_id: str, agent_type: str, session_prompt: str): | |
| """ | |
| Test that session overrides don't affect other sessions or base prompts. | |
| Property: Session-level prompt overrides should be isolated and not affect | |
| other sessions or the base centralized prompts. | |
| """ | |
| controller = PromptController() | |
| # Get base prompt configuration | |
| base_config = controller.get_prompt(agent_type) | |
| base_prompt_content = base_config.base_prompt | |
| # Set session override | |
| success = controller.set_session_override(agent_type, session_prompt, session_id) | |
| assert success, "Session override should be set successfully" | |
| # Get prompt with session override | |
| session_config = controller.get_prompt(agent_type, session_id=session_id) | |
| # Property assertion: Session should have override content | |
| assert session_config.session_override == session_prompt, "Session should have override content" | |
| # Property assertion: Base prompt should be unchanged | |
| base_config_after = controller.get_prompt(agent_type) | |
| assert base_config_after.base_prompt == base_prompt_content, "Base prompt should be unchanged" | |
| # Property assertion: Different session should not be affected | |
| different_session_id = f"different_{session_id}" | |
| different_session_config = controller.get_prompt(agent_type, session_id=different_session_id) | |
| assert different_session_config.session_override is None, "Different session should not have override" | |
| # Clean up | |
| controller.clear_session_overrides(session_id) | |
| # Property assertion: After cleanup, session should revert to base | |
| cleaned_config = controller.get_prompt(agent_type, session_id=session_id) | |
| assert cleaned_config.session_override is None, "Session should revert to base after cleanup" | |
| class TestTargetedQuestionGeneration: | |
| """ | |
| **Feature: prompt-optimization, Property 2: Scenario-Targeted Question Generation** | |
| **Validates: Requirements 2.1, 2.2, 2.3, 2.4, 2.5** | |
| Property: For any YELLOW scenario (loss of interest, loss of loved one, lack of support, | |
| vague stress, sleep issues), the generated triage question should specifically address | |
| the distinction between emotional distress and external factors relevant to that scenario type. | |
| """ | |
| def test_scenario_specific_question_targeting(self, scenario_type: str, patient_statement: str, context_clues: List[str]): | |
| """ | |
| Test that questions are targeted to specific YELLOW scenarios. | |
| Property: Generated questions should address the specific ambiguity | |
| relevant to each scenario type (emotional vs external factors). | |
| """ | |
| from config.prompt_management.data_models import YellowScenario, ScenarioType | |
| # Create scenario based on the type | |
| try: | |
| scenario_enum = ScenarioType(scenario_type) | |
| except ValueError: | |
| # Skip invalid scenario types | |
| return | |
| scenario = YellowScenario( | |
| scenario_type=scenario_enum, | |
| patient_statement=patient_statement, | |
| context_clues=context_clues, | |
| target_clarification=f"Clarify if {scenario_type} causes emotional distress", | |
| question_patterns=[] | |
| ) | |
| # Property assertion: Scenario should have valid structure | |
| assert scenario.scenario_type == scenario_enum | |
| assert len(scenario.patient_statement) >= 10 | |
| assert len(scenario.context_clues) >= 1 | |
| # Property assertion: Target clarification should be scenario-specific | |
| assert scenario_type in scenario.target_clarification.lower() | |
| def test_loss_of_interest_question_patterns(self, loss_statements: List[str]): | |
| """ | |
| Test that loss of interest scenarios generate appropriate questions. | |
| Property: Questions for loss of interest should distinguish between | |
| emotional impact and practical circumstances. | |
| """ | |
| # Expected question elements for loss of interest scenarios | |
| expected_elements = [ | |
| "emotional", "emotionally", "weighing", "circumstances", | |
| "time", "practical", "meaningful", "distressing" | |
| ] | |
| for statement in loss_statements: | |
| # Property assertion: Statement should contain loss of interest indicators | |
| loss_indicators = ["used to", "don't", "can't", "stopped"] | |
| assert any(indicator in statement.lower() for indicator in loss_indicators), \ | |
| f"Statement should contain loss of interest indicators: {statement}" | |
| # Property assertion: Should be classifiable as loss of interest scenario | |
| engagement_indicators = ["love", "enjoy", "do", "playing", "hobbies"] | |
| assert any(indicator in statement.lower() for indicator in engagement_indicators), \ | |
| f"Statement should express previous engagement: {statement}" | |
| def test_loss_of_loved_one_question_patterns(self, grief_statements: List[str]): | |
| """ | |
| Test that loss of loved one scenarios generate appropriate questions. | |
| Property: Questions for grief should focus on coping mechanisms | |
| and emotional state rather than practical arrangements. | |
| """ | |
| # Expected question elements for grief scenarios | |
| expected_elements = [ | |
| "coping", "processing", "difficult", "emotionally", | |
| "grief", "loss", "feeling", "support" | |
| ] | |
| for statement in grief_statements: | |
| # Property assertion: Statement should contain loss indicators | |
| loss_indicators = ["passed away", "died", "lost", "put", "down"] | |
| assert any(indicator in statement.lower() for indicator in loss_indicators), \ | |
| f"Statement should contain loss indicators: {statement}" | |
| # Property assertion: Should reference a relationship | |
| relationship_indicators = ["mother", "father", "husband", "wife", "dog", "cat"] | |
| assert any(rel in statement.lower() for rel in relationship_indicators), \ | |
| f"Statement should reference a relationship: {statement}" | |
| def test_no_support_question_patterns(self, support_statements: List[str]): | |
| """ | |
| Test that lack of support scenarios generate appropriate questions. | |
| Property: Questions should distinguish between practical isolation | |
| and emotional distress from lack of support. | |
| """ | |
| # Expected question elements for support scenarios | |
| expected_elements = [ | |
| "affecting", "emotionally", "practical", "challenge", | |
| "support", "alone", "isolated", "help" | |
| ] | |
| for statement in support_statements: | |
| # Property assertion: Statement should contain isolation indicators | |
| isolation_indicators = ["don't have", "alone", "no one", "no family"] | |
| assert any(indicator in statement.lower() for indicator in isolation_indicators), \ | |
| f"Statement should contain isolation indicators: {statement}" | |
| def test_vague_stress_question_patterns(self, stress_statements: List[str]): | |
| """ | |
| Test that vague stress scenarios generate clarifying questions. | |
| Property: Questions should identify specific causes of stress | |
| to determine if it's emotional distress or external factors. | |
| """ | |
| # Expected question elements for vague stress scenarios | |
| expected_elements = [ | |
| "causing", "source", "specifically", "what", "more about", | |
| "tell me", "explain", "describe" | |
| ] | |
| for statement in stress_statements: | |
| # Property assertion: Statement should be vague about cause | |
| vague_indicators = ["some", "a bit", "things", "it's been"] | |
| assert any(indicator in statement.lower() for indicator in vague_indicators), \ | |
| f"Statement should be vague about cause: {statement}" | |
| # Property assertion: Should mention stress/difficulty without specifics | |
| stress_indicators = ["stress", "difficult", "worried", "hard"] | |
| assert any(indicator in statement.lower() for indicator in stress_indicators), \ | |
| f"Statement should mention stress/difficulty: {statement}" | |
| def test_sleep_issues_question_patterns(self, sleep_statements: List[str]): | |
| """ | |
| Test that sleep issue scenarios generate appropriate questions. | |
| Property: Questions should distinguish between medical causes | |
| and emotional/mental causes of sleep problems. | |
| """ | |
| # Expected question elements for sleep scenarios | |
| expected_elements = [ | |
| "mind", "thoughts", "worrying", "medical", "medication", | |
| "physical", "emotional", "keeping you awake" | |
| ] | |
| for statement in sleep_statements: | |
| # Property assertion: Statement should contain sleep indicators | |
| sleep_indicators = ["sleep", "racing", "wake", "night"] | |
| assert any(indicator in statement.lower() for indicator in sleep_indicators), \ | |
| f"Statement should contain sleep indicators: {statement}" | |
| def test_question_effectiveness_validation(self): | |
| """ | |
| Test that question effectiveness can be validated. | |
| Property: The system should be able to assess whether generated | |
| questions effectively target the intended clarification. | |
| """ | |
| from config.prompt_management.data_models import ScenarioType | |
| # Test scenarios with expected effectiveness | |
| test_cases = [ | |
| { | |
| "scenario": ScenarioType.LOSS_OF_INTEREST, | |
| "good_question": "Is that something that's been weighing on you emotionally, or is it more about time or circumstances?", | |
| "poor_question": "How are you feeling about that?", | |
| "expected_better": "good_question" | |
| }, | |
| { | |
| "scenario": ScenarioType.VAGUE_STRESS, | |
| "good_question": "Can you tell me more about what's been causing that stress?", | |
| "poor_question": "That sounds difficult.", | |
| "expected_better": "good_question" | |
| } | |
| ] | |
| for case in test_cases: | |
| scenario = case["scenario"] | |
| good_q = case["good_question"] | |
| poor_q = case["poor_question"] | |
| # Property assertion: Good questions should be more specific | |
| assert len(good_q.split()) > len(poor_q.split()) or "what" in good_q.lower() or "how" in good_q.lower(), \ | |
| f"Good question should be more specific: {good_q}" | |
| # Property assertion: Good questions should contain clarifying words | |
| clarifying_words = ["what", "how", "why", "can you", "tell me", "more about"] | |
| good_has_clarifying = any(word in good_q.lower() for word in clarifying_words) | |
| poor_has_clarifying = any(word in poor_q.lower() for word in clarifying_words) | |
| assert good_has_clarifying or not poor_has_clarifying, \ | |
| f"Good question should be more clarifying than poor question" | |
| def test_question_language_matching(self): | |
| """ | |
| Test that questions match the patient's language. | |
| Property: Generated questions should be in the same language | |
| as the patient's input message. | |
| """ | |
| # This is a simplified test - in practice, language detection would be more complex | |
| test_cases = [ | |
| {"input": "I feel stressed", "language": "english"}, | |
| {"input": "Je me sens stressé", "language": "french"}, | |
| {"input": "Me siento estresado", "language": "spanish"} | |
| ] | |
| for case in test_cases: | |
| input_text = case["input"] | |
| expected_lang = case["language"] | |
| # Property assertion: Input should be non-empty | |
| assert len(input_text.strip()) > 0, "Input should be non-empty" | |
| # Property assertion: Language should be identifiable | |
| assert expected_lang in ["english", "french", "spanish"], "Language should be supported" | |
| # In a real implementation, we would test that the generated question | |
| # matches the detected language of the input | |
| class TestComponentConsistency: | |
| """ | |
| **Feature: prompt-optimization, Property 1: Component Consistency Enforcement** | |
| **Validates: Requirements 1.1, 1.2, 1.3, 1.4, 1.5** | |
| Property: For any spiritual distress indicator or classification rule defined in shared components, | |
| all AI agents (Spiritual_Monitor, Triage_Evaluator) should apply identical definitions, | |
| terminology, and evaluation logic when processing the same message. | |
| """ | |
| def test_identical_shared_components_across_agents(self, message_content: str, agent_types: List[str]): | |
| """ | |
| Test that all AI agents receive identical shared components. | |
| Property: When multiple AI agents request prompt configurations, they should | |
| all receive identical shared indicators, rules, and category definitions. | |
| """ | |
| controller = PromptController() | |
| # Get prompt configurations for different agents | |
| configs = {} | |
| for agent_type in agent_types: | |
| configs[agent_type] = controller.get_prompt(agent_type) | |
| # Property assertion: All agents should have identical shared indicators | |
| if len(configs) > 1: | |
| agent_names = list(configs.keys()) | |
| base_agent = agent_names[0] | |
| base_indicators = {ind.name: ind.to_dict() for ind in configs[base_agent].shared_indicators} | |
| for other_agent in agent_names[1:]: | |
| other_indicators = {ind.name: ind.to_dict() for ind in configs[other_agent].shared_indicators} | |
| # Check that indicator sets are identical | |
| assert set(base_indicators.keys()) == set(other_indicators.keys()), \ | |
| f"Indicator sets differ between {base_agent} and {other_agent}" | |
| # Check that indicator definitions are identical | |
| for ind_name in base_indicators: | |
| assert base_indicators[ind_name] == other_indicators[ind_name], \ | |
| f"Indicator {ind_name} differs between {base_agent} and {other_agent}" | |
| # Property assertion: All agents should have identical shared rules | |
| if len(configs) > 1: | |
| base_rules = {rule.rule_id: rule.to_dict() for rule in configs[base_agent].shared_rules} | |
| for other_agent in agent_names[1:]: | |
| other_rules = {rule.rule_id: rule.to_dict() for rule in configs[other_agent].shared_rules} | |
| # Check that rule sets are identical | |
| assert set(base_rules.keys()) == set(other_rules.keys()), \ | |
| f"Rule sets differ between {base_agent} and {other_agent}" | |
| # Check that rule definitions are identical | |
| for rule_id in base_rules: | |
| assert base_rules[rule_id] == other_rules[rule_id], \ | |
| f"Rule {rule_id} differs between {base_agent} and {other_agent}" | |
| def test_consistent_category_definitions(self, category_name: str, agent_types: List[str]): | |
| """ | |
| Test that category definitions are consistent across all agents. | |
| Property: All AI agents should use identical category definitions | |
| for GREEN, YELLOW, and RED classifications. | |
| """ | |
| controller = PromptController() | |
| # Get category definition from shared components | |
| category_def = controller.category_definitions.get_category_definition(category_name) | |
| assert category_def is not None, f"Category {category_name} should be defined" | |
| # Verify all agents have access to the same category definitions | |
| for agent_type in agent_types: | |
| config = controller.get_prompt(agent_type) | |
| # The category definitions should be accessible through the controller | |
| agent_category_def = controller.category_definitions.get_category_definition(category_name) | |
| # Property assertion: Category definitions should be identical | |
| assert agent_category_def == category_def, \ | |
| f"Category {category_name} definition differs for agent {agent_type}" | |
| def test_terminology_consistency_validation(self): | |
| """ | |
| Test that terminology validation catches inconsistencies. | |
| Property: The validation system should detect when different agents | |
| use inconsistent terminology for the same concepts. | |
| """ | |
| controller = PromptController() | |
| # Run consistency validation | |
| validation_result = controller.validate_consistency() | |
| # Property assertion: Validation should complete successfully | |
| assert isinstance(validation_result, ValidationResult), \ | |
| "Validation should return a ValidationResult object" | |
| # If there are errors, they should be specific and actionable | |
| for error in validation_result.errors: | |
| assert isinstance(error, str) and len(error) > 0, \ | |
| "Validation errors should be non-empty strings" | |
| # Warnings should also be specific | |
| for warning in validation_result.warnings: | |
| assert isinstance(warning, str) and len(warning) > 0, \ | |
| "Validation warnings should be non-empty strings" | |
| def test_update_propagation_consistency(self, indicator_updates: List[tuple]): | |
| """ | |
| Test that updates to shared components propagate consistently. | |
| Property: When shared components are updated, all dependent AI agents | |
| should receive the updates in the same way. | |
| """ | |
| controller = PromptController() | |
| # Apply updates to indicators | |
| added_indicators = [] | |
| for name, definition, weight in indicator_updates: | |
| indicator = Indicator( | |
| name=f"test_{name}", | |
| category=IndicatorCategory.EMOTIONAL, | |
| definition=definition, | |
| examples=[f"Example for {name}"], | |
| severity_weight=weight | |
| ) | |
| success = controller.indicator_catalog.add_indicator(indicator) | |
| if success: | |
| added_indicators.append(indicator.name) | |
| if not added_indicators: | |
| return # Skip if no indicators were added | |
| # Clear cache to force reload | |
| controller._prompt_cache.clear() | |
| # Get configurations for multiple agents | |
| agent_types = ['spiritual_monitor', 'triage_question', 'triage_evaluator'] | |
| configs = {agent: controller.get_prompt(agent) for agent in agent_types} | |
| # Property assertion: All agents should have the same updated indicators | |
| for indicator_name in added_indicators: | |
| for agent_type in agent_types: | |
| agent_indicators = {ind.name: ind for ind in configs[agent_type].shared_indicators} | |
| assert indicator_name in agent_indicators, \ | |
| f"Agent {agent_type} missing updated indicator: {indicator_name}" | |
| # Clean up | |
| for indicator_name in added_indicators: | |
| controller.indicator_catalog.remove_indicator(indicator_name) | |
| def test_rule_priority_consistency(self): | |
| """ | |
| Test that rule priorities are applied consistently across agents. | |
| Property: All agents should receive rules in the same priority order | |
| and apply them consistently. | |
| """ | |
| controller = PromptController() | |
| # Get rules from multiple agents | |
| agent_types = ['spiritual_monitor', 'triage_question', 'triage_evaluator'] | |
| rule_orders = {} | |
| for agent_type in agent_types: | |
| config = controller.get_prompt(agent_type) | |
| # Sort rules by priority (lower number = higher priority) | |
| sorted_rules = sorted(config.shared_rules, key=lambda r: r.priority) | |
| rule_orders[agent_type] = [rule.rule_id for rule in sorted_rules] | |
| # Property assertion: All agents should have the same rule order | |
| if len(rule_orders) > 1: | |
| agent_names = list(rule_orders.keys()) | |
| base_order = rule_orders[agent_names[0]] | |
| for other_agent in agent_names[1:]: | |
| other_order = rule_orders[other_agent] | |
| assert base_order == other_order, \ | |
| f"Rule priority order differs between {agent_names[0]} and {other_agent}" | |
| class TestConsentLanguageCompliance: | |
| """ | |
| **Feature: prompt-optimization, Property 4: Consent-Based Language Compliance** | |
| **Validates: Requirements 4.1, 4.2, 4.3, 4.4, 4.5** | |
| Property: For any RED classification or consent interaction, the system should generate | |
| messages using only approved non-assumptive language patterns and handle patient responses | |
| (acceptance, decline, ambiguity) appropriately. | |
| """ | |
| def test_consent_message_language_compliance(self, consent_contexts): | |
| """ | |
| Test that all generated consent messages comply with non-assumptive language requirements. | |
| Property: All consent messages should use approved language patterns and avoid | |
| assumptive, pressuring, or religiously presumptive language. | |
| """ | |
| from config.prompt_management.consent_manager import ConsentManager, ConsentMessageType | |
| consent_manager = ConsentManager() | |
| for distress_level, spiritual_mention, context_text in consent_contexts: | |
| context = { | |
| 'distress_level': distress_level, | |
| 'previous_spiritual_mention': spiritual_mention, | |
| 'context_text': context_text | |
| } | |
| # Test all message types | |
| message_types = [ | |
| ConsentMessageType.INITIAL_REQUEST, | |
| ConsentMessageType.CLARIFICATION, | |
| ConsentMessageType.CONFIRMATION, | |
| ConsentMessageType.DECLINE_ACKNOWLEDGMENT | |
| ] | |
| for message_type in message_types: | |
| # Generate message | |
| message = consent_manager.generate_consent_message(message_type, context) | |
| # Property assertion: Message should not be empty | |
| assert len(message.strip()) > 0, f"Generated message should not be empty for {message_type}" | |
| # Property assertion: Message should comply with language requirements | |
| is_compliant, violations = consent_manager.validate_language_compliance(message) | |
| assert is_compliant, f"Message violates language compliance: {violations}. Message: '{message}'" | |
| # Property assertion: Message should contain respectful language | |
| assert consent_manager._contains_respectful_language(message), \ | |
| f"Message should contain respectful language: '{message}'" | |
| def test_patient_response_handling(self, patient_responses): | |
| """ | |
| Test that patient responses are handled appropriately based on their classification. | |
| Property: Patient responses should be correctly classified and handled with | |
| appropriate next steps (accept -> referral, decline -> medical dialogue, ambiguous -> clarification). | |
| """ | |
| from config.prompt_management.consent_manager import ConsentManager, ConsentResponse | |
| consent_manager = ConsentManager() | |
| for response_text, session_id in patient_responses: | |
| # Handle the consent interaction | |
| result = consent_manager.handle_consent_interaction(response_text, session_id) | |
| # Property assertion: Result should have required fields | |
| required_fields = ['action', 'message', 'generate_provider_summary', 'log_referral', 'interaction'] | |
| for field in required_fields: | |
| assert field in result, f"Result missing required field: {field}" | |
| # Property assertion: Action should be valid | |
| valid_actions = ['proceed_with_referral', 'return_to_medical_dialogue', 'request_clarification'] | |
| assert result['action'] in valid_actions, f"Invalid action: {result['action']}" | |
| # Property assertion: Response message should be non-empty and compliant | |
| response_message = result['message'] | |
| assert len(response_message.strip()) > 0, "Response message should not be empty" | |
| is_compliant, violations = consent_manager.validate_language_compliance(response_message) | |
| assert is_compliant, f"Response message violates compliance: {violations}. Message: '{response_message}'" | |
| # Property assertion: Interaction should be properly recorded | |
| interaction = result['interaction'] | |
| assert 'interaction_id' in interaction, "Interaction should have ID" | |
| assert 'patient_response' in interaction, "Interaction should record patient response" | |
| assert interaction['patient_response'] == response_text, "Should record original response" | |
| # Property assertion: Actions should be consistent with response classification | |
| response_classification = ConsentResponse(interaction['response_classification']) | |
| if response_classification == ConsentResponse.ACCEPT: | |
| assert result['action'] == 'proceed_with_referral', "Accept should proceed with referral" | |
| assert result['generate_provider_summary'] == True, "Accept should generate summary" | |
| assert result['log_referral'] == True, "Accept should log referral" | |
| elif response_classification == ConsentResponse.DECLINE: | |
| assert result['action'] == 'return_to_medical_dialogue', "Decline should return to medical dialogue" | |
| assert result['generate_provider_summary'] == False, "Decline should not generate summary" | |
| assert result['log_referral'] == False, "Decline should not log referral" | |
| elif response_classification in [ConsentResponse.AMBIGUOUS, ConsentResponse.UNCLEAR]: | |
| assert result['action'] == 'request_clarification', "Ambiguous should request clarification" | |
| assert result['generate_provider_summary'] == False, "Ambiguous should not generate summary" | |
| assert result['log_referral'] == False, "Ambiguous should not log referral" | |
| assert result.get('requires_follow_up') == True, "Ambiguous should require follow-up" | |
| def test_clarification_question_generation(self, ambiguous_responses): | |
| """ | |
| Test that clarifying questions are generated appropriately for ambiguous responses. | |
| Property: Clarifying questions should be contextually appropriate, non-assumptive, | |
| and help patients make informed decisions about spiritual care. | |
| """ | |
| from config.prompt_management.consent_manager import ConsentManager | |
| consent_manager = ConsentManager() | |
| for response in ambiguous_responses: | |
| # Generate clarification question | |
| clarification = consent_manager.generate_clarification_question(response) | |
| # Property assertion: Clarification should not be empty | |
| assert len(clarification.strip()) > 0, "Clarification question should not be empty" | |
| # Property assertion: Clarification should be compliant | |
| is_compliant, violations = consent_manager.validate_language_compliance(clarification) | |
| assert is_compliant, f"Clarification violates compliance: {violations}. Question: '{clarification}'" | |
| # Property assertion: Clarification should be respectful | |
| assert consent_manager._contains_respectful_language(clarification), \ | |
| f"Clarification should be respectful: '{clarification}'" | |
| # Property assertion: Clarification should be contextually appropriate | |
| response_lower = response.lower() | |
| clarification_lower = clarification.lower() | |
| # Information-seeking responses should get informative clarifications | |
| if any(word in response_lower for word in ['what', 'how', 'tell me', 'involve']): | |
| assert any(word in clarification_lower for word in ['chaplain', 'counselor', 'support', 'team']), \ | |
| f"Information-seeking response should get informative clarification: '{clarification}'" | |
| # Uncertainty responses should get supportive clarifications | |
| elif any(word in response_lower for word in ['maybe', 'not sure', 'don\'t know']): | |
| assert any(word in clarification_lower for word in ['no pressure', 'okay', 'comfortable']), \ | |
| f"Uncertainty response should get supportive clarification: '{clarification}'" | |
| def test_language_validation_accuracy(self, test_messages): | |
| """ | |
| Test that language validation accurately identifies compliant and non-compliant messages. | |
| Property: The validation system should correctly identify assumptive language, | |
| pressure tactics, and religious assumptions in messages. | |
| """ | |
| from config.prompt_management.consent_manager import ConsentManager | |
| consent_manager = ConsentManager() | |
| # Test with known compliant messages | |
| compliant_messages = [ | |
| "Would you be interested in speaking with someone from our spiritual care team?", | |
| "Our spiritual care team is available if you'd like to connect with them.", | |
| "I understand and respect your decision.", | |
| "Could you help me understand what would be most helpful for you?" | |
| ] | |
| for message in compliant_messages: | |
| is_compliant, violations = consent_manager.validate_language_compliance(message) | |
| assert is_compliant, f"Known compliant message should pass validation: '{message}'. Violations: {violations}" | |
| # Test with known non-compliant messages | |
| non_compliant_messages = [ | |
| "You need to speak with someone from spiritual care.", | |
| "This will help you feel better.", | |
| "Obviously you're struggling with faith issues.", | |
| "You should pray about this.", | |
| "God will help you through this." | |
| ] | |
| for message in non_compliant_messages: | |
| is_compliant, violations = consent_manager.validate_language_compliance(message) | |
| assert not is_compliant, f"Known non-compliant message should fail validation: '{message}'" | |
| assert len(violations) > 0, f"Non-compliant message should have violations: '{message}'" | |
| # Test generated messages | |
| for message in test_messages: | |
| is_compliant, violations = consent_manager.validate_language_compliance(message) | |
| # Property assertion: Validation should return boolean and list | |
| assert isinstance(is_compliant, bool), "Validation should return boolean" | |
| assert isinstance(violations, list), "Violations should be a list" | |
| # Property assertion: If not compliant, should have violations | |
| if not is_compliant: | |
| assert len(violations) > 0, f"Non-compliant message should have violations: '{message}'" | |
| # Property assertion: Violations should be descriptive | |
| for violation in violations: | |
| assert isinstance(violation, str), "Violations should be strings" | |
| assert len(violation) > 0, "Violations should be non-empty" | |
| class TestStructuredFeedbackCapture: | |
| """ | |
| **Feature: prompt-optimization, Property 3: Structured Feedback Data Capture** | |
| **Validates: Requirements 3.1, 3.2, 3.3, 3.4, 3.5** | |
| Property: For any system issue (classification error, question problem, referral issue), | |
| the feedback system should capture all predefined structured data fields and store them | |
| in analyzable format according to documentation categories. | |
| """ | |
| def test_structured_feedback_data_capture(self, classification_errors, question_issues, referral_problems): | |
| """ | |
| Test that the feedback system captures all predefined structured data fields | |
| and stores them in analyzable format according to documentation categories. | |
| """ | |
| from config.prompt_management.feedback_system import FeedbackSystem | |
| from config.prompt_management.data_models import ErrorType, ErrorSubcategory, QuestionIssueType, ReferralProblemType, ScenarioType | |
| # Create feedback system with temporary storage | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| feedback_system = FeedbackSystem(storage_path=temp_dir) | |
| recorded_error_ids = [] | |
| recorded_question_ids = [] | |
| recorded_referral_ids = [] | |
| # Record classification errors | |
| for error_type_str, subcategory_str, expected, actual, message, comments, confidence in classification_errors: | |
| error_id = feedback_system.record_classification_error( | |
| error_type=ErrorType(error_type_str), | |
| subcategory=ErrorSubcategory(subcategory_str), | |
| expected_category=expected, | |
| actual_category=actual, | |
| message_content=message, | |
| reviewer_comments=comments, | |
| confidence_level=confidence, | |
| session_id="test_session", | |
| additional_context={"test": True} | |
| ) | |
| recorded_error_ids.append(error_id) | |
| # Record question issues | |
| for issue_type_str, question, scenario_str, comments, severity in question_issues: | |
| issue_id = feedback_system.record_question_issue( | |
| issue_type=QuestionIssueType(issue_type_str), | |
| question_content=question, | |
| scenario_type=ScenarioType(scenario_str), | |
| reviewer_comments=comments, | |
| severity=severity, | |
| session_id="test_session" | |
| ) | |
| recorded_question_ids.append(issue_id) | |
| # Record referral problems | |
| for problem_type_str, referral, comments, severity in referral_problems: | |
| problem_id = feedback_system.record_referral_problem( | |
| problem_type=ReferralProblemType(problem_type_str), | |
| referral_content=referral, | |
| reviewer_comments=comments, | |
| severity=severity, | |
| session_id="test_session", | |
| missing_fields=["contact_info", "urgency_level"] | |
| ) | |
| recorded_referral_ids.append(problem_id) | |
| # Verify all data was captured with required fields | |
| summary = feedback_system.get_feedback_summary() | |
| # Property assertion: All classification errors should be recorded | |
| assert summary['total_errors'] == len(classification_errors), "All classification errors should be recorded" | |
| assert len(recorded_error_ids) == len(classification_errors), "All error IDs should be returned" | |
| # Property assertion: All question issues should be recorded | |
| assert summary['total_question_issues'] == len(question_issues), "All question issues should be recorded" | |
| assert len(recorded_question_ids) == len(question_issues), "All question issue IDs should be returned" | |
| # Property assertion: All referral problems should be recorded | |
| assert summary['total_referral_problems'] == len(referral_problems), "All referral problems should be recorded" | |
| assert len(recorded_referral_ids) == len(referral_problems), "All referral problem IDs should be returned" | |
| # Property assertion: Structured data fields are present and valid | |
| if classification_errors: | |
| errors = feedback_system._load_errors() | |
| for error in errors: | |
| # Required fields must be present | |
| required_fields = ['error_id', 'error_type', 'subcategory', 'expected_category', | |
| 'actual_category', 'message_content', 'reviewer_comments', | |
| 'confidence_level', 'timestamp'] | |
| for field in required_fields: | |
| assert field in error, f"Required field {field} missing from error record" | |
| # Verify data types and constraints | |
| assert isinstance(error['confidence_level'], (int, float)), "Confidence level must be numeric" | |
| assert 0.0 <= error['confidence_level'] <= 1.0, "Confidence level must be between 0.0 and 1.0" | |
| assert error['expected_category'] in ['GREEN', 'YELLOW', 'RED'], "Expected category must be valid" | |
| assert error['actual_category'] in ['GREEN', 'YELLOW', 'RED'], "Actual category must be valid" | |
| assert len(error['error_id']) > 0, "Error ID must be non-empty" | |
| assert len(error['message_content']) >= 20, "Message content must meet minimum length" | |
| # Property assertion: Error pattern analysis works with sufficient data | |
| if len(classification_errors) >= 2: | |
| patterns = feedback_system.analyze_error_patterns(min_frequency=1) | |
| assert isinstance(patterns, list), "Pattern analysis should return list" | |
| # Verify pattern structure | |
| for pattern in patterns: | |
| pattern_dict = pattern.to_dict() | |
| assert 'pattern_id' in pattern_dict, "Pattern must have ID" | |
| assert 'frequency' in pattern_dict, "Pattern must have frequency" | |
| assert 'suggested_improvements' in pattern_dict, "Pattern must have suggestions" | |
| assert pattern_dict['frequency'] >= 1, "Pattern frequency must be positive" | |
| assert isinstance(pattern_dict['suggested_improvements'], list), "Suggestions must be list" | |
| # Property assertion: Improvement suggestions generation works | |
| suggestions = feedback_system.generate_improvement_suggestions() | |
| assert isinstance(suggestions, list), "Suggestions should be a list" | |
| assert all(isinstance(s, str) for s in suggestions), "All suggestions should be strings" | |
| assert all(len(s) > 0 for s in suggestions), "All suggestions should be non-empty" | |
| # Reduced examples for faster testing | |
| def test_error_pattern_analysis_accuracy(self, error_patterns): | |
| """ | |
| Test that error pattern analysis correctly identifies frequent error types. | |
| Property: When multiple errors of the same type are recorded, the pattern | |
| analysis should identify them as significant patterns with appropriate | |
| improvement suggestions. | |
| """ | |
| from config.prompt_management.feedback_system import FeedbackSystem | |
| from config.prompt_management.data_models import ErrorType, ErrorSubcategory | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| feedback_system = FeedbackSystem(storage_path=temp_dir) | |
| # Record multiple errors of each pattern type | |
| total_recorded = {} | |
| for error_type_str, frequency in error_patterns: | |
| total_recorded[error_type_str] = total_recorded.get(error_type_str, 0) + frequency | |
| for i in range(frequency): | |
| feedback_system.record_classification_error( | |
| error_type=ErrorType(error_type_str), | |
| subcategory=ErrorSubcategory.GREEN_TO_YELLOW if error_type_str == 'wrong_classification' else ErrorSubcategory.UNDERESTIMATED_DISTRESS, | |
| expected_category="YELLOW", | |
| actual_category="GREEN", | |
| message_content=f"Unique test message {error_type_str}_{i}_{hash(str(error_patterns))}", | |
| reviewer_comments=f"Test comment {i}", | |
| confidence_level=0.8, | |
| session_id=f"test_session_{error_type_str}_{i}" | |
| ) | |
| # Analyze patterns | |
| patterns = feedback_system.analyze_error_patterns(min_frequency=3) | |
| # Property assertion: Patterns should be identified for frequent error types | |
| pattern_types = [p.pattern_type for p in patterns] | |
| for error_type_str, total_freq in total_recorded.items(): | |
| if total_freq >= 3: | |
| expected_pattern = f"error_type_{error_type_str}" | |
| assert any(expected_pattern in pt for pt in pattern_types), \ | |
| f"Pattern should be identified for frequent error type: {error_type_str}" | |
| # Property assertion: All patterns should have improvement suggestions | |
| for pattern in patterns: | |
| assert len(pattern.suggested_improvements) > 0, f"Pattern {pattern.pattern_type} should have improvement suggestions" | |
| for suggestion in pattern.suggested_improvements: | |
| assert len(suggestion) > 5, f"Suggestions should be meaningful: '{suggestion}'" | |
| def test_feedback_summary_completeness(self, feedback_categories): | |
| """ | |
| Test that feedback summaries include all required information categories. | |
| Property: Feedback summaries should provide comprehensive statistics | |
| and insights across all types of recorded feedback. | |
| """ | |
| from config.prompt_management.feedback_system import FeedbackSystem | |
| from config.prompt_management.data_models import ErrorType, ErrorSubcategory, QuestionIssueType, ReferralProblemType, ScenarioType | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| feedback_system = FeedbackSystem(storage_path=temp_dir) | |
| # Record different types of feedback based on categories | |
| for category in feedback_categories: | |
| if category == 'classification_error': | |
| feedback_system.record_classification_error( | |
| error_type=ErrorType.WRONG_CLASSIFICATION, | |
| subcategory=ErrorSubcategory.GREEN_TO_YELLOW, | |
| expected_category="YELLOW", | |
| actual_category="GREEN", | |
| message_content="Test classification error message", | |
| reviewer_comments="Test classification error comment", | |
| confidence_level=0.9 | |
| ) | |
| elif category == 'question_issue': | |
| feedback_system.record_question_issue( | |
| issue_type=QuestionIssueType.INAPPROPRIATE_QUESTION, | |
| question_content="Test inappropriate question", | |
| scenario_type=ScenarioType.LOSS_OF_INTEREST, | |
| reviewer_comments="Test question issue comment", | |
| severity="medium" | |
| ) | |
| elif category == 'referral_problem': | |
| feedback_system.record_referral_problem( | |
| problem_type=ReferralProblemType.INCOMPLETE_SUMMARY, | |
| referral_content="Test incomplete referral summary", | |
| reviewer_comments="Test referral problem comment", | |
| severity="high" | |
| ) | |
| # Get feedback summary | |
| summary = feedback_system.get_feedback_summary() | |
| # Property assertion: Summary should contain all required fields | |
| required_fields = [ | |
| 'total_errors', 'total_question_issues', 'total_referral_problems', | |
| 'error_types', 'error_subcategories', 'question_issue_types', | |
| 'referral_problem_types', 'average_confidence', 'recent_errors', | |
| 'improvement_suggestions' | |
| ] | |
| for field in required_fields: | |
| assert field in summary, f"Summary missing required field: {field}" | |
| # Property assertion: Counts should match recorded feedback | |
| classification_count = feedback_categories.count('classification_error') | |
| question_count = feedback_categories.count('question_issue') | |
| referral_count = feedback_categories.count('referral_problem') | |
| assert summary['total_errors'] == classification_count, "Error count should match recorded errors" | |
| assert summary['total_question_issues'] == question_count, "Question issue count should match" | |
| assert summary['total_referral_problems'] == referral_count, "Referral problem count should match" | |
| # Property assertion: Statistics should be valid | |
| if classification_count > 0: | |
| assert 0.0 <= summary['average_confidence'] <= 1.0, "Average confidence should be valid" | |
| assert isinstance(summary['error_types'], dict), "Error types should be dictionary" | |
| assert isinstance(summary['error_subcategories'], dict), "Error subcategories should be dictionary" | |
| # Property assertion: Improvement suggestions should be provided | |
| assert isinstance(summary['improvement_suggestions'], list), "Improvement suggestions should be list" | |
| if __name__ == "__main__": | |
| # Run tests directly | |
| import subprocess | |
| import sys | |
| # Install hypothesis if not available | |
| try: | |
| import hypothesis | |
| except ImportError: | |
| print("Installing hypothesis for property-based testing...") | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "hypothesis"]) | |
| import hypothesis | |
| # Run the tests | |
| pytest.main([__file__, "-v"]) | |
| class TestContextAwareClassification: | |
| """ | |
| **Feature: prompt-optimization, Property 6: Context-Aware Classification Logic** | |
| **Validates: Requirements 6.1, 6.2, 6.3, 6.4, 6.5** | |
| Property: For any patient message with conversation history containing distress indicators, | |
| the classification should appropriately weight historical context against current statements, | |
| detect defensive patterns, and generate contextually relevant follow-up questions. | |
| """ | |
| def test_context_aware_classification_with_history(self, conversation_scenarios): | |
| """ | |
| Test that classification considers conversation history appropriately. | |
| Property: When patient previously expressed distress and now says "I'm fine", | |
| the system should classify as YELLOW for verification. | |
| """ | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| classifier = ContextAwareClassifier() | |
| for prev_messages, prev_classifications, distress_indicators, current_message in conversation_scenarios: | |
| # Ensure lists are same length | |
| min_len = min(len(prev_messages), len(prev_classifications)) | |
| prev_messages = prev_messages[:min_len] | |
| prev_classifications = prev_classifications[:min_len] | |
| # Build conversation history | |
| history = ConversationHistory( | |
| messages=[ | |
| Message(content=msg, classification=cls, timestamp=datetime.now()) | |
| for msg, cls in zip(prev_messages, prev_classifications) | |
| ], | |
| distress_indicators_found=distress_indicators, | |
| context_flags=[] | |
| ) | |
| # Classify with context | |
| result = classifier.classify_with_context(current_message, history) | |
| # Property assertion: Result should have required fields | |
| assert isinstance(result, Classification), "Result should be Classification object" | |
| assert result.category in ['GREEN', 'YELLOW', 'RED'], "Category should be valid" | |
| assert 0.0 <= result.confidence <= 1.0, "Confidence should be between 0 and 1" | |
| # Property assertion: Historical distress should influence classification | |
| if distress_indicators and any(cls in ['YELLOW', 'RED'] for cls in prev_classifications): | |
| # If there's historical distress and current message is dismissive | |
| dismissive_phrases = ['fine', 'okay', 'good', 'better', 'no problem'] | |
| if any(phrase in current_message.lower() for phrase in dismissive_phrases): | |
| # Should be at least YELLOW for verification | |
| assert result.category in ['YELLOW', 'RED'], \ | |
| f"Historical distress with dismissive response should be YELLOW/RED, got {result.category}" | |
| assert 'historical_context' in result.reasoning.lower() or 'previous' in result.reasoning.lower(), \ | |
| "Reasoning should mention historical context" | |
| def test_defensive_response_detection(self, defensive_scenarios): | |
| """ | |
| Test that defensive responses are detected when they contradict history. | |
| Property: When conversation context contains distress indicators and patient | |
| gives defensive responses, the system should detect the pattern. | |
| """ | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| classifier = ContextAwareClassifier() | |
| for defensive_message, prev_classifications, distress_count in defensive_scenarios: | |
| # Build history with distress | |
| history = ConversationHistory( | |
| messages=[ | |
| Message( | |
| content=f"I'm feeling stressed about things {i}", | |
| classification=prev_classifications[i % len(prev_classifications)], | |
| timestamp=datetime.now() | |
| ) | |
| for i in range(distress_count) | |
| ], | |
| distress_indicators_found=['stress', 'anxiety', 'worried'] * distress_count, | |
| context_flags=['distress_expressed'] | |
| ) | |
| # Detect defensive pattern | |
| is_defensive = classifier.detect_defensive_responses(defensive_message, history) | |
| # Property assertion: Should detect defensive pattern with sufficient history | |
| if distress_count >= 2: | |
| assert isinstance(is_defensive, bool), "Detection should return boolean" | |
| # With clear distress history and dismissive current message, should detect defensiveness | |
| assert is_defensive == True, \ | |
| f"Should detect defensive pattern with {distress_count} distress mentions and message: '{defensive_message}'" | |
| def test_contextual_indicator_weighting(self, contextual_indicators): | |
| """ | |
| Test that indicators are weighted based on conversation context. | |
| Property: Indicators that appear repeatedly in conversation history | |
| should receive higher weight in classification decisions. | |
| """ | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| classifier = ContextAwareClassifier() | |
| for indicator_name, base_weight, historical_mentions, recent_mention in contextual_indicators: | |
| context = { | |
| 'historical_mentions': historical_mentions, | |
| 'recent_mention': recent_mention, | |
| 'conversation_length': 5 | |
| } | |
| # Evaluate contextual weight | |
| contextual_weight = classifier.evaluate_contextual_indicators( | |
| [indicator_name], | |
| context | |
| ) | |
| # Property assertion: Weight should be numeric and valid | |
| assert isinstance(contextual_weight, (int, float)), "Weight should be numeric" | |
| assert contextual_weight >= 0.0, "Weight should be non-negative" | |
| # Property assertion: Historical mentions should increase weight | |
| if historical_mentions >= 2: | |
| # Weight should be higher than minimum for repeated indicators | |
| assert contextual_weight >= 0.5, \ | |
| f"Repeated indicator should have weight >= 0.5, got {contextual_weight}" | |
| # Property assertion: Recent mentions should have stronger influence | |
| if recent_mention and historical_mentions > 0: | |
| # Recent + historical should have reasonable weight | |
| assert contextual_weight >= 0.6, \ | |
| f"Recent mention with history should have weight >= 0.6, got {contextual_weight}" | |
| def test_contextual_follow_up_generation(self, follow_up_scenarios): | |
| """ | |
| Test that follow-up questions reference conversation context. | |
| Property: When follow-up questions are generated, they should reference | |
| previous conversation elements appropriately. | |
| """ | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| classifier = ContextAwareClassifier() | |
| for current_message, previous_topics, classification in follow_up_scenarios: | |
| # Build history | |
| history = ConversationHistory( | |
| messages=[ | |
| Message(content=topic, classification='YELLOW', timestamp=datetime.now()) | |
| for topic in previous_topics | |
| ], | |
| distress_indicators_found=['stress', 'worry'], | |
| context_flags=['follow_up_needed'] | |
| ) | |
| # Generate contextual follow-up | |
| follow_up = classifier.generate_contextual_follow_up( | |
| current_message, | |
| history, | |
| classification | |
| ) | |
| # Property assertion: Follow-up should not be empty | |
| assert len(follow_up.strip()) > 0, "Follow-up question should not be empty" | |
| # Property assertion: Follow-up should be a question | |
| assert '?' in follow_up, "Follow-up should be a question" | |
| # Property assertion: Follow-up should reference context when appropriate | |
| if len(previous_topics) >= 2: | |
| # With sufficient history, should reference previous conversation | |
| contextual_words = ['earlier', 'mentioned', 'said', 'discussed', 'talked about', 'before'] | |
| has_context_reference = any(word in follow_up.lower() for word in contextual_words) | |
| # Note: Not all follow-ups need explicit references, but many should | |
| # This is a soft assertion - we just check the capability exists | |
| assert isinstance(has_context_reference, bool), "Should check for context references" | |
| def test_medical_context_integration(self): | |
| """ | |
| Test that medical context is considered in classification. | |
| Property: When mental health conditions are mentioned in medical context, | |
| the system should consider this information in classification. | |
| """ | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| classifier = ContextAwareClassifier() | |
| # Test scenarios with medical context | |
| test_cases = [ | |
| { | |
| 'message': "I'm managing my anxiety with medication", | |
| 'medical_context': {'conditions': ['anxiety disorder'], 'medications': ['SSRI']}, | |
| 'expected_consideration': True | |
| }, | |
| { | |
| 'message': "I feel stressed about work", | |
| 'medical_context': {'conditions': ['depression'], 'medications': []}, | |
| 'expected_consideration': True | |
| }, | |
| { | |
| 'message': "Everything is fine", | |
| 'medical_context': {'conditions': [], 'medications': []}, | |
| 'expected_consideration': False | |
| } | |
| ] | |
| for case in test_cases: | |
| history = ConversationHistory( | |
| messages=[], | |
| distress_indicators_found=[], | |
| context_flags=[], | |
| medical_context=case['medical_context'] | |
| ) | |
| result = classifier.classify_with_context(case['message'], history) | |
| # Property assertion: Result should be valid | |
| assert isinstance(result, Classification), "Should return Classification" | |
| assert result.category in ['GREEN', 'YELLOW', 'RED'], "Category should be valid" | |
| # Property assertion: Medical context should influence reasoning | |
| if case['expected_consideration'] and case['medical_context']['conditions']: | |
| # Reasoning should mention medical context when relevant | |
| reasoning_lower = result.reasoning.lower() | |
| medical_terms = ['medical', 'condition', 'medication', 'treatment', 'diagnosis'] | |
| # At least some awareness of medical context in reasoning | |
| # This is a capability check, not a strict requirement for every case | |
| assert isinstance(result.reasoning, str), "Reasoning should be string" | |
| assert len(result.reasoning) > 0, "Reasoning should not be empty" | |
| class TestProviderSummaryCompleteness: | |
| """ | |
| **Feature: prompt-optimization, Property 7: Complete Provider Summary Generation** | |
| **Validates: Requirements 7.1, 7.2, 7.3, 7.4, 7.5** | |
| Property: For any RED classification generating a referral, the provider summary should | |
| contain all required information fields (contact info, distress indicators, reasoning, | |
| triage context, conversation background) as specified in requirements. | |
| """ | |
| def test_complete_provider_summary_generation(self, red_classifications): | |
| """ | |
| Test that provider summaries contain all required information fields. | |
| Property: For any RED classification, the generated provider summary should | |
| include patient contact information, distress indicators, reasoning, | |
| triage context, and conversation background. | |
| """ | |
| from core.provider_summary_generator import ProviderSummaryGenerator | |
| generator = ProviderSummaryGenerator() | |
| for indicators, reasoning, confidence, patient_name, phone, triage_q, triage_r, context in red_classifications: | |
| # Ensure triage questions and responses are same length | |
| min_len = min(len(triage_q), len(triage_r)) | |
| triage_questions = triage_q[:min_len] if min_len > 0 else None | |
| triage_responses = triage_r[:min_len] if min_len > 0 else None | |
| # Generate provider summary | |
| summary = generator.generate_summary( | |
| indicators=indicators, | |
| reasoning=reasoning, | |
| confidence=confidence, | |
| patient_name=patient_name, | |
| patient_phone=phone, | |
| triage_questions=triage_questions, | |
| triage_responses=triage_responses, | |
| conversation_context=context | |
| ) | |
| # Property assertion: Required fields must be present (Requirement 7.1) | |
| assert summary.patient_name == patient_name, "Should include patient contact information" | |
| assert summary.patient_phone == phone, "Should include patient phone number" | |
| # Property assertion: Distress indicators must be included (Requirement 7.2) | |
| assert summary.indicators == indicators, "Should include specific distress indicators" | |
| assert len(summary.indicators) > 0, "Should have at least one distress indicator" | |
| # Property assertion: Classification reasoning must be provided (Requirement 7.3) | |
| assert summary.reasoning == reasoning, "Should provide clear explanation of RED determination" | |
| assert len(summary.reasoning) >= 20, "Reasoning should be sufficiently detailed" | |
| # Property assertion: Triage context must be included when available (Requirement 7.4) | |
| if triage_questions and triage_responses and min_len > 0: | |
| assert len(summary.triage_context) == min_len, "Should include all triage question-answer pairs" | |
| for i, exchange in enumerate(summary.triage_context): | |
| assert 'question' in exchange, "Triage context should include questions" | |
| assert 'response' in exchange, "Triage context should include responses" | |
| assert exchange['question'] == triage_questions[i], "Should preserve original questions" | |
| assert exchange['response'] == triage_responses[i], "Should preserve original responses" | |
| # Property assertion: Conversation background must be included (Requirement 7.5) | |
| assert summary.conversation_context == context, "Should provide relevant background context" | |
| # Property assertion: Summary should be complete and valid | |
| assert summary.classification == "RED", "Should be classified as RED" | |
| assert summary.confidence == confidence, "Should preserve confidence level" | |
| assert summary.generated_at is not None, "Should have generation timestamp" | |
| # Property assertion: Summary should be serializable | |
| summary_dict = summary.to_dict() | |
| required_fields = [ | |
| 'patient_name', 'patient_phone', 'situation_description', 'indicators', | |
| 'classification', 'confidence', 'reasoning', 'triage_context', | |
| 'conversation_context', 'generated_at' | |
| ] | |
| for field in required_fields: | |
| assert field in summary_dict, f"Summary dict should contain {field}" | |
| # Property assertion: Situation description should be meaningful | |
| assert len(summary.situation_description) > 0, "Should generate meaningful situation description" | |
| # If indicators provided, they should be mentioned in situation | |
| if indicators: | |
| situation_lower = summary.situation_description.lower() | |
| # At least some indicators should be reflected in the description | |
| assert any(indicator.lower() in situation_lower for indicator in indicators[:2]), \ | |
| "Situation description should reflect key indicators" | |
| def test_provider_summary_formatting_completeness(self, summary_data): | |
| """ | |
| Test that provider summary formatting includes all required information. | |
| Property: Formatted provider summaries should contain all required sections | |
| and be suitable for provider review and action. | |
| """ | |
| from core.provider_summary_generator import ProviderSummaryGenerator, ProviderSummary | |
| indicators, reasoning, confidence, patient_name, phone, triage_exchanges, context = summary_data | |
| # Create summary | |
| generator = ProviderSummaryGenerator() | |
| # Convert triage exchanges to separate lists | |
| triage_questions = [ex[0] for ex in triage_exchanges] if triage_exchanges else None | |
| triage_responses = [ex[1] for ex in triage_exchanges] if triage_exchanges else None | |
| summary = generator.generate_summary( | |
| indicators=indicators, | |
| reasoning=reasoning, | |
| confidence=confidence, | |
| patient_name=patient_name, | |
| patient_phone=phone, | |
| triage_questions=triage_questions, | |
| triage_responses=triage_responses, | |
| conversation_context=context | |
| ) | |
| # Test display formatting | |
| display_format = generator.format_for_display(summary) | |
| # Property assertion: Display format should contain all required sections | |
| required_sections = [ | |
| "PROVIDER SUMMARY", | |
| "PATIENT INFORMATION", | |
| "CLASSIFICATION & URGENCY", | |
| "SITUATION OVERVIEW", | |
| "DISTRESS INDICATORS", | |
| "CLINICAL REASONING", | |
| "RECOMMENDED ACTIONS" | |
| ] | |
| for section in required_sections: | |
| assert section in display_format, f"Display format should contain {section} section" | |
| # Property assertion: Patient information should be visible | |
| assert patient_name in display_format, "Display should show patient name" | |
| assert phone in display_format, "Display should show patient phone" | |
| # Property assertion: All indicators should be listed | |
| for indicator in indicators: | |
| assert indicator in display_format, f"Display should show indicator: {indicator}" | |
| # Property assertion: Reasoning should be included (may be cleaned) | |
| import re | |
| clean_reasoning = re.sub(r'\s+', ' ', reasoning).strip() | |
| assert clean_reasoning in display_format or reasoning in display_format, "Display should include reasoning" | |
| # Property assertion: Triage context should be shown when available | |
| if triage_exchanges: | |
| assert "TRIAGE EXCHANGES" in display_format, "Should show triage section when available" | |
| for question, response in triage_exchanges: | |
| assert question in display_format, f"Should show triage question: {question}" | |
| assert response in display_format, f"Should show triage response: {response}" | |
| # Property assertion: Conversation context should be included | |
| # (May be truncated if too long) | |
| context_preview = context[:100] # First 100 chars should be visible | |
| assert context_preview in display_format, "Should show conversation context" | |
| # Test export formatting | |
| export_format = generator.format_for_export(summary) | |
| # Property assertion: Export format should be compact but complete | |
| # Names and phones may be cleaned in export format | |
| clean_name = patient_name.replace('\n', ' ').replace('\r', ' ').strip() | |
| clean_phone = phone.replace('\n', ' ').replace('\r', ' ').strip() | |
| assert clean_name in export_format or patient_name in export_format, "Export should include patient name" | |
| assert clean_phone in export_format or phone in export_format, "Export should include phone" | |
| assert "RED" in export_format, "Export should show classification" | |
| # Reasoning may be cleaned in export format | |
| clean_reasoning = re.sub(r'\s+', ' ', reasoning).strip() | |
| assert clean_reasoning in export_format or reasoning in export_format, "Export should include reasoning" | |
| # Property assertion: Export should be single line (no newlines) | |
| assert '\n' not in export_format, "Export format should be single line" | |
| # Property assertion: Export should use separators for parsing | |
| assert '|' in export_format, "Export should use pipe separators" | |
| def test_provider_summary_validation_and_completeness(self, validation_scenarios): | |
| """ | |
| Test that provider summary validation ensures completeness. | |
| Property: Provider summaries should handle missing information gracefully | |
| while ensuring all critical information is captured or flagged as missing. | |
| """ | |
| from core.provider_summary_generator import ProviderSummaryGenerator | |
| generator = ProviderSummaryGenerator() | |
| for indicators, reasoning, confidence, patient_name, phone in validation_scenarios: | |
| # Generate summary with potentially missing information | |
| summary = generator.generate_summary( | |
| indicators=indicators, | |
| reasoning=reasoning, | |
| confidence=confidence, | |
| patient_name=patient_name, | |
| patient_phone=phone | |
| ) | |
| # Property assertion: Summary should always be generated | |
| assert summary is not None, "Should always generate a summary" | |
| assert summary.classification == "RED", "Should maintain RED classification" | |
| # Property assertion: Missing contact info should use placeholders | |
| if patient_name is None: | |
| assert summary.patient_name == "[Patient Name]", "Should use placeholder for missing name" | |
| else: | |
| assert summary.patient_name == patient_name, "Should use provided name" | |
| if phone is None: | |
| assert summary.patient_phone == "[Phone Number]", "Should use placeholder for missing phone" | |
| else: | |
| assert summary.patient_phone == phone, "Should use provided phone" | |
| # Property assertion: Empty indicators should be handled gracefully | |
| if not indicators: | |
| assert summary.indicators == [], "Should handle empty indicators list" | |
| # Situation description should still be meaningful | |
| assert len(summary.situation_description) > 0, "Should generate description even without indicators" | |
| else: | |
| assert summary.indicators == indicators, "Should preserve provided indicators" | |
| # Property assertion: Empty reasoning should be handled | |
| if not reasoning: | |
| # Should still have some default reasoning or description | |
| assert len(summary.situation_description) > 0, "Should have situation description when reasoning is empty" | |
| else: | |
| assert summary.reasoning == reasoning, "Should preserve provided reasoning" | |
| # Property assertion: Confidence should be preserved | |
| assert summary.confidence == confidence, "Should preserve confidence level" | |
| # Property assertion: Timestamp should always be present | |
| assert summary.generated_at is not None, "Should always have generation timestamp" | |
| assert len(summary.generated_at) > 0, "Timestamp should not be empty" | |
| def test_provider_summary_integration_with_context_aware_classification(self): | |
| """ | |
| Test integration between provider summary generation and context-aware classification. | |
| Property: Provider summaries should integrate with context-aware classification | |
| results to provide comprehensive patient context. | |
| """ | |
| from core.provider_summary_generator import ProviderSummaryGenerator | |
| from config.prompt_management.context_aware_classifier import ContextAwareClassifier | |
| from config.prompt_management.data_models import ConversationHistory, Message | |
| from datetime import datetime, timedelta | |
| # Create context-aware classification scenario | |
| classifier = ContextAwareClassifier() | |
| generator = ProviderSummaryGenerator() | |
| # Build conversation history with escalating distress | |
| history = ConversationHistory( | |
| messages=[ | |
| Message("I'm feeling anxious about my treatment", "YELLOW", datetime.now() - timedelta(hours=2)), | |
| Message("I can't sleep and feel hopeless", "RED", datetime.now() - timedelta(hours=1)), | |
| Message("I don't think I can go on like this", "RED", datetime.now() - timedelta(minutes=30)) | |
| ], | |
| distress_indicators_found=['anxiety', 'hopeless', 'insomnia'], | |
| context_flags=['escalating_distress'], | |
| medical_context={'conditions': ['cancer'], 'medications': ['chemotherapy']} | |
| ) | |
| # Classify current message with context | |
| current_message = "I just want the pain to stop" | |
| classification_result = classifier.classify_with_context(current_message, history) | |
| # Generate provider summary using classification results | |
| summary = generator.generate_summary( | |
| indicators=classification_result.indicators_found, | |
| reasoning=classification_result.reasoning, | |
| confidence=classification_result.confidence, | |
| patient_name="Test Patient", | |
| patient_phone="555-0123", | |
| conversation_context=f"Recent messages show escalating distress. Current: {current_message}" | |
| ) | |
| # Property assertion: Summary should reflect context-aware classification | |
| assert summary.classification == "RED", "Should maintain RED classification" | |
| assert classification_result.confidence == summary.confidence, "Should preserve classification confidence" | |
| assert classification_result.reasoning == summary.reasoning, "Should use classification reasoning" | |
| # Property assertion: Context factors should be reflected | |
| if classification_result.context_factors: | |
| # Context factors should influence the summary somehow | |
| context_mentioned = any( | |
| factor.lower() in summary.situation_description.lower() | |
| for factor in classification_result.context_factors | |
| ) | |
| # This is a soft assertion - context may be reflected in various ways | |
| assert isinstance(context_mentioned, bool), "Should check for context factor reflection" | |
| # Property assertion: Summary should be comprehensive | |
| display_format = generator.format_for_display(summary) | |
| # Should contain key information for provider action | |
| assert "Test Patient" in display_format, "Should show patient name" | |
| assert "555-0123" in display_format, "Should show contact info" | |
| assert "RED FLAG" in display_format, "Should clearly indicate urgency" | |
| assert "RECOMMENDED ACTION" in display_format, "Should provide action guidance" | |
| # Property assertion: Export format should be suitable for handoff | |
| export_format = generator.format_for_export(summary) | |
| assert len(export_format) > 50, "Export should contain substantial information" | |
| assert "Test Patient" in export_format, "Export should include patient identification" | |
| assert "RED" in export_format, "Export should indicate classification" | |
| class TestPerformanceMonitoring: | |
| """ | |
| **Feature: prompt-optimization, Property 8: Comprehensive Performance Monitoring** | |
| Test that the performance monitoring system accurately captures all performance metrics | |
| (response times, confidence levels, classification outcomes) and provides data-driven | |
| optimization recommendations when patterns are identified. | |
| **Validates: Requirements 8.1, 8.2, 8.3, 8.4, 8.5** | |
| """ | |
| def test_comprehensive_performance_monitoring(self, performance_data): | |
| """ | |
| Test that performance monitoring captures all required metrics. | |
| Property: For any sequence of prompt executions, the monitoring system should | |
| accurately capture response times, confidence levels, and outcomes, and provide | |
| meaningful performance analysis. | |
| **Validates: Requirements 8.1, 8.2, 8.3, 8.4, 8.5** | |
| """ | |
| from config.prompt_management.prompt_controller import PromptController | |
| from config.prompt_management.performance_monitor import PromptMonitor | |
| # Create fresh instances for each test | |
| controller = PromptController() | |
| monitor = PromptMonitor() | |
| # Property: Performance metrics should be captured for all executions | |
| for agent_type, response_time, confidence, error, classification_result in performance_data: | |
| # Log performance metric (Requirement 8.1) | |
| controller.log_performance_metric( | |
| agent_type=agent_type, | |
| response_time=response_time, | |
| confidence=confidence, | |
| error=error, | |
| classification_result=classification_result | |
| ) | |
| # Monitor should also track the execution (Requirement 8.2) | |
| monitor.track_execution( | |
| agent_type=agent_type, | |
| response_time=response_time, | |
| confidence=confidence, | |
| success=not error, | |
| metadata={'classification': classification_result} | |
| ) | |
| # Property: All logged metrics should be retrievable | |
| unique_agents = list(set(item[0] for item in performance_data)) | |
| for agent_type in unique_agents: | |
| # Get metrics from controller | |
| controller_metrics = controller.get_performance_metrics(agent_type) | |
| # Property assertion: Metrics should contain all required fields (Requirement 8.1) | |
| assert 'total_executions' in controller_metrics, "Should track total executions" | |
| assert 'average_response_time' in controller_metrics, "Should track average response time" | |
| assert 'average_confidence' in controller_metrics, "Should track average confidence" | |
| assert 'error_rate' in controller_metrics, "Should track error rate" | |
| # Property assertion: Metrics should be accurate | |
| agent_data = [item for item in performance_data if item[0] == agent_type] | |
| expected_executions = len(agent_data) | |
| assert controller_metrics['total_executions'] == expected_executions, \ | |
| "Should count all executions correctly" | |
| if expected_executions > 0: | |
| expected_avg_time = sum(item[1] for item in agent_data) / expected_executions | |
| expected_avg_confidence = sum(item[2] for item in agent_data) / expected_executions | |
| expected_error_rate = sum(1 for item in agent_data if item[3]) / expected_executions | |
| # Allow small floating point differences | |
| assert abs(controller_metrics['average_response_time'] - expected_avg_time) < 0.001, \ | |
| "Should calculate average response time correctly" | |
| assert abs(controller_metrics['average_confidence'] - expected_avg_confidence) < 0.001, \ | |
| "Should calculate average confidence correctly" | |
| assert abs(controller_metrics['error_rate'] - expected_error_rate) < 0.001, \ | |
| "Should calculate error rate correctly" | |
| # Get detailed metrics from monitor (Requirement 8.2) | |
| monitor_metrics = monitor.get_detailed_metrics(agent_type) | |
| # Property assertion: Monitor should provide detailed analysis | |
| assert 'performance_trend' in monitor_metrics, "Should analyze performance trends" | |
| assert 'confidence_distribution' in monitor_metrics, "Should analyze confidence distribution" | |
| assert 'error_patterns' in monitor_metrics, "Should identify error patterns" | |
| def test_ab_testing_framework(self, ab_test_data): | |
| """ | |
| Test A/B testing framework for prompt performance comparison. | |
| Property: For any two prompt versions, the A/B testing framework should | |
| enable statistical comparison and automated rollback for underperforming prompts. | |
| **Validates: Requirements 8.3** | |
| """ | |
| from config.prompt_management.performance_monitor import PromptMonitor | |
| monitor = PromptMonitor() | |
| # Property: A/B testing should handle multiple prompt versions | |
| for agent_type, response_time, confidence, prompt_version in ab_test_data: | |
| monitor.log_ab_test_result( | |
| agent_type=agent_type, | |
| prompt_version=prompt_version, | |
| response_time=response_time, | |
| confidence=confidence | |
| ) | |
| # Property: Should be able to compare versions | |
| unique_agents = list(set(item[0] for item in ab_test_data)) | |
| for agent_type in unique_agents: | |
| agent_data = [item for item in ab_test_data if item[0] == agent_type] | |
| unique_versions = list(set(item[3] for item in agent_data)) | |
| if len(unique_versions) >= 2: | |
| # Test version comparison | |
| comparison_result = monitor.compare_prompt_versions( | |
| agent_type=agent_type, | |
| version_a=unique_versions[0], | |
| version_b=unique_versions[1] | |
| ) | |
| # Property assertion: Comparison should provide statistical analysis | |
| assert 'statistical_significance' in comparison_result, \ | |
| "Should test statistical significance" | |
| assert 'performance_difference' in comparison_result, \ | |
| "Should quantify performance difference" | |
| assert 'recommendation' in comparison_result, \ | |
| "Should provide rollback recommendation" | |
| # Property assertion: Recommendation should be actionable | |
| recommendation = comparison_result['recommendation'] | |
| assert recommendation in ['keep_version_a', 'switch_to_version_b', 'insufficient_data'], \ | |
| "Should provide clear recommendation" | |
| def test_optimization_recommendation_engine(self, optimization_data): | |
| """ | |
| Test optimization recommendation engine for data-driven improvements. | |
| Property: For any pattern of errors and performance issues, the optimization | |
| engine should identify patterns and provide specific improvement recommendations. | |
| **Validates: Requirements 8.4, 8.5** | |
| """ | |
| from config.prompt_management.performance_monitor import PromptMonitor | |
| monitor = PromptMonitor() | |
| # Property: Should analyze error patterns and generate recommendations | |
| for agent_type, confidence, classification_error, error_pattern in optimization_data: | |
| monitor.log_classification_outcome( | |
| agent_type=agent_type, | |
| confidence=confidence, | |
| classification_error=classification_error, | |
| error_details={'pattern': error_pattern} | |
| ) | |
| # Property: Should generate optimization recommendations | |
| unique_agents = list(set(item[0] for item in optimization_data)) | |
| for agent_type in unique_agents: | |
| agent_data = [item for item in optimization_data if item[0] == agent_type] | |
| # Get optimization recommendations | |
| recommendations = monitor.get_optimization_recommendations(agent_type) | |
| # Property assertion: Should provide actionable recommendations | |
| assert isinstance(recommendations, list), "Should return list of recommendations" | |
| if len(agent_data) >= 3: # Need sufficient data for analysis | |
| # Should identify patterns if errors exist | |
| has_errors = any(item[2] for item in agent_data) | |
| if has_errors: | |
| # Should provide specific recommendations for improvement | |
| assert len(recommendations) > 0, "Should provide recommendations when errors detected" | |
| for recommendation in recommendations: | |
| assert hasattr(recommendation, 'type'), "Should specify recommendation type" | |
| assert hasattr(recommendation, 'description'), "Should provide description" | |
| assert hasattr(recommendation, 'priority'), "Should indicate priority" | |
| assert hasattr(recommendation, 'expected_impact'), "Should estimate impact" | |
| # Property assertion: Recommendation types should be valid | |
| from config.prompt_management.performance_monitor import RecommendationType | |
| valid_types = [rt.value for rt in RecommendationType] | |
| assert recommendation.type.value in valid_types, \ | |
| f"Should use valid recommendation type: {recommendation.type.value}" | |
| # Property: Should track improvement over time | |
| improvement_metrics = monitor.get_improvement_tracking(agent_type) | |
| assert 'baseline_performance' in improvement_metrics, \ | |
| "Should establish baseline performance" | |
| assert 'current_performance' in improvement_metrics, \ | |
| "Should track current performance" | |
| assert 'improvement_trend' in improvement_metrics, \ | |
| "Should analyze improvement trend" | |
| def test_performance_monitoring_integration(self): | |
| """ | |
| Test integration between performance monitoring and existing prompt system. | |
| Property: Performance monitoring should integrate seamlessly with existing | |
| prompt management without affecting core functionality. | |
| **Validates: Requirements 8.1, 8.2, 8.3, 8.4, 8.5** | |
| """ | |
| from config.prompt_management.prompt_controller import PromptController | |
| from config.prompt_management.performance_monitor import PromptMonitor | |
| controller = PromptController() | |
| monitor = PromptMonitor() | |
| # Property: Should work with existing prompt retrieval | |
| config = controller.get_prompt('spiritual_monitor') | |
| assert config is not None, "Should retrieve prompt configuration" | |
| # Property: Should integrate with session overrides | |
| session_id = "test_session_123" | |
| test_prompt = "Test prompt for performance monitoring" | |
| success = controller.set_session_override('spiritual_monitor', test_prompt, session_id) | |
| assert success, "Should set session override successfully" | |
| # Property: Performance monitoring should work with session overrides | |
| session_config = controller.get_prompt('spiritual_monitor', session_id=session_id) | |
| assert session_config.session_override == test_prompt, "Should use session override" | |
| # Property: Should log performance for session-based prompts | |
| controller.log_performance_metric( | |
| agent_type='spiritual_monitor', | |
| response_time=0.5, | |
| confidence=0.8, | |
| session_id=session_id | |
| ) | |
| metrics = controller.get_performance_metrics('spiritual_monitor') | |
| assert metrics['total_executions'] >= 1, "Should log session-based performance" | |
| # Property: Should maintain performance history across sessions | |
| controller.clear_session_overrides(session_id) | |
| # Metrics should persist after session cleanup | |
| metrics_after_cleanup = controller.get_performance_metrics('spiritual_monitor') | |
| assert metrics_after_cleanup['total_executions'] == metrics['total_executions'], \ | |
| "Should maintain performance history after session cleanup" |