IndicatorHarmonizer / functions.py
holzhauerL's picture
Update in app.py to fix bug: used keywords in process_similarity_results()
c56bd4a
import gradio as gr
import pandas as pd
import re
import numpy as np
import importlib
from pandas import json_normalize
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
from pandas import json_normalize
### Parameters not expected to be changed in every run
# columns to use for embeddings on table 1
columns_embeddings_col1 = ['Indicator Name']
# columns to use for embeddings on table 2
columns_embeddings_col2 = ['Indicator name (leonardo)']
#### Functions
from numpy.linalg import norm
print("Functions loaded")
# Define cosine similarity function
cos_sim = lambda a, b: (a @ b.T) / (norm(a) * norm(b))
def concatenate_columns(df, columns):
# Check if all specified columns exist in the DataFrame
if not all(col in df.columns for col in columns):
raise ValueError("One or more specified columns do not exist in the DataFrame")
# Concatenate the specified columns with a period as the separator
df['concatenated_input'] = df[columns].astype(str).agg('.'.join, axis=1)
return df
# Define the function for mean pooling
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[0] # First element of model_output contains last hidden states
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
return sum_embeddings / sum_mask
# Define your get_embbedings function
def get_embbedings(table, colname):
# Initialize tokenizer and model
# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
# Tokenize sentences
encoded_input = tokenizer(table[colname].tolist(), padding=True, truncation=True, return_tensors='pt')
# Compute token embeddings
with torch.no_grad():
model_output = model(**encoded_input)
# Perform pooling
sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])
# Normalize embeddings
sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings
# Process similarity
def process_similarity_results(table1, table2,columns_embeddings_col1,columns_embeddings_col2, harmonization=True):
if 'Indicator ID' in table1.columns:
table1['ID'] = table1['Indicator ID'].astype(str)
else:
table1['ID'] = table1['ID'].astype(str)
if 'Indicator ID' in table2.columns:
table2['ID'] = table2['Indicator ID'].astype(str)
else:
table2['ID'] = table2['ID'].astype(str)
print(columns_embeddings_col1)
table1 = concatenate_columns(table1, columns= columns_embeddings_col1)
table2 = concatenate_columns(table2,columns= columns_embeddings_col2)
embeddings1 = get_embbedings(table1, 'concatenated_input')
embeddings2 = get_embbedings(table2,'concatenated_input')
# Calculate cosine similarity between the embeddings
similarities = cos_sim(embeddings1, embeddings2)
# Create a DataFrame for the similarities
result_df = pd.DataFrame(similarities,
columns=table2['ID'],
index=table1['ID'])
if harmonization:
# Mapping frameworks
table1_sel_map = table1.set_index('ID')['Framework'].to_dict()
table2_sel_map = table2.set_index('ID')['Framework'].to_dict()
# Function to check if there is any common framework element
def has_common_framework(table1_framework, table2_framework):
table1_frameworks = set(table1_framework.split(', '))
table2_frameworks = set(table2_framework.split(', '))
return not table1_frameworks.isdisjoint(table2_frameworks)
# Replace similarity values with NaN where the frameworks match
for table1_id, table1_framework in table1_sel_map.items():
for table2_id in result_df.columns:
table2_framework = table2_sel_map.get(table2_id)
if pd.notna(table2_framework) and pd.notna(table1_framework):
if has_common_framework(table1_framework, table2_framework):
result_df.loc[table1_id, table2_id] = np.nan
# Function to return the column names of the top 5 values for each row
def top_5_column(row):
# Find the top 5 values in the row
top_5_values = row.nlargest(5)
# Return the column names corresponding to these values
return top_5_values.index.tolist()
# Convert all columns to numeric data types, coercing non-convertible values to NaN
#result_df = result_df.iloc[:,1:].apply(pd.to_numeric, errors='coerce')
# Get the list of non-numeric columns
#non_numeric_columns = result_df.columns[result_df.dtypes == 'object']
# Apply the function to each row of the DataFrame, excluding non-numeric columns
result_df['Top 5 Column ID'] = result_df.apply(lambda row: top_5_column(row), axis=1)
# Create a dictionary for fast lookup
id_to_name = dict(zip(table2['ID'], table2['Indicator name (leonardo)']))
# Function to map IDs to names
def map_ids_to_names(id_list):
return [id_to_name.get(id, "ID") for id in id_list]
# Apply the function to the 'Top 5 Column ID' column
result_df['Top 5 Names'] = result_df['Top 5 Column ID'].apply(map_ids_to_names)
# Ensure all entries are lists and have at least 5 elements, filling missing values with None
result_df['Top 5 Names'] = result_df['Top 5 Names'].apply(lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5))
# Convert list in 'Top 5 Names' to separate columns
new_cols = pd.DataFrame(result_df['Top 5 Names'].tolist(), index=result_df.index, columns=["top1name", "top2name", "top3name", "top4name", "top5name"])
result_df = result_df.join(new_cols)
# Ensure all entries are lists and have exactly 5 elements, filling missing values with None
result_df['Top 5 Column ID'] = result_df['Top 5 Column ID'].apply(
lambda x: x if isinstance(x, list) and len(x) >= 5 else (x + [None] * (5 - len(x)) if isinstance(x, list) else [None]*5)
)
# Convert list in 'Top 5 Column ID' to separate columns
new_ids_cols = pd.DataFrame(result_df['Top 5 Column ID'].tolist(), index=result_df.index, columns=["top1id", "top2id", "top3id", "top4id", "top5id"])
result_df = result_df.join(new_ids_cols)
result_df['max_sim'] = np.nanmax(result_df[table2['ID']], axis=1)
# Calculate min and max of the 'max_sim' column, ignoring NaN values
min_val = np.nanmin(result_df['max_sim'])
max_val = np.nanmax(result_df['max_sim'])
# Normalize the 'max_sim' values
result_df['max_sim_normalized'] = (result_df['max_sim'] - min_val) / (max_val - min_val)
result_final = result_df[['max_sim_normalized','top1name', 'top2name', 'top3name', 'top4name', 'top5name', 'top1id',
'top2id', 'top3id', 'top4id', 'top5id']]
# Merge the DataFrames
result_final = table1[['ID', 'Indicator Name', 'Framework']].merge(result_final, on='ID', how='left')
# Create a mapping from ID to Framework
id_to_framework = table2.set_index('ID')['Framework'].to_dict()
# Function to map ID to Framework
def map_framework(id):
return id_to_framework.get(id, np.nan)
# Add framework information for top1id to top5id
result_final['top1framework'] = result_final['top1id'].apply(map_framework)
result_final['top2framework'] = result_final['top2id'].apply(map_framework)
result_final['top3framework'] = result_final['top3id'].apply(map_framework)
result_final['top4framework'] = result_final['top4id'].apply(map_framework)
result_final['top5framework'] = result_final['top5id'].apply(map_framework)
return result_final