""" Dynamic Pattern Manager for MCP Application This module generates classification and extraction patterns dynamically based on runtime MCP specs instead of using hardcoded patterns. """ import re from typing import List, Dict, Any, Set from src.mcp.compact_utils import CompactSpecsUtils from config.conversation_config import conversation_config class DynamicPatternManager: """Generates patterns dynamically based on MCP tool and resource specs.""" def __init__(self): self.compact_utils = CompactSpecsUtils() self._pattern_cache = {} self._last_spec_hash = None def get_history_patterns(self) -> List[str]: """Generate history query patterns dynamically.""" if not conversation_config.query_classification.use_dynamic_patterns: return self._get_default_history_patterns() cache_key = "history_patterns" if cache_key in self._pattern_cache: return self._pattern_cache[cache_key] # Base patterns that are language-agnostic patterns = [ r'\b(last|previous|earlier|before|just|recent)\b', r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b', r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b', r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b', r'\b(the (same|one|result|data) (I|we|you) (asked|requested|mentioned))\b' ] # Add domain-specific patterns based on available tools/resources try: tools_and_resources = self._get_all_available_names() domain_terms = self._extract_domain_terms(tools_and_resources) for term in domain_terms: patterns.append(rf'\b(that|those) {re.escape(term)}s?\b') patterns.append(rf'\bthe {re.escape(term)} (you|we) (found|mentioned|showed)\b') except Exception: pass self._pattern_cache[cache_key] = patterns return patterns def get_tool_patterns(self) -> List[str]: """Generate tool query patterns dynamically.""" if not conversation_config.query_classification.use_dynamic_patterns: return self._get_default_tool_patterns() cache_key = "tool_patterns" if cache_key in self._pattern_cache: return self._pattern_cache[cache_key] # Base patterns for new data requests patterns = [ r'\b(find|search|get|show|list) (new|other|different|more|all)\b', r'\b(latest|current|updated|active)\b', r'\b(how many|count|total)\b' ] # Add patterns based on available tools/resources try: tools_and_resources = self._get_all_available_names() domain_terms = self._extract_domain_terms(tools_and_resources) for term in domain_terms: patterns.append(rf'\b{re.escape(term)}s? (with|named|called|id)\b') patterns.append(rf'\b(get|find|show|list) {re.escape(term)}s?\b') except Exception: pass self._pattern_cache[cache_key] = patterns return patterns def get_pronoun_patterns(self) -> List[str]: """Get pronoun patterns for reference detection.""" return [r'\b(it|this|that|they|them|these|those)\b'] def get_entity_patterns(self) -> Dict[str, str]: """Generate entity extraction patterns dynamically.""" if not conversation_config.entity_extraction.use_dynamic_patterns: return self._get_default_entity_patterns() cache_key = "entity_patterns" if cache_key in self._pattern_cache: return self._pattern_cache[cache_key] patterns = {} try: # Get field information from all available tools/resources field_info = self._analyze_available_fields() # Generate patterns based on discovered fields for field_type, field_names in field_info.items(): if field_type == "id_fields": # Create pattern for ID fields id_terms = "|".join(re.escape(name) for name in field_names) patterns[f"dynamic_{field_type}"] = rf'\b({id_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b' elif field_type == "name_fields": # Create pattern for name fields name_terms = "|".join(re.escape(name) for name in field_names) patterns[f"dynamic_{field_type}"] = rf'\b({name_terms})\s*:?\s*([a-zA-Z0-9\s_-]+)\b' elif field_type == "handle_fields": # Create pattern for handle/username fields handle_terms = "|".join(re.escape(name) for name in field_names) patterns[f"dynamic_{field_type}"] = rf'\b({handle_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b' except Exception as e: print(f"Error generating entity patterns: {e}") patterns = self._get_default_entity_patterns() self._pattern_cache[cache_key] = patterns return patterns def _get_all_available_names(self) -> List[str]: """Get all available tool and resource names.""" names = [] try: names.extend(self.compact_utils.get_working_tools()) names.extend(self.compact_utils.get_working_resources()) except Exception: pass return names def _extract_domain_terms(self, names: List[str]) -> Set[str]: """Extract domain-specific terms from tool/resource names.""" domain_terms = set() for name in names: # Extract meaningful terms from names name_lower = name.lower() # Common domain terms in Topcoder if 'challenge' in name_lower: domain_terms.add('challenge') if 'skill' in name_lower: domain_terms.add('skill') if 'member' in name_lower or 'user' in name_lower: domain_terms.add('member') domain_terms.add('user') if 'contest' in name_lower: domain_terms.add('contest') if 'competition' in name_lower: domain_terms.add('competition') return domain_terms def _analyze_available_fields(self) -> Dict[str, Set[str]]: """Analyze available tools/resources to discover field patterns.""" field_info = { "id_fields": set(), "name_fields": set(), "handle_fields": set() } try: # Analyze tool specs for tool_name in self.compact_utils.get_working_tools(): spec = self.compact_utils.get_tool_spec(tool_name) if spec and "parameters" in spec: self._extract_field_names(spec["parameters"], field_info) # Analyze resource specs for resource_name in self.compact_utils.get_working_resources(): spec = self.compact_utils.get_resource_spec(resource_name) if spec and "parameters" in spec: self._extract_field_names(spec["parameters"], field_info) except Exception: pass return field_info def _extract_field_names(self, parameters: Dict[str, Any], field_info: Dict[str, Set[str]]): """Extract field names from parameter specifications.""" for field_name, field_spec in parameters.items(): field_lower = field_name.lower() # Classify fields by type if any(term in field_lower for term in ['id', 'identifier']): field_info["id_fields"].add(field_name) elif any(term in field_lower for term in ['name', 'title']): field_info["name_fields"].add(field_name) elif any(term in field_lower for term in ['handle', 'username', 'login']): field_info["handle_fields"].add(field_name) def _get_default_history_patterns(self) -> List[str]: """Fallback history patterns.""" return [ r'\b(last|previous|earlier|before|just|recent)\b', r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b', r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b', r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b', r'\b(that|those) (challenge|user|skill|member|result)s?\b' ] def _get_default_tool_patterns(self) -> List[str]: """Fallback tool patterns.""" return [ r'\b(find|search|get|show|list) (new|other|different|more|all)\b', r'\b(latest|current|updated|active)\b', r'\b(challenge|member|user|skill) (with|named|called|id)\b', r'\b(how many|count|total)\b' ] def _get_default_entity_patterns(self) -> Dict[str, str]: """Fallback entity patterns.""" return { "user_handle": r'\b@?([a-zA-Z0-9_-]+)\b(?=\s*(user|member|profile|handle))', "challenge_id": r'\bchallenge\s*(?:id\s*)?:?\s*([0-9]+)', "skill_name": r'\bskill\s*:?\s*([A-Za-z+#.]{2,})', "status": r'\bstatus\s*:?\s*(active|completed|draft|new)\b', } def clear_cache(self): """Clear pattern cache to force regeneration.""" self._pattern_cache.clear() self._last_spec_hash = None def refresh_if_needed(self): """Refresh patterns if MCP specs have changed.""" try: # Simple hash of available tools/resources to detect changes current_names = sorted(self._get_all_available_names()) current_hash = hash(str(current_names)) if current_hash != self._last_spec_hash: self.clear_cache() self._last_spec_hash = current_hash except Exception: pass # Global instance dynamic_pattern_manager = DynamicPatternManager()