File size: 11,213 Bytes
c4f5f25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""
Optimized query builder for OpenSearch to improve search performance.
"""

import logging
from datetime import datetime, timedelta
from typing import Any

logger = logging.getLogger(__name__)


class OptimizedQueryBuilder:
    """Builds optimized OpenSearch queries for better performance."""

    @staticmethod
    def build_bm25_query(
        query_text: str,
        *,
        top_k: int = 10,
        filters: dict[str, Any] | None = None,
        min_score: float = 0.5,
        boost_recent: bool = True
    ) -> dict[str, Any]:
        """Build optimized BM25 query with performance enhancements."""

        # Use function score for better relevance and performance
        query = {
            "size": top_k,
            "min_score": min_score,
            "query": {
                "function_score": {
                    "query": {
                        "bool": {
                            "must": [
                                {
                                    "multi_match": {
                                        "query": query_text,
                                        "fields": [
                                            "chunk_text^3",
                                            "title^2",
                                            "section_title^1.5",
                                            "abstract^1"
                                        ],
                                        "type": "best_fields",
                                        "fuzziness": "AUTO",
                                        "prefix_length": 2,
                                        "max_expansions": 50
                                    }
                                }
                            ]
                        }
                    },
                    "functions": [],
                    "score_mode": "multiply",
                    "boost_mode": "replace"
                }
            },
            # Optimize for performance
            "_source": ["_id", "chunk_text", "title", "section_title", "abstract", "metadata"],
            "sort": ["_score"],
            "track_total_hits": False  # Disable total hit counting for better performance
        }

        # Add recency boost if enabled
        if boost_recent:
            query["query"]["function_score"]["functions"].append({
                "gauss": {
                    "metadata.publication_date": {
                        "origin": "now",
                        "scale": "365d",
                        "offset": "30d",
                        "decay": 0.5
                    }
                },
                "weight": 1.2
            })

        # Add filters
        if filters:
            query["query"]["function_score"]["query"]["bool"]["filter"] = (
                OptimizedQueryBuilder._build_filters(filters)
            )

        return query

    @staticmethod
    def build_vector_query(
        query_vector: list[float],
        *,
        top_k: int = 10,
        filters: dict[str, Any] | None = None,
        min_score: float = 0.7,
        num_candidates: int = 100  # Larger candidate set for better recall
    ) -> dict[str, Any]:
        """Build optimized vector KNN query."""

        query = {
            "size": top_k,
            "min_score": min_score,
            "query": {
                "knn": {
                    "embedding": {
                        "vector": query_vector,
                        "k": top_k,
                        "num_candidates": num_candidates
                    }
                }
            },
            "_source": ["_id", "chunk_text", "title", "section_title", "abstract", "metadata"],
            "track_total_hits": False
        }

        # Add filters for KNN (must be in filter context)
        if filters:
            query["query"] = {
                "bool": {
                    "must": [query["query"]],
                    "filter": OptimizedQueryBuilder._build_filters(filters)
                }
            }

        return query

    @staticmethod
    def build_hybrid_query(
        query_text: str,
        query_vector: list[float],
        *,
        top_k: int = 10,
        filters: dict[str, Any] | None = None,
        rrf_window_size: int = 50,
        rrf_rank_constant: int = 60
    ) -> dict[str, Any]:
        """Build optimized hybrid query using RRF (Reciprocal Rank Fusion)."""

        # Build separate queries for BM25 and vector
        bm25_query = OptimizedQueryBuilder.build_bm25_query(
            query_text, top_k=rrf_window_size, filters=filters, min_score=0.1
        )

        vector_query = OptimizedQueryBuilder.build_vector_query(
            query_vector, top_k=rrf_window_size, filters=filters, min_score=0.1
        )

        # Combine using RRF
        query = {
            "size": top_k,
            "query": {
                "rrf": {
                    "queries": [bm25_query["query"], vector_query["query"]],
                    "rank_constant": rrf_rank_constant
                }
            },
            "_source": ["_id", "chunk_text", "title", "section_title", "abstract", "metadata"],
            "track_total_hits": False
        }

        return query

    @staticmethod
    def build_aggregation_query(
        query_text: str,
        agg_field: str,
        *,
        size: int = 10,
        filters: dict[str, Any] | None = None
    ) -> dict[str, Any]:
        """Build query with aggregations for analytics."""

        query = {
            "size": 0,  # We only want aggregations
            "query": {
                "multi_match": {
                    "query": query_text,
                    "fields": ["chunk_text", "title", "abstract"]
                }
            },
            "aggs": {
                "top_values": {
                    "terms": {
                        "field": f"{agg_field}.keyword",
                        "size": size,
                        "min_doc_count": 1
                    }
                }
            }
        }

        if filters:
            query["query"] = {
                "bool": {
                    "must": [query["query"]],
                    "filter": OptimizedQueryBuilder._build_filters(filters)
                }
            }

        return query

    @staticmethod
    def _build_filters(filters: dict[str, Any]) -> list[dict[str, Any]]:
        """Build optimized filter clauses."""
        filter_clauses = []

        for field, value in filters.items():
            if isinstance(value, list):
                # Multiple values - use terms query
                filter_clauses.append({
                    "terms": {f"{field}.keyword": value}
                })
            elif isinstance(value, dict):
                # Range query
                if "gte" in value or "lte" in value or "gt" in value or "lt" in value:
                    range_filter = {"range": {field: {}}}
                    for op, val in value.items():
                        if op in ["gte", "lte", "gt", "lt"]:
                            range_filter["range"][field][op] = val
                    filter_clauses.append(range_filter)
                else:
                    # Nested query
                    filter_clauses.append({
                        "nested": {
                            "path": field,
                            "query": {
                                "bool": {
                                    "must": [
                                        {"term": {f"{field}.{k}.keyword": v}}
                                        for k, v in value.items()
                                    ]
                                }
                            }
                        }
                    })
            else:
                # Single value - use term query
                filter_clauses.append({
                    "term": {f"{field}.keyword": value}
                })

        return filter_clauses

    @staticmethod
    def build_suggestion_query(
        text: str,
        *,
        field: str = "chunk_text",
        size: int = 5
    ) -> dict[str, Any]:
        """Build query for spell-check suggestions."""

        return {
            "suggest": {
                "text": text,
                "simple_phrase": {
                    "phrase": {
                        "field": field,
                        "size": size,
                        "gram_size": 3,
                        "direct_generator": [{
                            "field": field,
                            "suggest_mode": "missing"
                        }],
                        "highlight": {
                            "pre_tag": "<em>",
                            "post_tag": "</em>"
                        }
                    }
                }
            }
        }

    @staticmethod
    def build_more_like_this_query(
        doc_id: str,
        *,
        top_k: int = 10,
        min_term_freq: int = 1,
        max_query_terms: int = 25,
        min_doc_freq: int = 2
    ) -> dict[str, Any]:
        """Build More Like This query."""

        return {
            "size": top_k,
            "query": {
                "more_like_this": {
                    "fields": ["chunk_text", "title", "abstract"],
                    "like": [{"_index": "medical_chunks", "_id": doc_id}],
                    "min_term_freq": min_term_freq,
                    "max_query_terms": max_query_terms,
                    "min_doc_freq": min_doc_freq
                }
            },
            "_source": ["_id", "chunk_text", "title", "section_title", "abstract", "metadata"],
            "track_total_hits": False
        }


class QueryCache:
    """Simple query result cache for frequently executed queries."""

    def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
        self.cache: dict[str, dict[str, Any]] = {}
        self.max_size = max_size
        self.ttl_seconds = ttl_seconds

    def get(self, query_hash: str) -> list[dict[str, Any]] | None:
        """Get cached results if not expired."""
        if query_hash in self.cache:
            entry = self.cache[query_hash]
            if datetime.now() - entry["timestamp"] < timedelta(seconds=self.ttl_seconds):
                return entry["results"]
            else:
                del self.cache[query_hash]
        return None

    def set(self, query_hash: str, results: list[dict[str, Any]]) -> None:
        """Cache query results."""
        # Remove oldest entries if cache is full
        if len(self.cache) >= self.max_size:
            oldest_key = min(self.cache.keys(),
                           key=lambda k: self.cache[k]["timestamp"])
            del self.cache[oldest_key]

        self.cache[query_hash] = {
            "results": results,
            "timestamp": datetime.now()
        }

    def clear(self) -> None:
        """Clear the cache."""
        self.cache.clear()

    def get_stats(self) -> dict[str, Any]:
        """Get cache statistics."""
        return {
            "size": len(self.cache),
            "max_size": self.max_size,
            "ttl_seconds": self.ttl_seconds
        }