Spaces:
Sleeping
Sleeping
| """ | |
| Shared components for centralized prompt management. | |
| This module provides catalogs for indicators, rules, templates, and category definitions | |
| that are shared across all AI agents to ensure consistency. | |
| """ | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from .data_models import ( | |
| Indicator, Rule, Template, QuestionPattern, | |
| IndicatorCategory, ScenarioType, ValidationResult | |
| ) | |
| class SharedComponentBase: | |
| """Base class for shared component catalogs.""" | |
| def __init__(self, data_file: str): | |
| self.data_file = Path(__file__).parent / "data" / data_file | |
| self._data: Dict[str, Any] = {} | |
| self._load_data() | |
| def _load_data(self): | |
| """Load data from JSON file.""" | |
| if self.data_file.exists(): | |
| try: | |
| with open(self.data_file, 'r', encoding='utf-8') as f: | |
| self._data = json.load(f) | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"Warning: Could not load {self.data_file}: {e}") | |
| self._data = {} | |
| else: | |
| # Create directory if it doesn't exist | |
| self.data_file.parent.mkdir(parents=True, exist_ok=True) | |
| self._initialize_default_data() | |
| self._save_data() | |
| def _save_data(self): | |
| """Save data to JSON file.""" | |
| try: | |
| with open(self.data_file, 'w', encoding='utf-8') as f: | |
| json.dump(self._data, f, indent=2, ensure_ascii=False) | |
| except IOError as e: | |
| print(f"Warning: Could not save {self.data_file}: {e}") | |
| def _initialize_default_data(self): | |
| """Initialize with default data. Override in subclasses.""" | |
| self._data = {} | |
| class IndicatorCatalog(SharedComponentBase): | |
| """Catalog of spiritual distress indicators.""" | |
| def __init__(self): | |
| super().__init__("indicators.json") | |
| def _initialize_default_data(self): | |
| """Initialize with default spiritual distress indicators.""" | |
| default_indicators = [ | |
| { | |
| "name": "sleep_difficulties", | |
| "category": "emotional", | |
| "definition": "Insomnia, difficulty sleeping, or disrupted sleep patterns that may indicate emotional distress", | |
| "examples": ["I can't sleep at night", "my mind won't stop racing", "I've been having trouble sleeping"], | |
| "severity_weight": 0.6, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "anxiety_worry", | |
| "category": "emotional", | |
| "definition": "Expressions of anxiety, worry, or fear about current or future situations", | |
| "examples": ["I'm worried about", "I feel anxious", "I'm scared that"], | |
| "severity_weight": 0.7, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "spiritual_questioning", | |
| "category": "spiritual", | |
| "definition": "Questions about faith, God, meaning, or spiritual beliefs", | |
| "examples": ["Why is God doing this to me?", "What's the meaning of all this?", "I don't understand why this is happening"], | |
| "severity_weight": 0.8, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "loss_of_interest", | |
| "category": "emotional", | |
| "definition": "Loss of interest in previously enjoyed activities or hobbies", | |
| "examples": ["I used to love gardening, but now I can't", "I don't enjoy things anymore", "Nothing seems fun"], | |
| "severity_weight": 0.7, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "isolation_loneliness", | |
| "category": "social", | |
| "definition": "Feelings of loneliness, isolation, or being disconnected from others", | |
| "examples": ["I feel so alone", "Nobody understands", "I don't have anyone"], | |
| "severity_weight": 0.8, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "hopelessness", | |
| "category": "existential", | |
| "definition": "Expressions of hopelessness, despair, or loss of future orientation", | |
| "examples": ["There's no point", "Nothing will get better", "I have no hope"], | |
| "severity_weight": 0.9, | |
| "context_requirements": [] | |
| }, | |
| { | |
| "name": "crisis_language", | |
| "category": "existential", | |
| "definition": "Language indicating crisis, suicidal ideation, or desire to die", | |
| "examples": ["I want to die", "I can't go on", "Better off dead"], | |
| "severity_weight": 1.0, | |
| "context_requirements": [] | |
| } | |
| ] | |
| self._data = { | |
| "indicators": default_indicators, | |
| "version": "1.0", | |
| "last_updated": "2025-12-18" | |
| } | |
| def get_all_indicators(self) -> List[Indicator]: | |
| """Get all indicators as Indicator objects.""" | |
| indicators = [] | |
| for indicator_data in self._data.get("indicators", []): | |
| try: | |
| indicators.append(Indicator.from_dict(indicator_data)) | |
| except (KeyError, ValueError) as e: | |
| print(f"Warning: Invalid indicator data: {e}") | |
| return indicators | |
| def get_indicators_by_category(self, category: IndicatorCategory) -> List[Indicator]: | |
| """Get indicators filtered by category.""" | |
| return [ind for ind in self.get_all_indicators() if ind.category == category] | |
| def add_indicator(self, indicator: Indicator) -> bool: | |
| """Add a new indicator to the catalog.""" | |
| try: | |
| if "indicators" not in self._data: | |
| self._data["indicators"] = [] | |
| # Check if indicator already exists | |
| existing_names = [ind["name"] for ind in self._data["indicators"]] | |
| if indicator.name in existing_names: | |
| return False | |
| self._data["indicators"].append(indicator.to_dict()) | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error adding indicator: {e}") | |
| return False | |
| def update_indicator(self, name: str, indicator: Indicator) -> bool: | |
| """Update an existing indicator.""" | |
| try: | |
| for i, ind_data in enumerate(self._data.get("indicators", [])): | |
| if ind_data["name"] == name: | |
| self._data["indicators"][i] = indicator.to_dict() | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error updating indicator: {e}") | |
| return False | |
| def remove_indicator(self, name: str) -> bool: | |
| """Remove an indicator from the catalog.""" | |
| try: | |
| indicators = self._data.get("indicators", []) | |
| original_length = len(indicators) | |
| self._data["indicators"] = [ind for ind in indicators if ind["name"] != name] | |
| if len(self._data["indicators"]) < original_length: | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error removing indicator: {e}") | |
| return False | |
| def get_indicator_by_name(self, name: str) -> Optional[Indicator]: | |
| """Get a specific indicator by name.""" | |
| for indicator in self.get_all_indicators(): | |
| if indicator.name == name: | |
| return indicator | |
| return None | |
| def search_indicators(self, query: str) -> List[Indicator]: | |
| """Search indicators by name, definition, or examples.""" | |
| query_lower = query.lower() | |
| results = [] | |
| for indicator in self.get_all_indicators(): | |
| # Search in name | |
| if query_lower in indicator.name.lower(): | |
| results.append(indicator) | |
| continue | |
| # Search in definition | |
| if query_lower in indicator.definition.lower(): | |
| results.append(indicator) | |
| continue | |
| # Search in examples | |
| if any(query_lower in example.lower() for example in indicator.examples): | |
| results.append(indicator) | |
| continue | |
| return results | |
| def get_version_info(self) -> Dict[str, str]: | |
| """Get version information for the indicator catalog.""" | |
| return { | |
| "version": self._data.get("version", "unknown"), | |
| "last_updated": self._data.get("last_updated", "unknown"), | |
| "total_indicators": str(len(self.get_all_indicators())) | |
| } | |
| def export_to_dict(self) -> Dict[str, Any]: | |
| """Export the entire catalog to a dictionary.""" | |
| return self._data.copy() | |
| def import_from_dict(self, data: Dict[str, Any], merge: bool = False) -> bool: | |
| """ | |
| Import indicators from a dictionary. | |
| Args: | |
| data: Dictionary containing indicator data | |
| merge: If True, merge with existing data. If False, replace all data. | |
| Returns: | |
| True if import was successful | |
| """ | |
| try: | |
| if merge: | |
| # Merge with existing indicators | |
| existing_names = {ind["name"] for ind in self._data.get("indicators", [])} | |
| new_indicators = [ind for ind in data.get("indicators", []) | |
| if ind["name"] not in existing_names] | |
| self._data.setdefault("indicators", []).extend(new_indicators) | |
| else: | |
| # Replace all data | |
| self._data = data.copy() | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error importing indicator data: {e}") | |
| return False | |
| def validate_consistency(self) -> ValidationResult: | |
| """Validate indicator catalog consistency.""" | |
| result = ValidationResult(is_valid=True) | |
| indicators = self.get_all_indicators() | |
| names = [ind.name for ind in indicators] | |
| # Check for duplicate names | |
| if len(names) != len(set(names)): | |
| result.add_error("Duplicate indicator names found") | |
| # Check for valid severity weights | |
| for ind in indicators: | |
| if not (0.0 <= ind.severity_weight <= 1.0): | |
| result.add_error(f"Invalid severity weight for {ind.name}: {ind.severity_weight}") | |
| # Check for empty definitions | |
| for ind in indicators: | |
| if not ind.definition.strip(): | |
| result.add_error(f"Empty definition for indicator: {ind.name}") | |
| # Check for missing examples | |
| for ind in indicators: | |
| if not ind.examples: | |
| result.add_warning(f"No examples provided for indicator: {ind.name}") | |
| # Check for valid categories | |
| valid_categories = set(cat.value for cat in IndicatorCategory) | |
| for ind in indicators: | |
| if ind.category.value not in valid_categories: | |
| result.add_error(f"Invalid category for {ind.name}: {ind.category.value}") | |
| return result | |
| class RulesCatalog(SharedComponentBase): | |
| """Catalog of classification rules.""" | |
| def __init__(self): | |
| super().__init__("rules.json") | |
| def _initialize_default_data(self): | |
| """Initialize with default classification rules.""" | |
| default_rules = [ | |
| { | |
| "rule_id": "suicide_mention", | |
| "description": "ANY mention of suicide, self-harm, death wishes is ALWAYS RED", | |
| "condition": "message contains suicide, self-harm, or death wish language", | |
| "action": "classify as RED", | |
| "priority": 1, | |
| "examples": ["I want to die", "I want to kill myself", "Better off dead"] | |
| }, | |
| { | |
| "rule_id": "crisis_language", | |
| "description": "Active crisis or emergency language indicates RED", | |
| "condition": "message contains crisis indicators with despair", | |
| "action": "classify as RED", | |
| "priority": 2, | |
| "examples": ["I can't take this anymore", "I can't go on", "No reason to live"] | |
| }, | |
| { | |
| "rule_id": "ambiguous_distress", | |
| "description": "Unclear if situation causes emotional/spiritual distress", | |
| "condition": "potentially distressing circumstances without clear emotional expression", | |
| "action": "classify as YELLOW for clarification", | |
| "priority": 5, | |
| "examples": ["My mother passed away last month", "I don't have anyone to help me"] | |
| }, | |
| { | |
| "rule_id": "medical_only", | |
| "description": "Medical symptoms without emotional/spiritual indicators", | |
| "condition": "only medical symptoms, appointments, medication questions", | |
| "action": "classify as GREEN", | |
| "priority": 8, | |
| "examples": ["When is my next appointment?", "What are the side effects?"] | |
| }, | |
| { | |
| "rule_id": "contextual_positive", | |
| "description": "Positive statements with distress history need verification", | |
| "condition": "positive statement with previous distress indicators in conversation", | |
| "action": "classify as YELLOW for verification", | |
| "priority": 6, | |
| "examples": ["I'm fine now (after previous distress)", "Everything is okay (defensive response)"] | |
| } | |
| ] | |
| self._data = { | |
| "rules": default_rules, | |
| "version": "1.0", | |
| "last_updated": "2025-12-18" | |
| } | |
| def get_all_rules(self) -> List[Rule]: | |
| """Get all rules as Rule objects.""" | |
| rules = [] | |
| for rule_data in self._data.get("rules", []): | |
| try: | |
| rules.append(Rule.from_dict(rule_data)) | |
| except (KeyError, ValueError) as e: | |
| print(f"Warning: Invalid rule data: {e}") | |
| return rules | |
| def get_rules_by_priority(self) -> List[Rule]: | |
| """Get rules sorted by priority (lower number = higher priority).""" | |
| rules = self.get_all_rules() | |
| return sorted(rules, key=lambda r: r.priority) | |
| def add_rule(self, rule: Rule) -> bool: | |
| """Add a new rule to the catalog.""" | |
| try: | |
| if "rules" not in self._data: | |
| self._data["rules"] = [] | |
| # Check if rule already exists | |
| existing_ids = [r["rule_id"] for r in self._data["rules"]] | |
| if rule.rule_id in existing_ids: | |
| return False | |
| self._data["rules"].append(rule.to_dict()) | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error adding rule: {e}") | |
| return False | |
| def update_rule(self, rule_id: str, rule: Rule) -> bool: | |
| """Update an existing rule.""" | |
| try: | |
| for i, rule_data in enumerate(self._data.get("rules", [])): | |
| if rule_data["rule_id"] == rule_id: | |
| self._data["rules"][i] = rule.to_dict() | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error updating rule: {e}") | |
| return False | |
| def remove_rule(self, rule_id: str) -> bool: | |
| """Remove a rule from the catalog.""" | |
| try: | |
| rules = self._data.get("rules", []) | |
| original_length = len(rules) | |
| self._data["rules"] = [rule for rule in rules if rule["rule_id"] != rule_id] | |
| if len(self._data["rules"]) < original_length: | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error removing rule: {e}") | |
| return False | |
| def get_rule_by_id(self, rule_id: str) -> Optional[Rule]: | |
| """Get a specific rule by ID.""" | |
| for rule in self.get_all_rules(): | |
| if rule.rule_id == rule_id: | |
| return rule | |
| return None | |
| def search_rules(self, query: str) -> List[Rule]: | |
| """Search rules by ID, description, condition, or action.""" | |
| query_lower = query.lower() | |
| results = [] | |
| for rule in self.get_all_rules(): | |
| # Search in rule_id | |
| if query_lower in rule.rule_id.lower(): | |
| results.append(rule) | |
| continue | |
| # Search in description | |
| if query_lower in rule.description.lower(): | |
| results.append(rule) | |
| continue | |
| # Search in condition | |
| if query_lower in rule.condition.lower(): | |
| results.append(rule) | |
| continue | |
| # Search in action | |
| if query_lower in rule.action.lower(): | |
| results.append(rule) | |
| continue | |
| return results | |
| def get_rules_by_action(self, action_pattern: str) -> List[Rule]: | |
| """Get rules that match a specific action pattern.""" | |
| action_lower = action_pattern.lower() | |
| return [rule for rule in self.get_all_rules() | |
| if action_lower in rule.action.lower()] | |
| def reorder_rule_priority(self, rule_id: str, new_priority: int) -> bool: | |
| """Change the priority of a rule.""" | |
| rule = self.get_rule_by_id(rule_id) | |
| if rule: | |
| rule.priority = new_priority | |
| return self.update_rule(rule_id, rule) | |
| return False | |
| def get_version_info(self) -> Dict[str, str]: | |
| """Get version information for the rules catalog.""" | |
| return { | |
| "version": self._data.get("version", "unknown"), | |
| "last_updated": self._data.get("last_updated", "unknown"), | |
| "total_rules": str(len(self.get_all_rules())) | |
| } | |
| def export_to_dict(self) -> Dict[str, Any]: | |
| """Export the entire catalog to a dictionary.""" | |
| return self._data.copy() | |
| def import_from_dict(self, data: Dict[str, Any], merge: bool = False) -> bool: | |
| """ | |
| Import rules from a dictionary. | |
| Args: | |
| data: Dictionary containing rule data | |
| merge: If True, merge with existing data. If False, replace all data. | |
| Returns: | |
| True if import was successful | |
| """ | |
| try: | |
| if merge: | |
| # Merge with existing rules | |
| existing_ids = {rule["rule_id"] for rule in self._data.get("rules", [])} | |
| new_rules = [rule for rule in data.get("rules", []) | |
| if rule["rule_id"] not in existing_ids] | |
| self._data.setdefault("rules", []).extend(new_rules) | |
| else: | |
| # Replace all data | |
| self._data = data.copy() | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error importing rule data: {e}") | |
| return False | |
| def validate_consistency(self) -> ValidationResult: | |
| """Validate rules catalog consistency.""" | |
| result = ValidationResult(is_valid=True) | |
| rules = self.get_all_rules() | |
| rule_ids = [rule.rule_id for rule in rules] | |
| # Check for duplicate rule IDs | |
| if len(rule_ids) != len(set(rule_ids)): | |
| result.add_error("Duplicate rule IDs found") | |
| # Check for valid priorities | |
| priorities = [rule.priority for rule in rules] | |
| if len(priorities) != len(set(priorities)): | |
| result.add_warning("Duplicate rule priorities found - may cause conflicts") | |
| # Check for empty fields | |
| for rule in rules: | |
| if not rule.rule_id.strip(): | |
| result.add_error("Empty rule ID found") | |
| if not rule.description.strip(): | |
| result.add_error(f"Empty description for rule: {rule.rule_id}") | |
| if not rule.condition.strip(): | |
| result.add_error(f"Empty condition for rule: {rule.rule_id}") | |
| if not rule.action.strip(): | |
| result.add_error(f"Empty action for rule: {rule.rule_id}") | |
| # Check for valid priority range | |
| for rule in rules: | |
| if rule.priority < 1: | |
| result.add_error(f"Invalid priority for {rule.rule_id}: {rule.priority} (must be >= 1)") | |
| return result | |
| class TemplateCatalog(SharedComponentBase): | |
| """Catalog of reusable prompt templates.""" | |
| def __init__(self): | |
| super().__init__("templates.json") | |
| def _initialize_default_data(self): | |
| """Initialize with default prompt templates.""" | |
| default_templates = [ | |
| { | |
| "template_id": "consent_request", | |
| "name": "Consent Request Template", | |
| "content": "Some patients who feel this way find it helpful to talk with someone from our {team_name}. Would you be open to me sharing your information so they can reach out to you?", | |
| "variables": ["team_name"], | |
| "category": "consent" | |
| }, | |
| { | |
| "template_id": "clarifying_question", | |
| "name": "Clarifying Question Template", | |
| "content": "You mentioned {situation}. Is that something that's been weighing on you emotionally, or is it more about {alternative_cause}?", | |
| "variables": ["situation", "alternative_cause"], | |
| "category": "triage" | |
| }, | |
| { | |
| "template_id": "empathetic_response", | |
| "name": "Empathetic Response Template", | |
| "content": "I hear that {situation} has been {impact_description} for you. {follow_up_question}", | |
| "variables": ["situation", "impact_description", "follow_up_question"], | |
| "category": "response" | |
| } | |
| ] | |
| self._data = { | |
| "templates": default_templates, | |
| "version": "1.0", | |
| "last_updated": "2025-12-18" | |
| } | |
| def get_all_templates(self) -> List[Template]: | |
| """Get all templates as Template objects.""" | |
| templates = [] | |
| for template_data in self._data.get("templates", []): | |
| try: | |
| templates.append(Template.from_dict(template_data)) | |
| except (KeyError, ValueError) as e: | |
| print(f"Warning: Invalid template data: {e}") | |
| return templates | |
| def get_templates_by_category(self, category: str) -> List[Template]: | |
| """Get templates filtered by category.""" | |
| return [tmpl for tmpl in self.get_all_templates() if tmpl.category == category] | |
| def add_template(self, template: Template) -> bool: | |
| """Add a new template to the catalog.""" | |
| try: | |
| if "templates" not in self._data: | |
| self._data["templates"] = [] | |
| # Check if template already exists | |
| existing_ids = [t["template_id"] for t in self._data["templates"]] | |
| if template.template_id in existing_ids: | |
| return False | |
| self._data["templates"].append(template.to_dict()) | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error adding template: {e}") | |
| return False | |
| def update_template(self, template_id: str, template: Template) -> bool: | |
| """Update an existing template.""" | |
| try: | |
| for i, tmpl_data in enumerate(self._data.get("templates", [])): | |
| if tmpl_data["template_id"] == template_id: | |
| self._data["templates"][i] = template.to_dict() | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error updating template: {e}") | |
| return False | |
| def remove_template(self, template_id: str) -> bool: | |
| """Remove a template from the catalog.""" | |
| try: | |
| templates = self._data.get("templates", []) | |
| original_length = len(templates) | |
| self._data["templates"] = [tmpl for tmpl in templates if tmpl["template_id"] != template_id] | |
| if len(self._data["templates"]) < original_length: | |
| self._save_data() | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error removing template: {e}") | |
| return False | |
| def get_template_by_id(self, template_id: str) -> Optional[Template]: | |
| """Get a specific template by ID.""" | |
| for template in self.get_all_templates(): | |
| if template.template_id == template_id: | |
| return template | |
| return None | |
| def search_templates(self, query: str) -> List[Template]: | |
| """Search templates by ID, name, content, or category.""" | |
| query_lower = query.lower() | |
| results = [] | |
| for template in self.get_all_templates(): | |
| # Search in template_id | |
| if query_lower in template.template_id.lower(): | |
| results.append(template) | |
| continue | |
| # Search in name | |
| if query_lower in template.name.lower(): | |
| results.append(template) | |
| continue | |
| # Search in content | |
| if query_lower in template.content.lower(): | |
| results.append(template) | |
| continue | |
| # Search in category | |
| if query_lower in template.category.lower(): | |
| results.append(template) | |
| continue | |
| return results | |
| def render_template(self, template_id: str, variables: Dict[str, str]) -> Optional[str]: | |
| """ | |
| Render a template with provided variables. | |
| Args: | |
| template_id: ID of the template to render | |
| variables: Dictionary of variable name -> value mappings | |
| Returns: | |
| Rendered template content or None if template not found | |
| """ | |
| template = self.get_template_by_id(template_id) | |
| if not template: | |
| return None | |
| try: | |
| # Simple variable substitution using format | |
| rendered = template.content | |
| for var_name, var_value in variables.items(): | |
| placeholder = "{" + var_name + "}" | |
| rendered = rendered.replace(placeholder, str(var_value)) | |
| return rendered | |
| except Exception as e: | |
| print(f"Error rendering template: {e}") | |
| return None | |
| def validate_template_variables(self, template_id: str, variables: Dict[str, str]) -> ValidationResult: | |
| """ | |
| Validate that all required variables are provided for a template. | |
| Args: | |
| template_id: ID of the template to validate | |
| variables: Dictionary of variable name -> value mappings | |
| Returns: | |
| ValidationResult indicating if all variables are provided | |
| """ | |
| result = ValidationResult(is_valid=True) | |
| template = self.get_template_by_id(template_id) | |
| if not template: | |
| result.add_error(f"Template not found: {template_id}") | |
| return result | |
| # Check if all required variables are provided | |
| provided_vars = set(variables.keys()) | |
| required_vars = set(template.variables) | |
| missing_vars = required_vars - provided_vars | |
| if missing_vars: | |
| for var in missing_vars: | |
| result.add_error(f"Missing required variable: {var}") | |
| # Check for extra variables (warning only) | |
| extra_vars = provided_vars - required_vars | |
| if extra_vars: | |
| for var in extra_vars: | |
| result.add_warning(f"Extra variable provided: {var}") | |
| return result | |
| def get_version_info(self) -> Dict[str, str]: | |
| """Get version information for the template catalog.""" | |
| return { | |
| "version": self._data.get("version", "unknown"), | |
| "last_updated": self._data.get("last_updated", "unknown"), | |
| "total_templates": str(len(self.get_all_templates())) | |
| } | |
| def export_to_dict(self) -> Dict[str, Any]: | |
| """Export the entire catalog to a dictionary.""" | |
| return self._data.copy() | |
| def import_from_dict(self, data: Dict[str, Any], merge: bool = False) -> bool: | |
| """ | |
| Import templates from a dictionary. | |
| Args: | |
| data: Dictionary containing template data | |
| merge: If True, merge with existing data. If False, replace all data. | |
| Returns: | |
| True if import was successful | |
| """ | |
| try: | |
| if merge: | |
| # Merge with existing templates | |
| existing_ids = {tmpl["template_id"] for tmpl in self._data.get("templates", [])} | |
| new_templates = [tmpl for tmpl in data.get("templates", []) | |
| if tmpl["template_id"] not in existing_ids] | |
| self._data.setdefault("templates", []).extend(new_templates) | |
| else: | |
| # Replace all data | |
| self._data = data.copy() | |
| self._save_data() | |
| return True | |
| except Exception as e: | |
| print(f"Error importing template data: {e}") | |
| return False | |
| def validate_consistency(self) -> ValidationResult: | |
| """Validate template catalog consistency.""" | |
| result = ValidationResult(is_valid=True) | |
| templates = self.get_all_templates() | |
| template_ids = [tmpl.template_id for tmpl in templates] | |
| # Check for duplicate template IDs | |
| if len(template_ids) != len(set(template_ids)): | |
| result.add_error("Duplicate template IDs found") | |
| # Check for empty fields | |
| for tmpl in templates: | |
| if not tmpl.template_id.strip(): | |
| result.add_error("Empty template ID found") | |
| if not tmpl.name.strip(): | |
| result.add_error(f"Empty name for template: {tmpl.template_id}") | |
| if not tmpl.content.strip(): | |
| result.add_error(f"Empty content for template: {tmpl.template_id}") | |
| if not tmpl.category.strip(): | |
| result.add_error(f"Empty category for template: {tmpl.template_id}") | |
| # Check for valid variable references in content | |
| for tmpl in templates: | |
| content = tmpl.content | |
| declared_vars = set(tmpl.variables) | |
| # Find variables referenced in content (simple {var} pattern) | |
| import re | |
| referenced_vars = set(re.findall(r'\{(\w+)\}', content)) | |
| # Check for undeclared variables | |
| undeclared = referenced_vars - declared_vars | |
| if undeclared: | |
| for var in undeclared: | |
| result.add_warning(f"Template {tmpl.template_id} references undeclared variable: {var}") | |
| # Check for unused declared variables | |
| unused = declared_vars - referenced_vars | |
| if unused: | |
| for var in unused: | |
| result.add_warning(f"Template {tmpl.template_id} declares unused variable: {var}") | |
| return result | |
| class CategoryDefinitions(SharedComponentBase): | |
| """Catalog of category definitions for consistent classification.""" | |
| def __init__(self): | |
| super().__init__("categories.json") | |
| def _initialize_default_data(self): | |
| """Initialize with default category definitions.""" | |
| default_categories = { | |
| "GREEN": { | |
| "name": "GREEN", | |
| "severity": "no_distress", | |
| "description": "Medical symptoms, routine questions, appointment scheduling, medication inquiries, or other standard healthcare topics. No indicators of emotional or spiritual distress.", | |
| "criteria": [ | |
| "Only medical symptoms without emotional context", | |
| "Routine healthcare questions", | |
| "Appointment scheduling", | |
| "Medication inquiries", | |
| "Clearly neutral or positive statements without distress context" | |
| ] | |
| }, | |
| "YELLOW": { | |
| "name": "YELLOW", | |
| "severity": "ambiguous_distress", | |
| "description": "Indicators where it is UNCLEAR whether the patient's situation is caused by or is causing emotional/spiritual distress, or if it is due to external factors. YELLOW is about AMBIGUITY, not severity.", | |
| "criteria": [ | |
| "Potentially distressing circumstances without expressed emotional distress", | |
| "Loss of loved one without emotional context expressed", | |
| "Mentions having no help without indicating distress", | |
| "Difficult situation but cause of distress unclear", | |
| "Previous distress with current positive statements (may be defensive)" | |
| ] | |
| }, | |
| "RED": { | |
| "name": "RED", | |
| "severity": "severe_distress", | |
| "description": "Indicators of severe distress or crisis requiring immediate spiritual care attention.", | |
| "criteria": [ | |
| "ANY mention of suicide, self-harm, death wishes", | |
| "Active crisis or emergency language", | |
| "Severe hopelessness with crisis language", | |
| "Explicit severe emotional/spiritual distress", | |
| "Complete loss of hope or meaning with despair", | |
| "Spiritual anger toward God/higher power", | |
| "Unbearable suffering expressions" | |
| ] | |
| } | |
| } | |
| self._data = { | |
| "categories": default_categories, | |
| "version": "1.0", | |
| "last_updated": "2025-12-18" | |
| } | |
| def get_category_definition(self, category: str) -> Optional[Dict[str, Any]]: | |
| """Get definition for a specific category.""" | |
| return self._data.get("categories", {}).get(category.upper()) | |
| def get_all_categories(self) -> Dict[str, Dict[str, Any]]: | |
| """Get all category definitions.""" | |
| return self._data.get("categories", {}) | |
| def validate_consistency(self) -> ValidationResult: | |
| """Validate category definitions consistency.""" | |
| result = ValidationResult(is_valid=True) | |
| categories = self.get_all_categories() | |
| required_categories = ["GREEN", "YELLOW", "RED"] | |
| for cat in required_categories: | |
| if cat not in categories: | |
| result.add_error(f"Missing required category: {cat}") | |
| for cat_name, cat_data in categories.items(): | |
| required_fields = ["name", "severity", "description", "criteria"] | |
| for field in required_fields: | |
| if field not in cat_data: | |
| result.add_error(f"Missing field '{field}' in category {cat_name}") | |
| return result |