import gradio as gr import pandas as pd import numpy as np from datasets import load_dataset from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from rank_bm25 import BM25Okapi import re from typing import List, Tuple, Dict import threading import time # Configuration DATASET_NAME = "hoololi/AI_Act_with_embeddings" EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2" TOP_K = 5 class AIActSearchEngine: def __init__(self): self.dataset = None self.embedding_model = None self.tfidf_vectorizer = None self.tfidf_matrix = None self.bm25_model = None self.processed_docs = None self.load_data() self.setup_models() def load_data(self): """Load dataset from Hugging Face""" print("Loading dataset...") dataset = load_dataset(DATASET_NAME, split="train") self.dataset = dataset.to_pandas() print(f"Dataset loaded: {len(self.dataset)} articles") def setup_models(self): """Initialize models and vectorizers""" print("Initializing models...") # Embedding model self.embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME) # TF-IDF self.tfidf_vectorizer = TfidfVectorizer( max_features=10000, stop_words='english', lowercase=True, ngram_range=(1, 2) ) self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.dataset['processed_content']) # BM25 (using optimized library) self.setup_bm25() print("Models initialized successfully!") def setup_bm25(self): """Setup BM25 using optimized library""" print("Setting up BM25...") # Tokenize documents for BM25 self.processed_docs = [doc.split() for doc in self.dataset['processed_content']] # Create BM25 model (much faster than manual implementation) self.bm25_model = BM25Okapi(self.processed_docs) print("BM25 setup complete!") def search_tfidf(self, query: str) -> List[Tuple[str, str, float]]: """TF-IDF search""" query_vector = self.tfidf_vectorizer.transform([query]) similarities = cosine_similarity(query_vector, self.tfidf_matrix).flatten() # Create a list of (score, index) for stable sorting scored_docs = [(similarities[i], i) for i in range(len(similarities)) if similarities[i] > 0] # Sort by descending score, then by ascending index for tie-breaking scored_docs.sort(key=lambda x: (-x[0], x[1])) # Take top K and deduplicate seen_articles = set() results = [] for score, idx in scored_docs: article_num = self.dataset.iloc[idx]['article_number'] if article_num not in seen_articles and len(results) < TOP_K: seen_articles.add(article_num) results.append(( article_num, self.dataset.iloc[idx]['article_content'], score )) return results def search_bm25(self, query: str) -> List[Tuple[str, str, float]]: """BM25 search using optimized library""" # Tokenize query query_tokens = query.lower().split() # Get BM25 scores (much faster!) scores = self.bm25_model.get_scores(query_tokens) # Create scored documents list scored_docs = [(scores[i], i) for i in range(len(scores)) if scores[i] > 0] # Sort by descending score, then by ascending index for tie-breaking scored_docs.sort(key=lambda x: (-x[0], x[1])) # Take top K and deduplicate seen_articles = set() results = [] for score, idx in scored_docs: article_num = self.dataset.iloc[idx]['article_number'] if article_num not in seen_articles and len(results) < TOP_K: seen_articles.add(article_num) results.append(( article_num, self.dataset.iloc[idx]['article_content'], score )) return results def search_embeddings(self, query: str) -> List[Tuple[str, str, float]]: """Embedding similarity search""" # Encode the query query_embedding = self.embedding_model.encode([query]) # Get stored embeddings stored_embeddings = np.array(self.dataset['embedding'].tolist()) # Calculate cosine similarity similarities = cosine_similarity(query_embedding, stored_embeddings).flatten() # Create a list of (score, index) for stable sorting scored_docs = [(similarities[i], i) for i in range(len(similarities))] # Sort by descending score, then by ascending index for tie-breaking scored_docs.sort(key=lambda x: (-x[0], x[1])) # Take top K and deduplicate seen_articles = set() results = [] for score, idx in scored_docs: article_num = self.dataset.iloc[idx]['article_number'] if article_num not in seen_articles and len(results) < TOP_K: seen_articles.add(article_num) results.append(( article_num, self.dataset.iloc[idx]['article_content'], score )) return results def search_all(self, query: str) -> Dict[str, List[Tuple[str, str, float]]]: """Perform all searches""" if not query.strip(): return { 'tfidf': [], 'bm25': [], 'embeddings': [] } return { 'tfidf': self.search_tfidf(query), 'bm25': self.search_bm25(query), 'embeddings': self.search_embeddings(query) } def format_results_table(results: List[Tuple[str, str, float]], search_type: str, highlight_articles: set) -> str: """Format results as HTML table""" if not results: return f"""
No results found
| Score | Article | Content |
|---|---|---|
| {score:.3f} | {article_num} |
Enter a query to start searching
Searching...