File size: 10,353 Bytes
9a6a5aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
Dynamic Pattern Manager for MCP Application

This module generates classification and extraction patterns dynamically
based on runtime MCP specs instead of using hardcoded patterns.
"""

import re
from typing import List, Dict, Any, Set
from src.mcp.compact_utils import CompactSpecsUtils
from config.conversation_config import conversation_config

class DynamicPatternManager:
    """Generates patterns dynamically based on MCP tool and resource specs."""
    
    def __init__(self):
        self.compact_utils = CompactSpecsUtils()
        self._pattern_cache = {}
        self._last_spec_hash = None
        
    def get_history_patterns(self) -> List[str]:
        """Generate history query patterns dynamically."""
        if not conversation_config.query_classification.use_dynamic_patterns:
            return self._get_default_history_patterns()
        
        cache_key = "history_patterns"
        if cache_key in self._pattern_cache:
            return self._pattern_cache[cache_key]
        
        # Base patterns that are language-agnostic
        patterns = [
            r'\b(last|previous|earlier|before|just|recent)\b',
            r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b',
            r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b',
            r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b',
            r'\b(the (same|one|result|data) (I|we|you) (asked|requested|mentioned))\b'
        ]
        
        # Add domain-specific patterns based on available tools/resources
        try:
            tools_and_resources = self._get_all_available_names()
            domain_terms = self._extract_domain_terms(tools_and_resources)
            
            for term in domain_terms:
                patterns.append(rf'\b(that|those) {re.escape(term)}s?\b')
                patterns.append(rf'\bthe {re.escape(term)} (you|we) (found|mentioned|showed)\b')
        except Exception:
            pass
        
        self._pattern_cache[cache_key] = patterns
        return patterns
    
    def get_tool_patterns(self) -> List[str]:
        """Generate tool query patterns dynamically."""
        if not conversation_config.query_classification.use_dynamic_patterns:
            return self._get_default_tool_patterns()
        
        cache_key = "tool_patterns"
        if cache_key in self._pattern_cache:
            return self._pattern_cache[cache_key]
        
        # Base patterns for new data requests
        patterns = [
            r'\b(find|search|get|show|list) (new|other|different|more|all)\b',
            r'\b(latest|current|updated|active)\b',
            r'\b(how many|count|total)\b'
        ]
        
        # Add patterns based on available tools/resources
        try:
            tools_and_resources = self._get_all_available_names()
            domain_terms = self._extract_domain_terms(tools_and_resources)
            
            for term in domain_terms:
                patterns.append(rf'\b{re.escape(term)}s? (with|named|called|id)\b')
                patterns.append(rf'\b(get|find|show|list) {re.escape(term)}s?\b')
        except Exception:
            pass
        
        self._pattern_cache[cache_key] = patterns
        return patterns
    
    def get_pronoun_patterns(self) -> List[str]:
        """Get pronoun patterns for reference detection."""
        return [r'\b(it|this|that|they|them|these|those)\b']
    
    def get_entity_patterns(self) -> Dict[str, str]:
        """Generate entity extraction patterns dynamically."""
        if not conversation_config.entity_extraction.use_dynamic_patterns:
            return self._get_default_entity_patterns()
        
        cache_key = "entity_patterns"
        if cache_key in self._pattern_cache:
            return self._pattern_cache[cache_key]
        
        patterns = {}
        
        try:
            # Get field information from all available tools/resources
            field_info = self._analyze_available_fields()
            
            # Generate patterns based on discovered fields
            for field_type, field_names in field_info.items():
                if field_type == "id_fields":
                    # Create pattern for ID fields
                    id_terms = "|".join(re.escape(name) for name in field_names)
                    patterns[f"dynamic_{field_type}"] = rf'\b({id_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b'
                
                elif field_type == "name_fields":
                    # Create pattern for name fields
                    name_terms = "|".join(re.escape(name) for name in field_names)
                    patterns[f"dynamic_{field_type}"] = rf'\b({name_terms})\s*:?\s*([a-zA-Z0-9\s_-]+)\b'
                
                elif field_type == "handle_fields":
                    # Create pattern for handle/username fields
                    handle_terms = "|".join(re.escape(name) for name in field_names)
                    patterns[f"dynamic_{field_type}"] = rf'\b({handle_terms})\s*:?\s*([a-zA-Z0-9_-]+)\b'
        
        except Exception as e:
            print(f"Error generating entity patterns: {e}")
            patterns = self._get_default_entity_patterns()
        
        self._pattern_cache[cache_key] = patterns
        return patterns
    
    def _get_all_available_names(self) -> List[str]:
        """Get all available tool and resource names."""
        names = []
        try:
            names.extend(self.compact_utils.get_working_tools())
            names.extend(self.compact_utils.get_working_resources())
        except Exception:
            pass
        return names
    
    def _extract_domain_terms(self, names: List[str]) -> Set[str]:
        """Extract domain-specific terms from tool/resource names."""
        domain_terms = set()
        
        for name in names:
            # Extract meaningful terms from names
            name_lower = name.lower()
            
            # Common domain terms in Topcoder
            if 'challenge' in name_lower:
                domain_terms.add('challenge')
            if 'skill' in name_lower:
                domain_terms.add('skill')
            if 'member' in name_lower or 'user' in name_lower:
                domain_terms.add('member')
                domain_terms.add('user')
            if 'contest' in name_lower:
                domain_terms.add('contest')
            if 'competition' in name_lower:
                domain_terms.add('competition')
        
        return domain_terms
    
    def _analyze_available_fields(self) -> Dict[str, Set[str]]:
        """Analyze available tools/resources to discover field patterns."""
        field_info = {
            "id_fields": set(),
            "name_fields": set(),
            "handle_fields": set()
        }
        
        try:
            # Analyze tool specs
            for tool_name in self.compact_utils.get_working_tools():
                spec = self.compact_utils.get_tool_spec(tool_name)
                if spec and "parameters" in spec:
                    self._extract_field_names(spec["parameters"], field_info)
            
            # Analyze resource specs
            for resource_name in self.compact_utils.get_working_resources():
                spec = self.compact_utils.get_resource_spec(resource_name)
                if spec and "parameters" in spec:
                    self._extract_field_names(spec["parameters"], field_info)
        
        except Exception:
            pass
        
        return field_info
    
    def _extract_field_names(self, parameters: Dict[str, Any], field_info: Dict[str, Set[str]]):
        """Extract field names from parameter specifications."""
        for field_name, field_spec in parameters.items():
            field_lower = field_name.lower()
            
            # Classify fields by type
            if any(term in field_lower for term in ['id', 'identifier']):
                field_info["id_fields"].add(field_name)
            elif any(term in field_lower for term in ['name', 'title']):
                field_info["name_fields"].add(field_name)
            elif any(term in field_lower for term in ['handle', 'username', 'login']):
                field_info["handle_fields"].add(field_name)
    
    def _get_default_history_patterns(self) -> List[str]:
        """Fallback history patterns."""
        return [
            r'\b(last|previous|earlier|before|just|recent)\b',
            r'\b(what (did|was)|tell me about|show me) (the|that|it|them)\b',
            r'\b(from|in) (the|our|my) (last|previous|earlier) (conversation|chat|query|request)\b',
            r'\b(you (said|told|showed|mentioned)|we (discussed|talked about))\b',
            r'\b(that|those) (challenge|user|skill|member|result)s?\b'
        ]
    
    def _get_default_tool_patterns(self) -> List[str]:
        """Fallback tool patterns."""
        return [
            r'\b(find|search|get|show|list) (new|other|different|more|all)\b',
            r'\b(latest|current|updated|active)\b',
            r'\b(challenge|member|user|skill) (with|named|called|id)\b',
            r'\b(how many|count|total)\b'
        ]
    
    def _get_default_entity_patterns(self) -> Dict[str, str]:
        """Fallback entity patterns."""
        return {
            "user_handle": r'\b@?([a-zA-Z0-9_-]+)\b(?=\s*(user|member|profile|handle))',
            "challenge_id": r'\bchallenge\s*(?:id\s*)?:?\s*([0-9]+)',
            "skill_name": r'\bskill\s*:?\s*([A-Za-z+#.]{2,})',
            "status": r'\bstatus\s*:?\s*(active|completed|draft|new)\b',
        }
    
    def clear_cache(self):
        """Clear pattern cache to force regeneration."""
        self._pattern_cache.clear()
        self._last_spec_hash = None
    
    def refresh_if_needed(self):
        """Refresh patterns if MCP specs have changed."""
        try:
            # Simple hash of available tools/resources to detect changes
            current_names = sorted(self._get_all_available_names())
            current_hash = hash(str(current_names))
            
            if current_hash != self._last_spec_hash:
                self.clear_cache()
                self._last_spec_hash = current_hash
        except Exception:
            pass

# Global instance
dynamic_pattern_manager = DynamicPatternManager()