Spaces:
Sleeping
Sleeping
| """ | |
| Hybrid search combining BM25 and vector similarity. | |
| """ | |
| from typing import List, Tuple, Optional, Dict, Any | |
| import numpy as np | |
| from django.db import connection | |
| from django.db.models import QuerySet, F | |
| from django.contrib.postgres.search import SearchQuery, SearchRank | |
| from .embeddings import ( | |
| get_embedding_model, | |
| generate_embedding, | |
| cosine_similarity | |
| ) | |
| from .embedding_utils import load_embedding | |
| from .search_ml import expand_query_with_synonyms | |
| # Default weights for hybrid search | |
| DEFAULT_BM25_WEIGHT = 0.4 | |
| DEFAULT_VECTOR_WEIGHT = 0.6 | |
| # Minimum scores | |
| DEFAULT_MIN_BM25_SCORE = 0.0 | |
| DEFAULT_MIN_VECTOR_SCORE = 0.1 | |
| def calculate_exact_match_boost(obj: Any, query: str, text_fields: List[str]) -> float: | |
| """ | |
| Calculate boost score for exact keyword matches in title/name fields. | |
| Args: | |
| obj: Django model instance. | |
| query: Search query string. | |
| text_fields: List of field names to check (first 2 are usually title/name). | |
| Returns: | |
| Boost score (0.0 to 1.0). | |
| """ | |
| if not query or not text_fields: | |
| return 0.0 | |
| query_lower = query.lower().strip() | |
| # Extract key phrases (2-3 word combinations) from query | |
| query_words = query_lower.split() | |
| key_phrases = [] | |
| for i in range(len(query_words) - 1): | |
| phrase = " ".join(query_words[i:i+2]) | |
| if len(phrase) > 3: | |
| key_phrases.append(phrase) | |
| for i in range(len(query_words) - 2): | |
| phrase = " ".join(query_words[i:i+3]) | |
| if len(phrase) > 5: | |
| key_phrases.append(phrase) | |
| # Also add individual words (longer than 2 chars) | |
| query_words_set = set(word for word in query_words if len(word) > 2) | |
| boost = 0.0 | |
| # Check primary fields (title, name) for exact matches | |
| # First 2 fields are usually title/name | |
| for field in text_fields[:2]: | |
| if hasattr(obj, field): | |
| field_value = str(getattr(obj, field, "")).lower() | |
| if field_value: | |
| # Check for key phrases first (highest priority) | |
| for phrase in key_phrases: | |
| if phrase in field_value: | |
| # Major boost for phrase match | |
| boost += 0.5 | |
| # Extra boost if it's the exact field value | |
| if field_value.strip() == phrase.strip(): | |
| boost += 0.3 | |
| # Check for full query match | |
| if query_lower in field_value: | |
| boost += 0.4 | |
| # Count matched individual words | |
| matched_words = sum(1 for word in query_words_set if word in field_value) | |
| if matched_words > 0: | |
| # Moderate boost for word matches | |
| boost += 0.1 * min(matched_words, 3) # Cap at 3 words | |
| return min(boost, 1.0) # Cap at 1.0 for very strong matches | |
| def get_bm25_scores( | |
| queryset: QuerySet, | |
| query: str, | |
| top_k: int = 20 | |
| ) -> List[Tuple[Any, float]]: | |
| """ | |
| Get BM25 scores for queryset. | |
| Args: | |
| queryset: Django QuerySet to search. | |
| query: Search query string. | |
| top_k: Maximum number of results. | |
| Returns: | |
| List of (object, bm25_score) tuples. | |
| """ | |
| if not query or connection.vendor != "postgresql": | |
| return [] | |
| if not hasattr(queryset.model, "tsv_body"): | |
| return [] | |
| try: | |
| expanded_queries = expand_query_with_synonyms(query) | |
| combined_query = None | |
| for q_variant in expanded_queries: | |
| variant_query = SearchQuery(q_variant, config="simple") | |
| combined_query = variant_query if combined_query is None else combined_query | variant_query | |
| if combined_query is not None: | |
| ranked_qs = ( | |
| queryset | |
| .annotate(rank=SearchRank(F("tsv_body"), combined_query)) | |
| .filter(rank__gt=DEFAULT_MIN_BM25_SCORE) | |
| .order_by("-rank") | |
| ) | |
| results = list(ranked_qs[:top_k * 2]) # Get more for hybrid ranking | |
| return [(obj, float(getattr(obj, "rank", 0.0))) for obj in results] | |
| except Exception as e: | |
| print(f"Error in BM25 search: {e}") | |
| return [] | |
| def get_vector_scores( | |
| queryset: QuerySet, | |
| query: str, | |
| top_k: int = 20 | |
| ) -> List[Tuple[Any, float]]: | |
| """ | |
| Get vector similarity scores for queryset. | |
| Args: | |
| queryset: Django QuerySet to search. | |
| query: Search query string. | |
| top_k: Maximum number of results. | |
| Returns: | |
| List of (object, vector_score) tuples. | |
| """ | |
| if not query: | |
| return [] | |
| # Generate query embedding | |
| model = get_embedding_model() | |
| if model is None: | |
| return [] | |
| query_embedding = generate_embedding(query, model=model) | |
| if query_embedding is None: | |
| return [] | |
| # Get all objects with embeddings | |
| all_objects = list(queryset) | |
| if not all_objects: | |
| return [] | |
| # Check dimension compatibility first | |
| query_dim = len(query_embedding) | |
| dimension_mismatch = False | |
| # Calculate similarities | |
| scores = [] | |
| for obj in all_objects: | |
| obj_embedding = load_embedding(obj) | |
| if obj_embedding is not None: | |
| obj_dim = len(obj_embedding) | |
| if obj_dim != query_dim: | |
| # Dimension mismatch - skip vector search for this object | |
| if not dimension_mismatch: | |
| print(f"⚠️ Dimension mismatch: query={query_dim}, stored={obj_dim}. Skipping vector search.") | |
| dimension_mismatch = True | |
| continue | |
| similarity = cosine_similarity(query_embedding, obj_embedding) | |
| if similarity >= DEFAULT_MIN_VECTOR_SCORE: | |
| scores.append((obj, similarity)) | |
| # If dimension mismatch detected, return empty to fall back to BM25 + exact match | |
| if dimension_mismatch and not scores: | |
| return [] | |
| # Sort by score descending | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| return scores[:top_k * 2] # Get more for hybrid ranking | |
| def normalize_scores(scores: List[Tuple[Any, float]]) -> Dict[Any, float]: | |
| """ | |
| Normalize scores to 0-1 range. | |
| Args: | |
| scores: List of (object, score) tuples. | |
| Returns: | |
| Dictionary mapping object to normalized score. | |
| """ | |
| if not scores: | |
| return {} | |
| max_score = max(score for _, score in scores) if scores else 1.0 | |
| min_score = min(score for _, score in scores) if scores else 0.0 | |
| if max_score == min_score: | |
| # All scores are the same, return uniform distribution | |
| return {obj: 1.0 for obj, _ in scores} | |
| # Normalize to 0-1 | |
| normalized = {} | |
| for obj, score in scores: | |
| normalized[obj] = (score - min_score) / (max_score - min_score) | |
| return normalized | |
| def hybrid_search( | |
| queryset: QuerySet, | |
| query: str, | |
| top_k: int = 20, | |
| bm25_weight: float = DEFAULT_BM25_WEIGHT, | |
| vector_weight: float = DEFAULT_VECTOR_WEIGHT, | |
| min_hybrid_score: float = 0.1, | |
| text_fields: Optional[List[str]] = None | |
| ) -> List[Any]: | |
| """ | |
| Perform hybrid search combining BM25 and vector similarity. | |
| Args: | |
| queryset: Django QuerySet to search. | |
| query: Search query string. | |
| top_k: Maximum number of results. | |
| bm25_weight: Weight for BM25 score (0-1). | |
| vector_weight: Weight for vector score (0-1). | |
| min_hybrid_score: Minimum combined score threshold. | |
| text_fields: List of field names for exact match boost (optional). | |
| Returns: | |
| List of objects sorted by hybrid score. | |
| """ | |
| if not query: | |
| return list(queryset[:top_k]) | |
| # Normalize weights | |
| total_weight = bm25_weight + vector_weight | |
| if total_weight > 0: | |
| bm25_weight = bm25_weight / total_weight | |
| vector_weight = vector_weight / total_weight | |
| else: | |
| bm25_weight = 0.5 | |
| vector_weight = 0.5 | |
| # Get BM25 scores | |
| bm25_results = get_bm25_scores(queryset, query, top_k=top_k) | |
| bm25_scores = normalize_scores(bm25_results) | |
| # Get vector scores | |
| vector_results = get_vector_scores(queryset, query, top_k=top_k) | |
| vector_scores = normalize_scores(vector_results) | |
| # Combine scores | |
| combined_scores = {} | |
| all_objects = set() | |
| # Add BM25 objects | |
| for obj, _ in bm25_results: | |
| all_objects.add(obj) | |
| combined_scores[obj] = bm25_scores.get(obj, 0.0) * bm25_weight | |
| # Add vector objects | |
| for obj, _ in vector_results: | |
| all_objects.add(obj) | |
| if obj in combined_scores: | |
| combined_scores[obj] += vector_scores.get(obj, 0.0) * vector_weight | |
| else: | |
| combined_scores[obj] = vector_scores.get(obj, 0.0) * vector_weight | |
| # CRITICAL: Find exact matches FIRST using icontains, then apply boost | |
| # This ensures exact matches are always found and prioritized | |
| if text_fields: | |
| query_lower = query.lower() | |
| # Extract key phrases (2-word and 3-word) from query | |
| query_words = query_lower.split() | |
| key_phrases = [] | |
| # 2-word phrases | |
| for i in range(len(query_words) - 1): | |
| phrase = " ".join(query_words[i:i+2]) | |
| if len(phrase) > 3: | |
| key_phrases.append(phrase) | |
| # 3-word phrases | |
| for i in range(len(query_words) - 2): | |
| phrase = " ".join(query_words[i:i+3]) | |
| if len(phrase) > 5: | |
| key_phrases.append(phrase) | |
| # Find potential exact matches using icontains on name/title field | |
| # This ensures we don't miss exact matches even if BM25/vector don't find them | |
| exact_match_candidates = set() | |
| primary_field = text_fields[0] if text_fields else "name" | |
| if hasattr(queryset.model, primary_field): | |
| # Search for key phrases in the primary field | |
| for phrase in key_phrases: | |
| filter_kwargs = {f"{primary_field}__icontains": phrase} | |
| candidates = queryset.filter(**filter_kwargs)[:top_k * 2] | |
| exact_match_candidates.update(candidates) | |
| # Apply exact match boost to all candidates | |
| for obj in exact_match_candidates: | |
| if obj not in all_objects: | |
| all_objects.add(obj) | |
| combined_scores[obj] = 0.0 | |
| # Apply exact match boost (this should dominate) | |
| boost = calculate_exact_match_boost(obj, query, text_fields) | |
| if boost > 0: | |
| # Exact match boost should dominate - set it high | |
| combined_scores[obj] = max(combined_scores.get(obj, 0.0), boost) | |
| # Also check objects already in results for exact matches | |
| for obj in list(all_objects): | |
| boost = calculate_exact_match_boost(obj, query, text_fields) | |
| if boost > 0: | |
| # Boost existing scores | |
| combined_scores[obj] = max(combined_scores.get(obj, 0.0), boost) | |
| # Filter by minimum score and sort | |
| filtered_scores = [ | |
| (obj, score) for obj, score in combined_scores.items() | |
| if score >= min_hybrid_score | |
| ] | |
| filtered_scores.sort(key=lambda x: x[1], reverse=True) | |
| # Return top k | |
| results = [obj for obj, _ in filtered_scores[:top_k]] | |
| # Store hybrid score on objects for reference | |
| for obj, score in filtered_scores[:top_k]: | |
| obj._hybrid_score = score | |
| obj._bm25_score = bm25_scores.get(obj, 0.0) | |
| obj._vector_score = vector_scores.get(obj, 0.0) | |
| # Store exact match boost if applied | |
| if text_fields: | |
| obj._exact_match_boost = calculate_exact_match_boost(obj, query, text_fields) | |
| else: | |
| obj._exact_match_boost = 0.0 | |
| return results | |
| def semantic_query_expansion(query: str, top_n: int = 3) -> List[str]: | |
| """ | |
| Expand query with semantically similar terms using embeddings. | |
| Args: | |
| query: Original query string. | |
| top_n: Number of similar terms to add. | |
| Returns: | |
| List of expanded query variations. | |
| """ | |
| try: | |
| from hue_portal.chatbot.query_expansion import expand_query_semantically | |
| return expand_query_semantically(query, context=None) | |
| except Exception: | |
| # Fallback to basic synonym expansion | |
| return expand_query_with_synonyms(query) | |
| def rerank_results(query: str, results: List[Any], text_fields: List[str], top_k: int = 5) -> List[Any]: | |
| """ | |
| Rerank results using cross-encoder approach (recalculate similarity with query). | |
| Args: | |
| query: Search query. | |
| results: List of result objects. | |
| text_fields: List of field names to use for reranking. | |
| top_k: Number of top results to return. | |
| Returns: | |
| Reranked list of results. | |
| """ | |
| if not results or not query: | |
| return results[:top_k] | |
| try: | |
| # Generate query embedding | |
| model = get_embedding_model() | |
| if model is None: | |
| return results[:top_k] | |
| query_embedding = generate_embedding(query, model=model) | |
| if query_embedding is None: | |
| return results[:top_k] | |
| # Calculate similarity for each result | |
| scored_results = [] | |
| for obj in results: | |
| # Create text representation from text_fields | |
| text_parts = [] | |
| for field in text_fields: | |
| if hasattr(obj, field): | |
| value = getattr(obj, field, "") | |
| if value: | |
| text_parts.append(str(value)) | |
| if not text_parts: | |
| continue | |
| obj_text = " ".join(text_parts) | |
| obj_embedding = generate_embedding(obj_text, model=model) | |
| if obj_embedding is not None: | |
| similarity = cosine_similarity(query_embedding, obj_embedding) | |
| scored_results.append((obj, similarity)) | |
| # Sort by similarity and return top_k | |
| scored_results.sort(key=lambda x: x[1], reverse=True) | |
| return [obj for obj, _ in scored_results[:top_k]] | |
| except Exception as e: | |
| print(f"Error in reranking: {e}") | |
| return results[:top_k] | |
| def diversify_results(results: List[Any], top_k: int = 5, similarity_threshold: float = 0.8) -> List[Any]: | |
| """ | |
| Ensure diversity in results by removing very similar items. | |
| Args: | |
| results: List of result objects. | |
| top_k: Number of results to return. | |
| similarity_threshold: Maximum similarity allowed between results. | |
| Returns: | |
| Diversified list of results. | |
| """ | |
| if len(results) <= top_k: | |
| return results | |
| try: | |
| model = get_embedding_model() | |
| if model is None: | |
| return results[:top_k] | |
| # Generate embeddings for all results | |
| result_embeddings = [] | |
| valid_results = [] | |
| for obj in results: | |
| # Try to get embedding from object | |
| obj_embedding = load_embedding(obj) | |
| if obj_embedding is not None: | |
| result_embeddings.append(obj_embedding) | |
| valid_results.append(obj) | |
| if len(valid_results) <= top_k: | |
| return valid_results | |
| # Select diverse results using Maximal Marginal Relevance (MMR) | |
| selected = [valid_results[0]] # Always include first (highest score) | |
| selected_indices = {0} | |
| selected_embeddings = [result_embeddings[0]] | |
| for _ in range(min(top_k - 1, len(valid_results) - 1)): | |
| best_score = -1 | |
| best_idx = -1 | |
| for i, (obj, emb) in enumerate(zip(valid_results, result_embeddings)): | |
| if i in selected_indices: | |
| continue | |
| # Calculate max similarity to already selected results | |
| max_sim = 0.0 | |
| for sel_emb in selected_embeddings: | |
| sim = cosine_similarity(emb, sel_emb) | |
| max_sim = max(max_sim, sim) | |
| # Score: prefer results with lower similarity to selected ones | |
| score = 1.0 - max_sim | |
| if score > best_score: | |
| best_score = score | |
| best_idx = i | |
| if best_idx >= 0: | |
| selected.append(valid_results[best_idx]) | |
| selected_indices.add(best_idx) | |
| selected_embeddings.append(result_embeddings[best_idx]) | |
| return selected | |
| except Exception as e: | |
| print(f"Error in diversifying results: {e}") | |
| return results[:top_k] | |
| def search_with_hybrid( | |
| queryset: QuerySet, | |
| query: str, | |
| text_fields: List[str], | |
| top_k: int = 20, | |
| min_score: float = 0.1, | |
| use_hybrid: bool = True, | |
| bm25_weight: float = DEFAULT_BM25_WEIGHT, | |
| vector_weight: float = DEFAULT_VECTOR_WEIGHT, | |
| use_reranking: bool = False, | |
| use_diversification: bool = False | |
| ) -> QuerySet: | |
| """ | |
| Search with hybrid BM25 + vector, with fallback to BM25-only or TF-IDF. | |
| Args: | |
| queryset: Django QuerySet to search. | |
| query: Search query string. | |
| text_fields: List of field names (for fallback). | |
| top_k: Maximum number of results. | |
| min_score: Minimum score threshold. | |
| use_hybrid: Whether to use hybrid search. | |
| bm25_weight: Weight for BM25 in hybrid search. | |
| vector_weight: Weight for vector in hybrid search. | |
| Returns: | |
| Filtered and ranked QuerySet. | |
| """ | |
| if not query: | |
| return queryset[:top_k] | |
| # Try hybrid search if enabled | |
| if use_hybrid: | |
| try: | |
| hybrid_results = hybrid_search( | |
| queryset, | |
| query, | |
| top_k=top_k, | |
| bm25_weight=bm25_weight, | |
| vector_weight=vector_weight, | |
| min_hybrid_score=min_score, | |
| text_fields=text_fields | |
| ) | |
| if hybrid_results: | |
| # Apply reranking if enabled | |
| if use_reranking and len(hybrid_results) > top_k: | |
| hybrid_results = rerank_results(query, hybrid_results, text_fields, top_k=top_k * 2) | |
| # Apply diversification if enabled | |
| if use_diversification: | |
| hybrid_results = diversify_results(hybrid_results, top_k=top_k) | |
| # Convert to QuerySet with preserved order | |
| result_ids = [obj.id for obj in hybrid_results[:top_k]] | |
| if result_ids: | |
| from django.db.models import Case, When, IntegerField | |
| preserved = Case( | |
| *[When(pk=pk, then=pos) for pos, pk in enumerate(result_ids)], | |
| output_field=IntegerField() | |
| ) | |
| return queryset.filter(id__in=result_ids).order_by(preserved) | |
| except Exception as e: | |
| print(f"Hybrid search failed, falling back: {e}") | |
| # Fallback to BM25-only | |
| if connection.vendor == "postgresql" and hasattr(queryset.model, "tsv_body"): | |
| try: | |
| expanded_queries = expand_query_with_synonyms(query) | |
| combined_query = None | |
| for q_variant in expanded_queries: | |
| variant_query = SearchQuery(q_variant, config="simple") | |
| combined_query = variant_query if combined_query is None else combined_query | variant_query | |
| if combined_query is not None: | |
| ranked_qs = ( | |
| queryset | |
| .annotate(rank=SearchRank(F("tsv_body"), combined_query)) | |
| .filter(rank__gt=0) | |
| .order_by("-rank") | |
| ) | |
| results = list(ranked_qs[:top_k]) | |
| if results: | |
| for obj in results: | |
| obj._ml_score = getattr(obj, "rank", 0.0) | |
| return results | |
| except Exception: | |
| pass | |
| # Final fallback: import and use original search_with_ml | |
| from .search_ml import search_with_ml | |
| return search_with_ml(queryset, query, text_fields, top_k=top_k, min_score=min_score) | |