import gradio as gr import pandas as pd import re import numpy as np import importlib from pandas import json_normalize from transformers import AutoTokenizer, AutoModel import torch import torch.nn.functional as F from pandas import json_normalize ### Parameters not expected to be changed in every run # columns to use for embeddings on table 1 columns_embeddings_col1 = ['Indicator Name'] # columns to use for embeddings on table 2 columns_embeddings_col2 = ['Indicator name (leonardo)'] #### Functions from numpy.linalg import norm print("Functions loaded") # Define cosine similarity function cos_sim = lambda a, b: (a @ b.T) / (norm(a) * norm(b)) def concatenate_columns(df, columns): # Check if all specified columns exist in the DataFrame if not all(col in df.columns for col in columns): raise ValueError("One or more specified columns do not exist in the DataFrame") # Concatenate the specified columns with a period as the separator df['concatenated_input'] = df[columns].astype(str).agg('.'.join, axis=1) return df # Define the function for mean pooling def mean_pooling(model_output, attention_mask): token_embeddings = model_output[0] # First element of model_output contains last hidden states input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9) return sum_embeddings / sum_mask # Define your get_embbedings function def get_embbedings(table, colname): # Initialize tokenizer and model # Load model from HuggingFace Hub tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2') model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2') # Tokenize sentences encoded_input = tokenizer(table[colname].tolist(), padding=True, truncation=True, return_tensors='pt') # Compute token embeddings with torch.no_grad(): model_output = model(**encoded_input) # Perform pooling sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask']) # Normalize embeddings sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) return sentence_embeddings # Process similarity def process_similarity_results(table1, table2,columns_embeddings_col1,columns_embeddings_col2, harmonization=True): if 'Indicator ID' in table1.columns: table1['ID'] = table1['Indicator ID'].astype(str) else: table1['ID'] = table1['ID'].astype(str) if 'Indicator ID' in table2.columns: table2['ID'] = table2['Indicator ID'].astype(str) else: table2['ID'] = table2['ID'].astype(str) print(columns_embeddings_col1) table1 = concatenate_columns(table1, columns= columns_embeddings_col1) table2 = concatenate_columns(table2,columns= columns_embeddings_col2) embeddings1 = get_embbedings(table1, 'concatenated_input') embeddings2 = get_embbedings(table2,'concatenated_input') # Calculate cosine similarity between the embeddings similarities = cos_sim(embeddings1, embeddings2) # Create a DataFrame for the similarities result_df = pd.DataFrame(similarities, columns=table2['ID'], index=table1['ID']) if harmonization: # Mapping frameworks table1_sel_map = table1.set_index('ID')['Framework'].to_dict() table2_sel_map = table2.set_index('ID')['Framework'].to_dict() # Function to check if there is any common framework element def has_common_framework(table1_framework, table2_framework): table1_frameworks = set(table1_framework.split(', ')) table2_frameworks = set(table2_framework.split(', ')) return not table1_frameworks.isdisjoint(table2_frameworks) # Replace similarity values with NaN where the frameworks match for table1_id, table1_framework in table1_sel_map.items(): for table2_id in result_df.columns: table2_framework = table2_sel_map.get(table2_id) if pd.notna(table2_framework) and pd.notna(table1_framework): if has_common_framework(table1_framework, table2_framework): result_df.loc[table1_id, table2_id] = np.nan # Function to return the column names of the top 5 values for each row def top_5_column(row): # Find the top 5 values in the row top_5_values = row.nlargest(5) # Return the column names corresponding to these values return top_5_values.index.tolist() # Convert all columns to numeric data types, coercing non-convertible values to NaN #result_df = result_df.iloc[:,1:].apply(pd.to_numeric, errors='coerce') # Get the list of non-numeric columns #non_numeric_columns = result_df.columns[result_df.dtypes == 'object'] # Apply the function to each row of the DataFrame, excluding non-numeric columns result_df['Top 5 Column ID'] = result_df.apply(lambda row: top_5_column(row), axis=1) # Create a dictionary for fast lookup id_to_name = dict(zip(table2['ID'], table2['Indicator name (leonardo)'])) # Function to map IDs to names def map_ids_to_names(id_list): return [id_to_name.get(id, "ID") for id in id_list] # Apply the function to the 'Top 5 Column ID' column result_df['Top 5 Names'] = result_df['Top 5 Column ID'].apply(map_ids_to_names) # Ensure all entries are lists and have at least 5 elements, filling missing values with None result_df['Top 5 Names'] = result_df['Top 5 Names'].apply(lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5)) # Convert list in 'Top 5 Names' to separate columns new_cols = pd.DataFrame(result_df['Top 5 Names'].tolist(), index=result_df.index, columns=["top1name", "top2name", "top3name", "top4name", "top5name"]) result_df = result_df.join(new_cols) # Ensure all entries are lists and have exactly 5 elements, filling missing values with None result_df['Top 5 Column ID'] = result_df['Top 5 Column ID'].apply( lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5) ) # Convert list in 'Top 5 Column ID' to separate columns new_ids_cols = pd.DataFrame(result_df['Top 5 Column ID'].tolist(), index=result_df.index, columns=["top1id", "top2id", "top3id", "top4id", "top5id"]) result_df = result_df.join(new_ids_cols) result_df['max_sim'] = np.nanmax(result_df[table2['ID']], axis=1) # Calculate min and max of the 'max_sim' column, ignoring NaN values min_val = np.nanmin(result_df['max_sim']) max_val = np.nanmax(result_df['max_sim']) # Normalize the 'max_sim' values result_df['max_sim_normalized'] = (result_df['max_sim'] - min_val) / (max_val - min_val) result_final = result_df[['max_sim_normalized','top1name', 'top2name', 'top3name', 'top4name', 'top5name', 'top1id', 'top2id', 'top3id', 'top4id', 'top5id']] # Merge the DataFrames result_final = table1[['ID', 'Indicator Name', 'Framework']].merge(result_final, on='ID', how='left') # Create a mapping from ID to Framework id_to_framework = table2.set_index('ID')['Framework'].to_dict() # Function to map ID to Framework def map_framework(id): return id_to_framework.get(id, np.nan) # Add framework information for top1id to top5id result_final['top1framework'] = result_final['top1id'].apply(map_framework) result_final['top2framework'] = result_final['top2id'].apply(map_framework) result_final['top3framework'] = result_final['top3id'].apply(map_framework) result_final['top4framework'] = result_final['top4id'].apply(map_framework) result_final['top5framework'] = result_final['top5id'].apply(map_framework) return result_final