Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import re | |
| import numpy as np | |
| import importlib | |
| from pandas import json_normalize | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| import torch.nn.functional as F | |
| from pandas import json_normalize | |
| ### Parameters not expected to be changed in every run | |
| # columns to use for embeddings on table 1 | |
| columns_embeddings_col1 = ['Indicator Name'] | |
| # columns to use for embeddings on table 2 | |
| columns_embeddings_col2 = ['Indicator name (leonardo)'] | |
| #### Functions | |
| from numpy.linalg import norm | |
| print("Functions loaded") | |
| # Define cosine similarity function | |
| cos_sim = lambda a, b: (a @ b.T) / (norm(a) * norm(b)) | |
| def concatenate_columns(df, columns): | |
| # Check if all specified columns exist in the DataFrame | |
| if not all(col in df.columns for col in columns): | |
| raise ValueError("One or more specified columns do not exist in the DataFrame") | |
| # Concatenate the specified columns with a period as the separator | |
| df['concatenated_input'] = df[columns].astype(str).agg('.'.join, axis=1) | |
| return df | |
| # Define the function for mean pooling | |
| def mean_pooling(model_output, attention_mask): | |
| token_embeddings = model_output[0] # First element of model_output contains last hidden states | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) | |
| sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
| return sum_embeddings / sum_mask | |
| # Define your get_embbedings function | |
| def get_embbedings(table, colname): | |
| # Initialize tokenizer and model | |
| # Load model from HuggingFace Hub | |
| tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2') | |
| model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2') | |
| # Tokenize sentences | |
| encoded_input = tokenizer(table[colname].tolist(), padding=True, truncation=True, return_tensors='pt') | |
| # Compute token embeddings | |
| with torch.no_grad(): | |
| model_output = model(**encoded_input) | |
| # Perform pooling | |
| sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask']) | |
| # Normalize embeddings | |
| sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) | |
| return sentence_embeddings | |
| # Process similarity | |
| def process_similarity_results(table1, table2,columns_embeddings_col1,columns_embeddings_col2, harmonization=True): | |
| if 'Indicator ID' in table1.columns: | |
| table1['ID'] = table1['Indicator ID'].astype(str) | |
| else: | |
| table1['ID'] = table1['ID'].astype(str) | |
| if 'Indicator ID' in table2.columns: | |
| table2['ID'] = table2['Indicator ID'].astype(str) | |
| else: | |
| table2['ID'] = table2['ID'].astype(str) | |
| print(columns_embeddings_col1) | |
| table1 = concatenate_columns(table1, columns= columns_embeddings_col1) | |
| table2 = concatenate_columns(table2,columns= columns_embeddings_col2) | |
| embeddings1 = get_embbedings(table1, 'concatenated_input') | |
| embeddings2 = get_embbedings(table2,'concatenated_input') | |
| # Calculate cosine similarity between the embeddings | |
| similarities = cos_sim(embeddings1, embeddings2) | |
| # Create a DataFrame for the similarities | |
| result_df = pd.DataFrame(similarities, | |
| columns=table2['ID'], | |
| index=table1['ID']) | |
| if harmonization: | |
| # Mapping frameworks | |
| table1_sel_map = table1.set_index('ID')['Framework'].to_dict() | |
| table2_sel_map = table2.set_index('ID')['Framework'].to_dict() | |
| # Function to check if there is any common framework element | |
| def has_common_framework(table1_framework, table2_framework): | |
| table1_frameworks = set(table1_framework.split(', ')) | |
| table2_frameworks = set(table2_framework.split(', ')) | |
| return not table1_frameworks.isdisjoint(table2_frameworks) | |
| # Replace similarity values with NaN where the frameworks match | |
| for table1_id, table1_framework in table1_sel_map.items(): | |
| for table2_id in result_df.columns: | |
| table2_framework = table2_sel_map.get(table2_id) | |
| if pd.notna(table2_framework) and pd.notna(table1_framework): | |
| if has_common_framework(table1_framework, table2_framework): | |
| result_df.loc[table1_id, table2_id] = np.nan | |
| # Function to return the column names of the top 5 values for each row | |
| def top_5_column(row): | |
| # Find the top 5 values in the row | |
| top_5_values = row.nlargest(5) | |
| # Return the column names corresponding to these values | |
| return top_5_values.index.tolist() | |
| # Convert all columns to numeric data types, coercing non-convertible values to NaN | |
| #result_df = result_df.iloc[:,1:].apply(pd.to_numeric, errors='coerce') | |
| # Get the list of non-numeric columns | |
| #non_numeric_columns = result_df.columns[result_df.dtypes == 'object'] | |
| # Apply the function to each row of the DataFrame, excluding non-numeric columns | |
| result_df['Top 5 Column ID'] = result_df.apply(lambda row: top_5_column(row), axis=1) | |
| # Create a dictionary for fast lookup | |
| id_to_name = dict(zip(table2['ID'], table2['Indicator name (leonardo)'])) | |
| # Function to map IDs to names | |
| def map_ids_to_names(id_list): | |
| return [id_to_name.get(id, "ID") for id in id_list] | |
| # Apply the function to the 'Top 5 Column ID' column | |
| result_df['Top 5 Names'] = result_df['Top 5 Column ID'].apply(map_ids_to_names) | |
| # Ensure all entries are lists and have at least 5 elements, filling missing values with None | |
| result_df['Top 5 Names'] = result_df['Top 5 Names'].apply(lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5)) | |
| # Convert list in 'Top 5 Names' to separate columns | |
| new_cols = pd.DataFrame(result_df['Top 5 Names'].tolist(), index=result_df.index, columns=["top1name", "top2name", "top3name", "top4name", "top5name"]) | |
| result_df = result_df.join(new_cols) | |
| # Ensure all entries are lists and have exactly 5 elements, filling missing values with None | |
| result_df['Top 5 Column ID'] = result_df['Top 5 Column ID'].apply( | |
| lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5) | |
| ) | |
| # Convert list in 'Top 5 Column ID' to separate columns | |
| new_ids_cols = pd.DataFrame(result_df['Top 5 Column ID'].tolist(), index=result_df.index, columns=["top1id", "top2id", "top3id", "top4id", "top5id"]) | |
| result_df = result_df.join(new_ids_cols) | |
| result_df['max_sim'] = np.nanmax(result_df[table2['ID']], axis=1) | |
| # Calculate min and max of the 'max_sim' column, ignoring NaN values | |
| min_val = np.nanmin(result_df['max_sim']) | |
| max_val = np.nanmax(result_df['max_sim']) | |
| # Normalize the 'max_sim' values | |
| result_df['max_sim_normalized'] = (result_df['max_sim'] - min_val) / (max_val - min_val) | |
| result_final = result_df[['max_sim_normalized','top1name', 'top2name', 'top3name', 'top4name', 'top5name', 'top1id', | |
| 'top2id', 'top3id', 'top4id', 'top5id']] | |
| # Merge the DataFrames | |
| result_final = table1[['ID', 'Indicator Name', 'Framework']].merge(result_final, on='ID', how='left') | |
| # Create a mapping from ID to Framework | |
| id_to_framework = table2.set_index('ID')['Framework'].to_dict() | |
| # Function to map ID to Framework | |
| def map_framework(id): | |
| return id_to_framework.get(id, np.nan) | |
| # Add framework information for top1id to top5id | |
| result_final['top1framework'] = result_final['top1id'].apply(map_framework) | |
| result_final['top2framework'] = result_final['top2id'].apply(map_framework) | |
| result_final['top3framework'] = result_final['top3id'].apply(map_framework) | |
| result_final['top4framework'] = result_final['top4id'].apply(map_framework) | |
| result_final['top5framework'] = result_final['top5id'].apply(map_framework) | |
| return result_final | |