diff --git "a/functions.py" "b/functions.py" new file mode 100644--- /dev/null +++ "b/functions.py" @@ -0,0 +1,3099 @@ + +import numpy as np +import pandas as pd +from subprocess import check_output +#from pydantic_settings import BaseSettings +import ipywidgets as widgets +from IPython.display import display, Image, SVG, display_svg + +# Data visualization +import matplotlib.pyplot as plt +import seaborn as sns + +# Machine learning tools and metrics +from sklearn.cluster import KMeans +from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier +from sklearn.metrics import accuracy_score, r2_score, silhouette_samples, silhouette_score, confusion_matrix, f1_score +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder +from sklearn.metrics.pairwise import cosine_similarity +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.tree import DecisionTreeRegressor + +# Deep learning and NLP +from fastai.tabular.all import * +from dtreeviz.trees import * +from deep_translator import GoogleTranslator +from sentence_transformers import SentenceTransformer +from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer +import language_tool_python +from langdetect import detect +from geopy.distance import geodesic + + + +### Parameters to run the code +# Parameters + +# file paths, to be changed by tables in mongo DB +#survey_path = '/Users/alanfortuny/Downloads/Copy of 20250415_Giscor_LMC_Survey_Customers_-_all_versions_-_False_-_2025-04-30-10-58-05 1.xlsx' +#indicator_path = '/Users/alanfortuny/Downloads/Indicators_Indicators_Default view 5.xlsx' +#questions_path = '/Users/alanfortuny/Downloads/Indicators_Questions_Default View 7.xlsx' +#choice_path = '/Users/alanfortuny/Downloads/Indicators_Choices_Default View 5.xlsx' + +# Define the columns and the corresponding points for integrity +columns_integrity = [ + 'payment_for_survey', + 'respondent_influenced', + 'response_time_integrity', + 'audio_verification', + 'questions_which_were_difficult', + 'respondent_suspicious', + 'phone_number', + 'response_uniqueness', + 'name', + 'impact_feedback_integrity', + 'enumerator_bias', + 'location_check' +] + + + +# Define the mapping of report columns and their corresponding points for different survey types +survey_type_mapping = { + 'Supervised (On Site)': { + 'payment_for_survey':1, + 'respondent_influenced':1, + 'response_time_integrity':1, + 'audio_verification':1, + 'questions_which_were_difficult':1, + 'respondent_suspicious':1, + 'phone_number':1, + 'response_uniqueness':1, + 'name':1, + 'impact_feedback_integrity':1, + 'enumerator_bias':1, + 'location_check':1 + }, + 'Supervised (Telephone)': { + 'payment_for_survey':1, + 'respondent_influenced':1, + 'response_time_integrity':1, + 'audio_verification':0, + 'questions_which_were_difficult':1, + 'respondent_suspicious':1, + 'phone_number':1, + 'response_uniqueness':1, + 'name':1, + 'impact_feedback_integrity':1, + 'enumerator_bias':1, + 'location_check':0 + }, + 'Unsupervised (Online)': { + 'payment_for_survey':1, + 'respondent_influenced':0, + 'response_time_integrity':1, + 'audio_verification':0, + 'questions_which_were_difficult':0, + 'respondent_suspicious':0, + 'phone_number':1, + 'response_uniqueness':1, + 'name':0, + 'impact_feedback_integrity':1, + 'enumerator_bias':0, + 'location_check':0 + } +} + +# Points corresponding to each column in columns_integrity +points = [1, 1, 1, 1, 2, 2, 1, 1, 1, 2, 2, 1] + + +# Function to calculate the weighted aggregate and max possible score +def calculate_weighted_aggregate_with_max(data, survey_type, columns_integrity, survey_type_mapping, points): + # Get the weights for the selected survey type + weights = survey_type_mapping.get(survey_type, {}) + + # Ensure missing columns are filled with default values (e.g., 0) + missing_columns = set(columns_integrity) - set(data.columns) + for col in missing_columns: + data[col] = 0 # Add the missing columns with default value 0 + + # Multiply the values by the corresponding weights for each column in the integrity columns + weighted_values = data[columns_integrity].multiply([weights.get(col, 0) for col in columns_integrity], axis=1) + + # Sum the weighted values across all columns to get the total weighted aggregate value for each row + data['weighted_aggregate'] = weighted_values.sum(axis=1) + + # Calculate the max possible score by summing the points for the selected survey type + max_possible_score = sum([weights.get(col, 0) * points[i] for i, col in enumerate(columns_integrity)]) + data['max_possible_score'] = max_possible_score-1 + + return data + + + + + + + +### Load required data, temporally manually updated files + +def load_parameters(parameters_file): + """ + Load and parse the parameters file. + """ + with open(parameters_file, "r") as f: + parameters = json.load(f) + + return parameters + +def parse_parameters(parameters_path, uuid): + # Read the file (Excel or CSV) + if parameters_path.endswith('.xlsx') or parameters_path.endswith('.xls'): + df = pd.read_excel(parameters_path) + else: + df = pd.read_csv(parameters_path) + + # Filter by UUID + row = df[df['leonardoDataCollectionId'] == uuid] + + if row.empty: + raise ValueError(f"No entry found for UUID: {uuid}") + + # Extract values with fallback to np.nan if missing + def get_value(column): + return row[column].values[0] if pd.notnull(row[column].values[0]) else np.nan + + N = int(row['expectedRespondents']) + + survey_type = get_value('supervisionType') + + assessment = get_value('assessment') + theme_list = [theme.strip() for theme in str(assessment).split(',')] if pd.notnull(assessment) else [] + + customer = get_value('company') + site = get_value('site') + + # Process mappingSegmentationQuotas (must exist and be valid) + mapping_segmentation_quotas = None + raw_mapping = get_value('mappingSegmentationQuotas') + if pd.isnull(raw_mapping): + segmentation = 'no' + segmentation_columns = [] + else: + segmentation = 'yes' + cleaned = raw_mapping.replace("\\_", "_") # fix escaped underscores + mapping_segmentation_quotas = ast.literal_eval(cleaned) + segmentation_columns = list(mapping_segmentation_quotas.keys()) + + + environment = 'local' + + return N, survey_type, theme_list, segmentation_columns, mapping_segmentation_quotas, customer, site, environment, segmentation + + + +def load_dataframes(indicator_path, questions_path, choice_path, survey_path): + # Load the indicator, questions, and choice data + indicator_df = pd.read_excel(indicator_path) + questions_df = pd.read_excel(questions_path) + choice_df = pd.read_excel(choice_path) + data_all = pd.read_excel(survey_path) + + # Check if the '_index' column is present and use it as the index + if '_index' in data_all.columns: + data_all.reset_index(drop=True, inplace=True) # Reset the index to avoid misalignment + data_all.set_index('_index', inplace=True) # Set the '_index' column as the new index + else: + print("Warning: '_index' column not found in the dataframe.") + + # Make a copy to store survey as it came + raw_data = data_all.copy() + + # Note that the questions tab is now the column strategy table too + column_strategy_df = questions_df.copy() + + # Select the questions we need + column_strategy_df = column_strategy_df[['Field name', 'Answer Type', 'Format check text', + 'Expected behaviour', 'Preferred method']] + + # Rename to keep the code running + column_strategy_df.columns = ['Field name', 'Answer Type', 'Format_check_text', + 'Expected behaviour', 'Prefered method'] + + # Filter out rows where 'Field name' is not present as a column in 'data_all' + column_strategy_df = column_strategy_df[column_strategy_df['Field name'].isin(data_all.columns)] + + return indicator_df, questions_df, choice_df, data_all, raw_data, column_strategy_df + + +### create a mapping of indicators related to the themes selected + +def process_themes_and_questions(indicator_df, theme_list): + """ + Processes the indicator DataFrame, filters rows based on the provided theme list, splits and explodes + the 'Themes' and 'Question(s)' columns, and returns a DataFrame with unique combinations of 'Themes', + 'Question(s)', and 'ID'. + + Parameters: + - indicator_df (DataFrame): The original DataFrame containing the survey data. + - theme_list (list): A list of themes to filter the rows by. + + Returns: + - DataFrame: A DataFrame containing unique combinations of 'Themes', 'Question(s)', and 'ID'. + """ + # Convert 'Themes' column to string for filtering + indicator_df['Themes'] = indicator_df['Themes'].astype(str) + + # Add word boundaries to each theme_id for exact matching and create the regex pattern + theme_id_with_boundaries = [f'\\b{theme}\\b' for theme in theme_list] + pattern = '|'.join(theme_id_with_boundaries) + + # Filter rows where any element in the 'Themes' column contains any value from the theme_list + filtered_df = indicator_df[indicator_df['Themes'].str.contains(pattern)].copy() + + # Split the 'Themes' and 'Question(s)' columns into lists + filtered_df['Themes'] = filtered_df['Themes'].str.split(', ') + filtered_df['Question(s)'] = filtered_df['Question(s)'].str.split(', ') + + # Exploding both 'Themes' and 'Question(s)' columns + exploded_df = filtered_df.explode('Themes').explode('Question(s)').reset_index(drop=True) + + # Merge the exploded_df with the original indicator_df to retain non-exploded Themes + final_df = exploded_df.merge(indicator_df[['ID', 'Themes']], on='ID', how='inner', suffixes=('', '_original')) + + # Keep only the necessary columns: 'Themes' (non-exploded), 'Question(s)', and 'ID' + final_df = final_df[['Themes_original', 'Question(s)', 'ID']] + + # Rename columns for clarity + final_df = final_df.rename(columns={'Themes_original': 'Themes'}) + + # Drop duplicate combinations of 'Question(s)' and 'ID' + final_df = final_df.drop_duplicates(subset=['Question(s)', 'ID']) + + return final_df + +## 1.1 Consistency Validation based on supervised model + + +def get_missing_columns_without_model(data_validation_strategy_df, data_all): + # Filter data_validation_strategy_df to include only rows where 'Preferred method' contains 'model' + model_filtered_df = data_validation_strategy_df[data_validation_strategy_df['Prefered method'].str.contains('model', na=False)] + + # Get a set of all 'Field name' values where 'Prefered method' contains 'model' + model_field_names = set(model_filtered_df['Field name']) + + # Get a set of all columns in data_all + data_all_columns = set(data_all.columns) + + # Find columns in data_all that are not listed in 'Field name' with 'model' as the 'Prefered method' + missing_columns_without_model = data_all_columns.difference(model_field_names) + + # Convert the set to a list and return + return list(missing_columns_without_model) + + +def rf_feat_importance(m, df): + return pd.DataFrame({'cols':df.columns, 'imp':m.feature_importances_} + ).sort_values('imp', ascending=False) + +def model_process(data_all, column_strategy_df, exclude_columns, + test_size=0.4, random_state=42, max_features=0.5, + accuracy_threshold=0.85, num_top_features=10): + + accurate_columns = [] + acc_levels = [] + pred_actual_tuples = {} + + # Filter columns that are going to be analyzed + analysis_columns = [col for col in data_all.columns if col not in exclude_columns] + + for c in analysis_columns: + if column_strategy_df[column_strategy_df['Field name'] == c].empty: + y_type = 'decimal' # default type if not specified, assuming numerical data needs regression + else: + y_type = column_strategy_df.loc[column_strategy_df['Field name'] == c, 'Answer Type'].values[0] + + # Preserve original index by copying the DataFrame with its index + df_c = data_all[data_all[c].notna()].copy() + if df_c.empty or df_c[c].nunique() == 1: + continue + + # Convert target column to float if it's supposed to be numeric + if y_type not in ['select_one', 'select_multiple']: + df_c[c] = pd.to_numeric(df_c[c], errors='coerce') + + # Drop rows where conversion failed (NaNs introduced) + df_c.dropna(subset=[c], inplace=True) + + # Ensure there are enough observations to proceed with the split + if len(df_c) < int(1 / test_size): + continue # Skip to the next column if there aren't enough observations + + # Assume cont_cat_split and other preprocessing steps are defined elsewhere + columns_to_fill = df_c.columns.difference([c]) + cont, cat = cont_cat_split(df_c[analysis_columns], 1, dep_var=c) + to = TabularPandas(df_c, procs=[Categorify, FillMissing, OneHotEncode], cat_names=cat, cont_names=cont, y_names=c) + + # Splitting the data into training and testing sets using the original indices + train_idx, test_idx = train_test_split(df_c.index, test_size=test_size, random_state=random_state) + + # Create the mapping from internal codes to original labels + label_mapping = {code: label for label, code in zip(df_c[c], to.y)} + + # Choose the model based on the type of data + model = RandomForestClassifier(max_features=max_features, random_state=random_state) if y_type in ['select_one', 'select_multiple'] else RandomForestRegressor(max_features=max_features, random_state=random_state) + # Proceed with your preprocessing and fit the model + try: + # Additional prints to help trace the column being processed + print(f"Processing column: {c} with data type {y_type}") + # Assume preprocessing steps here that create to with .xs and .y ready for training + model.fit(to.xs.loc[df_c.index], to.y.loc[df_c.index]) # Use correct indices and data slices + except ValueError as e: + print(f"Error fitting model on column {c}: {str(e)}") + continue + + # Predicting on both the training and test sets + y_pred_train = model.predict(to.xs.loc[train_idx]) + y_pred_test = model.predict(to.xs.loc[test_idx]) + + # Combine actual and predicted values for both sets + actual_train = to.y.loc[train_idx] + actual_test = to.y.loc[test_idx] + predicted_train = y_pred_train + predicted_test = y_pred_test + + combined_indices = list(train_idx) + list(test_idx) + combined_mapped_indexes = combined_indices # Use the existing DataFrame index instead of '_index' + combined_actual = pd.concat([actual_train, actual_test]) + combined_predicted = list(predicted_train) + list(predicted_test) + + # Check mapping consistency + assert len(combined_indices) == len(combined_mapped_indexes), "Index mapping error: mismatched lengths." + + acc = accuracy_score(combined_actual, combined_predicted) if y_type in ['select_one', 'select_multiple'] else r2_score(combined_actual, combined_predicted) + + if y_type in ['select_one', 'select_multiple']: + combined_predicted_mapped = [label_mapping[pred] for pred in combined_predicted] + combined_actual_mapped = [label_mapping[act] for act in combined_actual] + + if 1 > acc >= accuracy_threshold: + accurate_columns.append(c) + acc_levels.append(acc) + # Assume rf_feat_importance is defined elsewhere + top_features = rf_feat_importance(model, to.xs.loc[train_idx]).nlargest(num_top_features, 'imp')['cols'].tolist() + top_features = [feat for feat in top_features if feat in df_c.columns] + + pred_actual_tuples[c] = [] + for i, index in enumerate(combined_indices): + actual = combined_actual.iloc[i] + predicted = combined_predicted[i] + + # Check if the predicted value is different or has a relative difference greater than 75% + if y_type in ['select_one', 'select_multiple']: + predicted = combined_predicted_mapped[i] + actual = combined_actual_mapped[i] + condition = predicted != actual + else: + # Compute absolute relative percentage difference + condition = (actual != 0 and abs(predicted - actual) / abs(actual) > 0.75) + + if condition: + + features = {feat: df_c.at[index, feat] for feat in top_features if feat in df_c.columns} + + # Default cleansing_urgency to 'low' and update if conditions are met + cleansing_urgency = 'low' + if isinstance(actual, (int, float)) and isinstance(predicted, (int, float)): + relative_difference = abs(predicted - actual) / abs(actual) + print(relative_difference) + if relative_difference > 2 and abs(actual) > 1: + cleansing_urgency = 'high' + + pred_actual_tuples[c].append({ + 'index': index, # Use the correct original index from the DataFrame + 'method': 'model based outlier', + 'model_accuracy': acc, + 'actual': actual, + 'predicted': predicted, + 'explanatory questions': features, + 'cleansing_urgency': cleansing_urgency + }) + + return accurate_columns, acc_levels, pred_actual_tuples + + +def model_issues_to_data_frame(pred_actual_tuples): + # Initialize an empty list to hold the records + records = [] + + # Iterate over the items in the dictionary + for question, entries in pred_actual_tuples.items(): + for entry in entries: + # Create a new record with the specified columns + record = { + '_index': entry['index'], + 'question': question, + 'check': entry['method'], + 'quality_dimension': 'consistency', + 'actual': entry['actual'], + 'predicted': entry['predicted'], + 'cleansing_urgency': entry['cleansing_urgency'], + } + + # Combine explanatory questions into a single string + explanatory_questions = '; '.join(f"{key}: {value}" for key, value in entry['explanatory questions'].items()) + # Merge model_accuracy into explanatory questions + relevant_context_data = f"{explanatory_questions}; model_accuracy: {entry['model_accuracy']}" + record['relevant_context_data'] = relevant_context_data + + records.append(record) + + # Create a DataFrame from the list of records + result_df = pd.DataFrame(records) + + return result_df + + +## 1.2 Detect format check violations +def check_date_time(value): + """ + Checks if the specified value can be converted to datetime. + If conversion is successful, returns the datetime value; + otherwise, returns pd.NaT. + + Parameters: + - value: The value to be checked for datetime conversion. + + Returns: + - pd.Timestamp or pd.NaT: A datetime value if conversion is successful; + otherwise, pd.NaT. + """ + # Check for NaN or empty strings and return pd.NaT + if pd.isna(value) or value == '': + return np.nan + + try: + # Attempt to convert the value to datetime + return pd.to_datetime(value, errors='raise') + except (ValueError, TypeError): + # Return pd.NaT if conversion fails + return np.nan + + +def format_checks(df, transformations_df): + + for _, row in transformations_df.iterrows(): + # Check if 'format_check' is part of the 'Preferred method' before executing + if isinstance(row['Prefered method'], str) and 'format_check' in row['Prefered method']: + if row['Field name'] == 'start': + # Apply the validate_phone_number function to the phone_number column + df['start'] = df['start'].apply(check_date_time) + if row['Field name'] == 'end': + # Apply the validate_phone_number function to the phone_number column + df['end'] = df['end'].apply(check_date_time) + +def create_nan_dict(data_all, raw_data, column_strategy_df): + + # Dictionary to store NaN records + nan_records = {} + + # Ensure that the index alignment between data_all and raw_data is correct + if not data_all.index.equals(raw_data.index): + print("Warning: Indices between data_all and raw_data do not match.") + # Reindex raw_data to align with data_all based on index + raw_data = raw_data.reindex(data_all.index) + + # Iterate over all columns that are present in both dataframes + common_columns = data_all.columns.intersection(raw_data.columns) + + for column in common_columns: + # Initialize an empty list for each column to store dictionaries of {index, raw_data_value} + if column not in nan_records: + nan_records[column] = [] + + # Check for NaNs in data_all's column + # Check for NaNs in data_all's column that are not NaNs in raw_data + nan_indices = data_all[column].isna() & ~raw_data[column].isna() + + # Gather data from both dataframes where NaNs are found in data_all + for index in data_all[nan_indices].index: + if index in raw_data.index: # Check if index exists in raw_data + nan_records[column].append({ + 'index': index, + 'method': 'format_check', + 'format_check': column_strategy_df.loc[column_strategy_df['Field name'] == column, 'Format_check_text'].item() + if not column_strategy_df.loc[column_strategy_df['Field name'] == column, 'Format_check_text'].isna().all() else None, + 'actual': raw_data.at[index, column] + }) + + return nan_records + + +def format_violations_to_df(format_violation_dictionary, data_issues_df): + """ + Transforms a format violation dictionary into a DataFrame and appends it to an existing DataFrame. + + Parameters: + - format_violation_dictionary: A dictionary containing format violations. + - data_issues_df: A DataFrame to which the new format violations will be appended. + + Returns: + - A combined DataFrame with format violations and existing data issues. + """ + # Initialize an empty list to store the transformed records + records = [] + + # Populate the records from the dictionary + for key, entries in format_violation_dictionary.items(): + for entry in entries: + records.append({ + '_index': entry['index'], + 'question': key, + 'check': 'format_check', + 'quality_dimension': 'consistency', + 'actual': entry['actual'], + 'predicted': np.nan, # Set predicted to NaT as per the request + 'cleansing_urgency': 'high', + 'relevant_context_data': f"{entry['format_check']} | actual value: {entry['actual']}" + }) + + # Create a DataFrame from the records + format_violations_df = pd.DataFrame(records) + + # Append data_issues_df to format_violations_df + combined_df = pd.concat([format_violations_df, data_issues_df], ignore_index=True) + + return combined_df + + + + +def summarize_log_issues(issue_dict): + import pandas as pd + + results = [] + unique_values_dict = {} # Dictionary to store unique values for each key + + for key, issues in issue_dict.items(): + total_issues = len(issues) + # Collect unique 'actual' values + unique_issues = set(issue['actual'] for issue in issues if 'actual' in issue) + results.append((key, total_issues, len(unique_issues))) + unique_values_dict[key] = list(unique_issues) # Store the unique actual values + + # Convert to DataFrame for better visualization and sorting + df = pd.DataFrame(results, columns=['Key', 'Total Issues', 'Unique Issues']) + df_sorted = df.sort_values(by='Total Issues', ascending=False) + + # Print the unique actual values for each key + for key, unique_values in unique_values_dict.items(): + print(f"Key: {key}, Unique Actual Values: {unique_values}") + + return df_sorted + + +## 1.3 Distribution outlier approach + + +def detect_outliers(df): + outliers_dict = {} + + # Loop through all columns in the DataFrame + for column_name in df.columns: + # Ensure the column data is a Series + column_data = df[column_name] + if not isinstance(column_data, pd.Series): + print(f"Error: Data for column {column_name} is not a Series.") + continue + + # Attempt to convert the column to numeric, coerce errors to NaN + data = pd.to_numeric(column_data, errors='coerce') + + # Drop NaNs that arise from conversion or were already present + data = data.dropna() + + # Exclude values that are -1 or -2 + data = data[~data.isin([-1, -2])] + + # Continue if data is empty after filtering or if it's binary + if data.empty or data.nunique() <= 2: + print(f"No non-binary numeric data available for analysis in column {column_name}.") + continue + + # Calculate Q1, Q3, and the interquartile range (IQR) + Q1 = data.quantile(0.25) + Q3 = data.quantile(0.75) + IQR = Q3 - Q1 + + # Define outliers using the IQR method + lower_bound = Q1 - 1.5 * IQR + upper_bound = Q3 + 1.5 * IQR + outlier_indices = data[(data < lower_bound) | (data > upper_bound)].index + + # Record details of the outliers if they are less than 5% + if len(outlier_indices) > 0 and len(outlier_indices) < 0.05 * len(data): + outlier_details = [{ + 'index': idx, + 'method': 'distribution based outlier', + 'actual': data.loc[idx], + 'Q1': Q1, + 'Q3': Q3, + 'cleaning_urgency': 'high' if data.loc[idx] > 10 * Q3 else 'low' + } for idx in outlier_indices] + outliers_dict[column_name] = outlier_details + + return outliers_dict + + +def print_sorted_outliers(dist_outliers_dict): + """ + This function processes a dictionary of outliers, extracting and sorting the 'actual' values + from the issues reported for each key. It prints the number of issues, Q1, Q3, and the sorted + outliers in descending order. + + :param dist_outliers_dict: Dictionary containing keys with lists of issues, where each issue + is expected to be a dictionary with 'actual', 'Q1', and 'Q3' keys. + """ + for key, issues in dist_outliers_dict.items(): + # Extract all actual values and sort them from largest to smallest + outlier_values = sorted([issue['actual'] for issue in issues], reverse=True) + # Print the amount of issues, Q1, Q3, and sorted outliers + if issues: # Check if there are any issues reported + Q1 = issues[0]['Q1'] # Assuming Q1 is the same for all issues + Q3 = issues[0]['Q3'] # Assuming Q3 is the same for all issues + print(f"{key}:") + print(f" Number of Issues: {len(issues)}") + print(f" Q1: {Q1}, Q3: {Q3}") + print(f" Outliers: {outlier_values}") + print("\n") + + +def dist_outliers_dict_to_df(dist_outliers_dict, data_issues_df): + """ + Transforms a format violation dictionary into a DataFrame and appends it to an existing DataFrame. + + Parameters: + - format_violation_dictionary: A dictionary containing format violations. + - data_issues_df: A DataFrame to which the new format violations will be appended. + + Returns: + - A combined DataFrame with format violations and existing data issues. + """ + # Initialize an empty list to store the transformed records + records = [] + + # Populate the records from the dictionary + for key, entries in dist_outliers_dict.items(): + for entry in entries: + records.append({ + '_index': entry['index'], + 'question': key, + 'check': 'dist_outlier_check', + 'quality_dimension': 'consistency', + 'actual': entry['actual'], + 'predicted': np.nan, # Set predicted to NaT as per the request + 'cleansing_urgency': entry['cleaning_urgency'], + 'relevant_context_data': f"Q1: {entry['Q1']} | Q3: {entry['Q3']}" + }) + + # Create a DataFrame from the records + df = pd.DataFrame(records) + + # Append data_issues_df to format_violations_df + combined_df = pd.concat([df, data_issues_df], ignore_index=True) + + return combined_df + + + + +## 1.4 Data Completeness check + + +import re + + +import pandas as pd +import numpy as np +import re + +def completeness_check(df, questions_df): + """ + Checks the completeness of the DataFrame, including: + - Handling missing values like `-1`, `-2`, or patterns like "No answer" and "Unknown". + - Handling free-text columns where the text is numeric or non-numeric, and applying specific checks. + + Parameters: + - df (DataFrame): The original DataFrame containing the survey data. + - questions_df (DataFrame): A DataFrame containing the question metadata with 'Field name' and 'Answer Type' columns. + + Returns: + - incomplete_records (dict): A dictionary with details of incomplete records in the DataFrame. + """ + incomplete_records = {} + + # Compile regex pattern to match variations of "Unknown" or "No answer" embedded in strings + no_answer_pattern = re.compile(r".*(no[_-]?answer|unknown).*", re.IGNORECASE) + + # Check for columns that have 'Answer Type' == 'text' in questions_df + text_columns = questions_df[questions_df['Answer Type'] == 'text']['Field name'].tolist() + + # Check each column in the DataFrame + for column in df.columns: + # Skip NaN values + df_column = df[column].dropna() + + # Create a mask to find "No answer" or "Unknown" entries using regex + regex_mask = df_column.astype(str).apply(lambda x: bool(no_answer_pattern.search(x))) + + # Create a mask to find values exactly equal to -1 or -2 + missing_value_mask = df_column.isin([-1, -2]) + + # Combine both masks to identify all incomplete or missing records + combined_mask = regex_mask | missing_value_mask + filtered_indices = df_column.index[combined_mask] + + # Store details if there are any "No answer" or "Unknown" entries + if not filtered_indices.empty: + incomplete_records[column] = [{ + 'index': idx, + 'method': 'completeness check', + 'actual': df.at[idx, column], + 'cleansing_urgency': 'high' + } for idx in filtered_indices] + + # Only check free-text columns (those with 'Answer Type' == 'text') for numeric and non-numeric checks + if column in text_columns: + # Check if value is numeric, including floats + def is_numeric(value): + try: + # Try converting value to numeric (float or int) + pd.to_numeric(value) + return True + except ValueError: + return False + + # Mask to check if the value is numeric + is_numeric_mask = df_column.apply(is_numeric) + + # For non-numeric values, check if length is less than 3 or if it has exactly two words + text_filtered_indices = df_column.index[~is_numeric_mask] + + # Iterate over non-numeric entries + for idx in text_filtered_indices: + text = str(df.at[idx, column]).strip() + text_length = len(text) + + # Check if the length is less than 3 characters + if text_length < 3: + if column not in incomplete_records: + incomplete_records[column] = [] + incomplete_records[column].append({ + 'index': idx, + 'method': 'free-text check (less than 3 characters)', + 'actual': text, + 'cleansing_urgency': 'low' + }) + # Check if the text contains fewer than two words (only for text > 3 characters) + elif text_length >= 3 and len(text.split()) < 2: + if column not in incomplete_records: + incomplete_records[column] = [] + incomplete_records[column].append({ + 'index': idx, + 'method': 'free-text check (more than 3 characters but less than two words)', + 'actual': text, + 'cleansing_urgency': 'low' + }) + + return incomplete_records + + +def completeness_check_dict_to_df(completeness_check_dict, data_issues_df): + """ + Transforms a completeness check dictionary into a DataFrame and appends it to an existing DataFrame. + + Parameters: + - completeness_check_dict: A dictionary containing completeness check results. + - data_issues_df: A DataFrame to which the new completeness check results will be appended. + + Returns: + - A combined DataFrame with completeness checks and existing data issues. + """ + # Initialize an empty list to store the transformed records + records = [] + + # Populate the records from the dictionary + for key, entries in completeness_check_dict.items(): + for entry in entries: + records.append({ + '_index': entry['index'], + 'question': key, + 'check': entry['method'], # Use method as the check type + 'quality_dimension': 'consistency', # Adjusted for completeness checks + 'actual': entry['actual'], + 'predicted': np.nan, # Set predicted to NaT as per the request + 'cleansing_urgency': entry.get('cleaning_urgency', 'low'), # Default to 'low' if not provided + 'relevant_context_data': None # You can add relevant context if applicable + }) + + # Create a DataFrame from the records + df = pd.DataFrame(records) + + # Append data_issues_df to the new DataFrame + combined_df = pd.concat([df, data_issues_df], ignore_index=True) + + return combined_df + + + +## 1.5 Putting the consistency log together + +def merge_nested_dicts(dict1, dict2): + for key, value in dict2.items(): + if key in dict1: + if isinstance(dict1[key], dict) and isinstance(value, dict): + merge_nested_dicts(dict1[key], value) + else: + # If the key exists but is not a dictionary, or values are not dictionaries, + # you can choose what to do, e.g., convert to list, replace, etc. + # Here, let's assume we are creating a list of values if not already a list + if not isinstance(dict1[key], list): + dict1[key] = [dict1[key]] + if not isinstance(value, list): + value = [value] + dict1[key].extend(value) + else: + dict1[key] = value + return dict1 + + +## 1.6 Calculating consistency score + +def calculate_consistency_scores(raw_data, column_strategy_df, data_issues_df): + # Step 1: Filter columns based on the column_strategy_df + preferred_columns = column_strategy_df[ + column_strategy_df['Prefered method'].notna() & + (column_strategy_df['Prefered method'] != "integrity_score")]['Field name'] + selected_columns = raw_data[preferred_columns] + + # Step 2: Calculate the number of non-empty responses per column + non_empty_counts = selected_columns.notna().sum() + + # Step 3: Calculate the number of unique _index per question from data_issues_df + unique_index_counts = data_issues_df.groupby('question')['_index'].nunique() + + # Step 4: Calculate the consistency score per question + consistency_scores = {} + for question in preferred_columns: + num_non_empty = non_empty_counts[question] if question in non_empty_counts else 0 + num_issues = unique_index_counts.get(question, 0) + + if num_non_empty > 0: + consistency_score = max(1 - (num_issues / num_non_empty),0) + else: + consistency_score = None # Handle case where there are no non-empty responses + + consistency_scores[question] = { + 'number_of_non_empty_responses': num_non_empty, + 'number_of_index_with_issues': num_issues, + 'consistency_score': consistency_score + } + + # Create a DataFrame from the consistency scores + results_df = pd.DataFrame.from_dict(consistency_scores, orient='index').reset_index() + results_df.rename(columns={'index': 'question'}, inplace=True) + + return results_df + + + + + +# 1.8 Representativity Score + +import math +import scipy.stats as stats +from scipy.stats import norm + +def calculate_representativity_scores(N, sample_size, confidence_level=0.90, e=5, p_hat=0.5, overall_score=1): + """ + This function calculates the representativity scores of a survey before and after data cleansing. + It computes the required sample size, confidence level, and necessary additional samples to meet target levels. + + Parameters: + - N (int): Total population size. + - sample_size (int): Sample size used in the study. + - confidence_level (float): Desired confidence level (default 0.90). + - e (float): Margin of error as a percentage (default 5). + - p_hat (float): Estimated proportion of the attribute present in the population (default 0.5). + - overall_score (float): Overall score used to estimate clean data (default 100). + + Returns: + - dict: A dictionary with calculated representativity scores and required additional samples. + """ + + def calculate_confidence_level(N, sample_size, e=5): + """ + Calculate the confidence level for a given population size, sample size, and margin of error. + """ + e_decimal = e / 100 # Convert margin of error to decimal + + # Calculate the finite population correction if applicable + finite_population_correction = math.sqrt((N - sample_size) / (N - 1)) + + # Standard error of the proportion + standard_error = finite_population_correction * math.sqrt(p_hat * (1 - p_hat) / sample_size) + + # Calculate the z-score from the margin of error and standard error + z_score = e_decimal / standard_error + + # Convert z-score to confidence level + confidence_level = stats.norm.cdf(z_score) - stats.norm.cdf(-z_score) + + return confidence_level * 100 # Convert to percentage + + def z_value_from_confidence_level(confidence_level): + """ + Calculate the z-value corresponding to a given confidence level. + """ + alpha = 1 - confidence_level + tail_probability = alpha / 2 + z_value = norm.ppf(1 - tail_probability) + return z_value + + def calculate_sample_size(N, z_score, e, p_hat): + """ + Calculate the required sample size. + """ + e_decimal = e / 100 # Convert margin of error to decimal + n_unlimited = (z_score**2 * p_hat * (1 - p_hat)) / (e_decimal**2) + n_finite = n_unlimited / (1 + ((n_unlimited - 1) / N)) + return math.ceil(n_finite) + + # Calculate target sample size and confidence levels + z_score = z_value_from_confidence_level(confidence_level) + target_sample_size = calculate_sample_size(N, z_score, e, p_hat) + confidence_target = calculate_confidence_level(N, target_sample_size) + + # Calculate the confidence level of actual and clean data + confidence_actual = calculate_confidence_level(N, sample_size) + clean_data_size = max(round(overall_score * sample_size,0),1) + confidence_clean = calculate_confidence_level(N, clean_data_size) + + # Calculate representativity scores + representativity_actual = 100 * (confidence_actual / confidence_target) + representativity_clean = 100 * (confidence_clean / confidence_target) + + # Determine if additional samples are needed + additional_samples = target_sample_size - clean_data_size if confidence_target > confidence_actual else 0 + + # Return the results as a dictionary + return { + 'target_sample_size': target_sample_size, + 'confidence_target': confidence_target, + 'confidence_actual': confidence_actual, + 'confidence_clean': confidence_clean, + 'representativity_actual': representativity_actual, + 'representativity_clean': representativity_clean, + 'additional_samples': additional_samples + } + + + + +def calculate_representativity_scores_per_question(consistency_df, N,score_column): + """ + Calculate representativity scores based on the provided consistency DataFrame. + + Parameters: + - consistency_df (pd.DataFrame): DataFrame containing consistency data, including + 'number_of_non_empty_responses' and 'consistency_score'. + - N (int): Total population size. + + Returns: + - pd.DataFrame: A DataFrame with calculated representativity scores. + """ + # Initialize a list to hold the results + results = [] + + # Iterate over each row in the consistency_df DataFrame + for i in range(len(consistency_df)): + # Determine the sample size as the number of non-empty rows + sample_size = consistency_df.loc[i, 'number_of_non_empty_responses'] + + # Check if sample size is greater than zero before proceeding + if sample_size > 0: + # Calculate the representativity scores for the current question + consistency_score_question = consistency_df.loc[i, score_column] + + scores = calculate_representativity_scores(N, sample_size, overall_score=consistency_score_question) + + # Append the results along with the question name + results.append({ + 'question': consistency_df.loc[i, 'question'], + 'target_sample_size': scores['target_sample_size'], + 'confidence_target': scores['confidence_target'], + 'confidence_actual': scores['confidence_actual'], + 'confidence_clean': scores['confidence_clean'], + 'representativity_actual': scores['representativity_actual'], + 'representativity_clean': scores['representativity_clean'], + 'additional_samples': scores['additional_samples'] + }) + else: + # Append results for columns with no valid entries + results.append({ + 'question': consistency_df.loc[i, 'question'], + 'target_sample_size': 0, # Set to 0 for no data + 'confidence_target': 0, + 'confidence_actual': 0, + 'confidence_clean': 0, + 'representativity_actual': 0, + 'representativity_clean': 0, + 'additional_samples': 0 + }) + + # Create a DataFrame from the results list + representativity_scores = pd.DataFrame(results) + + return representativity_scores + + +### INTEGRITY SCORE FUNCTIONS + + +# payment_for_survey + +def payment_for_survey_integrity(data, integrity_score_per_respondent): + """ + Calculate the integrity score for the 'payment_for_survey' column and append the result to the integrity_score_per_respondent DataFrame. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. + + Returns: + - None: The function modifies the integrity_score_per_respondent DataFrame in place. + """ + # Initialize the 'payment_for_survey' column with NA by default + integrity_score_per_respondent['payment_for_survey'] = np.nan + + # Check if the column 'payment_for_survey' is present in the data columns + if 'payment_for_survey' in data.columns: + # Assign values based on specified conditions + integrity_score_per_respondent['payment_for_survey'] = data['payment_for_survey'].apply( + lambda x: 1 if x == 'yes_no_forced_no' else (0 if x == 'yes_no_forced_yes' else np.nan) + ) + + # Filter the 'yes_no_forced_yes' and 'yes_no_forced_no' responses + filtered_responses = data['payment_for_survey'].value_counts() + + # Calculate the proportion of 'yes_no_forced_yes' responses + total_yes_no = filtered_responses.get(1, 0) + filtered_responses.get(0, 0) + proportion_yes_forced = filtered_responses.get(1, 0) / total_yes_no if total_yes_no > 0 else 0 + + # Display the calculated proportion + print(f"Proportion of 'yes_no_forced_yes': {proportion_yes_forced:.4f}") + else: + print("The column 'payment_for_survey' is not present in the dataset.") + integrity_score_per_respondent['payment_for_survey'] = 0 + return integrity_score_per_respondent + +# respondent influence + +def respondent_influence_integrity(data, integrity_score_per_respondent): + """ + Calculate the integrity score for the 'respondent_influenced' column and append the result to the integrity_score_per_respondent DataFrame. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. + + Returns: + - None: The function modifies the integrity_score_per_respondent DataFrame in place. + """ + # Initialize the 'respondent_influenced' column with NA by default + integrity_score_per_respondent['respondent_influenced'] = np.nan + + # Check if the column 'respondent_influenced' is present in the data columns + if 'respondent_influenced' in data.columns: + + # Assign values based on specified conditions + integrity_score_per_respondent['respondent_influenced'] = data['respondent_influenced'].apply( + lambda x: 1 if x == 'yes_no_forced_no' else (0 if x == 'yes_no_forced_yes' else np.nan) + ) + + # Filter the 'yes' and 'no' responses + filtered_responses = data['respondent_influenced'].value_counts() + + # Calculate the proportion of 'no' responses with respect to 'yes' or 'no' + total_yes_no = filtered_responses.get(1, 0) + filtered_responses.get(0, 0) + proportion_no = filtered_responses.get(0, 0) / total_yes_no if total_yes_no > 0 else 0 + + # Display the calculated proportion + print(f"Proportion of 'no' responses: {proportion_no:.4f}") + else: + print("The column 'respondent_influenced' is not present in the dataset.") + + return integrity_score_per_respondent + + +# Response duration integrity + +def response_time_integrity(data, integrity_score_per_respondent,questions_df,column_strategy_df): + """ + Process the survey data to calculate time differences, scoring, and integrate scores into the integrity DataFrame. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data with 'start' and 'end' columns. + - integrity_score_per_respondent (pd.DataFrame): DataFrame to store integrity scores. + + Returns: + - data (pd.DataFrame): The modified survey data DataFrame. + - integrity_score_per_respondent (pd.DataFrame): The modified integrity score DataFrame. + """ + # Convert 'start' and 'end' columns to datetime, coercing errors to NaT + data['start'] = pd.to_datetime(data['start'], utc=True, errors='coerce') + data['end'] = pd.to_datetime(data['end'], utc=True, errors='coerce') + + # Calculate time difference in seconds and minutes + data['time_diff_seconds'] = (data['end'] - data['start']).dt.total_seconds() + data['time_diff_minutes'] = data['time_diff_seconds']/60 + # Find matching columns based on 'Field name' in questions_df + # Extract valid matching columns from `column_strategy_df` + matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] + + # Filter to include only columns present in `data` + matching_columns = [col for col in matching_columns if col in data.columns] + # Count the number of non-empty responses per row for matching columns + data['num_questions'] = data[matching_columns].notna().sum(axis=1) + + # Define time ranges based on num_questions per row + data['min_time'] = data['num_questions'].apply(lambda x: 10 * x) # 10 seconds per question + data['max_time'] = data['num_questions'].apply(lambda x: 30 * x) # 30 seconds per question + + # Apply scoring logic + def calculate_score(row): + time_diff = row['time_diff_seconds'] + min_time = row['min_time'] + max_time = row['max_time'] + + # Check for NaT values in start or end + if pd.isna(row['start']) or pd.isna(row['end']): + return 0 # Set score to 0 if start or end is not valid + + if time_diff < min_time or time_diff > max_time: + # Score decreases exponentially as the time moves away from the range + return np.exp(-abs(time_diff - ((min_time + max_time) / 2)) / 1000) + else: + return 2 + + # Calculate the score for each row + integrity_score_per_respondent['response_time_integrity'] = data.apply(calculate_score, axis=1) + + return data, integrity_score_per_respondent + + + +def check_audio_verification(data, integrity_score_per_respondent): + """ + Check for audio verification columns and create a corresponding integrity score. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. + + Returns: + - pd.DataFrame: The updated integrity_score_per_respondent DataFrame. + """ + # Specify the columns to check + columns_to_check = ['audio_verification_name_self_supervised', 'audio_verification_name', 'audio_verification_name_URL'] + + # Flag to check if any of the columns exist + columns_exist = False + + # Initialize the audio_verification column in integrity_score_per_respondent + integrity_score_per_respondent['audio_verification'] = np.nan + + # Check and create new column if any specified column exists + for col in columns_to_check[:-1]: # Check only the first two columns for integrity score + if col in data.columns: + columns_exist = True + # Create or update the 'audio_verification' column based on conditions + integrity_score_per_respondent['audio_verification'] = data[col].apply( + lambda x: 1 if pd.notna(x) and x != '' else 0 + ) + + # Check if 'audio_verification_name_URL' exists and update the score if needed + if 'audio_verification_name_URL' in data.columns: + # Update the integrity score based on URL presence + url_present = data['audio_verification_name_URL'].notna() & (data['audio_verification_name_URL'].str.strip() != '') + integrity_score_per_respondent['audio_verification'] = integrity_score_per_respondent['audio_verification'] & url_present.astype(int) + + # If none of the columns exist, keep the 'audio_verification' column with NaN + if not columns_exist: + integrity_score_per_respondent['audio_verification'] = np.nan + + return integrity_score_per_respondent + + +def questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent): + """ + Check if the column 'questions_which_were_difficult' exists and map the responses to a new 'difficulty_score' column. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. + + Returns: + - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'difficulty_score' column. + """ + # Check if the column 'questions_which_were_difficult' exists in the data + if 'questions_which_were_difficult' in data.columns: + # Mapping of responses to the specified values + difficulty_mapping = { + 'questions_which_were_difficult_all': 0, + 'questions_which_were_difficult_most': 0.5, + 'questions_which_were_difficult_some': 0.75, + 'questions_which_were_difficult_few': 1.5, + 'questions_which_were_difficult_none': 2 + } + + # Assign the mapped values to a new column + integrity_score_per_respondent['questions_which_were_difficult'] = data['questions_which_were_difficult'].map(difficulty_mapping) + else: + integrity_score_per_respondent['questions_which_were_difficult'] = np.nan + print("The column 'questions_which_were_difficult' is not present in the dataset.") + + return integrity_score_per_respondent + + +def respondent_suspicious_integrity(data, integrity_score_per_respondent): + """ + Check if the column 'respondent_suspicious' exists and map the responses to a new 'suspicious_score' column. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. + + Returns: + - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'suspicious_score' column. + """ + # Check if the column 'respondent_suspicious' exists in the data + if 'respondent_suspicious' in data.columns: + # Mapping of responses to the specified values + suspicious_mapping = { + 'respondent_suspicious_at_ease': 2, + 'respondent_suspicious_in_between': 1, + 'respondent_suspicious_suspicious': 0 + } + + # Assign the mapped values to a new column + integrity_score_per_respondent['respondent_suspicious'] = data['respondent_suspicious'].map(suspicious_mapping) + else: + print("The column 'respondent_suspicious' is not present in the dataset.") + integrity_score_per_respondent['respondent_suspicious'] = np.nan + + return integrity_score_per_respondent + + +def validate_phone_number_all_conditions(num_list): + # Clean the numbers, count occurrences, and check length validity + cleaned_nums = [''.join(filter(str.isdigit, str(num).strip())) for num in num_list] + num_counts = {num: cleaned_nums.count(num) for num in cleaned_nums} + + # Function to check if all digits in the number are the same + def all_digits_equal(num): + return len(set(num)) == 1 + + # Assign NaN if the number is empty, 0 if a number is duplicated, invalid length, or all digits are the same + results = [ + np.nan if num == '' else + 0 if num_counts[num] > 1 or not (5 <= len(num) <= 15) or all_digits_equal(num) else 1 + for num in cleaned_nums + ] + return results + + +def validate_names(data): + """ + This function validates names in the 'enumerator_name' and 'name' columns of the provided DataFrame. + It checks if the names contain exactly two words without numbers, and assigns scores based on validity: + - 1 for valid names + - 0 for invalid names (single word, names with numbers, empty, or NaN) + If both columns are present, the function averages the validation scores. + + :param data: DataFrame containing 'enumerator_name' and/or 'name' columns + :return: DataFrame with a 'name_validation' column indicating the validation results + """ + + # Check if 'enumerator_name' or 'name' column is present in the DataFrame + columns_present = [col for col in ['enumerator_name', 'name'] if col in data.columns] + + if not columns_present: + print("Neither 'enumerator_name' nor 'name' column is present in the dataset.") + return data + + # Function to validate individual names + def validate_name(name): + if pd.isna(name) or name.strip() == '': + return 0 # Return 0 for empty or NaN values + name_parts = name.split() + # Check if name contains exactly two words, no numbers, and is valid + if len(name_parts) == 2 and all(part.isalpha() for part in name_parts): + return 1 # Valid name + else: + return 0 # Invalid name + + # Initialize name validation column + data['name_validation'] = 0 + + # Validate 'enumerator_name' if present + if 'enumerator_name' in data.columns: + data['enumerator_name_validation'] = data['enumerator_name'].apply(validate_name) + data['name_validation'] += data['enumerator_name_validation'] + + # Validate 'name' if present + if 'name' in data.columns: + data['name_validation_name'] = data['name'].apply(validate_name) + data['name_validation'] += data['name_validation_name'] + + # Average validation scores if both columns are present + if 'enumerator_name' in data.columns and 'name' in data.columns: + data['name_validation'] = data[['enumerator_name_validation', 'name_validation_name']].mean(axis=1) + else: + data['name_validation'] = data['name_validation'] + + # Drop intermediate validation columns + data = data.drop(columns=['enumerator_name_validation', 'name_validation_name'], errors='ignore') + + return data[['name_validation']] + + +# Function to encode data and assess uniqueness, reporting similar pairs +def assess_uniqueness_and_report_pairs(data, matching_columns, integrity_df): + # Convert all column values to strings for uniform encoding + data[matching_columns] = data[matching_columns].astype(str) + + # Encode the matching columns into numeric vectors + label_encoders = {col: LabelEncoder().fit(data[col]) for col in matching_columns} + + # Transform data into encoded numeric format + encoded_data = data[matching_columns].apply(lambda col: label_encoders[col.name].transform(col)) + + # Calculate cosine similarity between rows to assess uniqueness + similarity_matrix = cosine_similarity(encoded_data) + np.fill_diagonal(similarity_matrix, 0) # Ignore self-similarity + + # Define a threshold to flag highly similar pairs + threshold = 0.9995 # Define a high similarity threshold + + # Identify pairs of rows that are highly similar + similar_pairs = [] + index_mapping = data['_index'].tolist() # Extract _index values to use in reporting + for i in range(similarity_matrix.shape[0]): + for j in range(i + 1, similarity_matrix.shape[1]): + if similarity_matrix[i, j] > threshold: + similar_pairs.append((index_mapping[i], index_mapping[j])) + + # Flag rows suspected of being duplicates based on _index values + duplicate_indices = list(set([idx for pair in similar_pairs for idx in pair])) + data['response_uniqueness'] = 2 + data.loc[data['_index'].isin(duplicate_indices), 'response_uniqueness'] = 0 + + # Add the uniqueness results to the integrity score DataFrame + integrity_df = integrity_df.merge( + data[['_index', 'response_uniqueness']], on='_index', how='left' + ) + + # Report similar pairs + print("Pairs of very similar responses:") + for pair in similar_pairs: + print(f"Similar response pair: Row with _index {pair[0]} and Row with _index {pair[1]}") + + # Output flagged indices for review + flagged_indices = data[data['response_uniqueness'] == 0]['_index'].tolist() + print(f"Flagged responses suspected of being duplicates: {flagged_indices}") + + return integrity_df, similar_pairs + + + +# Initialize the model and tokenizer once to avoid reloading in each function call +model_name = "facebook/bart-large-mnli" +model = AutoModelForSequenceClassification.from_pretrained(model_name) +tokenizer = AutoTokenizer.from_pretrained(model_name) + +# Initialize the classifier pipeline +classifier = pipeline( + "zero-shot-classification", + model=model, + tokenizer=tokenizer, + device=-1 # -1 ensures the pipeline runs on CPU +) + +# LanguageTool setup for grammar checking +tool_en = language_tool_python.LanguageTool('en-US') +tool_fr = language_tool_python.LanguageTool('fr') +tool_de = language_tool_python.LanguageTool('de') + + + + +# impact feedback + + + +# Function to automatically detect language and assess the text +def detect_language(text): + try: + return detect(text) + except: + return 'en' # Default to English if detection fails + + +# Function to assess individual text entries with specified language tool +def assess_text(text, column_name, tool): + # Handle non-string or empty values + if not isinstance(text, str) or text.strip() == '': + return {'score': 0, 'grammar_issues': 'N/A'} + + # Labels for classification + labels = ["positive impact", "negative impact", "good grammar", "poor grammar"] + + # Run zero-shot classification to assess impact and grammar + result = classifier(text, labels, multi_label=True) + + # Extract scores for impact and grammar + positive_score = result['scores'][result['labels'].index('positive impact')] + negative_score = result['scores'][result['labels'].index('negative impact')] + good_grammar_score = result['scores'][result['labels'].index('good grammar')] + + # Define thresholds to classify the text as having good impact and acceptable grammar + impact_threshold = 0.4 + grammar_threshold = 0.4 + + # Determine impact score based on column name + impact_score = positive_score if column_name == 'positive_effects_client' else negative_score + + # Final scoring based on impact and grammar assessment + if impact_score >= impact_threshold and good_grammar_score >= grammar_threshold: + score = 2 # High quality + elif impact_score >= impact_threshold or good_grammar_score >= grammar_threshold: + score = 1 # Acceptable quality + else: + score = 0 # Poor quality + + # Check grammar issues using the specified LanguageTool + grammar_issues = len(tool.check(text)) + + return {'score': score, 'grammar_issues': f"{grammar_issues} issues detected"} + +# Main function to assess text column +def assess_text_column(data, column_name, integrity_df): + # Automatically detect the language of the text + detected_language = data[column_name].apply(detect_language).mode()[0] # Find the most common detected language + + # Select the appropriate LanguageTool based on the detected language + tool = {'fr': tool_fr, 'en': tool_en, 'de': tool_de}.get(detected_language, tool_en) # Default to English if not specified + + # Apply the assess_text function to the specified column in the DataFrame + data['assessment'] = data[column_name].apply(lambda x: assess_text(x, column_name, tool)) + + # Ensure we're working with a copy of the DataFrame + integrity_df = integrity_df.copy() # Prevent SettingWithCopyError + + # Extract the scores from the assessment and add them to the integrity score DataFrame + if column_name == 'positive_effects_client': + score_column = 'positive_impact_score' + elif column_name == 'negative_effects_client': + score_column = 'negative_impact_score' + else: + score_column = f'{column_name}_score' + + integrity_df[score_column] = data['assessment'].apply(lambda x: x['score']) + + return integrity_df[score_column] + + + +# enumerator bias + +from sklearn.preprocessing import LabelEncoder +from sklearn.metrics.pairwise import cosine_similarity +import pandas as pd +import numpy as np + +# Function to assess enumerator bias based on response differences +def assess_enumerator_bias(data, matching_columns): + # Check if the enumerator_name column exists + if 'enumerator_name' not in data.columns: + print("Column 'enumerator_name' does not exist in the data.") + return data + + # Convert all matching column values to strings for uniform encoding + data[matching_columns] = data[matching_columns].astype(str) + + # Encode the matching columns into numeric vectors + label_encoders = {col: LabelEncoder().fit(data[col]) for col in matching_columns} + + # Transform data into encoded numeric format + encoded_data = data[matching_columns].apply(lambda col: label_encoders[col.name].transform(col)) + + # Calculate cosine similarity between rows + similarity_matrix = cosine_similarity(encoded_data) + + # Initialize a column for enumerator bias + data['enumerator_bias'] = 2 + + # Group data by enumerator_name and assess bias + enumerator_groups = data.groupby('enumerator_name').groups + enumerator_means = {name: encoded_data.loc[indices].mean(axis=0) for name, indices in enumerator_groups.items()} + + # Calculate the similarity between each enumerator's responses and the overall average + overall_mean = encoded_data.mean(axis=0) + + for enumerator, indices in enumerator_groups.items(): + enumerator_mean = enumerator_means[enumerator] + similarity_to_others = cosine_similarity([enumerator_mean], [overall_mean])[0][0] + + # Define a threshold to flag bias (set to low similarity) + bias_threshold = 0.8 # Adjust this threshold as needed + + # Flag enumerator responses if similarity to overall mean is below the threshold + if similarity_to_others < bias_threshold: + data.loc[indices, 'enumerator_bias'] = 0 + + return data + + + + +# location check + +def location_integrity(data, integrity_score_per_respondent): + """ + This function processes the 'location' column in the provided 'data' DataFrame, checks for proximity of coordinates + and updates a new 'location_check' column in 'integrity_score_per_respondent'. + + Parameters: + - data (pd.DataFrame): The DataFrame containing survey data with columns '_index', 'location', and 'interview_setting'. + - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store the location check integrity scores. + + Returns: + - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'location_check' column. + """ + # Check if 'location' column exists + if 'location' not in data.columns: + integrity_score_per_respondent['location_check'] = 1 + else: + # Extract latitude and longitude from 'location' column + data[['lat', 'lon']] = data['location'].str.split(' ', n=2, expand=True)[[0, 1]].astype(float) + + # Filter out rows with missing coordinates + filtered_data = data.dropna(subset=['lat', 'lon']) + + # Get necessary columns for further calculations + filtered_settings = filtered_data['interview_setting'].values + filtered_coordinates = filtered_data[['lat', 'lon']].values + filtered_indexes = filtered_data['_index'].values + + # Initialize a list to store pairs of indexes that are too close + too_close_pairs_df = [] + + # Calculate distances, considering <10 meters for non-market/community pairs + for i in range(len(filtered_coordinates)): + for j in range(i + 1, len(filtered_coordinates)): + if not ('setting_market' in [filtered_settings[i], filtered_settings[j]] or + 'setting_community_center' in [filtered_settings[i], filtered_settings[j]]): + distance = geodesic(filtered_coordinates[i], filtered_coordinates[j]).meters + if distance < 10: + too_close_pairs_df.append({ + 'Index1': filtered_indexes[i], + 'Index2': filtered_indexes[j], + 'Distance (meters)': distance, + 'Coordinates1': f"{filtered_coordinates[i][0]},{filtered_coordinates[i][1]}", + 'Coordinates2': f"{filtered_coordinates[j][0]},{filtered_coordinates[j][1]}", + 'Setting1': filtered_settings[i], + 'Setting2': filtered_settings[j] + }) + + # Convert the list to a DataFrame + too_close_pairs_df = pd.DataFrame(too_close_pairs_df) + + if len(too_close_pairs_df) > 1: + # Get the unique indexes from both 'Index1' and 'Index2' columns in the too_close_pairs_df + unique_indexes = pd.unique(too_close_pairs_df[['Index1', 'Index2']].values.ravel()) + + # Detect NaN values in 'location' and replace them + nan_detected = data['location'].isna() + data['location'] = data['location'].fillna(0) + data['location'] = data['location'].replace('nan', 0) + + # Optionally save the too-close pairs to an Excel file + too_close_pairs_df.to_excel('gps_issues.xlsx') + + else: + unique_indexes = [] + + # Update the 'location_check' column based on conditions + data['_index'] = data.index + data['location_check'] = data.apply( + lambda row: 0 if (pd.isna(row['location']) or row['location'] == '' or row['location'] == 0 or row['_index'] in unique_indexes) else 1, axis=1 + ) + + # Update the integrity score DataFrame with the 'location_check' column + integrity_score_per_respondent['location_check'] = data['location_check'] + + return integrity_score_per_respondent + + +# Create the log with integrity issues + + +# Define the function to create the integrity issues DataFrame +def integrity_issues_df(integrity_score_values_filtered, data, columns_integrity, data_columns_integrity): + + # Filter out columns from data_columns_integrity that don't exist in the data DataFrame + valid_columns = [col for col in data_columns_integrity if isinstance(col, list) and all(c in data.columns for c in col)] + valid_columns += [col for col in data_columns_integrity if isinstance(col, str) and col in data.columns] + + # Initialize an empty list to store the data for the DataFrame + integrity_data = [] + + # Loop through each row of the filtered integrity score DataFrame + for _, row in integrity_score_values_filtered.iterrows(): + for idx, column_group in enumerate(valid_columns): + # If the column group is a list (e.g., ['start', 'end']), loop through each column in the group + if isinstance(column_group, list): + for column in column_group: + # Check if the column exists in the data DataFrame + if column in data.columns: + actual_value = data.loc[data['_index'] == row['_index'], column].values[0] + else: + actual_value = np.nan # Return NaN if the column does not exist + + # Create a dictionary for each combination of '_index' and 'column' + issue_data = { + '_index': row['_index'], # Use '_index' from the row + 'question': column, + 'check': columns_integrity[idx], + 'quality_dimension': 'integrity', + 'actual': actual_value, + 'predicted': np.nan, # Set predicted as NaN + 'cleansing_urgency': row['cleansing_urgency'], + 'relevant_context_data': row[columns_integrity].to_dict() # Store the entire row for context + } + integrity_data.append(issue_data) + else: + # Handle single columns (e.g., 'payment_for_survey') + # Check if the column exists in the data DataFrame + if column_group in data.columns: + actual_value = data.loc[data['_index'] == row['_index'], column_group].values[0] + else: + actual_value = np.nan # Return NaN if the column does not exist + + # Create a dictionary for each combination of '_index' and 'column' + issue_data = { + '_index': row['_index'], # Use '_index' from the row + 'question': column_group, + 'check': columns_integrity[idx], + 'quality_dimension': 'integrity', + 'actual': actual_value, + 'predicted': np.nan, # Set predicted as NaN + 'cleansing_urgency': row['cleansing_urgency'], + 'relevant_context_data': row[columns_integrity].to_dict() # Store the entire row for context + } + integrity_data.append(issue_data) + + # Convert the list of dictionaries into a DataFrame + integrity_issues_df = pd.DataFrame(integrity_data) + + return integrity_issues_df + +# Function to calculate 'cleansing_urgency' based on 'score_ratio' +def cleansing_integrity(df): + df['cleansing_urgency'] = df['score_ratio'].apply( + lambda x: 'high' if x < 0.3 else ('low' if x < 0.5 else None) + ) + return df + + +def calculate_consistency_and_integrity_scores(raw_data, column_strategy_df, final_issues_df): + # Step 1: Filter columns based on the column_strategy_df + # Filter preferred columns based on non-NA 'Prefered method' and method not equal to "integrity_score" + preferred_columns = column_strategy_df[ + column_strategy_df['Prefered method'].notna() & + (column_strategy_df['Prefered method'] != "integrity_score") + ]['Field name'] + + # Further filter to include only columns present in raw_data + preferred_columns = [col for col in preferred_columns if col in raw_data.columns] + + # Select only these columns from raw_data + selected_columns = raw_data[preferred_columns] + + # Step 2: Calculate the number of non-empty responses per column + non_empty_counts = selected_columns.notna().sum() + + # Step 3: Calculate the number of unique _index per question from final_issues_df (for each quality dimension) + unique_index_counts_consistency = final_issues_df[final_issues_df['quality_dimension'] == 'consistency']\ + .groupby('question')['_index'].nunique() + + unique_index_counts_integrity = final_issues_df[ + (final_issues_df['quality_dimension'] == 'integrity') & + (final_issues_df['cleansing_urgency'].isin(['high', 'low'])) +]['_index'].nunique() + + # Step 4: Count the number of high and low cleansing urgency per question from final_issues_df (for each quality dimension) + high_cleansing_urgency_counts_consistency = final_issues_df[ + (final_issues_df['cleansing_urgency'] == 'high') & + (final_issues_df['quality_dimension'] == 'consistency') + ].groupby('question')['_index'].nunique() + + low_cleansing_urgency_counts_consistency = final_issues_df[ + (final_issues_df['cleansing_urgency'] == 'low') & + (final_issues_df['quality_dimension'] == 'consistency') + ].groupby('question')['_index'].nunique() + + high_cleansing_urgency_counts_integrity = final_issues_df[ + (final_issues_df['cleansing_urgency'] == 'high') & + (final_issues_df['quality_dimension'] == 'integrity') + ]['_index'].nunique() + + low_cleansing_urgency_counts_integrity = final_issues_df[ + (final_issues_df['cleansing_urgency'] == 'low') & + (final_issues_df['quality_dimension'] == 'integrity') + ]['_index'].nunique() + + # Step 5: Initialize dictionaries for consistency and integrity scores + consistency_scores = {} + integrity_scores = {} + + # Step 6: Calculate the consistency and integrity score per question based on quality dimension + for question in preferred_columns: + num_non_empty = non_empty_counts[question] if question in non_empty_counts else 0 + + # Consistency issues + num_issues_consistency = unique_index_counts_consistency.get(question,0) + num_high_urgency_consistency = high_cleansing_urgency_counts_consistency.get(question, 0) + num_low_urgency_consistency = low_cleansing_urgency_counts_consistency.get(question, 0) + + if num_non_empty > 0: + consistency_score = 1 - (num_issues_consistency / num_non_empty) + else: + consistency_score = None # Handle case where there are no non-empty responses + + consistency_scores[question] = { + 'number_of_non_empty_responses': num_non_empty, + 'number_of_index_with_issues': num_issues_consistency, + 'number_of_index_with_high_cleansing_urgency': num_high_urgency_consistency, + 'number_of_index_with_low_cleansing_urgency': num_low_urgency_consistency, + 'consistency_score': consistency_score + } + + # Integrity issues + num_issues_integrity = unique_index_counts_integrity + num_high_urgency_integrity = high_cleansing_urgency_counts_integrity + num_low_urgency_integrity = low_cleansing_urgency_counts_integrity + + integrity_scores[question] = { + 'number_of_index_with_issues': num_issues_integrity, + 'number_of_index_with_high_cleansing_urgency': num_high_urgency_integrity, + 'number_of_index_with_low_cleansing_urgency': num_low_urgency_integrity, + } + + # Step 7: Combine the consistency and integrity scores into a single DataFrame + consistency_df = pd.DataFrame.from_dict(consistency_scores, orient='index').reset_index() + consistency_df.rename(columns={'index': 'question'}, inplace=True) + + integrity_df = pd.DataFrame.from_dict(integrity_scores, orient='index').reset_index() + integrity_df.rename(columns={'index': 'question'}, inplace=True) + + # Step 8: Merge the consistency and integrity dataframes into one + final_df = pd.merge(consistency_df, integrity_df, on='question', suffixes=('_consistency', '_integrity')) + + # Return the final merged dataframe + return final_df + + +### Function to calculate coverage values for each cleansing scenario +import pandas as pd + +def evaluate_quota_coverage(raw_data, segmentation_columns, mapping_segmentation_quotas): + """ + Evaluates the extent to which raw_data covers specified quotas. + + Parameters: + raw_data (pd.DataFrame): Input data to evaluate. + segmentation_columns (list): List of columns used for segmentation. + mapping_segmentation_quotas (dict): Dictionary specifying quotas for each segmentation. + + Returns: + pd.DataFrame: DataFrame containing actual quotas, target quotas, + relative coverage, and whether the quota is achieved. + """ + results = [] + + for segment_column in segmentation_columns: + if segment_column not in mapping_segmentation_quotas: + continue + + quotas = mapping_segmentation_quotas[segment_column] + + # Calculate total counts for the segment + total_count = raw_data[segment_column].value_counts(normalize=True).to_dict() + + for category, target_quota in quotas.items(): + actual_quota = total_count.get(category, 0) + relative_coverage = actual_quota / target_quota if target_quota > 0 else 1 + achieved = 1 if actual_quota >= target_quota or target_quota == 0 else relative_coverage + + results.append({ + 'Segmentation_Column': segment_column, # The column being analyzed (e.g., 'gender') + 'Segment': category, # The unique value within the column (e.g., 'gender_male') + 'Target_Quota': target_quota, + 'Actual_Quota': actual_quota, + 'Relative_Coverage': relative_coverage, + 'Achieved': achieved + }) + + return pd.DataFrame(results) + + +def calculate_coverage_scores_by_segment(quota_coverage_df): + """ + Calculate the coverage scores separately for each segmentation column and segment + based on two criteria: + A) Weighted average based on target quota per segment. + B) Simple average for groups with at least 5% target quota per segment. + + Parameters: + quota_coverage_df (pd.DataFrame): DataFrame with the quota coverage data. + + Returns: + pd.DataFrame: DataFrame containing the coverage scores for each segmentation group. + """ + coverage_scores_by_segment = [] + + # Group by 'Segmentation_Column' and 'Segment' + for (segmentation_column, segment), group in quota_coverage_df.groupby(['Segmentation_Column', 'Segment']): + # A) Weighted Average Coverage Score + weighted_avg = (group['Achieved'] * group['Target_Quota']).sum() / group['Target_Quota'].sum() + + # B) Simple Average Coverage Score (only for groups with at least 5% target quota) + valid_groups = group[group['Target_Quota'] >= 0.05] + simple_avg = valid_groups['Achieved'].mean() + + # Store results for the current segmentation column and segment + coverage_scores_by_segment.append({ + 'Segmentation_Column': segmentation_column, + 'Segment': segment, + 'Weighted_Avg_Coverage': weighted_avg, + 'Simple_Avg_Coverage': simple_avg + }) + + # Convert the list of results to a DataFrame + return pd.DataFrame(coverage_scores_by_segment) + + +def calculate_coverage_for_all(raw_data, final_issues_df, segmentation_columns, mapping_segmentation_quotas): + """ + This function calculates the coverage scores for three variants of raw_data: + 1. raw_data (original data) + 2. raw_data_wurgent (excludes rows with urgent integrity issues) + 3. raw_data_wrecm (excludes rows with recommended integrity issues) + + Parameters: + raw_data (pd.DataFrame): Original data to process. + final_issues_df (pd.DataFrame): Data containing integrity issues. + segmentation_columns (list): List of segmentation columns. + mapping_segmentation_quotas (dict): Mapping of segmentation quotas. + + Returns: + pd.DataFrame: Concatenated DataFrame with coverage scores for all variants. + """ + # Step 1: Identify indices with integrity issues + urgent_integrity_index = final_issues_df[ + (final_issues_df['quality_dimension'] == 'integrity') & + (final_issues_df['cleansing_urgency'] == 'high') + ]['_index'].unique() + + recommended_integrity_index = final_issues_df[ + ( + (final_issues_df['quality_dimension'] == 'integrity') & + (final_issues_df['cleansing_urgency'] == 'high') + ) | + ( + (final_issues_df['quality_dimension'] == 'integrity') & + (final_issues_df['cleansing_urgency'] == 'low') + ) + ]['_index'].unique() + + # Step 2: Create data variants based on integrity issues + raw_data_wurgent = raw_data[~raw_data['_index'].isin(urgent_integrity_index)] + raw_data_wrecm = raw_data[~raw_data['_index'].isin(recommended_integrity_index)] + + # Step 3: Prepare a list to store coverage scores for all data variants + all_coverage_scores = [] + raw_data_list = [raw_data, raw_data_wurgent, raw_data_wrecm] + raw_data_names = ['0', 'A', 'B'] # Labels for the data variants + + # Step 4: Loop through each raw_data variant + for raw_data_variant, name in zip(raw_data_list, raw_data_names): + # Step 4.1: Calculate the coverage scores for the current raw_data variant + result_df = evaluate_quota_coverage(raw_data_variant, segmentation_columns, mapping_segmentation_quotas) + + # Step 4.2: Calculate the segment-based coverage scores + coverage_scores = calculate_coverage_scores_by_segment(result_df) + + # Step 4.3: Add a new column to indicate which variant of raw_data this is from + coverage_scores['raw_data_variant'] = name + + # Step 4.4: Append the coverage scores to the list + all_coverage_scores.append(coverage_scores) + + # Step 5: Concatenate the results from all raw_data variants into a single DataFrame + final_coverage_scores_df = pd.concat(all_coverage_scores, ignore_index=True) + + return final_coverage_scores_df + + +### CONSISTENCY SCORE REPORT WRAPPER FUNCTION + +def consistency_score_report( + raw_data, + indicator_df, + questions_df, + column_strategy_df, + data_all, + theme_list, +): + # Step 1: Ensure `_index` column exists in `raw_data` + if '_index' not in raw_data.columns: + raw_data['_index'] = range(1, len(raw_data) + 1) + + # Step 2: Create the question indicator mapping + indicator_question_theme_mapping = process_themes_and_questions(indicator_df, theme_list) + + # Step 3: Generate model-based outlier report + exclude_columns = get_missing_columns_without_model(column_strategy_df, data_all) + accurate_columns, acc_levels, pred_actual_tuples = model_process( + data_all=data_all, + column_strategy_df=column_strategy_df, + exclude_columns=exclude_columns + ) + data_issues_df = model_issues_to_data_frame(pred_actual_tuples) + + # Step 4: Create format check violations + columns_format_check = list( + column_strategy_df[column_strategy_df['Prefered method'].str.contains('format_check', na=False)]['Field name'] + ) + valid_columns = [col for col in columns_format_check if col in data_all.columns] + filtered_data_all = data_all[valid_columns].copy() + + format_checks(df=filtered_data_all, transformations_df=column_strategy_df) + format_violation_dictionary = create_nan_dict(filtered_data_all, raw_data, column_strategy_df=column_strategy_df) + data_issues_df = format_violations_to_df(format_violation_dictionary, data_issues_df) + + # Step 5: Distribution outlier approach + outlier_columns = column_strategy_df[ + column_strategy_df['Prefered method'].str.contains('outlier', na=False)]['Field name'] + outlier_columns = outlier_columns[outlier_columns.isin(raw_data.columns)] + outlier_data = raw_data[outlier_columns] # Only pass numeric columns + + dist_outliers_dict = detect_outliers(outlier_data) + data_issues_df = dist_outliers_dict_to_df(dist_outliers_dict, data_issues_df) + + # Step 6: Data completeness check + completeness_check_dict = completeness_check(raw_data, questions_df) + data_issues_df = completeness_check_dict_to_df(completeness_check_dict, data_issues_df) + + # Step 7: Consolidate consistency log and calculate scores + full_dict = {} + full_dict = merge_nested_dicts(completeness_check_dict, dist_outliers_dict) + full_dict = merge_nested_dicts(full_dict, pred_actual_tuples) + full_dict = merge_nested_dicts(full_dict, format_violation_dictionary) + + consistency_df = calculate_consistency_scores(raw_data, column_strategy_df, data_issues_df) + consistency_score = round(consistency_df['consistency_score'].mean(), 4) * 100 + + # Merge consistency data with the question indicator mapping + merged_df = consistency_df.merge( + indicator_question_theme_mapping, + how='left', + left_on='question', + right_on='Question(s)' + ) + + # Filter out rows where `ID` is NaN + filtered_df = merged_df[merged_df['ID'].notna()] + + # Step 8: Create tables for consistency dashboard + # Table 1.1: Question breakdown by data checks + unique_counts = data_issues_df.groupby(['question', 'check'])['_index'].nunique().reset_index() + pivoted_counts = unique_counts.pivot_table( + index=['question'], columns='check', values='_index', aggfunc='sum', fill_value=0 + ).reset_index() + pivoted_counts['total'] = pivoted_counts.iloc[:, 1:].sum(axis=1) + sorted_counts = pivoted_counts.sort_values(by='total', ascending=False).reset_index(drop=True) + table_1_1 = sorted_counts.copy() + columns = ['question', 'total'] + sorted([col for col in table_1_1.columns if col not in ['question', 'total']]) + table_1_1 = table_1_1[columns] + # Table 1.2: Data issues for each question + table_1_2 = data_issues_df.copy() + + # Table 1.3: Consistency scores per question and indicator + # Add consistency scores for each question + question_consistency = consistency_df[['question', 'consistency_score']].rename( + columns={'consistency_score': 'question_consistency_score'} + ) + + # Grouping by 'ID' and aggregating the data for each ID + aggregated_data = filtered_df.groupby('ID').agg({ + 'question': lambda x: ', '.join(x), # Concatenate all questions under the ID + 'consistency_score': 'mean' # Calculate the average consistency score for the ID + }).reset_index() + + # Updating the 'questions_score' column to include individual question scores + def format_questions_with_scores(questions, consistency_df): + question_scores = [ + f"{question} (Score: {consistency_df.loc[consistency_df['question'] == question, 'consistency_score'].values[0]:.2f})" + for question in questions.split(', ') + if question in consistency_df['question'].values + ] + return ', '.join(question_scores) + + aggregated_data['questions_score'] = aggregated_data['question'].apply( + lambda x: format_questions_with_scores(x, consistency_df) + ) + + # Renaming the consistency score column to 'indicator_score' + aggregated_data.rename(columns={'consistency_score': 'indicator_score'}, inplace=True) + table_1_3 = aggregated_data.copy() + table_1_3 = table_1_3[['ID', 'indicator_score','question', 'questions_score']] + return table_1_1, table_1_2, table_1_3 + + + +### INTEGRITY REPORT FUNCTION WITH ENTIRE WORKFLOW + +def generate_integrity_issues_log(raw_data, table_2_1,column_strategy_df): + """ + Generate a log of integrity issues. + + Parameters: + - raw_data (pd.DataFrame): The original dataset containing all raw survey responses. + - table_2_1 (pd.DataFrame): Table with integrity scores and cleansing urgency. + - mapping_table (pd.DataFrame): Mapping of columns_integrity to data_columns_integrity. + + Returns: + - table_2_4 (pd.DataFrame): DataFrame with detailed integrity issues log. + """ + issue_data_list = [] + + # Extract valid matching columns from `column_strategy_df` + matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] + + # Filter to include only columns present in `data` + matching_columns = [col for col in matching_columns if col in raw_data.columns] + + + # Provided lists for `columns_integrity` and `data_columns_integrity` + columns_integrity = [ + 'payment_for_survey', + 'respondent_influenced', + 'response_time_integrity', + 'audio_verification', + 'questions_which_were_difficult', + 'respondent_suspicious', + 'phone_number_check', + 'response_uniqueness', + 'name_check', + 'impact_feedback_integrity', + 'enumerator_bias', + 'location_check' + ] + + data_columns_integrity = [ + 'payment_for_survey', + 'respondent_influenced', + ['start', 'end'], + ['audio_verification_name', 'audio_verification_name_self_supervised', 'audio_verification_name_URL'], + 'questions_which_were_difficult', + 'respondent_suspicious', + 'phone_number', + matching_columns, + ['name', 'enumerator_name'], + ['positive_effects_client', 'negative_effects_client'], + 'matching_columns', + 'location' + ] + + + + + # Map `columns_integrity` to `data_columns_integrity` + mapping_table = pd.DataFrame({ + 'columns_integrity': columns_integrity, + 'data_columns_integrity': data_columns_integrity + }) + + # create table_2_4 + + for _index in table_2_1['_index']: + for _, row in mapping_table.iterrows(): + # Extract `columns_integrity` and `data_columns_integrity` from mapping_table + integrity_check = row['columns_integrity'] + data_cols = row['data_columns_integrity'] + + # Ensure `data_cols` is iterable (handle lists and single string cases) + if not isinstance(data_cols, list): + data_cols = [data_cols] + + for question in data_cols: + try: + if isinstance(question, str): # Ensure the question is valid + actual_value = ( + raw_data.loc[raw_data['_index'] == _index, question].values[0] + if question in raw_data.columns and not raw_data.loc[raw_data['_index'] == _index, question].empty + else np.nan + ) + predicted_value = ( + table_2_1.loc[table_2_1['_index'] == _index, integrity_check].values[0] + if integrity_check in table_2_1.columns + else np.nan + ) + cleansing_urgency_value = ( + table_2_1.loc[table_2_1['_index'] == _index, 'cleansing_urgency'].values[0] + if 'cleansing_urgency' in table_2_1.columns + else np.nan + ) + issue_data_list.append({ + '_index': _index, + 'question': question, + 'check': integrity_check, + 'quality_dimension': 'integrity', + 'actual': {question: actual_value}, + 'predicted': predicted_value, + 'cleansing_urgency': cleansing_urgency_value, + 'relevant_context_data': np.nan + }) + except Exception as e: + print(f"Error processing index {_index}, question {question}: {e}") + + # Convert the list to a DataFrame + table_2_4 = pd.DataFrame(issue_data_list) + return table_2_4 + + + +def integrity_report(raw_data, questions_df, column_strategy_df, survey_type,table_1_2): + """ + Generate data integrity reports (table_2_1 and table_2_2) based on the provided survey data. + + Parameters: + - survey_path (str): Path to the survey data file. + - questions_df (pd.DataFrame): DataFrame containing question details. + - column_strategy_df (pd.DataFrame): DataFrame with column strategies. + - survey_type (str): Type of survey ('Supervised (On Site)', 'Supervised (Telephone)', 'Unsupervised (Online)'). + + Returns: + - table_2_1 (pd.DataFrame): Integrity score table. + - table_2_2 (pd.DataFrame): Detailed data integrity table. + """ + + # ### 2.0.0 Create an empty data to report the issues per respondent + data = raw_data.reset_index(drop=True).copy() + integrity_score_per_respondent = pd.DataFrame() + integrity_score_per_respondent['_index'] = data['_index'] + + # ### 2.1 Who collected the data: enumerator_relationship + if 'enumerator_relationship' in data.columns: + integrity_score_per_respondent['enumerator_relationship'] = data['enumerator_relationship'] + + # ### 2.2 Payment for answers check: payment_for_survey - affects the score + integrity_score_per_respondent = payment_for_survey_integrity(data, integrity_score_per_respondent) + + # ### 2.3 Do you think anyone influenced the respondent's answers during the interview? + integrity_score_per_respondent = respondent_influence_integrity(data, integrity_score_per_respondent) + + # ### 2.4 Time to conduct the survey + data, integrity_score_per_respondent = response_time_integrity(data, integrity_score_per_respondent,questions_df,column_strategy_df) + + # ### 2.5 Voice Recorded as signature + integrity_score_per_respondent = check_audio_verification(data, integrity_score_per_respondent) + + # ### 2.6 questions_which_were_difficult + integrity_score_per_respondent = questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent) + + # ### 2.7 respondent_suspicious + integrity_score_per_respondent = respondent_suspicious_integrity(data, integrity_score_per_respondent) + + # ### 2.8 Phone_number + if 'phone_number' in data.columns: + integrity_score_per_respondent['phone_number_check'] = validate_phone_number_all_conditions(data['phone_number']) + else: + integrity_score_per_respondent['phone_number_check'] = 0 + + # ### 2.9 Name check + if 'name' in data.columns or 'enumerator_name' in data.columns: + integrity_score_per_respondent['name_check'] = validate_names(data) + else: + integrity_score_per_respondent['name_check'] = 0 + + # ### 2.10 Positive and negative impact client check + if 'positive_effects_client' in data.columns: + pos_score = assess_text_column(data, 'positive_effects_client', integrity_score_per_respondent) + neg_score = assess_text_column(data, 'negative_effects_client', integrity_score_per_respondent) + integrity_score_per_respondent['impact_feedback_integrity'] = pos_score.combine(neg_score, max) + else: + integrity_score_per_respondent['impact_feedback_integrity'] = 0 + + # ### 2.11 Respondent uniqueness + matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] + # Filter matching_columns to include only those present in data + matching_columns = [col for col in matching_columns if col in data.columns] + + integrity_score_per_respondent, _ = assess_uniqueness_and_report_pairs(data, matching_columns, integrity_score_per_respondent) + # If less than 10 columns to evaluate do not assess uniqueness as give max points + if len(matching_columns)< 10: + integrity_score_per_respondent['response_uniqueness'] = 1 + + # ### 2.12 Enumerator bias + if 'enumerator_name' in data.columns: + data = assess_enumerator_bias(data, matching_columns) + integrity_score_per_respondent['enumerator_bias'] = data['enumerator_bias'] + else: + integrity_score_per_respondent['enumerator_bias'] = 0 + + # ### 2.13 Check GPS proximity + integrity_score_per_respondent = location_integrity(data, integrity_score_per_respondent) + + # ### 2.14 Calculate the Integrity Score + columns_integrity = [ + 'payment_for_survey', + 'respondent_influenced', + 'response_time_integrity', + 'audio_verification', + 'questions_which_were_difficult', + 'respondent_suspicious', + 'phone_number_check', + 'response_uniqueness', + 'name_check', + 'impact_feedback_integrity', + 'enumerator_bias', + 'location_check' + ] + + survey_type_mapping = { + 'Supervised (On Site)': {col: 1 for col in columns_integrity}, + 'Supervised (Telephone)': {**{col: 1 for col in columns_integrity}, 'audio_verification': 0, 'location_check': 0}, + 'Unsupervised (Online)': {**{col: 1 for col in columns_integrity}, 'respondent_influenced': 0, 'audio_verification': 0, + 'questions_which_were_difficult': 0, 'respondent_suspicious': 0, 'name': 0, 'location_check': 0} + } + + data_columns_integrity = [ + 'payment_for_survey', + 'respondent_influenced', + ['start ', 'end'], + ['audio_verification_name','audio_verification_name_self_supervised','audio_verification_name_URL'], + 'questions_which_were_difficult', + 'respondent_suspicious', + 'phone_number', + matching_columns, + ['name', 'enumerator_name'], + ['positive_effects_client','negative_effects_client'], + matching_columns, + 'location' + ] + + + points = [1] * len(columns_integrity) + integrity_score_values = calculate_weighted_aggregate_with_max( + integrity_score_per_respondent, survey_type, columns_integrity, survey_type_mapping, points + ) + integrity_score_values['score_ratio'] = integrity_score_values['weighted_aggregate'] / integrity_score_values['max_possible_score'] + integrity_score_values['score_ratio'] = np.minimum(integrity_score_values['score_ratio'], 1) + + # add cleansing urgency + integrity_score_values = cleansing_integrity(integrity_score_values) + + # ### 2.15 Create necessary tables for navigation + table_2_1 = integrity_score_values[[ + '_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score', + 'payment_for_survey', 'respondent_influenced', 'response_time_integrity', + 'audio_verification', 'questions_which_were_difficult', 'respondent_suspicious', + 'phone_number_check', 'name_check', 'impact_feedback_integrity','enumerator_bias', + 'location_check' + ]] + table_2_1 = table_2_1.sort_values(by='score_ratio', ascending=True) + # Flatten data_columns_integrity + data_columns_integrity = [ + 'payment_for_survey', 'respondent_influenced', ['start', 'end'], + ['audio_verification_name', 'audio_verification_name_self_supervised', 'audio_verification_name_URL'], + 'questions_which_were_difficult', 'respondent_suspicious', 'phone_number', + ['name', 'enumerator_name'], ['positive_effects_client', 'negative_effects_client'], 'location' + ] + # Flatten nested lists into a single list of column names + flattened_columns = [] + for col in data_columns_integrity: + if isinstance(col, list): + flattened_columns.extend(col) + else: + flattened_columns.append(col) + + # Ensure unique columns + unique_columns = list(dict.fromkeys(flattened_columns)) + + # Start with '_index' column + columns_to_include = ['_index'] + + # Select columns that exist in data + columns_to_include += [col for col in unique_columns if col in data.columns] + + # Add missing columns to data with NaN values + for col in unique_columns: + if col not in data.columns: + data[col] = np.nan + + # Create table_2_2 with the selected and created columns + table_2_2 = data[columns_to_include] + # create table 2_4 with integrity standarized issues report + # Apply the function to the DataFrame + integrity_score_values_filtered = table_2_1[table_2_1['score_ratio']<0.5] + table_2_4 = generate_integrity_issues_log(raw_data, table_2_1,column_strategy_df) + integrity_issues = generate_integrity_issues_log(raw_data, integrity_score_values_filtered,column_strategy_df) + # create table 2_3 with the scores to calculate representativity scores + + + # store all integrity issues + if integrity_score_values_filtered.shape[0] == 0: + final_issues_df = table_1_2.copy() + else: + final_issues_df = pd.concat([table_1_2, integrity_issues], ignore_index=True) + # Example usage with the raw_data, column_strategy_df, and final_issues_df DataFrames + report_df = calculate_consistency_and_integrity_scores( raw_data, column_strategy_df, final_issues_df) + # Create a report per issues to calculate the different representativity scenarios + report_df = calculate_consistency_and_integrity_scores( raw_data, column_strategy_df, final_issues_df) + report_df['integrity_score'] = table_2_1['score_ratio'].mean() + report_df['integrity_score_high'] = table_2_1[table_2_1['score_ratio'] >= 0.3]['score_ratio'].mean() + report_df['integrity_score_low'] = table_2_1[table_2_1['score_ratio'] >= 0.5]['score_ratio'].mean() + report_df['consistency_score_high'] = 1- report_df['number_of_index_with_high_cleansing_urgency_consistency']/report_df['number_of_non_empty_responses'] + report_df['number_of_index_with_issues_integrity'] = report_df['number_of_index_with_high_cleansing_urgency_integrity'] + report_df['number_of_index_with_low_cleansing_urgency_integrity'] + report_df['consistency_score_low'] = report_df['consistency_score'] + # Calculate high_score ensuring it does not exceed 1 + report_df['high_score'] = 1 - ( + (report_df['number_of_index_with_high_cleansing_urgency_consistency'] + + report_df['number_of_index_with_high_cleansing_urgency_integrity']) / + report_df['number_of_non_empty_responses'] + ).clip(upper=1) + + # Calculate low_score ensuring it does not go below 0 + report_df['low_score'] = 1 - ( + (report_df['number_of_index_with_issues_consistency'] + + report_df['number_of_index_with_issues_integrity']) / + report_df['number_of_non_empty_responses'] + ).clip(lower=0) + + table_2_3 = report_df.copy() + + # Change column order in table_2_1 + required_columns = ['_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score'] + other_columns = [col for col in table_2_1.columns if col not in required_columns] + ordered_columns = required_columns + other_columns + + # Reorder table_2_1 + table_2_1 = table_2_1[ordered_columns] + + return table_2_1, table_2_2,table_2_3,table_2_4 + + + +def integrity_report(raw_data, questions_df, column_strategy_df, survey_type, table_1_2): + """ + Generate data integrity reports (table_2_1 and table_2_2) based on the provided survey data. + + Parameters: + - raw_data (pd.DataFrame): DataFrame containing the survey data. + - questions_df (pd.DataFrame): DataFrame containing question details. + - column_strategy_df (pd.DataFrame): DataFrame with column strategies. + - survey_type (str): Type of survey ('Supervised (On Site)', 'Supervised (Telephone)', 'Unsupervised (Online)'). + - table_1_2 (pd.DataFrame): Consistency table. + + Returns: + - table_2_1 (pd.DataFrame): Integrity score table. + - table_2_2 (pd.DataFrame): Detailed data integrity table. + - table_2_3 (pd.DataFrame): Table for representativity scores. + - table_2_4 (pd.DataFrame): Standardized integrity issues report. + """ + + try: + print("Step 1: Preparing data") + data = raw_data.reset_index(drop=True).copy() + integrity_score_per_respondent = pd.DataFrame() + integrity_score_per_respondent['_index'] = data['_index'] + except Exception as e: + print(f"Error in Step 1: {e}") + raise + + try: + print("Step 2: Enumerator relationship") + if 'enumerator_relationship' in data.columns: + integrity_score_per_respondent['enumerator_relationship'] = data['enumerator_relationship'] + except Exception as e: + print(f"Error in Step 2: {e}") + raise + + try: + print("Step 3: Payment for survey integrity") + integrity_score_per_respondent = payment_for_survey_integrity(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 3: {e}") + raise + + try: + print("Step 4: Respondent influenced integrity") + integrity_score_per_respondent = respondent_influence_integrity(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 4: {e}") + raise + + try: + print("Step 5: Response time integrity") + data, integrity_score_per_respondent = response_time_integrity(data, integrity_score_per_respondent, questions_df, column_strategy_df) + except Exception as e: + print(f"Error in Step 5: {e}") + raise + + try: + print("Step 6: Audio verification integrity") + integrity_score_per_respondent = check_audio_verification(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 6: {e}") + raise + + try: + print("Step 7: Questions which were difficult") + integrity_score_per_respondent = questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 7: {e}") + raise + + try: + print("Step 8: Respondent suspicious integrity") + integrity_score_per_respondent = respondent_suspicious_integrity(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 8: {e}") + raise + + try: + print("Step 9: Phone number validation") + if 'phone_number' in data.columns: + integrity_score_per_respondent['phone_number_check'] = validate_phone_number_all_conditions(data['phone_number']) + else: + integrity_score_per_respondent['phone_number_check'] = 0 + except Exception as e: + print(f"Error in Step 9: {e}") + raise + + try: + print("Step 10: Name validation") + if 'name' in data.columns or 'enumerator_name' in data.columns: + integrity_score_per_respondent['name_check'] = validate_names(data) + else: + integrity_score_per_respondent['name_check'] = 0 + except Exception as e: + print(f"Error in Step 10: {e}") + raise + + try: + print("Step 11: Positive and negative impact client") + integrity_score_per_respondent['impact_feedback_integrity'] = 2 + #if 'positive_effects_client' in data.columns: + # pos_score = assess_text_column(data, 'positive_effects_client', integrity_score_per_respondent) + # neg_score = assess_text_column(data, 'negative_effects_client', integrity_score_per_respondent) + # integrity_score_per_respondent['impact_feedback_integrity'] = pos_score.combine(neg_score, max) + #else: + # integrity_score_per_respondent['impact_feedback_integrity'] = 0 + except Exception as e: + print(f"Error in Step 11: {e}") + raise + + try: + print("Step 12: Respondent uniqueness") + matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] + matching_columns = [col for col in matching_columns if col in data.columns] + integrity_score_per_respondent, _ = assess_uniqueness_and_report_pairs(data, matching_columns, integrity_score_per_respondent) + if len(matching_columns) < 10: + integrity_score_per_respondent['response_uniqueness'] = 1 + except Exception as e: + print(f"Error in Step 12: {e}") + raise + + try: + print("Step 13: Enumerator bias") + if 'enumerator_name' in data.columns: + data = assess_enumerator_bias(data, matching_columns) + integrity_score_per_respondent['enumerator_bias'] = data['enumerator_bias'] + else: + integrity_score_per_respondent['enumerator_bias'] = 0 + except Exception as e: + print(f"Error in Step 13: {e}") + raise + + try: + print("Step 14: Location integrity") + integrity_score_per_respondent = location_integrity(data, integrity_score_per_respondent) + except Exception as e: + print(f"Error in Step 14: {e}") + raise + + try: + print("Step 15: Calculate integrity scores") + columns_integrity = [ + 'payment_for_survey', 'respondent_influenced', 'response_time_integrity', 'audio_verification', + 'questions_which_were_difficult', 'respondent_suspicious', 'phone_number_check', 'response_uniqueness', + 'name_check', 'impact_feedback_integrity', 'enumerator_bias', 'location_check' + ] + points = [1] * len(columns_integrity) + survey_type_mapping = { + 'Supervised (On Site)': {col: 1 for col in columns_integrity}, + 'Supervised (Telephone)': {**{col: 1 for col in columns_integrity}, 'audio_verification': 0, 'location_check': 0}, + 'Unsupervised (Online)': {**{col: 1 for col in columns_integrity}, 'respondent_influenced': 0, 'audio_verification': 0, + 'questions_which_were_difficult': 0, 'respondent_suspicious': 0, 'name_check': 0, 'location_check': 0} + } + integrity_score_values = calculate_weighted_aggregate_with_max( + integrity_score_per_respondent, survey_type, columns_integrity, survey_type_mapping, points + ) + integrity_score_values['score_ratio'] = integrity_score_values['weighted_aggregate'] / integrity_score_values['max_possible_score'] + integrity_score_values['score_ratio'] = np.minimum(integrity_score_values['score_ratio'], 1) + except Exception as e: + print(f"Error in Step 15: {e}") + raise + + try: + print("Step 16: Add cleansing urgency") + integrity_score_values = cleansing_integrity(integrity_score_values) + except Exception as e: + print(f"Error in Step 16: {e}") + raise + + try: + print("Step 17: Prepare table_2_1 and table_2_2") + table_2_1 = integrity_score_values.sort_values(by='score_ratio', ascending=True) + + # Update the column 'cleansing_urgency' based on 'score_ratio' values + table_2_1['cleansing_urgency'] = table_2_1['score_ratio'].apply( + lambda x: 'high' if x < 0.3 else ('low' if x < 0.5 else None) + ) + + columns_to_include = ['_index'] + [col for col in data.columns if col in matching_columns] + table_2_2 = data[columns_to_include] + except Exception as e: + print(f"Error in Step 17: {e}") + raise + + try: + print("Step 18: Generate integrity issues log (table_2_4)") + integrity_score_values_filtered = table_2_1[table_2_1['score_ratio'] < 0.5] + table_2_4 = generate_integrity_issues_log(data, table_2_1, column_strategy_df) + integrity_issues_df = generate_integrity_issues_log(data, integrity_score_values_filtered, column_strategy_df) + except Exception as e: + print(f"Error in Step 18: {e}") + raise + try: + print("Step 19: Final issues and report generation") + if table_2_1.shape[0] == 0: + final_issues_df = table_1_2.copy() + else: + final_issues_df = pd.concat([table_1_2, table_2_4], ignore_index=True) + table_2_5 = final_issues_df.copy() + report_df = calculate_consistency_and_integrity_scores(data, column_strategy_df, final_issues_df) + report_df['integrity_score'] = table_2_1['score_ratio'].mean() + report_df['integrity_score_high'] = table_2_1[table_2_1['score_ratio'] >= 0.3]['score_ratio'].mean() + report_df['integrity_score_low'] = table_2_1[table_2_1['score_ratio'] >= 0.5]['score_ratio'].mean() + report_df['consistency_score_high'] = 1 - ( + report_df['number_of_index_with_high_cleansing_urgency_consistency'] / + report_df['number_of_non_empty_responses'] + ) + report_df['consistency_score_low'] = report_df['consistency_score'] + + # Calculate high_score ensuring it does not exceed 1 + report_df['high_score'] = 1 - ( + (report_df['number_of_index_with_high_cleansing_urgency_consistency'] + + report_df['number_of_index_with_high_cleansing_urgency_integrity']) / + report_df['number_of_non_empty_responses'] + ).clip(upper=1) + + # Calculate low_score ensuring it does not go below 0 + report_df['low_score'] = 1 - ( + (report_df['number_of_index_with_issues_consistency'] + + report_df['number_of_index_with_issues_integrity']) / + report_df['number_of_non_empty_responses'] + ).clip(lower=0) + + table_2_3 = report_df.copy() + except Exception as e: + print(f"Error in Step 19: {e}") + raise + + # Change column order in table_2_1 + required_columns = ['_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score'] + other_columns = [col for col in table_2_1.columns if col not in required_columns] + ordered_columns = required_columns + other_columns + + # Reorder table_2_1 + table_2_1 = table_2_1[ordered_columns] + # Return all tables + try: + print("Returning tables: table_2_1, table_2_2, table_2_3, table_2_4,table_2_5") + return table_2_1, table_2_2, table_2_3, table_2_4, table_2_5 + except Exception as e: + print(f"Error during return: {e}") + raise + + +def representativity_report(segmentation, raw_data, table_2_4, segmentation_columns, mapping_segmentation_quotas, + table_2_3, N, table_1_3): + + # Step 1: If segmentation is 'yes', calculate the coverage scores + if segmentation == 'yes': + # Calculate table_3_1 - coverage score per question + table_3_1 = calculate_coverage_for_all(raw_data, table_2_4, segmentation_columns, mapping_segmentation_quotas) + + # Calculate table_3_2 - average Weighted_Avg_Coverage per raw_data_variant + table_3_2 = table_3_1.groupby('raw_data_variant')['Weighted_Avg_Coverage'].mean().reset_index() + else: + table_3_1 = pd.DataFrame() + table_3_2 = pd.DataFrame() + + # Step 2: Calculate representativity scores per question + representativity_scores_df_high = calculate_representativity_scores_per_question(table_2_3, N, score_column='high_score') + representativity_score_actual_0, representativity_score_clean_A = representativity_scores_df_high['confidence_actual'].mean(), representativity_scores_df_high['confidence_clean'].mean() + + representativity_scores_df_low = calculate_representativity_scores_per_question(table_2_3, N, score_column='low_score') + representativity_score_actual_0_low, representativity_score_clean_B = representativity_scores_df_low['confidence_actual'].mean(), representativity_scores_df_low['confidence_clean'].mean() + + # Step 3: Calculate overall representativity scores based on segmentation + if segmentation == 'yes': + overall_representativity_score_0 = (((table_3_2[table_3_2['raw_data_variant']=='0']['Weighted_Avg_Coverage'] + representativity_score_actual_0/100)/2)*100) + overall_representativity_score_A = (((table_3_2[table_3_2['raw_data_variant']=='A']['Weighted_Avg_Coverage'] + representativity_score_clean_A/100)/2)*100) + overall_representativity_score_B = (((table_3_2[table_3_2['raw_data_variant']=='B']['Weighted_Avg_Coverage'] + representativity_score_clean_B/100)/2)*100) + else: + overall_representativity_score_0 = representativity_score_actual_0 + overall_representativity_score_A = representativity_score_clean_A + overall_representativity_score_B = representativity_score_clean_B + + # Step 4: Calculate the data quality report + # Calculate the consistency score + consistency_score = table_1_3['indicator_score'].median() + + # Create the data quality report DataFrame + data_quality_report_df = pd.DataFrame() + + # Add scenario scores + data_quality_report_df['scenario'] = ['0','A','B'] + data_quality_report_df['consistency_score'] = [consistency_score, table_2_3['consistency_score_low'].median(), 1] + + # If segmentation is 'yes', use the representativity scores from the segmented data + if segmentation == 'yes': + data_quality_report_df['overall_representativity_score'] = [ + overall_representativity_score_0[0]/100, + overall_representativity_score_A[1]/100, + overall_representativity_score_B[2]/100 + ] + else: + data_quality_report_df['overall_representativity_score'] = [ + overall_representativity_score_0/100, + overall_representativity_score_A/100, + overall_representativity_score_B/100 + ] + + # Add integrity scores + data_quality_report_df['integrity_score'] = [ + table_2_3['integrity_score'].median(), + table_2_3['integrity_score_low'].median(), + table_2_3['integrity_score_low'].median() + ] + + # Calculate the data quality score + data_quality_report_df['data_quality_score'] = ( + data_quality_report_df['consistency_score'] + + data_quality_report_df['overall_representativity_score'] + + data_quality_report_df['integrity_score'] + ) / 3 + + # Step 5: Copy the final data quality report to table_3_3 + table_3_3 = data_quality_report_df.copy() + + # Step 6: Create a table with representativity per question after urgent cleaning is in place + table_3_4 = representativity_scores_df_high.copy() + + if segmentation == 'yes': + return table_3_1, table_3_2, table_3_3, table_3_4 + else: + return table_3_3, table_3_4 + + + +### ENUMERATOR BIAS REPORT +def enumerator_urgent_issues_report(raw_data, table_2_5): + + # Clean the names (strip spaces and lowercase) + raw_data['numerator_name_clean'] = raw_data['enumerator_name'].str.strip().str.lower().str.replace(r'\s+', ' ', regex=True) + + # Initialize the pre-trained Sentence-BERT model + model = SentenceTransformer('all-MiniLM-L6-v2') + + # Generate embeddings for the cleaned names + embeddings = model.encode(raw_data['numerator_name_clean'].tolist()) + + # Calculate cosine similarity between all name embeddings + cosine_sim = cosine_similarity(embeddings) + + # Define a threshold to group similar names + threshold = 0.85 # Similarity threshold (adjust based on your data) + + # Create a dictionary to map names to the most similar group + name_to_group = {} + group_counter = 0 + + for i, name in enumerate(raw_data['numerator_name_clean']): + # If the name already has a group, skip it + if name in name_to_group: + continue + # Find all names with cosine similarity above the threshold + similar_names = np.where(cosine_sim[i] > threshold)[0] + # Assign all similar names to the same group + for idx in similar_names: + name_to_group[raw_data['numerator_name_clean'].iloc[idx]] = group_counter + group_counter += 1 + + # Assign the corrected group name based on the most similar name in each group + group_to_name = {} + for name, group_id in name_to_group.items(): + if group_id not in group_to_name: + group_to_name[group_id] = name # Use the first name in the group as the canonical name + + # Create the corrected names column + raw_data['enumerator_name_corrected'] = raw_data['numerator_name_clean'].map(lambda x: group_to_name[name_to_group[x]]) + + # Show the cleaned and corrected names + raw_data[['enumerator_name', 'enumerator_name_corrected']] + + # Create the new columns based on conditions provided + table_2_6 = table_2_5.copy() + table_2_6['consistency_low'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'low' else 0, axis=1) + table_2_6['consistency_high'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'high' else 0, axis=1) + table_2_6['integrity_high'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'high' else 0, axis=1) + table_2_6['integrity_low'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'low' else 0, axis=1) + table_2_6['cleansing_urgency_high'] = table_2_5.apply(lambda row: 1 if (row['quality_dimension'] in ['consistency', 'integrity']) and row['cleansing_urgency'] == 'high' else 0, axis=1) + table_2_6['cleansing_urgency_low'] = table_2_5.apply(lambda row: 1 if (row['quality_dimension'] in ['consistency', 'integrity']) and row['cleansing_urgency'] == 'low' else 0, axis=1) + + + table_2_6 = table_2_6[['_index','consistency_low', 'consistency_high', 'integrity_high', 'integrity_low', 'cleansing_urgency_high', 'cleansing_urgency_low']] + + # Reset index to avoid ambiguity when using _index as a column + raw_data.reset_index(drop=True, inplace=True) # Drop the old index + raw_data['_index'] = raw_data.index + 1 # Reassign _index as a column, not an index + + # Group by '_index' and aggregate the columns accordingly + table_2_6_grouped = table_2_6.groupby('_index').agg({ + 'consistency_low': 'max', + 'consistency_high': 'max', + 'integrity_high': 'max', + 'integrity_low': 'max', + 'cleansing_urgency_high': 'max', + 'cleansing_urgency_low': 'max' + }).reset_index() + + # Merge the enumerator_name_corrected with table_2_6_grouped + table_2_6_grouped = table_2_6_grouped.merge(raw_data[['_index','enumerator_name_corrected']], on='_index') + + # Calculate the number of high and low cleansing urgency issues per enumerator_name_corrected + summary = table_2_6_grouped.groupby('enumerator_name_corrected').agg( + high_urgency_issues=('cleansing_urgency_high', 'sum'), + total_indices=('_index', 'count') + ).reset_index() + + # Calculate the proportion of high urgency issues + summary['high_urgency_proportion'] = summary['high_urgency_issues'] / summary['total_indices'] + + # Sort by the proportion of high urgency issues in descending order + summary_sorted = summary.sort_values('high_urgency_proportion', ascending=False) + + # Filter rows where cleansing_urgency_high is 1 + high_urgency_issues = table_2_5.merge(raw_data[['_index','enumerator_name_corrected']], on='_index', how='inner') + high_urgency_issues = high_urgency_issues[['enumerator_name_corrected', '_index', 'quality_dimension', 'cleansing_urgency'] + + [col for col in high_urgency_issues.columns if col not in ['enumerator_name_corrected', '_index', 'quality_dimension', 'cleansing_urgency']]] + + return summary_sorted, high_urgency_issues + + +### REPORT CLEANING DATA SET + +def data_cleansing(raw_data, table_2_5, customer, theme, site, population_size, interview_type): + # Initialize the cleaned data as a copy of the original raw_data + cleansed_data = raw_data.copy() + + # Create a 'date' column with the latest 'end' date value in every row + latest_end_date = raw_data['end'].max() + cleansed_data['date'] = latest_end_date + + # DataFrame to store cleansing details (criteria, _index, affected columns, original values, new values) + cleansed_summary = [] + + # Scenario 1: If quality_dimension is 'consistency' and cleansing_urgency is 'high' + for i, row in table_2_5.iterrows(): + if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'high': + # Find the corresponding question column in raw_data + question_column = row['question'] + + # Get the original value before cleansing + original_value = raw_data.loc[raw_data['_index'] == row['_index'], question_column].iloc[0] + + # Set the value to None in the cleansed_data + cleansed_data.loc[cleansed_data['_index'] == row['_index'], question_column] = None + + # Log the details in the cleansed_summary + cleansed_summary.append({ + 'criteria': 'consistency & high cleansing_urgency', + '_index': row['_index'], + 'affected_columns': [question_column], + 'original_value': original_value, + 'new_value': None + }) + + # Scenario 2: If quality_dimension is 'integrity' and cleansing_urgency is 'high', filter the row + for i, row in table_2_5.iterrows(): + if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'high': + # Capture the entire row before filtering + original_row = raw_data[raw_data['_index'] == row['_index']] + + # Remove the entire row from cleansed_data + cleansed_data = cleansed_data[cleansed_data['_index'] != row['_index']] + + # Log the details of the removed row in the cleansed_summary + cleansed_summary.append({ + 'criteria': 'integrity & high cleansing_urgency', + '_index': row['_index'], + 'affected_columns': ['entire row removed'], + 'original_value': original_row.to_dict(orient='records')[0] if not original_row.empty else None, + 'new_value': 'row removed' + }) + + # Create the 'readme' DataFrame with the parameters passed into the function + readme_data = { + 'parameter': ['customer', 'theme', 'site', 'population size', 'interview type'], + 'value': [customer, theme, site, population_size, interview_type] + } + readme = pd.DataFrame(readme_data) + + # Convert cleansed_summary into a DataFrame + cleansed_summary_df = pd.DataFrame(cleansed_summary) + + return raw_data, cleansed_data, cleansed_summary_df, readme + + + + +### ACTIONS REPORT + +## CONSISTENCY +import random +import random + +def consistency_issues_action(table_1_1, table_2_3): + try: + # Merge the two tables on the 'question' column + df = table_1_1.merge(table_2_3, on='question', how='inner') + except Exception as e: + return f"Error during merging tables: {e}" + + # Define columns used to calculate issues + issues_columns = [ + 'completeness check', + 'dist_outlier_check', + 'free-text check (more than 3 characters but less than two words)', + 'model based outlier' + ] + issue_threshold = 0.20 + + # Introductory statements + intro_statements = [ + "After reviewing the data, we found several questions that require attention due to significant consistency issues.", + "Based on our analysis, the following questions have been flagged due to their consistency concerns.", + "Our analysis identified several questions where consistency issues need to be addressed, as detailed below.", + "The following questions exhibit high levels of inconsistency, and we recommend taking a closer look at them.", + "In reviewing the dataset, we've identified certain questions with notable consistency problems that need your attention.", + "We have analyzed the data and found that several questions may require additional review due to potential consistency issues.", + "Our investigation has revealed some questions where the data shows inconsistencies that should be addressed promptly.", + "Based on recent checks, we identified a set of questions with significant consistency challenges that require action.", + "After a thorough review, we observed that the following questions exhibit consistency problems that need to be addressed.", + "Our data analysis indicates that several questions are affected by consistency issues that need further scrutiny." + ] + + report = [] + intro = random.choice(intro_statements) + questions_with_issues = [] + + # Ensure issues columns exist and fill missing with 0 + for col in issues_columns + ['consistency_score_low', 'total']: + if col not in df.columns: + df[col] = 0 + else: + df[col] = df[col].fillna(0) + + for _, row in df.iterrows(): + try: + total_issues = sum(row[issues_columns]) + total_responses = row['total'] + + if total_responses == 0: + continue + + if row['consistency_score_low'] < 0.80 and total_issues / total_responses > issue_threshold: + question = row['question'] + issues_count = row[issues_columns] + sorted_issues = issues_count.sort_values(ascending=False) + + max_issue_dimension = sorted_issues.index[0] + max_issue_value = sorted_issues.iloc[0] + + if len(sorted_issues) > 1 and sorted_issues.iloc[1] >= 5: + second_max_issue_dimension = sorted_issues.index[1] + second_max_issue_value = sorted_issues.iloc[1] + issue_details = ( + f"Question: '{question}' has {total_issues} issues.\n" + f" - The dimension(s) with the most issues: {max_issue_dimension} with {max_issue_value} issues.\n" + f" - The second dimension with issues: {second_max_issue_dimension} with {second_max_issue_value} issues.\n" + ) + else: + issue_details = ( + f"Question: '{question}' has {total_issues} issues.\n" + f" - The dimension with the most issues: {max_issue_dimension} with {max_issue_value} issues.\n" + ) + + questions_with_issues.append(issue_details) + except Exception as e: + continue # Skip problematic rows silently + + explanation_text = ( + "\nThe following dimensions are evaluated for consistency:\n" + "- Completeness check: An answer was expected but not provided.\n" + "- Dist outlier check: A value outside the range of reasonable values.\n" + "- Free-text check (more than 3 characters but less than two words): Ensures minimal content for free-text responses.\n" + "- Model-based outlier: An inconsistent or extreme value compared to typical responses.\n\n" + ) + + if not questions_with_issues: + report.append("All questions show good consistency health. No immediate actions are required.\n") + else: + report.append(intro) + report.append(explanation_text) + if len(questions_with_issues) == 1: + report.insert(-1, "The following question requires attention:\n") + else: + report.insert(-1, "The following questions require attention:\n") + report.extend(questions_with_issues) + + report.append("\nFor a detailed view of each question's consistency issues, please refer to the 'Data Consistency Issues Deep Dive' tab.\n") + + styles = ["\033[1m", "\033[3m", "\033[4m", "\033[7m"] + styled_report = random.choice(styles) + "\n".join(report) + "\033[0m" + + return styled_report + + +import random +import pandas as pd + +def integrity_issues_action(table_2_1, threshold=0.7): + # Define the maximum possible values for each column + column_max_values = { + 'payment_for_survey': 1, + 'respondent_influenced': 1, + 'response_time_integrity': 1, + 'audio_verification': 1, + 'questions_which_were_difficult': 2, + 'respondent_suspicious': 2, + 'phone_number_check': 1, + 'response_uniqueness': 1, + 'name_check': 1, + 'impact_feedback_integrity': 2, + 'enumerator_bias': 2, + 'location_check': 1 + } + + # Filter rows where the integrity score is below the threshold + urgent_rows = table_2_1[table_2_1['score_ratio'] < threshold] + + if urgent_rows.empty: + # No respondents with low integrity scores + return "All respondents have sufficient integrity to be considered for impact measurement." + + # Multiple introductory statements + intro_statements = [ + "After reviewing the data, we found several respondents that require attention due to significant integrity issues.", + "Based on our analysis, the following respondents have been flagged due to integrity concerns.", + "Our analysis identified several respondents where integrity issues need to be addressed, as detailed below.", + "The following respondents exhibit low integrity scores, and we recommend taking a closer look at them.", + "In reviewing the dataset, we've identified certain respondents with notable integrity problems that need your attention.", + "We have analyzed the data and found that several respondents may require additional review due to integrity issues.", + "Our investigation has revealed some respondents where integrity issues should be addressed promptly.", + "Based on recent checks, we identified a set of respondents with significant integrity challenges that require action.", + "After a thorough review, we observed that the following respondents exhibit integrity issues that need to be addressed.", + "Our data analysis indicates that several respondents are affected by integrity issues that need further scrutiny." + ] + + # Select a random intro statement + intro = random.choice(intro_statements) + + # List to collect sections for each respondent + issues_report = [] + + for index, row in urgent_rows.iterrows(): + # Start a new section for each respondent + section = [f"**Respondent with _index: {row['_index']}**"] + + # Add the questions and scores if there are fewer than 3 respondents + if len(urgent_rows) < 3: + far_from_max_columns = [] + for col in column_max_values: + max_value = column_max_values[col] + score = row[col] + if pd.notna(score) and score != '' and score < max_value: # Check if score is not NaN or empty, and less than max value + far_from_max_columns.append(f'{col.replace("_", " ").title()} (score: {score}/{max_value})') + + if far_from_max_columns: + section.append("\n The following checks scored below the maximum value:") + section.extend(far_from_max_columns) + + # Append the section to the report + issues_report.append("\n".join(section)) + + # Combine the intro and the detailed sections + final_report = intro + "\n\n" + "\n\n".join(issues_report) + + # Add an explanation of what each check means + final_report += """ +\n The following checks are evaluated for integrity: +- **Payment for Survey:** Less integrity if the respondent was paid to do it. +- **Respondent Influenced:** Less integrity score if the respondent seemed influenced. +- **Response Time Integrity:** Less integrity if the respondent took too long or too short to respond. +- **Audio Verification:** More integrity if audio verification is in place. +- **Questions Were Difficult:** Less integrity if more questions were hard to respond to. +- **Respondent Suspicious:** Less integrity the more suspicious the respondent is. +- **Phone Number Check:** More integrity if a realistic phone number is provided. +- **Response Uniqueness:** More integrity if the response is truly unique. +- **Name Check:** More integrity if the name is realistic. +- **Impact Feedback Integrity:** More integrity if relevant and well-articulated feedback is provided. +- **Enumerator Bias:** Less integrity if enumerator responses are biased. +- **Location Check:** Less integrity if responses' locations are too close to each other in certain contexts. + +For a detailed view of each respondent's integrity issues, please refer to the 'Integrity Issues Deep Dive' tab. +""" + + return final_report