topcoder-info-mcp-agent / src /application /services /dynamic_pattern_manager.py
abhishekrn's picture
history, follow-up
9a6a5aa
"""
Dynamic Pattern Manager for MCP Application
This module generates classification and extraction patterns dynamically
based on runtime MCP specs instead of using hardcoded patterns.
"""
import re
from typing import List, Dict, Any, Set
from src.mcp.compact_utils import CompactSpecsUtils
from config.conversation_config import conversation_config
class DynamicPatternManager:
"""Generates patterns dynamically based on MCP tool and resource specs."""
def __init__(self):
self.compact_utils = CompactSpecsUtils()
self._pattern_cache = {}
self._last_spec_hash = None
def get_history_patterns(self) -> List[str]:
"""Generate history query patterns dynamically."""
if not conversation_config.query_classification.use_dynamic_patterns:
return self._get_default_history_patterns()
cache_key = "history_patterns"
if cache_key in self._pattern_cache:
return self._pattern_cache[cache_key]
# Base patterns that are language-agnostic
patterns = [
r'\b(last|previous|earlier|before|just|recent)\b',
r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b',
r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b',
r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b',
r'\b(the (same|one|result|data) (I|we|you) (asked|requested|mentioned))\b'
]
# Add domain-specific patterns based on available tools/resources
try:
tools_and_resources = self._get_all_available_names()
domain_terms = self._extract_domain_terms(tools_and_resources)
for term in domain_terms:
patterns.append(rf'\b(that|those) {re.escape(term)}s?\b')
patterns.append(rf'\bthe {re.escape(term)} (you|we) (found|mentioned|showed)\b')
except Exception:
pass
self._pattern_cache[cache_key] = patterns
return patterns
def get_tool_patterns(self) -> List[str]:
"""Generate tool query patterns dynamically."""
if not conversation_config.query_classification.use_dynamic_patterns:
return self._get_default_tool_patterns()
cache_key = "tool_patterns"
if cache_key in self._pattern_cache:
return self._pattern_cache[cache_key]
# Base patterns for new data requests
patterns = [
r'\b(find|search|get|show|list) (new|other|different|more|all)\b',
r'\b(latest|current|updated|active)\b',
r'\b(how many|count|total)\b'
]
# Add patterns based on available tools/resources
try:
tools_and_resources = self._get_all_available_names()
domain_terms = self._extract_domain_terms(tools_and_resources)
for term in domain_terms:
patterns.append(rf'\b{re.escape(term)}s? (with|named|called|id)\b')
patterns.append(rf'\b(get|find|show|list) {re.escape(term)}s?\b')
except Exception:
pass
self._pattern_cache[cache_key] = patterns
return patterns
def get_pronoun_patterns(self) -> List[str]:
"""Get pronoun patterns for reference detection."""
return [r'\b(it|this|that|they|them|these|those)\b']
def get_entity_patterns(self) -> Dict[str, str]:
"""Generate entity extraction patterns dynamically."""
if not conversation_config.entity_extraction.use_dynamic_patterns:
return self._get_default_entity_patterns()
cache_key = "entity_patterns"
if cache_key in self._pattern_cache:
return self._pattern_cache[cache_key]
patterns = {}
try:
# Get field information from all available tools/resources
field_info = self._analyze_available_fields()
# Generate patterns based on discovered fields
for field_type, field_names in field_info.items():
if field_type == "id_fields":
# Create pattern for ID fields
id_terms = "|".join(re.escape(name) for name in field_names)
patterns[f"dynamic_{field_type}"] = rf'\b({id_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b'
elif field_type == "name_fields":
# Create pattern for name fields
name_terms = "|".join(re.escape(name) for name in field_names)
patterns[f"dynamic_{field_type}"] = rf'\b({name_terms})\s*:?\s*([a-zA-Z0-9\s_-]+)\b'
elif field_type == "handle_fields":
# Create pattern for handle/username fields
handle_terms = "|".join(re.escape(name) for name in field_names)
patterns[f"dynamic_{field_type}"] = rf'\b({handle_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b'
except Exception as e:
print(f"Error generating entity patterns: {e}")
patterns = self._get_default_entity_patterns()
self._pattern_cache[cache_key] = patterns
return patterns
def _get_all_available_names(self) -> List[str]:
"""Get all available tool and resource names."""
names = []
try:
names.extend(self.compact_utils.get_working_tools())
names.extend(self.compact_utils.get_working_resources())
except Exception:
pass
return names
def _extract_domain_terms(self, names: List[str]) -> Set[str]:
"""Extract domain-specific terms from tool/resource names."""
domain_terms = set()
for name in names:
# Extract meaningful terms from names
name_lower = name.lower()
# Common domain terms in Topcoder
if 'challenge' in name_lower:
domain_terms.add('challenge')
if 'skill' in name_lower:
domain_terms.add('skill')
if 'member' in name_lower or 'user' in name_lower:
domain_terms.add('member')
domain_terms.add('user')
if 'contest' in name_lower:
domain_terms.add('contest')
if 'competition' in name_lower:
domain_terms.add('competition')
return domain_terms
def _analyze_available_fields(self) -> Dict[str, Set[str]]:
"""Analyze available tools/resources to discover field patterns."""
field_info = {
"id_fields": set(),
"name_fields": set(),
"handle_fields": set()
}
try:
# Analyze tool specs
for tool_name in self.compact_utils.get_working_tools():
spec = self.compact_utils.get_tool_spec(tool_name)
if spec and "parameters" in spec:
self._extract_field_names(spec["parameters"], field_info)
# Analyze resource specs
for resource_name in self.compact_utils.get_working_resources():
spec = self.compact_utils.get_resource_spec(resource_name)
if spec and "parameters" in spec:
self._extract_field_names(spec["parameters"], field_info)
except Exception:
pass
return field_info
def _extract_field_names(self, parameters: Dict[str, Any], field_info: Dict[str, Set[str]]):
"""Extract field names from parameter specifications."""
for field_name, field_spec in parameters.items():
field_lower = field_name.lower()
# Classify fields by type
if any(term in field_lower for term in ['id', 'identifier']):
field_info["id_fields"].add(field_name)
elif any(term in field_lower for term in ['name', 'title']):
field_info["name_fields"].add(field_name)
elif any(term in field_lower for term in ['handle', 'username', 'login']):
field_info["handle_fields"].add(field_name)
def _get_default_history_patterns(self) -> List[str]:
"""Fallback history patterns."""
return [
r'\b(last|previous|earlier|before|just|recent)\b',
r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b',
r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b',
r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b',
r'\b(that|those) (challenge|user|skill|member|result)s?\b'
]
def _get_default_tool_patterns(self) -> List[str]:
"""Fallback tool patterns."""
return [
r'\b(find|search|get|show|list) (new|other|different|more|all)\b',
r'\b(latest|current|updated|active)\b',
r'\b(challenge|member|user|skill) (with|named|called|id)\b',
r'\b(how many|count|total)\b'
]
def _get_default_entity_patterns(self) -> Dict[str, str]:
"""Fallback entity patterns."""
return {
"user_handle": r'\b@?([a-zA-Z0-9_-]+)\b(?=\s*(user|member|profile|handle))',
"challenge_id": r'\bchallenge\s*(?:id\s*)?:?\s*([0-9]+)',
"skill_name": r'\bskill\s*:?\s*([A-Za-z+#.]{2,})',
"status": r'\bstatus\s*:?\s*(active|completed|draft|new)\b',
}
def clear_cache(self):
"""Clear pattern cache to force regeneration."""
self._pattern_cache.clear()
self._last_spec_hash = None
def refresh_if_needed(self):
"""Refresh patterns if MCP specs have changed."""
try:
# Simple hash of available tools/resources to detect changes
current_names = sorted(self._get_all_available_names())
current_hash = hash(str(current_names))
if current_hash != self._last_spec_hash:
self.clear_cache()
self._last_spec_hash = current_hash
except Exception:
pass
# Global instance
dynamic_pattern_manager = DynamicPatternManager()