Spaces:
Runtime error
Runtime error
| from src.pytorch_modules.datasets.schema_string_dataset import SchemaStringDataset | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| import faiss | |
| import torch | |
| class UtilsSearch: | |
| def __init__(self, config): | |
| self.config = config | |
| def dataframe_to_index(df): | |
| embeddings = np.stack(df['embeddings'].to_numpy()) | |
| norm_embeddings = np.ascontiguousarray(embeddings / np.linalg.norm(embeddings, axis=1)[:, None]) | |
| # Create a FAISS index (Step 2, unchanged but using normalized embeddings) | |
| dimension = norm_embeddings.shape[1] | |
| index = faiss.IndexFlatL2(dimension) | |
| index.add(norm_embeddings) | |
| return index # Ad | |
| def retrieve(self, query, df, model, index, top_k=100, api=False): | |
| query += "Represent this sentence for searching relevant passages: " | |
| """ | |
| Search the DataFrame for the given query and return a sorted DataFrame based on similarity. | |
| :param query: The search query string. | |
| :param df: The input DataFrame containing embeddings. | |
| :param model: The model to encode the query and compute embeddings. | |
| :param index: The search index for querying. | |
| :param top_k: The number of top results to return. | |
| :return: A new DataFrame sorted by similarity to the query, with a 'similarities' column. | |
| """ | |
| # Check if CUDA is available and set the device accordingly | |
| if not api: | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| # Compute the query embedding | |
| query_vector = model.encode(query, convert_to_tensor=True, device=device).cpu().numpy() | |
| query_vector /= np.linalg.norm(query_vector) | |
| else: | |
| res = model.embeddings( | |
| input=[query], | |
| model=self.config["sentence_transformer_name"], | |
| prompt=None, | |
| ) | |
| query_vector = np.array([entry.embedding for entry in res.data][0]).astype(np.float32) | |
| # Normalize the query vector | |
| # Perform the search | |
| distances, indices = index.search(np.array([query_vector]), top_k) | |
| # Retrieve the rows from the DataFrame corresponding to the indices | |
| retrieved_df = df.iloc[indices[0]] | |
| # Attach the distances as a new column named 'similarities' | |
| # Ensure the distances array matches the size of the retrieved DataFrame, especially if using slicing or other operations that might change its shape | |
| retrieved_df = retrieved_df.assign(similarities=distances[0]) | |
| if 'similarities' in retrieved_df.columns: | |
| retrieved_df = retrieved_df.sort_values(by='similarities', ascending=True) | |
| # Optionally, you might want to reset the index if the order matters or if you need to serialize the DataFrame without index issues | |
| retrieved_df = retrieved_df.reset_index(drop=True) | |
| return retrieved_df | |
| def rerank(self, query, df_top_100, cross_encoder, index): | |
| # Convert the top 5 records to a list of dictionaries for processing | |
| # print(df_top_100) | |
| config = self.config | |
| df_copy = df_top_100.copy().reset_index(drop=True) | |
| records = df_copy.to_dict(orient='records')[:100] | |
| # Assuming SchemaStringDataset can handle GPU data | |
| dataset_str = SchemaStringDataset(records, config) | |
| # Extract documents from dataset | |
| documents = [batch["inputs"][:256] for batch in dataset_str] | |
| # Rank documents based on the query | |
| # Ensure data processed by cross_encoder is moved to the correct device | |
| ids = [item["corpus_id"] for item in cross_encoder.rank(query, documents, top_k=10)] | |
| # Use the ids to filter and reorder the original DataFrame | |
| df_sorted_by_relevance = df_copy.loc[ids] | |
| return df_sorted_by_relevance | |
| def search(self, query, df, model, cross_encoder, index): | |
| sorted_df = self.retrieve(query, df, model, index) | |
| return self.rerank(query, sorted_df, cross_encoder, index) | |
| def top_10_common_values(df, column_name): | |
| """ | |
| This function takes a pandas dataframe and a column name, | |
| and returns the top 10 most common non-null values of that column as a list. | |
| """ | |
| # Drop null values from the specified column and count occurrences of each value | |
| # Convert the index of the resulting Series (which contains the values) to a list | |
| value_counts_list = df[column_name].dropna().value_counts().head(10).index.tolist() | |
| return value_counts_list | |
| def filter_dataframe(df, config, top_k_programmatic=100): | |
| """ | |
| Filters a DataFrame based on scalar and discrete column configurations, with type handling and null filtering. | |
| Parameters: | |
| - df: pandas.DataFrame to filter. | |
| - config: Dictionary containing 'scalar_columns' and 'discrete_columns' configurations. | |
| Returns: | |
| - Filtered pandas.DataFrame. | |
| """ | |
| scalar_columns = config.get('scalar_columns', []) | |
| discrete_columns = config.get('discrete_columns', []) | |
| # Combine all column names to check for nulls | |
| all_columns = [col["column_name"] for col in scalar_columns] + [col["column_name"] for col in discrete_columns] | |
| # Drop rows where any of the specified columns have null values | |
| df = df.dropna(subset=all_columns) | |
| # Filtering based on scalar columns | |
| for col in scalar_columns: | |
| column_name = col["column_name"] | |
| # Ensure min_value and max_value are of numeric type | |
| min_value = float(col["min_value"]) | |
| max_value = float(col["max_value"]) | |
| # Convert the DataFrame column to numeric type to avoid comparison issues | |
| df[column_name] = pd.to_numeric(df[column_name], errors='coerce') | |
| df = df[df[column_name].between(min_value, max_value)] | |
| # Filtering based on discrete columns | |
| for col in discrete_columns: | |
| column_name = col["column_name"] | |
| default_values = col["default_values"] | |
| if len(default_values) > 0: | |
| df = df[df[column_name].isin(default_values)] | |
| if 'similarities' in df.columns: | |
| df = df.sort_values(by='similarities', ascending=False) | |
| # Return the top 100 items with the highest similarity | |
| return df | |
| def drop_columns(df, config): | |
| columns_to_drop = config.get('columns_to_drop', []) | |
| df_dropped = df.drop(columns_to_drop, axis=1) | |
| return df_dropped | |