Spaces:
Sleeping
Sleeping
| """ | |
| Machine Learning-based search utilities using TF-IDF and text similarity. | |
| """ | |
| import re | |
| from typing import List, Tuple, Dict, Any | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import numpy as np | |
| from django.db import connection | |
| from django.db.models import Q, QuerySet, F | |
| from django.contrib.postgres.search import SearchQuery, SearchRank | |
| from .models import Synonym | |
| def normalize_text(text: str) -> str: | |
| """Normalize Vietnamese text for search.""" | |
| if not text: | |
| return "" | |
| # Lowercase and remove extra spaces | |
| text = text.lower().strip() | |
| text = re.sub(r'\s+', ' ', text) | |
| return text | |
| def expand_query_with_synonyms(query: str) -> List[str]: | |
| """Expand query using synonyms from database.""" | |
| query_normalized = normalize_text(query) | |
| expanded = [query_normalized] | |
| try: | |
| # Get all synonyms | |
| synonyms = Synonym.objects.all() | |
| for synonym in synonyms: | |
| keyword = normalize_text(synonym.keyword) | |
| alias = normalize_text(synonym.alias) | |
| # If query contains keyword, add alias | |
| if keyword in query_normalized: | |
| expanded.append(query_normalized.replace(keyword, alias)) | |
| # If query contains alias, add keyword | |
| if alias in query_normalized: | |
| expanded.append(query_normalized.replace(alias, keyword)) | |
| except Exception: | |
| pass # If Synonym table doesn't exist yet | |
| return list(set(expanded)) # Remove duplicates | |
| def create_search_vector(text_fields: List[str]) -> str: | |
| """Create a searchable text vector from multiple fields.""" | |
| return " ".join(str(field) for field in text_fields if field) | |
| def calculate_similarity_scores( | |
| query: str, | |
| documents: List[str], | |
| top_k: int = 20 | |
| ) -> List[Tuple[int, float]]: | |
| """ | |
| Calculate cosine similarity scores between query and documents. | |
| Returns list of (index, score) tuples sorted by score descending. | |
| """ | |
| if not query or not documents: | |
| return [] | |
| # Expand query with synonyms | |
| expanded_queries = expand_query_with_synonyms(query) | |
| # Combine all query variations | |
| all_texts = expanded_queries + documents | |
| try: | |
| # Create TF-IDF vectorizer | |
| vectorizer = TfidfVectorizer( | |
| analyzer='word', | |
| ngram_range=(1, 2), # Unigrams and bigrams | |
| min_df=1, | |
| max_df=0.95, | |
| lowercase=True, | |
| token_pattern=r'\b\w+\b' | |
| ) | |
| # Fit and transform | |
| tfidf_matrix = vectorizer.fit_transform(all_texts) | |
| # Get query vector (average of expanded queries) | |
| query_vectors = tfidf_matrix[:len(expanded_queries)] | |
| query_vector = np.mean(query_vectors.toarray(), axis=0).reshape(1, -1) | |
| # Get document vectors | |
| doc_vectors = tfidf_matrix[len(expanded_queries):] | |
| # Calculate similarities | |
| similarities = cosine_similarity(query_vector, doc_vectors)[0] | |
| # Get top k results with scores | |
| top_indices = np.argsort(similarities)[::-1][:top_k] | |
| results = [(int(idx), float(similarities[idx])) for idx in top_indices if similarities[idx] > 0.0] | |
| return results | |
| except Exception as e: | |
| # Fallback to simple text matching if ML fails | |
| query_lower = normalize_text(query) | |
| results = [] | |
| for idx, doc in enumerate(documents): | |
| doc_lower = normalize_text(doc) | |
| if query_lower in doc_lower: | |
| # Simple score based on position and length | |
| score = 1.0 - (doc_lower.find(query_lower) / max(len(doc_lower), 1)) | |
| results.append((idx, score)) | |
| return sorted(results, key=lambda x: x[1], reverse=True)[:top_k] | |
| def search_with_ml( | |
| queryset: QuerySet, | |
| query: str, | |
| text_fields: List[str], | |
| top_k: int = 20, | |
| min_score: float = 0.1, | |
| use_hybrid: bool = True | |
| ) -> QuerySet: | |
| """ | |
| Search queryset using ML-based similarity scoring. | |
| Args: | |
| queryset: Django QuerySet to search | |
| query: Search query string | |
| text_fields: List of field names to search in | |
| top_k: Maximum number of results | |
| min_score: Minimum similarity score threshold | |
| Returns: | |
| Filtered and ranked QuerySet | |
| """ | |
| if not query: | |
| return queryset[:top_k] | |
| # Try hybrid search if enabled | |
| if use_hybrid: | |
| try: | |
| from .hybrid_search import search_with_hybrid | |
| from .config.hybrid_search_config import get_config | |
| # Determine content type from model | |
| model_name = queryset.model.__name__.lower() | |
| content_type = None | |
| if 'procedure' in model_name: | |
| content_type = 'procedure' | |
| elif 'fine' in model_name: | |
| content_type = 'fine' | |
| elif 'office' in model_name: | |
| content_type = 'office' | |
| elif 'advisory' in model_name: | |
| content_type = 'advisory' | |
| elif 'legalsection' in model_name: | |
| content_type = 'legal' | |
| config = get_config(content_type) | |
| return search_with_hybrid( | |
| queryset, | |
| query, | |
| text_fields, | |
| top_k=top_k, | |
| min_score=min_score, | |
| use_hybrid=True, | |
| bm25_weight=config.bm25_weight, | |
| vector_weight=config.vector_weight | |
| ) | |
| except Exception as e: | |
| print(f"Hybrid search not available, using BM25/TF-IDF: {e}") | |
| # Attempt PostgreSQL BM25 ranking first when available | |
| if connection.vendor == "postgresql" and hasattr(queryset.model, "tsv_body"): | |
| try: | |
| expanded_queries = expand_query_with_synonyms(query) | |
| combined_query = None | |
| for q_variant in expanded_queries: | |
| variant_query = SearchQuery(q_variant, config="simple") | |
| combined_query = variant_query if combined_query is None else combined_query | variant_query | |
| if combined_query is not None: | |
| ranked_qs = ( | |
| queryset | |
| .annotate(rank=SearchRank(F("tsv_body"), combined_query)) | |
| .filter(rank__gt=0) | |
| .order_by("-rank") | |
| ) | |
| results = list(ranked_qs[:top_k]) | |
| if results: | |
| for obj in results: | |
| obj._ml_score = getattr(obj, "rank", 0.0) | |
| return results | |
| except Exception: | |
| # Fall through to ML-based search if any error occurs (e.g. missing extensions) | |
| pass | |
| # Get all objects and create search vectors | |
| all_objects = list(queryset) | |
| if not all_objects: | |
| return queryset.none() | |
| # Create search vectors for each object | |
| documents = [] | |
| for obj in all_objects: | |
| field_values = [getattr(obj, field, "") for field in text_fields] | |
| search_vector = create_search_vector(field_values) | |
| documents.append(search_vector) | |
| # Calculate similarity scores | |
| try: | |
| scored_indices = calculate_similarity_scores(query, documents, top_k=top_k) | |
| # Filter by minimum score and get object IDs | |
| valid_indices = [idx for idx, score in scored_indices if score >= min_score] | |
| # If ML search found results, use them | |
| if valid_indices: | |
| result_objects = [all_objects[idx] for idx in valid_indices] | |
| result_ids = [obj.id for obj in result_objects] | |
| if result_ids: | |
| # Create a mapping of ID to order for sorting | |
| id_to_order = {obj_id: idx for idx, obj_id in enumerate(result_ids)} | |
| # Filter by IDs and sort by the order | |
| filtered = queryset.filter(id__in=result_ids) | |
| # Convert to list, sort by order, then convert back to queryset | |
| result_list = list(filtered) | |
| result_list.sort(key=lambda x: id_to_order.get(x.id, 999)) | |
| # Return limited results - create a new queryset from IDs in order | |
| ordered_ids = [obj.id for obj in result_list[:top_k]] | |
| if ordered_ids: | |
| # Use Case/When for ordering in PostgreSQL | |
| from django.db.models import Case, When, IntegerField | |
| preserved = Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(ordered_ids)], output_field=IntegerField()) | |
| return queryset.filter(id__in=ordered_ids).order_by(preserved) | |
| except Exception as e: | |
| # If ML search fails, fall back to simple search | |
| pass | |
| # Fallback to simple icontains search with exact match prioritization | |
| query_lower = normalize_text(query) | |
| query_words = query_lower.split() | |
| # Extract key phrases (2-3 words) for better matching | |
| key_phrases = [] | |
| for i in range(len(query_words) - 1): | |
| phrase = " ".join(query_words[i:i+2]) | |
| if len(phrase) > 3: | |
| key_phrases.append(phrase) | |
| for i in range(len(query_words) - 2): | |
| phrase = " ".join(query_words[i:i+3]) | |
| if len(phrase) > 5: | |
| key_phrases.append(phrase) | |
| # Try to find exact phrase matches first | |
| exact_matches = [] | |
| primary_field = text_fields[0] if text_fields else None | |
| if primary_field: | |
| for phrase in key_phrases: | |
| filter_kwargs = {f"{primary_field}__icontains": phrase} | |
| matches = list(queryset.filter(**filter_kwargs)[:top_k]) | |
| exact_matches.extend(matches) | |
| # If we found exact matches, prioritize them | |
| if exact_matches: | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_matches = [] | |
| for obj in exact_matches: | |
| if obj.id not in seen: | |
| seen.add(obj.id) | |
| unique_matches.append(obj) | |
| return unique_matches[:top_k] | |
| # Fallback to simple icontains search | |
| q_objects = Q() | |
| for field in text_fields: | |
| q_objects |= Q(**{f"{field}__icontains": query}) | |
| return queryset.filter(q_objects)[:top_k] | |