Spaces:
Sleeping
Sleeping
| """ | |
| Dynamic Pattern Manager for MCP Application | |
| This module generates classification and extraction patterns dynamically | |
| based on runtime MCP specs instead of using hardcoded patterns. | |
| """ | |
| import re | |
| from typing import List, Dict, Any, Set | |
| from src.mcp.compact_utils import CompactSpecsUtils | |
| from config.conversation_config import conversation_config | |
| class DynamicPatternManager: | |
| """Generates patterns dynamically based on MCP tool and resource specs.""" | |
| def __init__(self): | |
| self.compact_utils = CompactSpecsUtils() | |
| self._pattern_cache = {} | |
| self._last_spec_hash = None | |
| def get_history_patterns(self) -> List[str]: | |
| """Generate history query patterns dynamically.""" | |
| if not conversation_config.query_classification.use_dynamic_patterns: | |
| return self._get_default_history_patterns() | |
| cache_key = "history_patterns" | |
| if cache_key in self._pattern_cache: | |
| return self._pattern_cache[cache_key] | |
| # Base patterns that are language-agnostic | |
| patterns = [ | |
| r'\b(last|previous|earlier|before|just|recent)\b', | |
| r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b', | |
| r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b', | |
| r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b', | |
| r'\b(the (same|one|result|data) (I|we|you) (asked|requested|mentioned))\b' | |
| ] | |
| # Add domain-specific patterns based on available tools/resources | |
| try: | |
| tools_and_resources = self._get_all_available_names() | |
| domain_terms = self._extract_domain_terms(tools_and_resources) | |
| for term in domain_terms: | |
| patterns.append(rf'\b(that|those) {re.escape(term)}s?\b') | |
| patterns.append(rf'\bthe {re.escape(term)} (you|we) (found|mentioned|showed)\b') | |
| except Exception: | |
| pass | |
| self._pattern_cache[cache_key] = patterns | |
| return patterns | |
| def get_tool_patterns(self) -> List[str]: | |
| """Generate tool query patterns dynamically.""" | |
| if not conversation_config.query_classification.use_dynamic_patterns: | |
| return self._get_default_tool_patterns() | |
| cache_key = "tool_patterns" | |
| if cache_key in self._pattern_cache: | |
| return self._pattern_cache[cache_key] | |
| # Base patterns for new data requests | |
| patterns = [ | |
| r'\b(find|search|get|show|list) (new|other|different|more|all)\b', | |
| r'\b(latest|current|updated|active)\b', | |
| r'\b(how many|count|total)\b' | |
| ] | |
| # Add patterns based on available tools/resources | |
| try: | |
| tools_and_resources = self._get_all_available_names() | |
| domain_terms = self._extract_domain_terms(tools_and_resources) | |
| for term in domain_terms: | |
| patterns.append(rf'\b{re.escape(term)}s? (with|named|called|id)\b') | |
| patterns.append(rf'\b(get|find|show|list) {re.escape(term)}s?\b') | |
| except Exception: | |
| pass | |
| self._pattern_cache[cache_key] = patterns | |
| return patterns | |
| def get_pronoun_patterns(self) -> List[str]: | |
| """Get pronoun patterns for reference detection.""" | |
| return [r'\b(it|this|that|they|them|these|those)\b'] | |
| def get_entity_patterns(self) -> Dict[str, str]: | |
| """Generate entity extraction patterns dynamically.""" | |
| if not conversation_config.entity_extraction.use_dynamic_patterns: | |
| return self._get_default_entity_patterns() | |
| cache_key = "entity_patterns" | |
| if cache_key in self._pattern_cache: | |
| return self._pattern_cache[cache_key] | |
| patterns = {} | |
| try: | |
| # Get field information from all available tools/resources | |
| field_info = self._analyze_available_fields() | |
| # Generate patterns based on discovered fields | |
| for field_type, field_names in field_info.items(): | |
| if field_type == "id_fields": | |
| # Create pattern for ID fields | |
| id_terms = "|".join(re.escape(name) for name in field_names) | |
| patterns[f"dynamic_{field_type}"] = rf'\b({id_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b' | |
| elif field_type == "name_fields": | |
| # Create pattern for name fields | |
| name_terms = "|".join(re.escape(name) for name in field_names) | |
| patterns[f"dynamic_{field_type}"] = rf'\b({name_terms})\s*:?\s*([a-zA-Z0-9\s_-]+)\b' | |
| elif field_type == "handle_fields": | |
| # Create pattern for handle/username fields | |
| handle_terms = "|".join(re.escape(name) for name in field_names) | |
| patterns[f"dynamic_{field_type}"] = rf'\b({handle_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b' | |
| except Exception as e: | |
| print(f"Error generating entity patterns: {e}") | |
| patterns = self._get_default_entity_patterns() | |
| self._pattern_cache[cache_key] = patterns | |
| return patterns | |
| def _get_all_available_names(self) -> List[str]: | |
| """Get all available tool and resource names.""" | |
| names = [] | |
| try: | |
| names.extend(self.compact_utils.get_working_tools()) | |
| names.extend(self.compact_utils.get_working_resources()) | |
| except Exception: | |
| pass | |
| return names | |
| def _extract_domain_terms(self, names: List[str]) -> Set[str]: | |
| """Extract domain-specific terms from tool/resource names.""" | |
| domain_terms = set() | |
| for name in names: | |
| # Extract meaningful terms from names | |
| name_lower = name.lower() | |
| # Common domain terms in Topcoder | |
| if 'challenge' in name_lower: | |
| domain_terms.add('challenge') | |
| if 'skill' in name_lower: | |
| domain_terms.add('skill') | |
| if 'member' in name_lower or 'user' in name_lower: | |
| domain_terms.add('member') | |
| domain_terms.add('user') | |
| if 'contest' in name_lower: | |
| domain_terms.add('contest') | |
| if 'competition' in name_lower: | |
| domain_terms.add('competition') | |
| return domain_terms | |
| def _analyze_available_fields(self) -> Dict[str, Set[str]]: | |
| """Analyze available tools/resources to discover field patterns.""" | |
| field_info = { | |
| "id_fields": set(), | |
| "name_fields": set(), | |
| "handle_fields": set() | |
| } | |
| try: | |
| # Analyze tool specs | |
| for tool_name in self.compact_utils.get_working_tools(): | |
| spec = self.compact_utils.get_tool_spec(tool_name) | |
| if spec and "parameters" in spec: | |
| self._extract_field_names(spec["parameters"], field_info) | |
| # Analyze resource specs | |
| for resource_name in self.compact_utils.get_working_resources(): | |
| spec = self.compact_utils.get_resource_spec(resource_name) | |
| if spec and "parameters" in spec: | |
| self._extract_field_names(spec["parameters"], field_info) | |
| except Exception: | |
| pass | |
| return field_info | |
| def _extract_field_names(self, parameters: Dict[str, Any], field_info: Dict[str, Set[str]]): | |
| """Extract field names from parameter specifications.""" | |
| for field_name, field_spec in parameters.items(): | |
| field_lower = field_name.lower() | |
| # Classify fields by type | |
| if any(term in field_lower for term in ['id', 'identifier']): | |
| field_info["id_fields"].add(field_name) | |
| elif any(term in field_lower for term in ['name', 'title']): | |
| field_info["name_fields"].add(field_name) | |
| elif any(term in field_lower for term in ['handle', 'username', 'login']): | |
| field_info["handle_fields"].add(field_name) | |
| def _get_default_history_patterns(self) -> List[str]: | |
| """Fallback history patterns.""" | |
| return [ | |
| r'\b(last|previous|earlier|before|just|recent)\b', | |
| r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b', | |
| r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b', | |
| r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b', | |
| r'\b(that|those) (challenge|user|skill|member|result)s?\b' | |
| ] | |
| def _get_default_tool_patterns(self) -> List[str]: | |
| """Fallback tool patterns.""" | |
| return [ | |
| r'\b(find|search|get|show|list) (new|other|different|more|all)\b', | |
| r'\b(latest|current|updated|active)\b', | |
| r'\b(challenge|member|user|skill) (with|named|called|id)\b', | |
| r'\b(how many|count|total)\b' | |
| ] | |
| def _get_default_entity_patterns(self) -> Dict[str, str]: | |
| """Fallback entity patterns.""" | |
| return { | |
| "user_handle": r'\b@?([a-zA-Z0-9_-]+)\b(?=\s*(user|member|profile|handle))', | |
| "challenge_id": r'\bchallenge\s*(?:id\s*)?:?\s*([0-9]+)', | |
| "skill_name": r'\bskill\s*:?\s*([A-Za-z+#.]{2,})', | |
| "status": r'\bstatus\s*:?\s*(active|completed|draft|new)\b', | |
| } | |
| def clear_cache(self): | |
| """Clear pattern cache to force regeneration.""" | |
| self._pattern_cache.clear() | |
| self._last_spec_hash = None | |
| def refresh_if_needed(self): | |
| """Refresh patterns if MCP specs have changed.""" | |
| try: | |
| # Simple hash of available tools/resources to detect changes | |
| current_names = sorted(self._get_all_available_names()) | |
| current_hash = hash(str(current_names)) | |
| if current_hash != self._last_spec_hash: | |
| self.clear_cache() | |
| self._last_spec_hash = current_hash | |
| except Exception: | |
| pass | |
| # Global instance | |
| dynamic_pattern_manager = DynamicPatternManager() |