| |
|
| | import numpy as np |
| | import pandas as pd |
| | from subprocess import check_output |
| | |
| | import ipywidgets as widgets |
| | from IPython.display import display, Image, SVG, display_svg |
| |
|
| | |
| | import matplotlib.pyplot as plt |
| | import seaborn as sns |
| |
|
| | |
| | from sklearn.cluster import KMeans |
| | from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier |
| | from sklearn.metrics import accuracy_score, r2_score, silhouette_samples, silhouette_score, confusion_matrix, f1_score |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.preprocessing import LabelEncoder |
| | from sklearn.metrics.pairwise import cosine_similarity |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from sklearn.tree import DecisionTreeRegressor |
| |
|
| | |
| | from fastai.tabular.all import * |
| | from dtreeviz.trees import * |
| | from deep_translator import GoogleTranslator |
| | from sentence_transformers import SentenceTransformer |
| | from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer |
| | import language_tool_python |
| | from langdetect import detect |
| | from geopy.distance import geodesic |
| |
|
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | columns_integrity = [ |
| | 'payment_for_survey', |
| | 'respondent_influenced', |
| | 'response_time_integrity', |
| | 'audio_verification', |
| | 'questions_which_were_difficult', |
| | 'respondent_suspicious', |
| | 'phone_number', |
| | 'response_uniqueness', |
| | 'name', |
| | 'impact_feedback_integrity', |
| | 'enumerator_bias', |
| | 'location_check' |
| | ] |
| |
|
| |
|
| |
|
| | |
| | survey_type_mapping = { |
| | 'Supervised (On Site)': { |
| | 'payment_for_survey':1, |
| | 'respondent_influenced':1, |
| | 'response_time_integrity':1, |
| | 'audio_verification':1, |
| | 'questions_which_were_difficult':1, |
| | 'respondent_suspicious':1, |
| | 'phone_number':1, |
| | 'response_uniqueness':1, |
| | 'name':1, |
| | 'impact_feedback_integrity':1, |
| | 'enumerator_bias':1, |
| | 'location_check':1 |
| | }, |
| | 'Supervised (Telephone)': { |
| | 'payment_for_survey':1, |
| | 'respondent_influenced':1, |
| | 'response_time_integrity':1, |
| | 'audio_verification':0, |
| | 'questions_which_were_difficult':1, |
| | 'respondent_suspicious':1, |
| | 'phone_number':1, |
| | 'response_uniqueness':1, |
| | 'name':1, |
| | 'impact_feedback_integrity':1, |
| | 'enumerator_bias':1, |
| | 'location_check':0 |
| | }, |
| | 'Unsupervised (Online)': { |
| | 'payment_for_survey':1, |
| | 'respondent_influenced':0, |
| | 'response_time_integrity':1, |
| | 'audio_verification':0, |
| | 'questions_which_were_difficult':0, |
| | 'respondent_suspicious':0, |
| | 'phone_number':1, |
| | 'response_uniqueness':1, |
| | 'name':0, |
| | 'impact_feedback_integrity':1, |
| | 'enumerator_bias':0, |
| | 'location_check':0 |
| | } |
| | } |
| |
|
| | |
| | points = [1, 1, 1, 1, 2, 2, 1, 1, 1, 2, 2, 1] |
| |
|
| |
|
| | |
| | def calculate_weighted_aggregate_with_max(data, survey_type, columns_integrity, survey_type_mapping, points): |
| | |
| | weights = survey_type_mapping.get(survey_type, {}) |
| |
|
| | |
| | missing_columns = set(columns_integrity) - set(data.columns) |
| | for col in missing_columns: |
| | data[col] = 0 |
| | |
| | |
| | weighted_values = data[columns_integrity].multiply([weights.get(col, 0) for col in columns_integrity], axis=1) |
| |
|
| | |
| | data['weighted_aggregate'] = weighted_values.sum(axis=1) |
| |
|
| | |
| | max_possible_score = sum([weights.get(col, 0) * points[i] for i, col in enumerate(columns_integrity)]) |
| | data['max_possible_score'] = max_possible_score-1 |
| |
|
| | return data |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| | def load_parameters(parameters_file): |
| | """ |
| | Load and parse the parameters file. |
| | """ |
| | with open(parameters_file, "r") as f: |
| | parameters = json.load(f) |
| |
|
| | return parameters |
| |
|
| | def parse_parameters(parameters_path, uuid): |
| | |
| | if parameters_path.endswith('.xlsx') or parameters_path.endswith('.xls'): |
| | df = pd.read_excel(parameters_path) |
| | else: |
| | df = pd.read_csv(parameters_path) |
| |
|
| | |
| | row = df[df['leonardoDataCollectionId'] == uuid] |
| |
|
| | if row.empty: |
| | raise ValueError(f"No entry found for UUID: {uuid}") |
| |
|
| | |
| | def get_value(column): |
| | return row[column].values[0] if pd.notnull(row[column].values[0]) else np.nan |
| |
|
| | N = int(row['expectedRespondents']) |
| |
|
| | survey_type = get_value('supervisionType') |
| |
|
| | assessment = get_value('assessment') |
| | theme_list = [theme.strip() for theme in str(assessment).split(',')] if pd.notnull(assessment) else [] |
| |
|
| | customer = get_value('company') |
| | site = get_value('site') |
| |
|
| | |
| | mapping_segmentation_quotas = None |
| | raw_mapping = get_value('mappingSegmentationQuotas') |
| | if pd.isnull(raw_mapping): |
| | segmentation = 'no' |
| | segmentation_columns = [] |
| | else: |
| | segmentation = 'yes' |
| | cleaned = raw_mapping.replace("\\_", "_") |
| | mapping_segmentation_quotas = ast.literal_eval(cleaned) |
| | segmentation_columns = list(mapping_segmentation_quotas.keys()) |
| |
|
| |
|
| | environment = 'local' |
| |
|
| | return N, survey_type, theme_list, segmentation_columns, mapping_segmentation_quotas, customer, site, environment, segmentation |
| |
|
| |
|
| |
|
| | def load_dataframes(indicator_path, questions_path, choice_path, survey_path): |
| | |
| | indicator_df = pd.read_excel(indicator_path) |
| | questions_df = pd.read_excel(questions_path) |
| | choice_df = pd.read_excel(choice_path) |
| | data_all = pd.read_excel(survey_path) |
| |
|
| | |
| | if '_index' in data_all.columns: |
| | data_all.reset_index(drop=True, inplace=True) |
| | data_all.set_index('_index', inplace=True) |
| | else: |
| | print("Warning: '_index' column not found in the dataframe.") |
| |
|
| | |
| | raw_data = data_all.copy() |
| |
|
| | |
| | column_strategy_df = questions_df.copy() |
| |
|
| | |
| | column_strategy_df = column_strategy_df[['Field name', 'Answer Type', 'Format check text', |
| | 'Expected behaviour', 'Preferred method']] |
| |
|
| | |
| | column_strategy_df.columns = ['Field name', 'Answer Type', 'Format_check_text', |
| | 'Expected behaviour', 'Prefered method'] |
| |
|
| | |
| | column_strategy_df = column_strategy_df[column_strategy_df['Field name'].isin(data_all.columns)] |
| |
|
| | return indicator_df, questions_df, choice_df, data_all, raw_data, column_strategy_df |
| |
|
| |
|
| | |
| |
|
| | def process_themes_and_questions(indicator_df, theme_list): |
| | """ |
| | Processes the indicator DataFrame, filters rows based on the provided theme list, splits and explodes |
| | the 'Themes' and 'Question(s)' columns, and returns a DataFrame with unique combinations of 'Themes', |
| | 'Question(s)', and 'ID'. |
| | |
| | Parameters: |
| | - indicator_df (DataFrame): The original DataFrame containing the survey data. |
| | - theme_list (list): A list of themes to filter the rows by. |
| | |
| | Returns: |
| | - DataFrame: A DataFrame containing unique combinations of 'Themes', 'Question(s)', and 'ID'. |
| | """ |
| | |
| | indicator_df['Themes'] = indicator_df['Themes'].astype(str) |
| |
|
| | |
| | theme_id_with_boundaries = [f'\\b{theme}\\b' for theme in theme_list] |
| | pattern = '|'.join(theme_id_with_boundaries) |
| |
|
| | |
| | filtered_df = indicator_df[indicator_df['Themes'].str.contains(pattern)].copy() |
| |
|
| | |
| | filtered_df['Themes'] = filtered_df['Themes'].str.split(', ') |
| | filtered_df['Question(s)'] = filtered_df['Question(s)'].str.split(', ') |
| |
|
| | |
| | exploded_df = filtered_df.explode('Themes').explode('Question(s)').reset_index(drop=True) |
| |
|
| | |
| | final_df = exploded_df.merge(indicator_df[['ID', 'Themes']], on='ID', how='inner', suffixes=('', '_original')) |
| |
|
| | |
| | final_df = final_df[['Themes_original', 'Question(s)', 'ID']] |
| |
|
| | |
| | final_df = final_df.rename(columns={'Themes_original': 'Themes'}) |
| |
|
| | |
| | final_df = final_df.drop_duplicates(subset=['Question(s)', 'ID']) |
| |
|
| | return final_df |
| |
|
| | |
| |
|
| |
|
| | def get_missing_columns_without_model(data_validation_strategy_df, data_all): |
| | |
| | model_filtered_df = data_validation_strategy_df[data_validation_strategy_df['Prefered method'].str.contains('model', na=False)] |
| |
|
| | |
| | model_field_names = set(model_filtered_df['Field name']) |
| |
|
| | |
| | data_all_columns = set(data_all.columns) |
| |
|
| | |
| | missing_columns_without_model = data_all_columns.difference(model_field_names) |
| |
|
| | |
| | return list(missing_columns_without_model) |
| |
|
| |
|
| | def rf_feat_importance(m, df): |
| | return pd.DataFrame({'cols':df.columns, 'imp':m.feature_importances_} |
| | ).sort_values('imp', ascending=False) |
| |
|
| | def model_process(data_all, column_strategy_df, exclude_columns, |
| | test_size=0.4, random_state=42, max_features=0.5, |
| | accuracy_threshold=0.85, num_top_features=10): |
| | |
| | accurate_columns = [] |
| | acc_levels = [] |
| | pred_actual_tuples = {} |
| | |
| | |
| | analysis_columns = [col for col in data_all.columns if col not in exclude_columns] |
| | |
| | for c in analysis_columns: |
| | if column_strategy_df[column_strategy_df['Field name'] == c].empty: |
| | y_type = 'decimal' |
| | else: |
| | y_type = column_strategy_df.loc[column_strategy_df['Field name'] == c, 'Answer Type'].values[0] |
| |
|
| | |
| | df_c = data_all[data_all[c].notna()].copy() |
| | if df_c.empty or df_c[c].nunique() == 1: |
| | continue |
| | |
| | |
| | if y_type not in ['select_one', 'select_multiple']: |
| | df_c[c] = pd.to_numeric(df_c[c], errors='coerce') |
| | |
| | |
| | df_c.dropna(subset=[c], inplace=True) |
| | |
| | |
| | if len(df_c) < int(1 / test_size): |
| | continue |
| |
|
| | |
| | columns_to_fill = df_c.columns.difference([c]) |
| | cont, cat = cont_cat_split(df_c[analysis_columns], 1, dep_var=c) |
| | to = TabularPandas(df_c, procs=[Categorify, FillMissing, OneHotEncode], cat_names=cat, cont_names=cont, y_names=c) |
| | |
| | |
| | train_idx, test_idx = train_test_split(df_c.index, test_size=test_size, random_state=random_state) |
| | |
| | |
| | label_mapping = {code: label for label, code in zip(df_c[c], to.y)} |
| | |
| | |
| | model = RandomForestClassifier(max_features=max_features, random_state=random_state) if y_type in ['select_one', 'select_multiple'] else RandomForestRegressor(max_features=max_features, random_state=random_state) |
| | |
| | try: |
| | |
| | print(f"Processing column: {c} with data type {y_type}") |
| | |
| | model.fit(to.xs.loc[df_c.index], to.y.loc[df_c.index]) |
| | except ValueError as e: |
| | print(f"Error fitting model on column {c}: {str(e)}") |
| | continue |
| | |
| | |
| | y_pred_train = model.predict(to.xs.loc[train_idx]) |
| | y_pred_test = model.predict(to.xs.loc[test_idx]) |
| | |
| | |
| | actual_train = to.y.loc[train_idx] |
| | actual_test = to.y.loc[test_idx] |
| | predicted_train = y_pred_train |
| | predicted_test = y_pred_test |
| | |
| | combined_indices = list(train_idx) + list(test_idx) |
| | combined_mapped_indexes = combined_indices |
| | combined_actual = pd.concat([actual_train, actual_test]) |
| | combined_predicted = list(predicted_train) + list(predicted_test) |
| | |
| | |
| | assert len(combined_indices) == len(combined_mapped_indexes), "Index mapping error: mismatched lengths." |
| |
|
| | acc = accuracy_score(combined_actual, combined_predicted) if y_type in ['select_one', 'select_multiple'] else r2_score(combined_actual, combined_predicted) |
| | |
| | if y_type in ['select_one', 'select_multiple']: |
| | combined_predicted_mapped = [label_mapping[pred] for pred in combined_predicted] |
| | combined_actual_mapped = [label_mapping[act] for act in combined_actual] |
| | |
| | if 1 > acc >= accuracy_threshold: |
| | accurate_columns.append(c) |
| | acc_levels.append(acc) |
| | |
| | top_features = rf_feat_importance(model, to.xs.loc[train_idx]).nlargest(num_top_features, 'imp')['cols'].tolist() |
| | top_features = [feat for feat in top_features if feat in df_c.columns] |
| |
|
| | pred_actual_tuples[c] = [] |
| | for i, index in enumerate(combined_indices): |
| | actual = combined_actual.iloc[i] |
| | predicted = combined_predicted[i] |
| | |
| | |
| | if y_type in ['select_one', 'select_multiple']: |
| | predicted = combined_predicted_mapped[i] |
| | actual = combined_actual_mapped[i] |
| | condition = predicted != actual |
| | else: |
| | |
| | condition = (actual != 0 and abs(predicted - actual) / abs(actual) > 0.75) |
| | |
| | if condition: |
| | |
| | features = {feat: df_c.at[index, feat] for feat in top_features if feat in df_c.columns} |
| | |
| | |
| | cleansing_urgency = 'low' |
| | if isinstance(actual, (int, float)) and isinstance(predicted, (int, float)): |
| | relative_difference = abs(predicted - actual) / abs(actual) |
| | print(relative_difference) |
| | if relative_difference > 2 and abs(actual) > 1: |
| | cleansing_urgency = 'high' |
| | |
| | pred_actual_tuples[c].append({ |
| | 'index': index, |
| | 'method': 'model based outlier', |
| | 'model_accuracy': acc, |
| | 'actual': actual, |
| | 'predicted': predicted, |
| | 'explanatory questions': features, |
| | 'cleansing_urgency': cleansing_urgency |
| | }) |
| |
|
| | return accurate_columns, acc_levels, pred_actual_tuples |
| |
|
| |
|
| | def model_issues_to_data_frame(pred_actual_tuples): |
| | |
| | records = [] |
| |
|
| | |
| | for question, entries in pred_actual_tuples.items(): |
| | for entry in entries: |
| | |
| | record = { |
| | '_index': entry['index'], |
| | 'question': question, |
| | 'check': entry['method'], |
| | 'quality_dimension': 'consistency', |
| | 'actual': entry['actual'], |
| | 'predicted': entry['predicted'], |
| | 'cleansing_urgency': entry['cleansing_urgency'], |
| | } |
| |
|
| | |
| | explanatory_questions = '; '.join(f"{key}: {value}" for key, value in entry['explanatory questions'].items()) |
| | |
| | relevant_context_data = f"{explanatory_questions}; model_accuracy: {entry['model_accuracy']}" |
| | record['relevant_context_data'] = relevant_context_data |
| | |
| | records.append(record) |
| |
|
| | |
| | result_df = pd.DataFrame(records) |
| |
|
| | return result_df |
| |
|
| |
|
| | |
| | def check_date_time(value): |
| | """ |
| | Checks if the specified value can be converted to datetime. |
| | If conversion is successful, returns the datetime value; |
| | otherwise, returns pd.NaT. |
| | |
| | Parameters: |
| | - value: The value to be checked for datetime conversion. |
| | |
| | Returns: |
| | - pd.Timestamp or pd.NaT: A datetime value if conversion is successful; |
| | otherwise, pd.NaT. |
| | """ |
| | |
| | if pd.isna(value) or value == '': |
| | return np.nan |
| | |
| | try: |
| | |
| | return pd.to_datetime(value, errors='raise') |
| | except (ValueError, TypeError): |
| | |
| | return np.nan |
| |
|
| |
|
| | def format_checks(df, transformations_df): |
| |
|
| | for _, row in transformations_df.iterrows(): |
| | |
| | if isinstance(row['Prefered method'], str) and 'format_check' in row['Prefered method']: |
| | if row['Field name'] == 'start': |
| | |
| | df['start'] = df['start'].apply(check_date_time) |
| | if row['Field name'] == 'end': |
| | |
| | df['end'] = df['end'].apply(check_date_time) |
| | |
| | def create_nan_dict(data_all, raw_data, column_strategy_df): |
| | |
| | |
| | nan_records = {} |
| |
|
| | |
| | if not data_all.index.equals(raw_data.index): |
| | print("Warning: Indices between data_all and raw_data do not match.") |
| | |
| | raw_data = raw_data.reindex(data_all.index) |
| |
|
| | |
| | common_columns = data_all.columns.intersection(raw_data.columns) |
| |
|
| | for column in common_columns: |
| | |
| | if column not in nan_records: |
| | nan_records[column] = [] |
| |
|
| | |
| | |
| | nan_indices = data_all[column].isna() & ~raw_data[column].isna() |
| |
|
| | |
| | for index in data_all[nan_indices].index: |
| | if index in raw_data.index: |
| | nan_records[column].append({ |
| | 'index': index, |
| | 'method': 'format_check', |
| | 'format_check': column_strategy_df.loc[column_strategy_df['Field name'] == column, 'Format_check_text'].item() |
| | if not column_strategy_df.loc[column_strategy_df['Field name'] == column, 'Format_check_text'].isna().all() else None, |
| | 'actual': raw_data.at[index, column] |
| | }) |
| |
|
| | return nan_records |
| |
|
| |
|
| | def format_violations_to_df(format_violation_dictionary, data_issues_df): |
| | """ |
| | Transforms a format violation dictionary into a DataFrame and appends it to an existing DataFrame. |
| | |
| | Parameters: |
| | - format_violation_dictionary: A dictionary containing format violations. |
| | - data_issues_df: A DataFrame to which the new format violations will be appended. |
| | |
| | Returns: |
| | - A combined DataFrame with format violations and existing data issues. |
| | """ |
| | |
| | records = [] |
| |
|
| | |
| | for key, entries in format_violation_dictionary.items(): |
| | for entry in entries: |
| | records.append({ |
| | '_index': entry['index'], |
| | 'question': key, |
| | 'check': 'format_check', |
| | 'quality_dimension': 'consistency', |
| | 'actual': entry['actual'], |
| | 'predicted': np.nan, |
| | 'cleansing_urgency': 'high', |
| | 'relevant_context_data': f"{entry['format_check']} | actual value: {entry['actual']}" |
| | }) |
| |
|
| | |
| | format_violations_df = pd.DataFrame(records) |
| |
|
| | |
| | combined_df = pd.concat([format_violations_df, data_issues_df], ignore_index=True) |
| |
|
| | return combined_df |
| |
|
| |
|
| |
|
| |
|
| | def summarize_log_issues(issue_dict): |
| | import pandas as pd |
| |
|
| | results = [] |
| | unique_values_dict = {} |
| |
|
| | for key, issues in issue_dict.items(): |
| | total_issues = len(issues) |
| | |
| | unique_issues = set(issue['actual'] for issue in issues if 'actual' in issue) |
| | results.append((key, total_issues, len(unique_issues))) |
| | unique_values_dict[key] = list(unique_issues) |
| |
|
| | |
| | df = pd.DataFrame(results, columns=['Key', 'Total Issues', 'Unique Issues']) |
| | df_sorted = df.sort_values(by='Total Issues', ascending=False) |
| |
|
| | |
| | for key, unique_values in unique_values_dict.items(): |
| | print(f"Key: {key}, Unique Actual Values: {unique_values}") |
| |
|
| | return df_sorted |
| |
|
| |
|
| | |
| |
|
| |
|
| | def detect_outliers(df): |
| | outliers_dict = {} |
| |
|
| | |
| | for column_name in df.columns: |
| | |
| | column_data = df[column_name] |
| | if not isinstance(column_data, pd.Series): |
| | print(f"Error: Data for column {column_name} is not a Series.") |
| | continue |
| |
|
| | |
| | data = pd.to_numeric(column_data, errors='coerce') |
| |
|
| | |
| | data = data.dropna() |
| |
|
| | |
| | data = data[~data.isin([-1, -2])] |
| |
|
| | |
| | if data.empty or data.nunique() <= 2: |
| | print(f"No non-binary numeric data available for analysis in column {column_name}.") |
| | continue |
| |
|
| | |
| | Q1 = data.quantile(0.25) |
| | Q3 = data.quantile(0.75) |
| | IQR = Q3 - Q1 |
| |
|
| | |
| | lower_bound = Q1 - 1.5 * IQR |
| | upper_bound = Q3 + 1.5 * IQR |
| | outlier_indices = data[(data < lower_bound) | (data > upper_bound)].index |
| |
|
| | |
| | if len(outlier_indices) > 0 and len(outlier_indices) < 0.05 * len(data): |
| | outlier_details = [{ |
| | 'index': idx, |
| | 'method': 'distribution based outlier', |
| | 'actual': data.loc[idx], |
| | 'Q1': Q1, |
| | 'Q3': Q3, |
| | 'cleaning_urgency': 'high' if data.loc[idx] > 10 * Q3 else 'low' |
| | } for idx in outlier_indices] |
| | outliers_dict[column_name] = outlier_details |
| |
|
| | return outliers_dict |
| |
|
| |
|
| | def print_sorted_outliers(dist_outliers_dict): |
| | """ |
| | This function processes a dictionary of outliers, extracting and sorting the 'actual' values |
| | from the issues reported for each key. It prints the number of issues, Q1, Q3, and the sorted |
| | outliers in descending order. |
| | |
| | :param dist_outliers_dict: Dictionary containing keys with lists of issues, where each issue |
| | is expected to be a dictionary with 'actual', 'Q1', and 'Q3' keys. |
| | """ |
| | for key, issues in dist_outliers_dict.items(): |
| | |
| | outlier_values = sorted([issue['actual'] for issue in issues], reverse=True) |
| | |
| | if issues: |
| | Q1 = issues[0]['Q1'] |
| | Q3 = issues[0]['Q3'] |
| | print(f"{key}:") |
| | print(f" Number of Issues: {len(issues)}") |
| | print(f" Q1: {Q1}, Q3: {Q3}") |
| | print(f" Outliers: {outlier_values}") |
| | print("\n") |
| |
|
| |
|
| | def dist_outliers_dict_to_df(dist_outliers_dict, data_issues_df): |
| | """ |
| | Transforms a format violation dictionary into a DataFrame and appends it to an existing DataFrame. |
| | |
| | Parameters: |
| | - format_violation_dictionary: A dictionary containing format violations. |
| | - data_issues_df: A DataFrame to which the new format violations will be appended. |
| | |
| | Returns: |
| | - A combined DataFrame with format violations and existing data issues. |
| | """ |
| | |
| | records = [] |
| |
|
| | |
| | for key, entries in dist_outliers_dict.items(): |
| | for entry in entries: |
| | records.append({ |
| | '_index': entry['index'], |
| | 'question': key, |
| | 'check': 'dist_outlier_check', |
| | 'quality_dimension': 'consistency', |
| | 'actual': entry['actual'], |
| | 'predicted': np.nan, |
| | 'cleansing_urgency': entry['cleaning_urgency'], |
| | 'relevant_context_data': f"Q1: {entry['Q1']} | Q3: {entry['Q3']}" |
| | }) |
| |
|
| | |
| | df = pd.DataFrame(records) |
| |
|
| | |
| | combined_df = pd.concat([df, data_issues_df], ignore_index=True) |
| |
|
| | return combined_df |
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| |
|
| | import re |
| |
|
| |
|
| | import pandas as pd |
| | import numpy as np |
| | import re |
| |
|
| | def completeness_check(df, questions_df): |
| | """ |
| | Checks the completeness of the DataFrame, including: |
| | - Handling missing values like `-1`, `-2`, or patterns like "No answer" and "Unknown". |
| | - Handling free-text columns where the text is numeric or non-numeric, and applying specific checks. |
| | |
| | Parameters: |
| | - df (DataFrame): The original DataFrame containing the survey data. |
| | - questions_df (DataFrame): A DataFrame containing the question metadata with 'Field name' and 'Answer Type' columns. |
| | |
| | Returns: |
| | - incomplete_records (dict): A dictionary with details of incomplete records in the DataFrame. |
| | """ |
| | incomplete_records = {} |
| | |
| | |
| | no_answer_pattern = re.compile(r".*(no[_-]?answer|unknown).*", re.IGNORECASE) |
| | |
| | |
| | text_columns = questions_df[questions_df['Answer Type'] == 'text']['Field name'].tolist() |
| |
|
| | |
| | for column in df.columns: |
| | |
| | df_column = df[column].dropna() |
| |
|
| | |
| | regex_mask = df_column.astype(str).apply(lambda x: bool(no_answer_pattern.search(x))) |
| | |
| | |
| | missing_value_mask = df_column.isin([-1, -2]) |
| | |
| | |
| | combined_mask = regex_mask | missing_value_mask |
| | filtered_indices = df_column.index[combined_mask] |
| | |
| | |
| | if not filtered_indices.empty: |
| | incomplete_records[column] = [{ |
| | 'index': idx, |
| | 'method': 'completeness check', |
| | 'actual': df.at[idx, column], |
| | 'cleansing_urgency': 'high' |
| | } for idx in filtered_indices] |
| |
|
| | |
| | if column in text_columns: |
| | |
| | def is_numeric(value): |
| | try: |
| | |
| | pd.to_numeric(value) |
| | return True |
| | except ValueError: |
| | return False |
| |
|
| | |
| | is_numeric_mask = df_column.apply(is_numeric) |
| |
|
| | |
| | text_filtered_indices = df_column.index[~is_numeric_mask] |
| | |
| | |
| | for idx in text_filtered_indices: |
| | text = str(df.at[idx, column]).strip() |
| | text_length = len(text) |
| | |
| | |
| | if text_length < 3: |
| | if column not in incomplete_records: |
| | incomplete_records[column] = [] |
| | incomplete_records[column].append({ |
| | 'index': idx, |
| | 'method': 'free-text check (less than 3 characters)', |
| | 'actual': text, |
| | 'cleansing_urgency': 'low' |
| | }) |
| | |
| | elif text_length >= 3 and len(text.split()) < 2: |
| | if column not in incomplete_records: |
| | incomplete_records[column] = [] |
| | incomplete_records[column].append({ |
| | 'index': idx, |
| | 'method': 'free-text check (more than 3 characters but less than two words)', |
| | 'actual': text, |
| | 'cleansing_urgency': 'low' |
| | }) |
| |
|
| | return incomplete_records |
| |
|
| |
|
| | def completeness_check_dict_to_df(completeness_check_dict, data_issues_df): |
| | """ |
| | Transforms a completeness check dictionary into a DataFrame and appends it to an existing DataFrame. |
| | |
| | Parameters: |
| | - completeness_check_dict: A dictionary containing completeness check results. |
| | - data_issues_df: A DataFrame to which the new completeness check results will be appended. |
| | |
| | Returns: |
| | - A combined DataFrame with completeness checks and existing data issues. |
| | """ |
| | |
| | records = [] |
| |
|
| | |
| | for key, entries in completeness_check_dict.items(): |
| | for entry in entries: |
| | records.append({ |
| | '_index': entry['index'], |
| | 'question': key, |
| | 'check': entry['method'], |
| | 'quality_dimension': 'consistency', |
| | 'actual': entry['actual'], |
| | 'predicted': np.nan, |
| | 'cleansing_urgency': entry.get('cleaning_urgency', 'low'), |
| | 'relevant_context_data': None |
| | }) |
| |
|
| | |
| | df = pd.DataFrame(records) |
| |
|
| | |
| | combined_df = pd.concat([df, data_issues_df], ignore_index=True) |
| |
|
| | return combined_df |
| |
|
| |
|
| |
|
| | |
| |
|
| | def merge_nested_dicts(dict1, dict2): |
| | for key, value in dict2.items(): |
| | if key in dict1: |
| | if isinstance(dict1[key], dict) and isinstance(value, dict): |
| | merge_nested_dicts(dict1[key], value) |
| | else: |
| | |
| | |
| | |
| | if not isinstance(dict1[key], list): |
| | dict1[key] = [dict1[key]] |
| | if not isinstance(value, list): |
| | value = [value] |
| | dict1[key].extend(value) |
| | else: |
| | dict1[key] = value |
| | return dict1 |
| |
|
| |
|
| | |
| |
|
| | def calculate_consistency_scores(raw_data, column_strategy_df, data_issues_df): |
| | |
| | preferred_columns = column_strategy_df[ |
| | column_strategy_df['Prefered method'].notna() & |
| | (column_strategy_df['Prefered method'] != "integrity_score")]['Field name'] |
| | selected_columns = raw_data[preferred_columns] |
| |
|
| | |
| | non_empty_counts = selected_columns.notna().sum() |
| |
|
| | |
| | unique_index_counts = data_issues_df.groupby('question')['_index'].nunique() |
| |
|
| | |
| | consistency_scores = {} |
| | for question in preferred_columns: |
| | num_non_empty = non_empty_counts[question] if question in non_empty_counts else 0 |
| | num_issues = unique_index_counts.get(question, 0) |
| | |
| | if num_non_empty > 0: |
| | consistency_score = max(1 - (num_issues / num_non_empty),0) |
| | else: |
| | consistency_score = None |
| |
|
| | consistency_scores[question] = { |
| | 'number_of_non_empty_responses': num_non_empty, |
| | 'number_of_index_with_issues': num_issues, |
| | 'consistency_score': consistency_score |
| | } |
| |
|
| | |
| | results_df = pd.DataFrame.from_dict(consistency_scores, orient='index').reset_index() |
| | results_df.rename(columns={'index': 'question'}, inplace=True) |
| |
|
| | return results_df |
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| | import math |
| | import scipy.stats as stats |
| | from scipy.stats import norm |
| |
|
| | def calculate_representativity_scores(N, sample_size, confidence_level=0.90, e=5, p_hat=0.5, overall_score=1): |
| | """ |
| | This function calculates the representativity scores of a survey before and after data cleansing. |
| | It computes the required sample size, confidence level, and necessary additional samples to meet target levels. |
| | |
| | Parameters: |
| | - N (int): Total population size. |
| | - sample_size (int): Sample size used in the study. |
| | - confidence_level (float): Desired confidence level (default 0.90). |
| | - e (float): Margin of error as a percentage (default 5). |
| | - p_hat (float): Estimated proportion of the attribute present in the population (default 0.5). |
| | - overall_score (float): Overall score used to estimate clean data (default 100). |
| | |
| | Returns: |
| | - dict: A dictionary with calculated representativity scores and required additional samples. |
| | """ |
| | |
| | def calculate_confidence_level(N, sample_size, e=5): |
| | """ |
| | Calculate the confidence level for a given population size, sample size, and margin of error. |
| | """ |
| | e_decimal = e / 100 |
| |
|
| | |
| | finite_population_correction = math.sqrt((N - sample_size) / (N - 1)) |
| | |
| | |
| | standard_error = finite_population_correction * math.sqrt(p_hat * (1 - p_hat) / sample_size) |
| | |
| | |
| | z_score = e_decimal / standard_error |
| | |
| | |
| | confidence_level = stats.norm.cdf(z_score) - stats.norm.cdf(-z_score) |
| | |
| | return confidence_level * 100 |
| |
|
| | def z_value_from_confidence_level(confidence_level): |
| | """ |
| | Calculate the z-value corresponding to a given confidence level. |
| | """ |
| | alpha = 1 - confidence_level |
| | tail_probability = alpha / 2 |
| | z_value = norm.ppf(1 - tail_probability) |
| | return z_value |
| |
|
| | def calculate_sample_size(N, z_score, e, p_hat): |
| | """ |
| | Calculate the required sample size. |
| | """ |
| | e_decimal = e / 100 |
| | n_unlimited = (z_score**2 * p_hat * (1 - p_hat)) / (e_decimal**2) |
| | n_finite = n_unlimited / (1 + ((n_unlimited - 1) / N)) |
| | return math.ceil(n_finite) |
| |
|
| | |
| | z_score = z_value_from_confidence_level(confidence_level) |
| | target_sample_size = calculate_sample_size(N, z_score, e, p_hat) |
| | confidence_target = calculate_confidence_level(N, target_sample_size) |
| | |
| | |
| | confidence_actual = calculate_confidence_level(N, sample_size) |
| | clean_data_size = max(round(overall_score * sample_size,0),1) |
| | confidence_clean = calculate_confidence_level(N, clean_data_size) |
| | |
| | |
| | representativity_actual = 100 * (confidence_actual / confidence_target) |
| | representativity_clean = 100 * (confidence_clean / confidence_target) |
| | |
| | |
| | additional_samples = target_sample_size - clean_data_size if confidence_target > confidence_actual else 0 |
| |
|
| | |
| | return { |
| | 'target_sample_size': target_sample_size, |
| | 'confidence_target': confidence_target, |
| | 'confidence_actual': confidence_actual, |
| | 'confidence_clean': confidence_clean, |
| | 'representativity_actual': representativity_actual, |
| | 'representativity_clean': representativity_clean, |
| | 'additional_samples': additional_samples |
| | } |
| |
|
| |
|
| |
|
| |
|
| | def calculate_representativity_scores_per_question(consistency_df, N,score_column): |
| | """ |
| | Calculate representativity scores based on the provided consistency DataFrame. |
| | |
| | Parameters: |
| | - consistency_df (pd.DataFrame): DataFrame containing consistency data, including |
| | 'number_of_non_empty_responses' and 'consistency_score'. |
| | - N (int): Total population size. |
| | |
| | Returns: |
| | - pd.DataFrame: A DataFrame with calculated representativity scores. |
| | """ |
| | |
| | results = [] |
| |
|
| | |
| | for i in range(len(consistency_df)): |
| | |
| | sample_size = consistency_df.loc[i, 'number_of_non_empty_responses'] |
| |
|
| | |
| | if sample_size > 0: |
| | |
| | consistency_score_question = consistency_df.loc[i, score_column] |
| | |
| | scores = calculate_representativity_scores(N, sample_size, overall_score=consistency_score_question) |
| | |
| | |
| | results.append({ |
| | 'question': consistency_df.loc[i, 'question'], |
| | 'target_sample_size': scores['target_sample_size'], |
| | 'confidence_target': scores['confidence_target'], |
| | 'confidence_actual': scores['confidence_actual'], |
| | 'confidence_clean': scores['confidence_clean'], |
| | 'representativity_actual': scores['representativity_actual'], |
| | 'representativity_clean': scores['representativity_clean'], |
| | 'additional_samples': scores['additional_samples'] |
| | }) |
| | else: |
| | |
| | results.append({ |
| | 'question': consistency_df.loc[i, 'question'], |
| | 'target_sample_size': 0, |
| | 'confidence_target': 0, |
| | 'confidence_actual': 0, |
| | 'confidence_clean': 0, |
| | 'representativity_actual': 0, |
| | 'representativity_clean': 0, |
| | 'additional_samples': 0 |
| | }) |
| |
|
| | |
| | representativity_scores = pd.DataFrame(results) |
| |
|
| | return representativity_scores |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| | def payment_for_survey_integrity(data, integrity_score_per_respondent): |
| | """ |
| | Calculate the integrity score for the 'payment_for_survey' column and append the result to the integrity_score_per_respondent DataFrame. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - None: The function modifies the integrity_score_per_respondent DataFrame in place. |
| | """ |
| | |
| | integrity_score_per_respondent['payment_for_survey'] = np.nan |
| |
|
| | |
| | if 'payment_for_survey' in data.columns: |
| | |
| | integrity_score_per_respondent['payment_for_survey'] = data['payment_for_survey'].apply( |
| | lambda x: 1 if x == 'yes_no_forced_no' else (0 if x == 'yes_no_forced_yes' else np.nan) |
| | ) |
| | |
| | |
| | filtered_responses = data['payment_for_survey'].value_counts() |
| |
|
| | |
| | total_yes_no = filtered_responses.get(1, 0) + filtered_responses.get(0, 0) |
| | proportion_yes_forced = filtered_responses.get(1, 0) / total_yes_no if total_yes_no > 0 else 0 |
| | |
| | |
| | print(f"Proportion of 'yes_no_forced_yes': {proportion_yes_forced:.4f}") |
| | else: |
| | print("The column 'payment_for_survey' is not present in the dataset.") |
| | integrity_score_per_respondent['payment_for_survey'] = 0 |
| | return integrity_score_per_respondent |
| |
|
| | |
| |
|
| | def respondent_influence_integrity(data, integrity_score_per_respondent): |
| | """ |
| | Calculate the integrity score for the 'respondent_influenced' column and append the result to the integrity_score_per_respondent DataFrame. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - None: The function modifies the integrity_score_per_respondent DataFrame in place. |
| | """ |
| | |
| | integrity_score_per_respondent['respondent_influenced'] = np.nan |
| |
|
| | |
| | if 'respondent_influenced' in data.columns: |
| | |
| | |
| | integrity_score_per_respondent['respondent_influenced'] = data['respondent_influenced'].apply( |
| | lambda x: 1 if x == 'yes_no_forced_no' else (0 if x == 'yes_no_forced_yes' else np.nan) |
| | ) |
| | |
| | |
| | filtered_responses = data['respondent_influenced'].value_counts() |
| |
|
| | |
| | total_yes_no = filtered_responses.get(1, 0) + filtered_responses.get(0, 0) |
| | proportion_no = filtered_responses.get(0, 0) / total_yes_no if total_yes_no > 0 else 0 |
| | |
| | |
| | print(f"Proportion of 'no' responses: {proportion_no:.4f}") |
| | else: |
| | print("The column 'respondent_influenced' is not present in the dataset.") |
| | |
| | return integrity_score_per_respondent |
| |
|
| |
|
| | |
| |
|
| | def response_time_integrity(data, integrity_score_per_respondent,questions_df,column_strategy_df): |
| | """ |
| | Process the survey data to calculate time differences, scoring, and integrate scores into the integrity DataFrame. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data with 'start' and 'end' columns. |
| | - integrity_score_per_respondent (pd.DataFrame): DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - data (pd.DataFrame): The modified survey data DataFrame. |
| | - integrity_score_per_respondent (pd.DataFrame): The modified integrity score DataFrame. |
| | """ |
| | |
| | data['start'] = pd.to_datetime(data['start'], utc=True, errors='coerce') |
| | data['end'] = pd.to_datetime(data['end'], utc=True, errors='coerce') |
| |
|
| | |
| | data['time_diff_seconds'] = (data['end'] - data['start']).dt.total_seconds() |
| | data['time_diff_minutes'] = data['time_diff_seconds']/60 |
| | |
| | |
| | matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] |
| |
|
| | |
| | matching_columns = [col for col in matching_columns if col in data.columns] |
| | |
| | data['num_questions'] = data[matching_columns].notna().sum(axis=1) |
| |
|
| | |
| | data['min_time'] = data['num_questions'].apply(lambda x: 10 * x) |
| | data['max_time'] = data['num_questions'].apply(lambda x: 30 * x) |
| |
|
| | |
| | def calculate_score(row): |
| | time_diff = row['time_diff_seconds'] |
| | min_time = row['min_time'] |
| | max_time = row['max_time'] |
| | |
| | |
| | if pd.isna(row['start']) or pd.isna(row['end']): |
| | return 0 |
| | |
| | if time_diff < min_time or time_diff > max_time: |
| | |
| | return np.exp(-abs(time_diff - ((min_time + max_time) / 2)) / 1000) |
| | else: |
| | return 2 |
| |
|
| | |
| | integrity_score_per_respondent['response_time_integrity'] = data.apply(calculate_score, axis=1) |
| |
|
| | return data, integrity_score_per_respondent |
| |
|
| |
|
| |
|
| | def check_audio_verification(data, integrity_score_per_respondent): |
| | """ |
| | Check for audio verification columns and create a corresponding integrity score. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - pd.DataFrame: The updated integrity_score_per_respondent DataFrame. |
| | """ |
| | |
| | columns_to_check = ['audio_verification_name_self_supervised', 'audio_verification_name', 'audio_verification_name_URL'] |
| |
|
| | |
| | columns_exist = False |
| |
|
| | |
| | integrity_score_per_respondent['audio_verification'] = np.nan |
| |
|
| | |
| | for col in columns_to_check[:-1]: |
| | if col in data.columns: |
| | columns_exist = True |
| | |
| | integrity_score_per_respondent['audio_verification'] = data[col].apply( |
| | lambda x: 1 if pd.notna(x) and x != '' else 0 |
| | ) |
| |
|
| | |
| | if 'audio_verification_name_URL' in data.columns: |
| | |
| | url_present = data['audio_verification_name_URL'].notna() & (data['audio_verification_name_URL'].str.strip() != '') |
| | integrity_score_per_respondent['audio_verification'] = integrity_score_per_respondent['audio_verification'] & url_present.astype(int) |
| |
|
| | |
| | if not columns_exist: |
| | integrity_score_per_respondent['audio_verification'] = np.nan |
| | |
| | return integrity_score_per_respondent |
| |
|
| |
|
| | def questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent): |
| | """ |
| | Check if the column 'questions_which_were_difficult' exists and map the responses to a new 'difficulty_score' column. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'difficulty_score' column. |
| | """ |
| | |
| | if 'questions_which_were_difficult' in data.columns: |
| | |
| | difficulty_mapping = { |
| | 'questions_which_were_difficult_all': 0, |
| | 'questions_which_were_difficult_most': 0.5, |
| | 'questions_which_were_difficult_some': 0.75, |
| | 'questions_which_were_difficult_few': 1.5, |
| | 'questions_which_were_difficult_none': 2 |
| | } |
| | |
| | |
| | integrity_score_per_respondent['questions_which_were_difficult'] = data['questions_which_were_difficult'].map(difficulty_mapping) |
| | else: |
| | integrity_score_per_respondent['questions_which_were_difficult'] = np.nan |
| | print("The column 'questions_which_were_difficult' is not present in the dataset.") |
| | |
| | return integrity_score_per_respondent |
| |
|
| |
|
| | def respondent_suspicious_integrity(data, integrity_score_per_respondent): |
| | """ |
| | Check if the column 'respondent_suspicious' exists and map the responses to a new 'suspicious_score' column. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store integrity scores. |
| | |
| | Returns: |
| | - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'suspicious_score' column. |
| | """ |
| | |
| | if 'respondent_suspicious' in data.columns: |
| | |
| | suspicious_mapping = { |
| | 'respondent_suspicious_at_ease': 2, |
| | 'respondent_suspicious_in_between': 1, |
| | 'respondent_suspicious_suspicious': 0 |
| | } |
| | |
| | |
| | integrity_score_per_respondent['respondent_suspicious'] = data['respondent_suspicious'].map(suspicious_mapping) |
| | else: |
| | print("The column 'respondent_suspicious' is not present in the dataset.") |
| | integrity_score_per_respondent['respondent_suspicious'] = np.nan |
| | |
| | return integrity_score_per_respondent |
| |
|
| |
|
| | def validate_phone_number_all_conditions(num_list): |
| | |
| | cleaned_nums = [''.join(filter(str.isdigit, str(num).strip())) for num in num_list] |
| | num_counts = {num: cleaned_nums.count(num) for num in cleaned_nums} |
| |
|
| | |
| | def all_digits_equal(num): |
| | return len(set(num)) == 1 |
| |
|
| | |
| | results = [ |
| | np.nan if num == '' else |
| | 0 if num_counts[num] > 1 or not (5 <= len(num) <= 15) or all_digits_equal(num) else 1 |
| | for num in cleaned_nums |
| | ] |
| | return results |
| |
|
| |
|
| | def validate_names(data): |
| | """ |
| | This function validates names in the 'enumerator_name' and 'name' columns of the provided DataFrame. |
| | It checks if the names contain exactly two words without numbers, and assigns scores based on validity: |
| | - 1 for valid names |
| | - 0 for invalid names (single word, names with numbers, empty, or NaN) |
| | If both columns are present, the function averages the validation scores. |
| | |
| | :param data: DataFrame containing 'enumerator_name' and/or 'name' columns |
| | :return: DataFrame with a 'name_validation' column indicating the validation results |
| | """ |
| | |
| | |
| | columns_present = [col for col in ['enumerator_name', 'name'] if col in data.columns] |
| | |
| | if not columns_present: |
| | print("Neither 'enumerator_name' nor 'name' column is present in the dataset.") |
| | return data |
| |
|
| | |
| | def validate_name(name): |
| | if pd.isna(name) or name.strip() == '': |
| | return 0 |
| | name_parts = name.split() |
| | |
| | if len(name_parts) == 2 and all(part.isalpha() for part in name_parts): |
| | return 1 |
| | else: |
| | return 0 |
| |
|
| | |
| | data['name_validation'] = 0 |
| | |
| | |
| | if 'enumerator_name' in data.columns: |
| | data['enumerator_name_validation'] = data['enumerator_name'].apply(validate_name) |
| | data['name_validation'] += data['enumerator_name_validation'] |
| | |
| | |
| | if 'name' in data.columns: |
| | data['name_validation_name'] = data['name'].apply(validate_name) |
| | data['name_validation'] += data['name_validation_name'] |
| |
|
| | |
| | if 'enumerator_name' in data.columns and 'name' in data.columns: |
| | data['name_validation'] = data[['enumerator_name_validation', 'name_validation_name']].mean(axis=1) |
| | else: |
| | data['name_validation'] = data['name_validation'] |
| |
|
| | |
| | data = data.drop(columns=['enumerator_name_validation', 'name_validation_name'], errors='ignore') |
| |
|
| | return data[['name_validation']] |
| |
|
| |
|
| | |
| | def assess_uniqueness_and_report_pairs(data, matching_columns, integrity_df): |
| | |
| | data[matching_columns] = data[matching_columns].astype(str) |
| |
|
| | |
| | label_encoders = {col: LabelEncoder().fit(data[col]) for col in matching_columns} |
| | |
| | |
| | encoded_data = data[matching_columns].apply(lambda col: label_encoders[col.name].transform(col)) |
| |
|
| | |
| | similarity_matrix = cosine_similarity(encoded_data) |
| | np.fill_diagonal(similarity_matrix, 0) |
| |
|
| | |
| | threshold = 0.9995 |
| |
|
| | |
| | similar_pairs = [] |
| | index_mapping = data['_index'].tolist() |
| | for i in range(similarity_matrix.shape[0]): |
| | for j in range(i + 1, similarity_matrix.shape[1]): |
| | if similarity_matrix[i, j] > threshold: |
| | similar_pairs.append((index_mapping[i], index_mapping[j])) |
| |
|
| | |
| | duplicate_indices = list(set([idx for pair in similar_pairs for idx in pair])) |
| | data['response_uniqueness'] = 2 |
| | data.loc[data['_index'].isin(duplicate_indices), 'response_uniqueness'] = 0 |
| |
|
| | |
| | integrity_df = integrity_df.merge( |
| | data[['_index', 'response_uniqueness']], on='_index', how='left' |
| | ) |
| |
|
| | |
| | print("Pairs of very similar responses:") |
| | for pair in similar_pairs: |
| | print(f"Similar response pair: Row with _index {pair[0]} and Row with _index {pair[1]}") |
| |
|
| | |
| | flagged_indices = data[data['response_uniqueness'] == 0]['_index'].tolist() |
| | print(f"Flagged responses suspected of being duplicates: {flagged_indices}") |
| |
|
| | return integrity_df, similar_pairs |
| |
|
| |
|
| |
|
| | |
| | model_name = "facebook/bart-large-mnli" |
| | model = AutoModelForSequenceClassification.from_pretrained(model_name) |
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| |
|
| | |
| | classifier = pipeline( |
| | "zero-shot-classification", |
| | model=model, |
| | tokenizer=tokenizer, |
| | device=-1 |
| | ) |
| |
|
| | |
| | tool_en = language_tool_python.LanguageTool('en-US') |
| | tool_fr = language_tool_python.LanguageTool('fr') |
| | tool_de = language_tool_python.LanguageTool('de') |
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| |
|
| |
|
| | |
| | def detect_language(text): |
| | try: |
| | return detect(text) |
| | except: |
| | return 'en' |
| |
|
| |
|
| | |
| | def assess_text(text, column_name, tool): |
| | |
| | if not isinstance(text, str) or text.strip() == '': |
| | return {'score': 0, 'grammar_issues': 'N/A'} |
| |
|
| | |
| | labels = ["positive impact", "negative impact", "good grammar", "poor grammar"] |
| |
|
| | |
| | result = classifier(text, labels, multi_label=True) |
| |
|
| | |
| | positive_score = result['scores'][result['labels'].index('positive impact')] |
| | negative_score = result['scores'][result['labels'].index('negative impact')] |
| | good_grammar_score = result['scores'][result['labels'].index('good grammar')] |
| |
|
| | |
| | impact_threshold = 0.4 |
| | grammar_threshold = 0.4 |
| |
|
| | |
| | impact_score = positive_score if column_name == 'positive_effects_client' else negative_score |
| |
|
| | |
| | if impact_score >= impact_threshold and good_grammar_score >= grammar_threshold: |
| | score = 2 |
| | elif impact_score >= impact_threshold or good_grammar_score >= grammar_threshold: |
| | score = 1 |
| | else: |
| | score = 0 |
| |
|
| | |
| | grammar_issues = len(tool.check(text)) |
| |
|
| | return {'score': score, 'grammar_issues': f"{grammar_issues} issues detected"} |
| |
|
| | |
| | def assess_text_column(data, column_name, integrity_df): |
| | |
| | detected_language = data[column_name].apply(detect_language).mode()[0] |
| | |
| | |
| | tool = {'fr': tool_fr, 'en': tool_en, 'de': tool_de}.get(detected_language, tool_en) |
| |
|
| | |
| | data['assessment'] = data[column_name].apply(lambda x: assess_text(x, column_name, tool)) |
| |
|
| | |
| | integrity_df = integrity_df.copy() |
| |
|
| | |
| | if column_name == 'positive_effects_client': |
| | score_column = 'positive_impact_score' |
| | elif column_name == 'negative_effects_client': |
| | score_column = 'negative_impact_score' |
| | else: |
| | score_column = f'{column_name}_score' |
| |
|
| | integrity_df[score_column] = data['assessment'].apply(lambda x: x['score']) |
| |
|
| | return integrity_df[score_column] |
| |
|
| |
|
| |
|
| | |
| |
|
| | from sklearn.preprocessing import LabelEncoder |
| | from sklearn.metrics.pairwise import cosine_similarity |
| | import pandas as pd |
| | import numpy as np |
| |
|
| | |
| | def assess_enumerator_bias(data, matching_columns): |
| | |
| | if 'enumerator_name' not in data.columns: |
| | print("Column 'enumerator_name' does not exist in the data.") |
| | return data |
| |
|
| | |
| | data[matching_columns] = data[matching_columns].astype(str) |
| |
|
| | |
| | label_encoders = {col: LabelEncoder().fit(data[col]) for col in matching_columns} |
| |
|
| | |
| | encoded_data = data[matching_columns].apply(lambda col: label_encoders[col.name].transform(col)) |
| |
|
| | |
| | similarity_matrix = cosine_similarity(encoded_data) |
| |
|
| | |
| | data['enumerator_bias'] = 2 |
| |
|
| | |
| | enumerator_groups = data.groupby('enumerator_name').groups |
| | enumerator_means = {name: encoded_data.loc[indices].mean(axis=0) for name, indices in enumerator_groups.items()} |
| |
|
| | |
| | overall_mean = encoded_data.mean(axis=0) |
| |
|
| | for enumerator, indices in enumerator_groups.items(): |
| | enumerator_mean = enumerator_means[enumerator] |
| | similarity_to_others = cosine_similarity([enumerator_mean], [overall_mean])[0][0] |
| |
|
| | |
| | bias_threshold = 0.8 |
| |
|
| | |
| | if similarity_to_others < bias_threshold: |
| | data.loc[indices, 'enumerator_bias'] = 0 |
| |
|
| | return data |
| |
|
| |
|
| |
|
| |
|
| | |
| |
|
| | def location_integrity(data, integrity_score_per_respondent): |
| | """ |
| | This function processes the 'location' column in the provided 'data' DataFrame, checks for proximity of coordinates |
| | and updates a new 'location_check' column in 'integrity_score_per_respondent'. |
| | |
| | Parameters: |
| | - data (pd.DataFrame): The DataFrame containing survey data with columns '_index', 'location', and 'interview_setting'. |
| | - integrity_score_per_respondent (pd.DataFrame): The DataFrame to store the location check integrity scores. |
| | |
| | Returns: |
| | - pd.DataFrame: The updated integrity_score_per_respondent DataFrame with the 'location_check' column. |
| | """ |
| | |
| | if 'location' not in data.columns: |
| | integrity_score_per_respondent['location_check'] = 1 |
| | else: |
| | |
| | data[['lat', 'lon']] = data['location'].str.split(' ', n=2, expand=True)[[0, 1]].astype(float) |
| |
|
| | |
| | filtered_data = data.dropna(subset=['lat', 'lon']) |
| |
|
| | |
| | filtered_settings = filtered_data['interview_setting'].values |
| | filtered_coordinates = filtered_data[['lat', 'lon']].values |
| | filtered_indexes = filtered_data['_index'].values |
| |
|
| | |
| | too_close_pairs_df = [] |
| |
|
| | |
| | for i in range(len(filtered_coordinates)): |
| | for j in range(i + 1, len(filtered_coordinates)): |
| | if not ('setting_market' in [filtered_settings[i], filtered_settings[j]] or |
| | 'setting_community_center' in [filtered_settings[i], filtered_settings[j]]): |
| | distance = geodesic(filtered_coordinates[i], filtered_coordinates[j]).meters |
| | if distance < 10: |
| | too_close_pairs_df.append({ |
| | 'Index1': filtered_indexes[i], |
| | 'Index2': filtered_indexes[j], |
| | 'Distance (meters)': distance, |
| | 'Coordinates1': f"{filtered_coordinates[i][0]},{filtered_coordinates[i][1]}", |
| | 'Coordinates2': f"{filtered_coordinates[j][0]},{filtered_coordinates[j][1]}", |
| | 'Setting1': filtered_settings[i], |
| | 'Setting2': filtered_settings[j] |
| | }) |
| |
|
| | |
| | too_close_pairs_df = pd.DataFrame(too_close_pairs_df) |
| |
|
| | if len(too_close_pairs_df) > 1: |
| | |
| | unique_indexes = pd.unique(too_close_pairs_df[['Index1', 'Index2']].values.ravel()) |
| |
|
| | |
| | nan_detected = data['location'].isna() |
| | data['location'] = data['location'].fillna(0) |
| | data['location'] = data['location'].replace('nan', 0) |
| | |
| | |
| | too_close_pairs_df.to_excel('gps_issues.xlsx') |
| |
|
| | else: |
| | unique_indexes = [] |
| |
|
| | |
| | data['_index'] = data.index |
| | data['location_check'] = data.apply( |
| | lambda row: 0 if (pd.isna(row['location']) or row['location'] == '' or row['location'] == 0 or row['_index'] in unique_indexes) else 1, axis=1 |
| | ) |
| |
|
| | |
| | integrity_score_per_respondent['location_check'] = data['location_check'] |
| | |
| | return integrity_score_per_respondent |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| | def integrity_issues_df(integrity_score_values_filtered, data, columns_integrity, data_columns_integrity): |
| | |
| | |
| | valid_columns = [col for col in data_columns_integrity if isinstance(col, list) and all(c in data.columns for c in col)] |
| | valid_columns += [col for col in data_columns_integrity if isinstance(col, str) and col in data.columns] |
| | |
| | |
| | integrity_data = [] |
| |
|
| | |
| | for _, row in integrity_score_values_filtered.iterrows(): |
| | for idx, column_group in enumerate(valid_columns): |
| | |
| | if isinstance(column_group, list): |
| | for column in column_group: |
| | |
| | if column in data.columns: |
| | actual_value = data.loc[data['_index'] == row['_index'], column].values[0] |
| | else: |
| | actual_value = np.nan |
| |
|
| | |
| | issue_data = { |
| | '_index': row['_index'], |
| | 'question': column, |
| | 'check': columns_integrity[idx], |
| | 'quality_dimension': 'integrity', |
| | 'actual': actual_value, |
| | 'predicted': np.nan, |
| | 'cleansing_urgency': row['cleansing_urgency'], |
| | 'relevant_context_data': row[columns_integrity].to_dict() |
| | } |
| | integrity_data.append(issue_data) |
| | else: |
| | |
| | |
| | if column_group in data.columns: |
| | actual_value = data.loc[data['_index'] == row['_index'], column_group].values[0] |
| | else: |
| | actual_value = np.nan |
| |
|
| | |
| | issue_data = { |
| | '_index': row['_index'], |
| | 'question': column_group, |
| | 'check': columns_integrity[idx], |
| | 'quality_dimension': 'integrity', |
| | 'actual': actual_value, |
| | 'predicted': np.nan, |
| | 'cleansing_urgency': row['cleansing_urgency'], |
| | 'relevant_context_data': row[columns_integrity].to_dict() |
| | } |
| | integrity_data.append(issue_data) |
| |
|
| | |
| | integrity_issues_df = pd.DataFrame(integrity_data) |
| | |
| | return integrity_issues_df |
| |
|
| | |
| | def cleansing_integrity(df): |
| | df['cleansing_urgency'] = df['score_ratio'].apply( |
| | lambda x: 'high' if x < 0.3 else ('low' if x < 0.5 else None) |
| | ) |
| | return df |
| |
|
| |
|
| | def calculate_consistency_and_integrity_scores(raw_data, column_strategy_df, final_issues_df): |
| | |
| | |
| | preferred_columns = column_strategy_df[ |
| | column_strategy_df['Prefered method'].notna() & |
| | (column_strategy_df['Prefered method'] != "integrity_score") |
| | ]['Field name'] |
| |
|
| | |
| | preferred_columns = [col for col in preferred_columns if col in raw_data.columns] |
| |
|
| | |
| | selected_columns = raw_data[preferred_columns] |
| |
|
| | |
| | non_empty_counts = selected_columns.notna().sum() |
| |
|
| | |
| | unique_index_counts_consistency = final_issues_df[final_issues_df['quality_dimension'] == 'consistency']\ |
| | .groupby('question')['_index'].nunique() |
| |
|
| | unique_index_counts_integrity = final_issues_df[ |
| | (final_issues_df['quality_dimension'] == 'integrity') & |
| | (final_issues_df['cleansing_urgency'].isin(['high', 'low'])) |
| | ]['_index'].nunique() |
| |
|
| | |
| | high_cleansing_urgency_counts_consistency = final_issues_df[ |
| | (final_issues_df['cleansing_urgency'] == 'high') & |
| | (final_issues_df['quality_dimension'] == 'consistency') |
| | ].groupby('question')['_index'].nunique() |
| |
|
| | low_cleansing_urgency_counts_consistency = final_issues_df[ |
| | (final_issues_df['cleansing_urgency'] == 'low') & |
| | (final_issues_df['quality_dimension'] == 'consistency') |
| | ].groupby('question')['_index'].nunique() |
| |
|
| | high_cleansing_urgency_counts_integrity = final_issues_df[ |
| | (final_issues_df['cleansing_urgency'] == 'high') & |
| | (final_issues_df['quality_dimension'] == 'integrity') |
| | ]['_index'].nunique() |
| |
|
| | low_cleansing_urgency_counts_integrity = final_issues_df[ |
| | (final_issues_df['cleansing_urgency'] == 'low') & |
| | (final_issues_df['quality_dimension'] == 'integrity') |
| | ]['_index'].nunique() |
| |
|
| | |
| | consistency_scores = {} |
| | integrity_scores = {} |
| |
|
| | |
| | for question in preferred_columns: |
| | num_non_empty = non_empty_counts[question] if question in non_empty_counts else 0 |
| | |
| | |
| | num_issues_consistency = unique_index_counts_consistency.get(question,0) |
| | num_high_urgency_consistency = high_cleansing_urgency_counts_consistency.get(question, 0) |
| | num_low_urgency_consistency = low_cleansing_urgency_counts_consistency.get(question, 0) |
| | |
| | if num_non_empty > 0: |
| | consistency_score = 1 - (num_issues_consistency / num_non_empty) |
| | else: |
| | consistency_score = None |
| |
|
| | consistency_scores[question] = { |
| | 'number_of_non_empty_responses': num_non_empty, |
| | 'number_of_index_with_issues': num_issues_consistency, |
| | 'number_of_index_with_high_cleansing_urgency': num_high_urgency_consistency, |
| | 'number_of_index_with_low_cleansing_urgency': num_low_urgency_consistency, |
| | 'consistency_score': consistency_score |
| | } |
| |
|
| | |
| | num_issues_integrity = unique_index_counts_integrity |
| | num_high_urgency_integrity = high_cleansing_urgency_counts_integrity |
| | num_low_urgency_integrity = low_cleansing_urgency_counts_integrity |
| |
|
| | integrity_scores[question] = { |
| | 'number_of_index_with_issues': num_issues_integrity, |
| | 'number_of_index_with_high_cleansing_urgency': num_high_urgency_integrity, |
| | 'number_of_index_with_low_cleansing_urgency': num_low_urgency_integrity, |
| | } |
| |
|
| | |
| | consistency_df = pd.DataFrame.from_dict(consistency_scores, orient='index').reset_index() |
| | consistency_df.rename(columns={'index': 'question'}, inplace=True) |
| |
|
| | integrity_df = pd.DataFrame.from_dict(integrity_scores, orient='index').reset_index() |
| | integrity_df.rename(columns={'index': 'question'}, inplace=True) |
| |
|
| | |
| | final_df = pd.merge(consistency_df, integrity_df, on='question', suffixes=('_consistency', '_integrity')) |
| |
|
| | |
| | return final_df |
| |
|
| |
|
| | |
| | import pandas as pd |
| |
|
| | def evaluate_quota_coverage(raw_data, segmentation_columns, mapping_segmentation_quotas): |
| | """ |
| | Evaluates the extent to which raw_data covers specified quotas. |
| | |
| | Parameters: |
| | raw_data (pd.DataFrame): Input data to evaluate. |
| | segmentation_columns (list): List of columns used for segmentation. |
| | mapping_segmentation_quotas (dict): Dictionary specifying quotas for each segmentation. |
| | |
| | Returns: |
| | pd.DataFrame: DataFrame containing actual quotas, target quotas, |
| | relative coverage, and whether the quota is achieved. |
| | """ |
| | results = [] |
| |
|
| | for segment_column in segmentation_columns: |
| | if segment_column not in mapping_segmentation_quotas: |
| | continue |
| |
|
| | quotas = mapping_segmentation_quotas[segment_column] |
| |
|
| | |
| | total_count = raw_data[segment_column].value_counts(normalize=True).to_dict() |
| |
|
| | for category, target_quota in quotas.items(): |
| | actual_quota = total_count.get(category, 0) |
| | relative_coverage = actual_quota / target_quota if target_quota > 0 else 1 |
| | achieved = 1 if actual_quota >= target_quota or target_quota == 0 else relative_coverage |
| |
|
| | results.append({ |
| | 'Segmentation_Column': segment_column, |
| | 'Segment': category, |
| | 'Target_Quota': target_quota, |
| | 'Actual_Quota': actual_quota, |
| | 'Relative_Coverage': relative_coverage, |
| | 'Achieved': achieved |
| | }) |
| |
|
| | return pd.DataFrame(results) |
| |
|
| |
|
| | def calculate_coverage_scores_by_segment(quota_coverage_df): |
| | """ |
| | Calculate the coverage scores separately for each segmentation column and segment |
| | based on two criteria: |
| | A) Weighted average based on target quota per segment. |
| | B) Simple average for groups with at least 5% target quota per segment. |
| | |
| | Parameters: |
| | quota_coverage_df (pd.DataFrame): DataFrame with the quota coverage data. |
| | |
| | Returns: |
| | pd.DataFrame: DataFrame containing the coverage scores for each segmentation group. |
| | """ |
| | coverage_scores_by_segment = [] |
| |
|
| | |
| | for (segmentation_column, segment), group in quota_coverage_df.groupby(['Segmentation_Column', 'Segment']): |
| | |
| | weighted_avg = (group['Achieved'] * group['Target_Quota']).sum() / group['Target_Quota'].sum() |
| |
|
| | |
| | valid_groups = group[group['Target_Quota'] >= 0.05] |
| | simple_avg = valid_groups['Achieved'].mean() |
| |
|
| | |
| | coverage_scores_by_segment.append({ |
| | 'Segmentation_Column': segmentation_column, |
| | 'Segment': segment, |
| | 'Weighted_Avg_Coverage': weighted_avg, |
| | 'Simple_Avg_Coverage': simple_avg |
| | }) |
| |
|
| | |
| | return pd.DataFrame(coverage_scores_by_segment) |
| |
|
| |
|
| | def calculate_coverage_for_all(raw_data, final_issues_df, segmentation_columns, mapping_segmentation_quotas): |
| | """ |
| | This function calculates the coverage scores for three variants of raw_data: |
| | 1. raw_data (original data) |
| | 2. raw_data_wurgent (excludes rows with urgent integrity issues) |
| | 3. raw_data_wrecm (excludes rows with recommended integrity issues) |
| | |
| | Parameters: |
| | raw_data (pd.DataFrame): Original data to process. |
| | final_issues_df (pd.DataFrame): Data containing integrity issues. |
| | segmentation_columns (list): List of segmentation columns. |
| | mapping_segmentation_quotas (dict): Mapping of segmentation quotas. |
| | |
| | Returns: |
| | pd.DataFrame: Concatenated DataFrame with coverage scores for all variants. |
| | """ |
| | |
| | urgent_integrity_index = final_issues_df[ |
| | (final_issues_df['quality_dimension'] == 'integrity') & |
| | (final_issues_df['cleansing_urgency'] == 'high') |
| | ]['_index'].unique() |
| |
|
| | recommended_integrity_index = final_issues_df[ |
| | ( |
| | (final_issues_df['quality_dimension'] == 'integrity') & |
| | (final_issues_df['cleansing_urgency'] == 'high') |
| | ) | |
| | ( |
| | (final_issues_df['quality_dimension'] == 'integrity') & |
| | (final_issues_df['cleansing_urgency'] == 'low') |
| | ) |
| | ]['_index'].unique() |
| |
|
| | |
| | raw_data_wurgent = raw_data[~raw_data['_index'].isin(urgent_integrity_index)] |
| | raw_data_wrecm = raw_data[~raw_data['_index'].isin(recommended_integrity_index)] |
| |
|
| | |
| | all_coverage_scores = [] |
| | raw_data_list = [raw_data, raw_data_wurgent, raw_data_wrecm] |
| | raw_data_names = ['0', 'A', 'B'] |
| |
|
| | |
| | for raw_data_variant, name in zip(raw_data_list, raw_data_names): |
| | |
| | result_df = evaluate_quota_coverage(raw_data_variant, segmentation_columns, mapping_segmentation_quotas) |
| |
|
| | |
| | coverage_scores = calculate_coverage_scores_by_segment(result_df) |
| |
|
| | |
| | coverage_scores['raw_data_variant'] = name |
| |
|
| | |
| | all_coverage_scores.append(coverage_scores) |
| |
|
| | |
| | final_coverage_scores_df = pd.concat(all_coverage_scores, ignore_index=True) |
| |
|
| | return final_coverage_scores_df |
| |
|
| |
|
| | |
| |
|
| | def consistency_score_report( |
| | raw_data, |
| | indicator_df, |
| | questions_df, |
| | column_strategy_df, |
| | data_all, |
| | theme_list, |
| | ): |
| | |
| | if '_index' not in raw_data.columns: |
| | raw_data['_index'] = range(1, len(raw_data) + 1) |
| |
|
| | |
| | indicator_question_theme_mapping = process_themes_and_questions(indicator_df, theme_list) |
| |
|
| | |
| | exclude_columns = get_missing_columns_without_model(column_strategy_df, data_all) |
| | accurate_columns, acc_levels, pred_actual_tuples = model_process( |
| | data_all=data_all, |
| | column_strategy_df=column_strategy_df, |
| | exclude_columns=exclude_columns |
| | ) |
| | data_issues_df = model_issues_to_data_frame(pred_actual_tuples) |
| |
|
| | |
| | columns_format_check = list( |
| | column_strategy_df[column_strategy_df['Prefered method'].str.contains('format_check', na=False)]['Field name'] |
| | ) |
| | valid_columns = [col for col in columns_format_check if col in data_all.columns] |
| | filtered_data_all = data_all[valid_columns].copy() |
| |
|
| | format_checks(df=filtered_data_all, transformations_df=column_strategy_df) |
| | format_violation_dictionary = create_nan_dict(filtered_data_all, raw_data, column_strategy_df=column_strategy_df) |
| | data_issues_df = format_violations_to_df(format_violation_dictionary, data_issues_df) |
| |
|
| | |
| | outlier_columns = column_strategy_df[ |
| | column_strategy_df['Prefered method'].str.contains('outlier', na=False)]['Field name'] |
| | outlier_columns = outlier_columns[outlier_columns.isin(raw_data.columns)] |
| | outlier_data = raw_data[outlier_columns] |
| |
|
| | dist_outliers_dict = detect_outliers(outlier_data) |
| | data_issues_df = dist_outliers_dict_to_df(dist_outliers_dict, data_issues_df) |
| |
|
| | |
| | completeness_check_dict = completeness_check(raw_data, questions_df) |
| | data_issues_df = completeness_check_dict_to_df(completeness_check_dict, data_issues_df) |
| |
|
| | |
| | full_dict = {} |
| | full_dict = merge_nested_dicts(completeness_check_dict, dist_outliers_dict) |
| | full_dict = merge_nested_dicts(full_dict, pred_actual_tuples) |
| | full_dict = merge_nested_dicts(full_dict, format_violation_dictionary) |
| |
|
| | consistency_df = calculate_consistency_scores(raw_data, column_strategy_df, data_issues_df) |
| | consistency_score = round(consistency_df['consistency_score'].mean(), 4) * 100 |
| |
|
| | |
| | merged_df = consistency_df.merge( |
| | indicator_question_theme_mapping, |
| | how='left', |
| | left_on='question', |
| | right_on='Question(s)' |
| | ) |
| |
|
| | |
| | filtered_df = merged_df[merged_df['ID'].notna()] |
| |
|
| | |
| | |
| | unique_counts = data_issues_df.groupby(['question', 'check'])['_index'].nunique().reset_index() |
| | pivoted_counts = unique_counts.pivot_table( |
| | index=['question'], columns='check', values='_index', aggfunc='sum', fill_value=0 |
| | ).reset_index() |
| | pivoted_counts['total'] = pivoted_counts.iloc[:, 1:].sum(axis=1) |
| | sorted_counts = pivoted_counts.sort_values(by='total', ascending=False).reset_index(drop=True) |
| | table_1_1 = sorted_counts.copy() |
| | columns = ['question', 'total'] + sorted([col for col in table_1_1.columns if col not in ['question', 'total']]) |
| | table_1_1 = table_1_1[columns] |
| | |
| | table_1_2 = data_issues_df.copy() |
| |
|
| | |
| | |
| | question_consistency = consistency_df[['question', 'consistency_score']].rename( |
| | columns={'consistency_score': 'question_consistency_score'} |
| | ) |
| |
|
| | |
| | aggregated_data = filtered_df.groupby('ID').agg({ |
| | 'question': lambda x: ', '.join(x), |
| | 'consistency_score': 'mean' |
| | }).reset_index() |
| |
|
| | |
| | def format_questions_with_scores(questions, consistency_df): |
| | question_scores = [ |
| | f"{question} (Score: {consistency_df.loc[consistency_df['question'] == question, 'consistency_score'].values[0]:.2f})" |
| | for question in questions.split(', ') |
| | if question in consistency_df['question'].values |
| | ] |
| | return ', '.join(question_scores) |
| |
|
| | aggregated_data['questions_score'] = aggregated_data['question'].apply( |
| | lambda x: format_questions_with_scores(x, consistency_df) |
| | ) |
| |
|
| | |
| | aggregated_data.rename(columns={'consistency_score': 'indicator_score'}, inplace=True) |
| | table_1_3 = aggregated_data.copy() |
| | table_1_3 = table_1_3[['ID', 'indicator_score','question', 'questions_score']] |
| | return table_1_1, table_1_2, table_1_3 |
| |
|
| |
|
| |
|
| | |
| |
|
| | def generate_integrity_issues_log(raw_data, table_2_1,column_strategy_df): |
| | """ |
| | Generate a log of integrity issues. |
| | |
| | Parameters: |
| | - raw_data (pd.DataFrame): The original dataset containing all raw survey responses. |
| | - table_2_1 (pd.DataFrame): Table with integrity scores and cleansing urgency. |
| | - mapping_table (pd.DataFrame): Mapping of columns_integrity to data_columns_integrity. |
| | |
| | Returns: |
| | - table_2_4 (pd.DataFrame): DataFrame with detailed integrity issues log. |
| | """ |
| | issue_data_list = [] |
| | |
| | |
| | matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] |
| |
|
| | |
| | matching_columns = [col for col in matching_columns if col in raw_data.columns] |
| |
|
| |
|
| | |
| | columns_integrity = [ |
| | 'payment_for_survey', |
| | 'respondent_influenced', |
| | 'response_time_integrity', |
| | 'audio_verification', |
| | 'questions_which_were_difficult', |
| | 'respondent_suspicious', |
| | 'phone_number_check', |
| | 'response_uniqueness', |
| | 'name_check', |
| | 'impact_feedback_integrity', |
| | 'enumerator_bias', |
| | 'location_check' |
| | ] |
| |
|
| | data_columns_integrity = [ |
| | 'payment_for_survey', |
| | 'respondent_influenced', |
| | ['start', 'end'], |
| | ['audio_verification_name', 'audio_verification_name_self_supervised', 'audio_verification_name_URL'], |
| | 'questions_which_were_difficult', |
| | 'respondent_suspicious', |
| | 'phone_number', |
| | matching_columns, |
| | ['name', 'enumerator_name'], |
| | ['positive_effects_client', 'negative_effects_client'], |
| | 'matching_columns', |
| | 'location' |
| | ] |
| |
|
| |
|
| |
|
| |
|
| | |
| | mapping_table = pd.DataFrame({ |
| | 'columns_integrity': columns_integrity, |
| | 'data_columns_integrity': data_columns_integrity |
| | }) |
| | |
| | |
| | |
| | for _index in table_2_1['_index']: |
| | for _, row in mapping_table.iterrows(): |
| | |
| | integrity_check = row['columns_integrity'] |
| | data_cols = row['data_columns_integrity'] |
| | |
| | |
| | if not isinstance(data_cols, list): |
| | data_cols = [data_cols] |
| | |
| | for question in data_cols: |
| | try: |
| | if isinstance(question, str): |
| | actual_value = ( |
| | raw_data.loc[raw_data['_index'] == _index, question].values[0] |
| | if question in raw_data.columns and not raw_data.loc[raw_data['_index'] == _index, question].empty |
| | else np.nan |
| | ) |
| | predicted_value = ( |
| | table_2_1.loc[table_2_1['_index'] == _index, integrity_check].values[0] |
| | if integrity_check in table_2_1.columns |
| | else np.nan |
| | ) |
| | cleansing_urgency_value = ( |
| | table_2_1.loc[table_2_1['_index'] == _index, 'cleansing_urgency'].values[0] |
| | if 'cleansing_urgency' in table_2_1.columns |
| | else np.nan |
| | ) |
| | issue_data_list.append({ |
| | '_index': _index, |
| | 'question': question, |
| | 'check': integrity_check, |
| | 'quality_dimension': 'integrity', |
| | 'actual': {question: actual_value}, |
| | 'predicted': predicted_value, |
| | 'cleansing_urgency': cleansing_urgency_value, |
| | 'relevant_context_data': np.nan |
| | }) |
| | except Exception as e: |
| | print(f"Error processing index {_index}, question {question}: {e}") |
| | |
| | |
| | table_2_4 = pd.DataFrame(issue_data_list) |
| | return table_2_4 |
| |
|
| |
|
| |
|
| | def integrity_report(raw_data, questions_df, column_strategy_df, survey_type,table_1_2): |
| | """ |
| | Generate data integrity reports (table_2_1 and table_2_2) based on the provided survey data. |
| | |
| | Parameters: |
| | - survey_path (str): Path to the survey data file. |
| | - questions_df (pd.DataFrame): DataFrame containing question details. |
| | - column_strategy_df (pd.DataFrame): DataFrame with column strategies. |
| | - survey_type (str): Type of survey ('Supervised (On Site)', 'Supervised (Telephone)', 'Unsupervised (Online)'). |
| | |
| | Returns: |
| | - table_2_1 (pd.DataFrame): Integrity score table. |
| | - table_2_2 (pd.DataFrame): Detailed data integrity table. |
| | """ |
| |
|
| | |
| | data = raw_data.reset_index(drop=True).copy() |
| | integrity_score_per_respondent = pd.DataFrame() |
| | integrity_score_per_respondent['_index'] = data['_index'] |
| |
|
| | |
| | if 'enumerator_relationship' in data.columns: |
| | integrity_score_per_respondent['enumerator_relationship'] = data['enumerator_relationship'] |
| |
|
| | |
| | integrity_score_per_respondent = payment_for_survey_integrity(data, integrity_score_per_respondent) |
| |
|
| | |
| | integrity_score_per_respondent = respondent_influence_integrity(data, integrity_score_per_respondent) |
| |
|
| | |
| | data, integrity_score_per_respondent = response_time_integrity(data, integrity_score_per_respondent,questions_df,column_strategy_df) |
| |
|
| | |
| | integrity_score_per_respondent = check_audio_verification(data, integrity_score_per_respondent) |
| |
|
| | |
| | integrity_score_per_respondent = questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent) |
| |
|
| | |
| | integrity_score_per_respondent = respondent_suspicious_integrity(data, integrity_score_per_respondent) |
| |
|
| | |
| | if 'phone_number' in data.columns: |
| | integrity_score_per_respondent['phone_number_check'] = validate_phone_number_all_conditions(data['phone_number']) |
| | else: |
| | integrity_score_per_respondent['phone_number_check'] = 0 |
| |
|
| | |
| | if 'name' in data.columns or 'enumerator_name' in data.columns: |
| | integrity_score_per_respondent['name_check'] = validate_names(data) |
| | else: |
| | integrity_score_per_respondent['name_check'] = 0 |
| |
|
| | |
| | if 'positive_effects_client' in data.columns: |
| | pos_score = assess_text_column(data, 'positive_effects_client', integrity_score_per_respondent) |
| | neg_score = assess_text_column(data, 'negative_effects_client', integrity_score_per_respondent) |
| | integrity_score_per_respondent['impact_feedback_integrity'] = pos_score.combine(neg_score, max) |
| | else: |
| | integrity_score_per_respondent['impact_feedback_integrity'] = 0 |
| |
|
| | |
| | matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] |
| | |
| | matching_columns = [col for col in matching_columns if col in data.columns] |
| |
|
| | integrity_score_per_respondent, _ = assess_uniqueness_and_report_pairs(data, matching_columns, integrity_score_per_respondent) |
| | |
| | if len(matching_columns)< 10: |
| | integrity_score_per_respondent['response_uniqueness'] = 1 |
| | |
| | |
| | if 'enumerator_name' in data.columns: |
| | data = assess_enumerator_bias(data, matching_columns) |
| | integrity_score_per_respondent['enumerator_bias'] = data['enumerator_bias'] |
| | else: |
| | integrity_score_per_respondent['enumerator_bias'] = 0 |
| |
|
| | |
| | integrity_score_per_respondent = location_integrity(data, integrity_score_per_respondent) |
| |
|
| | |
| | columns_integrity = [ |
| | 'payment_for_survey', |
| | 'respondent_influenced', |
| | 'response_time_integrity', |
| | 'audio_verification', |
| | 'questions_which_were_difficult', |
| | 'respondent_suspicious', |
| | 'phone_number_check', |
| | 'response_uniqueness', |
| | 'name_check', |
| | 'impact_feedback_integrity', |
| | 'enumerator_bias', |
| | 'location_check' |
| | ] |
| |
|
| | survey_type_mapping = { |
| | 'Supervised (On Site)': {col: 1 for col in columns_integrity}, |
| | 'Supervised (Telephone)': {**{col: 1 for col in columns_integrity}, 'audio_verification': 0, 'location_check': 0}, |
| | 'Unsupervised (Online)': {**{col: 1 for col in columns_integrity}, 'respondent_influenced': 0, 'audio_verification': 0, |
| | 'questions_which_were_difficult': 0, 'respondent_suspicious': 0, 'name': 0, 'location_check': 0} |
| | } |
| |
|
| | data_columns_integrity = [ |
| | 'payment_for_survey', |
| | 'respondent_influenced', |
| | ['start ', 'end'], |
| | ['audio_verification_name','audio_verification_name_self_supervised','audio_verification_name_URL'], |
| | 'questions_which_were_difficult', |
| | 'respondent_suspicious', |
| | 'phone_number', |
| | matching_columns, |
| | ['name', 'enumerator_name'], |
| | ['positive_effects_client','negative_effects_client'], |
| | matching_columns, |
| | 'location' |
| | ] |
| | |
| | |
| | points = [1] * len(columns_integrity) |
| | integrity_score_values = calculate_weighted_aggregate_with_max( |
| | integrity_score_per_respondent, survey_type, columns_integrity, survey_type_mapping, points |
| | ) |
| | integrity_score_values['score_ratio'] = integrity_score_values['weighted_aggregate'] / integrity_score_values['max_possible_score'] |
| | integrity_score_values['score_ratio'] = np.minimum(integrity_score_values['score_ratio'], 1) |
| |
|
| | |
| | integrity_score_values = cleansing_integrity(integrity_score_values) |
| |
|
| | |
| | table_2_1 = integrity_score_values[[ |
| | '_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score', |
| | 'payment_for_survey', 'respondent_influenced', 'response_time_integrity', |
| | 'audio_verification', 'questions_which_were_difficult', 'respondent_suspicious', |
| | 'phone_number_check', 'name_check', 'impact_feedback_integrity','enumerator_bias', |
| | 'location_check' |
| | ]] |
| | table_2_1 = table_2_1.sort_values(by='score_ratio', ascending=True) |
| | |
| | data_columns_integrity = [ |
| | 'payment_for_survey', 'respondent_influenced', ['start', 'end'], |
| | ['audio_verification_name', 'audio_verification_name_self_supervised', 'audio_verification_name_URL'], |
| | 'questions_which_were_difficult', 'respondent_suspicious', 'phone_number', |
| | ['name', 'enumerator_name'], ['positive_effects_client', 'negative_effects_client'], 'location' |
| | ] |
| | |
| | flattened_columns = [] |
| | for col in data_columns_integrity: |
| | if isinstance(col, list): |
| | flattened_columns.extend(col) |
| | else: |
| | flattened_columns.append(col) |
| |
|
| | |
| | unique_columns = list(dict.fromkeys(flattened_columns)) |
| |
|
| | |
| | columns_to_include = ['_index'] |
| |
|
| | |
| | columns_to_include += [col for col in unique_columns if col in data.columns] |
| |
|
| | |
| | for col in unique_columns: |
| | if col not in data.columns: |
| | data[col] = np.nan |
| |
|
| | |
| | table_2_2 = data[columns_to_include] |
| | |
| | |
| | integrity_score_values_filtered = table_2_1[table_2_1['score_ratio']<0.5] |
| | table_2_4 = generate_integrity_issues_log(raw_data, table_2_1,column_strategy_df) |
| | integrity_issues = generate_integrity_issues_log(raw_data, integrity_score_values_filtered,column_strategy_df) |
| | |
| | |
| | |
| | |
| | if integrity_score_values_filtered.shape[0] == 0: |
| | final_issues_df = table_1_2.copy() |
| | else: |
| | final_issues_df = pd.concat([table_1_2, integrity_issues], ignore_index=True) |
| | |
| | report_df = calculate_consistency_and_integrity_scores( raw_data, column_strategy_df, final_issues_df) |
| | |
| | report_df = calculate_consistency_and_integrity_scores( raw_data, column_strategy_df, final_issues_df) |
| | report_df['integrity_score'] = table_2_1['score_ratio'].mean() |
| | report_df['integrity_score_high'] = table_2_1[table_2_1['score_ratio'] >= 0.3]['score_ratio'].mean() |
| | report_df['integrity_score_low'] = table_2_1[table_2_1['score_ratio'] >= 0.5]['score_ratio'].mean() |
| | report_df['consistency_score_high'] = 1- report_df['number_of_index_with_high_cleansing_urgency_consistency']/report_df['number_of_non_empty_responses'] |
| | report_df['number_of_index_with_issues_integrity'] = report_df['number_of_index_with_high_cleansing_urgency_integrity'] + report_df['number_of_index_with_low_cleansing_urgency_integrity'] |
| | report_df['consistency_score_low'] = report_df['consistency_score'] |
| | |
| | report_df['high_score'] = 1 - ( |
| | (report_df['number_of_index_with_high_cleansing_urgency_consistency'] + |
| | report_df['number_of_index_with_high_cleansing_urgency_integrity']) / |
| | report_df['number_of_non_empty_responses'] |
| | ).clip(upper=1) |
| |
|
| | |
| | report_df['low_score'] = 1 - ( |
| | (report_df['number_of_index_with_issues_consistency'] + |
| | report_df['number_of_index_with_issues_integrity']) / |
| | report_df['number_of_non_empty_responses'] |
| | ).clip(lower=0) |
| | |
| | table_2_3 = report_df.copy() |
| | |
| | |
| | required_columns = ['_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score'] |
| | other_columns = [col for col in table_2_1.columns if col not in required_columns] |
| | ordered_columns = required_columns + other_columns |
| |
|
| | |
| | table_2_1 = table_2_1[ordered_columns] |
| | |
| | return table_2_1, table_2_2,table_2_3,table_2_4 |
| |
|
| |
|
| |
|
| | def integrity_report(raw_data, questions_df, column_strategy_df, survey_type, table_1_2): |
| | """ |
| | Generate data integrity reports (table_2_1 and table_2_2) based on the provided survey data. |
| | |
| | Parameters: |
| | - raw_data (pd.DataFrame): DataFrame containing the survey data. |
| | - questions_df (pd.DataFrame): DataFrame containing question details. |
| | - column_strategy_df (pd.DataFrame): DataFrame with column strategies. |
| | - survey_type (str): Type of survey ('Supervised (On Site)', 'Supervised (Telephone)', 'Unsupervised (Online)'). |
| | - table_1_2 (pd.DataFrame): Consistency table. |
| | |
| | Returns: |
| | - table_2_1 (pd.DataFrame): Integrity score table. |
| | - table_2_2 (pd.DataFrame): Detailed data integrity table. |
| | - table_2_3 (pd.DataFrame): Table for representativity scores. |
| | - table_2_4 (pd.DataFrame): Standardized integrity issues report. |
| | """ |
| |
|
| | try: |
| | print("Step 1: Preparing data") |
| | data = raw_data.reset_index(drop=True).copy() |
| | integrity_score_per_respondent = pd.DataFrame() |
| | integrity_score_per_respondent['_index'] = data['_index'] |
| | except Exception as e: |
| | print(f"Error in Step 1: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 2: Enumerator relationship") |
| | if 'enumerator_relationship' in data.columns: |
| | integrity_score_per_respondent['enumerator_relationship'] = data['enumerator_relationship'] |
| | except Exception as e: |
| | print(f"Error in Step 2: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 3: Payment for survey integrity") |
| | integrity_score_per_respondent = payment_for_survey_integrity(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 3: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 4: Respondent influenced integrity") |
| | integrity_score_per_respondent = respondent_influence_integrity(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 4: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 5: Response time integrity") |
| | data, integrity_score_per_respondent = response_time_integrity(data, integrity_score_per_respondent, questions_df, column_strategy_df) |
| | except Exception as e: |
| | print(f"Error in Step 5: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 6: Audio verification integrity") |
| | integrity_score_per_respondent = check_audio_verification(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 6: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 7: Questions which were difficult") |
| | integrity_score_per_respondent = questions_which_which_where_difficult_integrity(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 7: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 8: Respondent suspicious integrity") |
| | integrity_score_per_respondent = respondent_suspicious_integrity(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 8: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 9: Phone number validation") |
| | if 'phone_number' in data.columns: |
| | integrity_score_per_respondent['phone_number_check'] = validate_phone_number_all_conditions(data['phone_number']) |
| | else: |
| | integrity_score_per_respondent['phone_number_check'] = 0 |
| | except Exception as e: |
| | print(f"Error in Step 9: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 10: Name validation") |
| | if 'name' in data.columns or 'enumerator_name' in data.columns: |
| | integrity_score_per_respondent['name_check'] = validate_names(data) |
| | else: |
| | integrity_score_per_respondent['name_check'] = 0 |
| | except Exception as e: |
| | print(f"Error in Step 10: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 11: Positive and negative impact client") |
| | integrity_score_per_respondent['impact_feedback_integrity'] = 2 |
| | |
| | |
| | |
| | |
| | |
| | |
| | except Exception as e: |
| | print(f"Error in Step 11: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 12: Respondent uniqueness") |
| | matching_columns = column_strategy_df[column_strategy_df['Prefered method'].notna()]['Field name'] |
| | matching_columns = [col for col in matching_columns if col in data.columns] |
| | integrity_score_per_respondent, _ = assess_uniqueness_and_report_pairs(data, matching_columns, integrity_score_per_respondent) |
| | if len(matching_columns) < 10: |
| | integrity_score_per_respondent['response_uniqueness'] = 1 |
| | except Exception as e: |
| | print(f"Error in Step 12: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 13: Enumerator bias") |
| | if 'enumerator_name' in data.columns: |
| | data = assess_enumerator_bias(data, matching_columns) |
| | integrity_score_per_respondent['enumerator_bias'] = data['enumerator_bias'] |
| | else: |
| | integrity_score_per_respondent['enumerator_bias'] = 0 |
| | except Exception as e: |
| | print(f"Error in Step 13: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 14: Location integrity") |
| | integrity_score_per_respondent = location_integrity(data, integrity_score_per_respondent) |
| | except Exception as e: |
| | print(f"Error in Step 14: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 15: Calculate integrity scores") |
| | columns_integrity = [ |
| | 'payment_for_survey', 'respondent_influenced', 'response_time_integrity', 'audio_verification', |
| | 'questions_which_were_difficult', 'respondent_suspicious', 'phone_number_check', 'response_uniqueness', |
| | 'name_check', 'impact_feedback_integrity', 'enumerator_bias', 'location_check' |
| | ] |
| | points = [1] * len(columns_integrity) |
| | survey_type_mapping = { |
| | 'Supervised (On Site)': {col: 1 for col in columns_integrity}, |
| | 'Supervised (Telephone)': {**{col: 1 for col in columns_integrity}, 'audio_verification': 0, 'location_check': 0}, |
| | 'Unsupervised (Online)': {**{col: 1 for col in columns_integrity}, 'respondent_influenced': 0, 'audio_verification': 0, |
| | 'questions_which_were_difficult': 0, 'respondent_suspicious': 0, 'name_check': 0, 'location_check': 0} |
| | } |
| | integrity_score_values = calculate_weighted_aggregate_with_max( |
| | integrity_score_per_respondent, survey_type, columns_integrity, survey_type_mapping, points |
| | ) |
| | integrity_score_values['score_ratio'] = integrity_score_values['weighted_aggregate'] / integrity_score_values['max_possible_score'] |
| | integrity_score_values['score_ratio'] = np.minimum(integrity_score_values['score_ratio'], 1) |
| | except Exception as e: |
| | print(f"Error in Step 15: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 16: Add cleansing urgency") |
| | integrity_score_values = cleansing_integrity(integrity_score_values) |
| | except Exception as e: |
| | print(f"Error in Step 16: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 17: Prepare table_2_1 and table_2_2") |
| | table_2_1 = integrity_score_values.sort_values(by='score_ratio', ascending=True) |
| | |
| | |
| | table_2_1['cleansing_urgency'] = table_2_1['score_ratio'].apply( |
| | lambda x: 'high' if x < 0.3 else ('low' if x < 0.5 else None) |
| | ) |
| | |
| | columns_to_include = ['_index'] + [col for col in data.columns if col in matching_columns] |
| | table_2_2 = data[columns_to_include] |
| | except Exception as e: |
| | print(f"Error in Step 17: {e}") |
| | raise |
| |
|
| | try: |
| | print("Step 18: Generate integrity issues log (table_2_4)") |
| | integrity_score_values_filtered = table_2_1[table_2_1['score_ratio'] < 0.5] |
| | table_2_4 = generate_integrity_issues_log(data, table_2_1, column_strategy_df) |
| | integrity_issues_df = generate_integrity_issues_log(data, integrity_score_values_filtered, column_strategy_df) |
| | except Exception as e: |
| | print(f"Error in Step 18: {e}") |
| | raise |
| | try: |
| | print("Step 19: Final issues and report generation") |
| | if table_2_1.shape[0] == 0: |
| | final_issues_df = table_1_2.copy() |
| | else: |
| | final_issues_df = pd.concat([table_1_2, table_2_4], ignore_index=True) |
| | table_2_5 = final_issues_df.copy() |
| | report_df = calculate_consistency_and_integrity_scores(data, column_strategy_df, final_issues_df) |
| | report_df['integrity_score'] = table_2_1['score_ratio'].mean() |
| | report_df['integrity_score_high'] = table_2_1[table_2_1['score_ratio'] >= 0.3]['score_ratio'].mean() |
| | report_df['integrity_score_low'] = table_2_1[table_2_1['score_ratio'] >= 0.5]['score_ratio'].mean() |
| | report_df['consistency_score_high'] = 1 - ( |
| | report_df['number_of_index_with_high_cleansing_urgency_consistency'] / |
| | report_df['number_of_non_empty_responses'] |
| | ) |
| | report_df['consistency_score_low'] = report_df['consistency_score'] |
| |
|
| | |
| | report_df['high_score'] = 1 - ( |
| | (report_df['number_of_index_with_high_cleansing_urgency_consistency'] + |
| | report_df['number_of_index_with_high_cleansing_urgency_integrity']) / |
| | report_df['number_of_non_empty_responses'] |
| | ).clip(upper=1) |
| |
|
| | |
| | report_df['low_score'] = 1 - ( |
| | (report_df['number_of_index_with_issues_consistency'] + |
| | report_df['number_of_index_with_issues_integrity']) / |
| | report_df['number_of_non_empty_responses'] |
| | ).clip(lower=0) |
| |
|
| | table_2_3 = report_df.copy() |
| | except Exception as e: |
| | print(f"Error in Step 19: {e}") |
| | raise |
| |
|
| | |
| | required_columns = ['_index', 'score_ratio', 'cleansing_urgency', 'weighted_aggregate', 'max_possible_score'] |
| | other_columns = [col for col in table_2_1.columns if col not in required_columns] |
| | ordered_columns = required_columns + other_columns |
| |
|
| | |
| | table_2_1 = table_2_1[ordered_columns] |
| | |
| | try: |
| | print("Returning tables: table_2_1, table_2_2, table_2_3, table_2_4,table_2_5") |
| | return table_2_1, table_2_2, table_2_3, table_2_4, table_2_5 |
| | except Exception as e: |
| | print(f"Error during return: {e}") |
| | raise |
| |
|
| |
|
| | def representativity_report(segmentation, raw_data, table_2_4, segmentation_columns, mapping_segmentation_quotas, |
| | table_2_3, N, table_1_3): |
| | |
| | |
| | if segmentation == 'yes': |
| | |
| | table_3_1 = calculate_coverage_for_all(raw_data, table_2_4, segmentation_columns, mapping_segmentation_quotas) |
| | |
| | |
| | table_3_2 = table_3_1.groupby('raw_data_variant')['Weighted_Avg_Coverage'].mean().reset_index() |
| | else: |
| | table_3_1 = pd.DataFrame() |
| | table_3_2 = pd.DataFrame() |
| | |
| | |
| | representativity_scores_df_high = calculate_representativity_scores_per_question(table_2_3, N, score_column='high_score') |
| | representativity_score_actual_0, representativity_score_clean_A = representativity_scores_df_high['confidence_actual'].mean(), representativity_scores_df_high['confidence_clean'].mean() |
| |
|
| | representativity_scores_df_low = calculate_representativity_scores_per_question(table_2_3, N, score_column='low_score') |
| | representativity_score_actual_0_low, representativity_score_clean_B = representativity_scores_df_low['confidence_actual'].mean(), representativity_scores_df_low['confidence_clean'].mean() |
| |
|
| | |
| | if segmentation == 'yes': |
| | overall_representativity_score_0 = (((table_3_2[table_3_2['raw_data_variant']=='0']['Weighted_Avg_Coverage'] + representativity_score_actual_0/100)/2)*100) |
| | overall_representativity_score_A = (((table_3_2[table_3_2['raw_data_variant']=='A']['Weighted_Avg_Coverage'] + representativity_score_clean_A/100)/2)*100) |
| | overall_representativity_score_B = (((table_3_2[table_3_2['raw_data_variant']=='B']['Weighted_Avg_Coverage'] + representativity_score_clean_B/100)/2)*100) |
| | else: |
| | overall_representativity_score_0 = representativity_score_actual_0 |
| | overall_representativity_score_A = representativity_score_clean_A |
| | overall_representativity_score_B = representativity_score_clean_B |
| | |
| | |
| | |
| | consistency_score = table_1_3['indicator_score'].median() |
| |
|
| | |
| | data_quality_report_df = pd.DataFrame() |
| |
|
| | |
| | data_quality_report_df['scenario'] = ['0','A','B'] |
| | data_quality_report_df['consistency_score'] = [consistency_score, table_2_3['consistency_score_low'].median(), 1] |
| | |
| | |
| | if segmentation == 'yes': |
| | data_quality_report_df['overall_representativity_score'] = [ |
| | overall_representativity_score_0[0]/100, |
| | overall_representativity_score_A[1]/100, |
| | overall_representativity_score_B[2]/100 |
| | ] |
| | else: |
| | data_quality_report_df['overall_representativity_score'] = [ |
| | overall_representativity_score_0/100, |
| | overall_representativity_score_A/100, |
| | overall_representativity_score_B/100 |
| | ] |
| | |
| | |
| | data_quality_report_df['integrity_score'] = [ |
| | table_2_3['integrity_score'].median(), |
| | table_2_3['integrity_score_low'].median(), |
| | table_2_3['integrity_score_low'].median() |
| | ] |
| | |
| | |
| | data_quality_report_df['data_quality_score'] = ( |
| | data_quality_report_df['consistency_score'] + |
| | data_quality_report_df['overall_representativity_score'] + |
| | data_quality_report_df['integrity_score'] |
| | ) / 3 |
| |
|
| | |
| | table_3_3 = data_quality_report_df.copy() |
| |
|
| | |
| | table_3_4 = representativity_scores_df_high.copy() |
| | |
| | if segmentation == 'yes': |
| | return table_3_1, table_3_2, table_3_3, table_3_4 |
| | else: |
| | return table_3_3, table_3_4 |
| | |
| |
|
| |
|
| | |
| | def enumerator_urgent_issues_report(raw_data, table_2_5): |
| | |
| | |
| | raw_data['numerator_name_clean'] = raw_data['enumerator_name'].str.strip().str.lower().str.replace(r'\s+', ' ', regex=True) |
| |
|
| | |
| | model = SentenceTransformer('all-MiniLM-L6-v2') |
| |
|
| | |
| | embeddings = model.encode(raw_data['numerator_name_clean'].tolist()) |
| |
|
| | |
| | cosine_sim = cosine_similarity(embeddings) |
| |
|
| | |
| | threshold = 0.85 |
| |
|
| | |
| | name_to_group = {} |
| | group_counter = 0 |
| |
|
| | for i, name in enumerate(raw_data['numerator_name_clean']): |
| | |
| | if name in name_to_group: |
| | continue |
| | |
| | similar_names = np.where(cosine_sim[i] > threshold)[0] |
| | |
| | for idx in similar_names: |
| | name_to_group[raw_data['numerator_name_clean'].iloc[idx]] = group_counter |
| | group_counter += 1 |
| |
|
| | |
| | group_to_name = {} |
| | for name, group_id in name_to_group.items(): |
| | if group_id not in group_to_name: |
| | group_to_name[group_id] = name |
| |
|
| | |
| | raw_data['enumerator_name_corrected'] = raw_data['numerator_name_clean'].map(lambda x: group_to_name[name_to_group[x]]) |
| |
|
| | |
| | raw_data[['enumerator_name', 'enumerator_name_corrected']] |
| |
|
| | |
| | table_2_6 = table_2_5.copy() |
| | table_2_6['consistency_low'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'low' else 0, axis=1) |
| | table_2_6['consistency_high'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'high' else 0, axis=1) |
| | table_2_6['integrity_high'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'high' else 0, axis=1) |
| | table_2_6['integrity_low'] = table_2_5.apply(lambda row: 1 if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'low' else 0, axis=1) |
| | table_2_6['cleansing_urgency_high'] = table_2_5.apply(lambda row: 1 if (row['quality_dimension'] in ['consistency', 'integrity']) and row['cleansing_urgency'] == 'high' else 0, axis=1) |
| | table_2_6['cleansing_urgency_low'] = table_2_5.apply(lambda row: 1 if (row['quality_dimension'] in ['consistency', 'integrity']) and row['cleansing_urgency'] == 'low' else 0, axis=1) |
| |
|
| |
|
| | table_2_6 = table_2_6[['_index','consistency_low', 'consistency_high', 'integrity_high', 'integrity_low', 'cleansing_urgency_high', 'cleansing_urgency_low']] |
| | |
| | |
| | raw_data.reset_index(drop=True, inplace=True) |
| | raw_data['_index'] = raw_data.index + 1 |
| | |
| | |
| | table_2_6_grouped = table_2_6.groupby('_index').agg({ |
| | 'consistency_low': 'max', |
| | 'consistency_high': 'max', |
| | 'integrity_high': 'max', |
| | 'integrity_low': 'max', |
| | 'cleansing_urgency_high': 'max', |
| | 'cleansing_urgency_low': 'max' |
| | }).reset_index() |
| |
|
| | |
| | table_2_6_grouped = table_2_6_grouped.merge(raw_data[['_index','enumerator_name_corrected']], on='_index') |
| |
|
| | |
| | summary = table_2_6_grouped.groupby('enumerator_name_corrected').agg( |
| | high_urgency_issues=('cleansing_urgency_high', 'sum'), |
| | total_indices=('_index', 'count') |
| | ).reset_index() |
| |
|
| | |
| | summary['high_urgency_proportion'] = summary['high_urgency_issues'] / summary['total_indices'] |
| |
|
| | |
| | summary_sorted = summary.sort_values('high_urgency_proportion', ascending=False) |
| |
|
| | |
| | high_urgency_issues = table_2_5.merge(raw_data[['_index','enumerator_name_corrected']], on='_index', how='inner') |
| | high_urgency_issues = high_urgency_issues[['enumerator_name_corrected', '_index', 'quality_dimension', 'cleansing_urgency'] + |
| | [col for col in high_urgency_issues.columns if col not in ['enumerator_name_corrected', '_index', 'quality_dimension', 'cleansing_urgency']]] |
| |
|
| | return summary_sorted, high_urgency_issues |
| |
|
| |
|
| | |
| |
|
| | def data_cleansing(raw_data, table_2_5, customer, theme, site, population_size, interview_type): |
| | |
| | cleansed_data = raw_data.copy() |
| |
|
| | |
| | latest_end_date = raw_data['end'].max() |
| | cleansed_data['date'] = latest_end_date |
| |
|
| | |
| | cleansed_summary = [] |
| |
|
| | |
| | for i, row in table_2_5.iterrows(): |
| | if row['quality_dimension'] == 'consistency' and row['cleansing_urgency'] == 'high': |
| | |
| | question_column = row['question'] |
| |
|
| | |
| | original_value = raw_data.loc[raw_data['_index'] == row['_index'], question_column].iloc[0] |
| |
|
| | |
| | cleansed_data.loc[cleansed_data['_index'] == row['_index'], question_column] = None |
| |
|
| | |
| | cleansed_summary.append({ |
| | 'criteria': 'consistency & high cleansing_urgency', |
| | '_index': row['_index'], |
| | 'affected_columns': [question_column], |
| | 'original_value': original_value, |
| | 'new_value': None |
| | }) |
| |
|
| | |
| | for i, row in table_2_5.iterrows(): |
| | if row['quality_dimension'] == 'integrity' and row['cleansing_urgency'] == 'high': |
| | |
| | original_row = raw_data[raw_data['_index'] == row['_index']] |
| |
|
| | |
| | cleansed_data = cleansed_data[cleansed_data['_index'] != row['_index']] |
| |
|
| | |
| | cleansed_summary.append({ |
| | 'criteria': 'integrity & high cleansing_urgency', |
| | '_index': row['_index'], |
| | 'affected_columns': ['entire row removed'], |
| | 'original_value': original_row.to_dict(orient='records')[0] if not original_row.empty else None, |
| | 'new_value': 'row removed' |
| | }) |
| |
|
| | |
| | readme_data = { |
| | 'parameter': ['customer', 'theme', 'site', 'population size', 'interview type'], |
| | 'value': [customer, theme, site, population_size, interview_type] |
| | } |
| | readme = pd.DataFrame(readme_data) |
| |
|
| | |
| | cleansed_summary_df = pd.DataFrame(cleansed_summary) |
| |
|
| | return raw_data, cleansed_data, cleansed_summary_df, readme |
| | |
| |
|
| |
|
| |
|
| | |
| |
|
| | |
| | import random |
| | import random |
| |
|
| | def consistency_issues_action(table_1_1, table_2_3): |
| | try: |
| | |
| | df = table_1_1.merge(table_2_3, on='question', how='inner') |
| | except Exception as e: |
| | return f"Error during merging tables: {e}" |
| |
|
| | |
| | issues_columns = [ |
| | 'completeness check', |
| | 'dist_outlier_check', |
| | 'free-text check (more than 3 characters but less than two words)', |
| | 'model based outlier' |
| | ] |
| | issue_threshold = 0.20 |
| |
|
| | |
| | intro_statements = [ |
| | "After reviewing the data, we found several questions that require attention due to significant consistency issues.", |
| | "Based on our analysis, the following questions have been flagged due to their consistency concerns.", |
| | "Our analysis identified several questions where consistency issues need to be addressed, as detailed below.", |
| | "The following questions exhibit high levels of inconsistency, and we recommend taking a closer look at them.", |
| | "In reviewing the dataset, we've identified certain questions with notable consistency problems that need your attention.", |
| | "We have analyzed the data and found that several questions may require additional review due to potential consistency issues.", |
| | "Our investigation has revealed some questions where the data shows inconsistencies that should be addressed promptly.", |
| | "Based on recent checks, we identified a set of questions with significant consistency challenges that require action.", |
| | "After a thorough review, we observed that the following questions exhibit consistency problems that need to be addressed.", |
| | "Our data analysis indicates that several questions are affected by consistency issues that need further scrutiny." |
| | ] |
| |
|
| | report = [] |
| | intro = random.choice(intro_statements) |
| | questions_with_issues = [] |
| |
|
| | |
| | for col in issues_columns + ['consistency_score_low', 'total']: |
| | if col not in df.columns: |
| | df[col] = 0 |
| | else: |
| | df[col] = df[col].fillna(0) |
| |
|
| | for _, row in df.iterrows(): |
| | try: |
| | total_issues = sum(row[issues_columns]) |
| | total_responses = row['total'] |
| |
|
| | if total_responses == 0: |
| | continue |
| |
|
| | if row['consistency_score_low'] < 0.80 and total_issues / total_responses > issue_threshold: |
| | question = row['question'] |
| | issues_count = row[issues_columns] |
| | sorted_issues = issues_count.sort_values(ascending=False) |
| |
|
| | max_issue_dimension = sorted_issues.index[0] |
| | max_issue_value = sorted_issues.iloc[0] |
| |
|
| | if len(sorted_issues) > 1 and sorted_issues.iloc[1] >= 5: |
| | second_max_issue_dimension = sorted_issues.index[1] |
| | second_max_issue_value = sorted_issues.iloc[1] |
| | issue_details = ( |
| | f"Question: '{question}' has {total_issues} issues.\n" |
| | f" - The dimension(s) with the most issues: {max_issue_dimension} with {max_issue_value} issues.\n" |
| | f" - The second dimension with issues: {second_max_issue_dimension} with {second_max_issue_value} issues.\n" |
| | ) |
| | else: |
| | issue_details = ( |
| | f"Question: '{question}' has {total_issues} issues.\n" |
| | f" - The dimension with the most issues: {max_issue_dimension} with {max_issue_value} issues.\n" |
| | ) |
| |
|
| | questions_with_issues.append(issue_details) |
| | except Exception as e: |
| | continue |
| |
|
| | explanation_text = ( |
| | "\nThe following dimensions are evaluated for consistency:\n" |
| | "- Completeness check: An answer was expected but not provided.\n" |
| | "- Dist outlier check: A value outside the range of reasonable values.\n" |
| | "- Free-text check (more than 3 characters but less than two words): Ensures minimal content for free-text responses.\n" |
| | "- Model-based outlier: An inconsistent or extreme value compared to typical responses.\n\n" |
| | ) |
| |
|
| | if not questions_with_issues: |
| | report.append("All questions show good consistency health. No immediate actions are required.\n") |
| | else: |
| | report.append(intro) |
| | report.append(explanation_text) |
| | if len(questions_with_issues) == 1: |
| | report.insert(-1, "The following question requires attention:\n") |
| | else: |
| | report.insert(-1, "The following questions require attention:\n") |
| | report.extend(questions_with_issues) |
| |
|
| | report.append("\nFor a detailed view of each question's consistency issues, please refer to the 'Data Consistency Issues Deep Dive' tab.\n") |
| |
|
| | styles = ["\033[1m", "\033[3m", "\033[4m", "\033[7m"] |
| | styled_report = random.choice(styles) + "\n".join(report) + "\033[0m" |
| |
|
| | return styled_report |
| |
|
| |
|
| | import random |
| | import pandas as pd |
| |
|
| | def integrity_issues_action(table_2_1, threshold=0.7): |
| | |
| | column_max_values = { |
| | 'payment_for_survey': 1, |
| | 'respondent_influenced': 1, |
| | 'response_time_integrity': 1, |
| | 'audio_verification': 1, |
| | 'questions_which_were_difficult': 2, |
| | 'respondent_suspicious': 2, |
| | 'phone_number_check': 1, |
| | 'response_uniqueness': 1, |
| | 'name_check': 1, |
| | 'impact_feedback_integrity': 2, |
| | 'enumerator_bias': 2, |
| | 'location_check': 1 |
| | } |
| |
|
| | |
| | urgent_rows = table_2_1[table_2_1['score_ratio'] < threshold] |
| | |
| | if urgent_rows.empty: |
| | |
| | return "All respondents have sufficient integrity to be considered for impact measurement." |
| | |
| | |
| | intro_statements = [ |
| | "After reviewing the data, we found several respondents that require attention due to significant integrity issues.", |
| | "Based on our analysis, the following respondents have been flagged due to integrity concerns.", |
| | "Our analysis identified several respondents where integrity issues need to be addressed, as detailed below.", |
| | "The following respondents exhibit low integrity scores, and we recommend taking a closer look at them.", |
| | "In reviewing the dataset, we've identified certain respondents with notable integrity problems that need your attention.", |
| | "We have analyzed the data and found that several respondents may require additional review due to integrity issues.", |
| | "Our investigation has revealed some respondents where integrity issues should be addressed promptly.", |
| | "Based on recent checks, we identified a set of respondents with significant integrity challenges that require action.", |
| | "After a thorough review, we observed that the following respondents exhibit integrity issues that need to be addressed.", |
| | "Our data analysis indicates that several respondents are affected by integrity issues that need further scrutiny." |
| | ] |
| | |
| | |
| | intro = random.choice(intro_statements) |
| | |
| | |
| | issues_report = [] |
| | |
| | for index, row in urgent_rows.iterrows(): |
| | |
| | section = [f"**Respondent with _index: {row['_index']}**"] |
| |
|
| | |
| | if len(urgent_rows) < 3: |
| | far_from_max_columns = [] |
| | for col in column_max_values: |
| | max_value = column_max_values[col] |
| | score = row[col] |
| | if pd.notna(score) and score != '' and score < max_value: |
| | far_from_max_columns.append(f'{col.replace("_", " ").title()} (score: {score}/{max_value})') |
| | |
| | if far_from_max_columns: |
| | section.append("\n The following checks scored below the maximum value:") |
| | section.extend(far_from_max_columns) |
| |
|
| | |
| | issues_report.append("\n".join(section)) |
| | |
| | |
| | final_report = intro + "\n\n" + "\n\n".join(issues_report) |
| | |
| | |
| | final_report += """ |
| | \n The following checks are evaluated for integrity: |
| | - **Payment for Survey:** Less integrity if the respondent was paid to do it. |
| | - **Respondent Influenced:** Less integrity score if the respondent seemed influenced. |
| | - **Response Time Integrity:** Less integrity if the respondent took too long or too short to respond. |
| | - **Audio Verification:** More integrity if audio verification is in place. |
| | - **Questions Were Difficult:** Less integrity if more questions were hard to respond to. |
| | - **Respondent Suspicious:** Less integrity the more suspicious the respondent is. |
| | - **Phone Number Check:** More integrity if a realistic phone number is provided. |
| | - **Response Uniqueness:** More integrity if the response is truly unique. |
| | - **Name Check:** More integrity if the name is realistic. |
| | - **Impact Feedback Integrity:** More integrity if relevant and well-articulated feedback is provided. |
| | - **Enumerator Bias:** Less integrity if enumerator responses are biased. |
| | - **Location Check:** Less integrity if responses' locations are too close to each other in certain contexts. |
| | |
| | For a detailed view of each respondent's integrity issues, please refer to the 'Integrity Issues Deep Dive' tab. |
| | """ |
| | |
| | return final_report |
| |
|
| |
|
| |
|
| | def representativity_issues_action(segmentation, table_3_1=None, table_3_2=None, table_3_3=None, table_3_4=None): |
| | report = [] |
| |
|
| | |
| | if table_3_1 is not None and not table_3_1.empty: |
| | if 'Weighted_Avg_Coverage' in table_3_1.columns and 'raw_data_variant' in table_3_1.columns: |
| | low_coverage_segments = table_3_1[ |
| | (table_3_1['Weighted_Avg_Coverage'] < 0.75) & (table_3_1['raw_data_variant'] == 'A') |
| | ][['Segmentation_Column', 'Segment', 'Weighted_Avg_Coverage']].drop_duplicates() |
| |
|
| | if not low_coverage_segments.empty: |
| | report.append("After urgent cleansing is applied, the following segments have coverage below 0.75:") |
| | for _, row in low_coverage_segments.iterrows(): |
| | report.append( |
| | f"- {row['Segmentation_Column']} ({row['Segment']}) with coverage {row['Weighted_Avg_Coverage']:.2f}" |
| | ) |
| | else: |
| | report.append("table_3_1 is missing required columns: 'Weighted_Avg_Coverage' or 'raw_data_variant'.") |
| |
|
| | |
| | if table_3_3 is not None and not table_3_3.empty: |
| | scenario_a = table_3_3[table_3_3['scenario'] == 'A'] |
| | if not scenario_a.empty: |
| | overall_score = scenario_a['overall_representativity_score'].iloc[0] |
| | if overall_score < 0.80: |
| | if table_3_4 is not None and not table_3_4.empty and 'representativity_clean' in table_3_4.columns: |
| | low_questions = table_3_4[ |
| | table_3_4['representativity_clean'] < 0.80 |
| | ]['question'].drop_duplicates() |
| | if not low_questions.empty: |
| | report.append("\nAdditionally, the following questions have representativity below 0.80:") |
| | for question in low_questions: |
| | report.append(f"- {question}") |
| | else: |
| | report.append("\nQuestions representativity data is unavailable.") |
| | else: |
| | report.append(f"\nThe overall representativity score after urgent cleansing is {overall_score:.2f}.") |
| | report.append( |
| | "The survey is able to assess the target confidence level of 90% with a margin of error of 5%." |
| | ) |
| | else: |
| | report.append("\nThe data quality report for the urgent cleansing scenario is unavailable.") |
| | |
| | if not report: |
| | report.append("No data available for representativity analysis.") |
| |
|
| | return "\n".join(report) |
| |
|
| |
|
| |
|
| | def enumerator_issue_action(table_4_1): |
| | """ |
| | Analyzes enumerator issues and generates a natural text report. |
| | |
| | Parameters: |
| | table_4_1 (pd.DataFrame): DataFrame containing columns 'enumerator_name_corrected', |
| | 'total_indices', and 'high_urgency_proportion'. |
| | |
| | Returns: |
| | str: A natural text report with recommendations or a message indicating no bias found. |
| | """ |
| | |
| | enumerators_with_issues = table_4_1[table_4_1['total_indices'] > 5] |
| |
|
| | if enumerators_with_issues.empty: |
| | return "No enumerator bias has been found." |
| |
|
| | |
| | average_high_urgency = enumerators_with_issues['high_urgency_proportion'].mean() |
| |
|
| | |
| | problematic_enumerators = enumerators_with_issues[ |
| | enumerators_with_issues['high_urgency_proportion'] > 2 * average_high_urgency |
| | ] |
| |
|
| | if problematic_enumerators.empty: |
| | return "No enumerator bias has been found." |
| |
|
| | |
| | report = [ |
| | "After analyzing the number of urgent issues per enumerator name, we recommend a deep dive into an analysis of the responses provided by the following enumerators:" |
| | ] |
| |
|
| | for _, row in problematic_enumerators.iterrows(): |
| | report.append(f"- {row['enumerator_name_corrected']} (Total Issues: {row['total_indices']}, High Urgency Proportion: {row['high_urgency_proportion']:.2f})") |
| |
|
| | report.append("\nWe recommend going to the tab 'Enumerator Bias Deep Dive' for further investigation.") |
| |
|
| | return "\n".join(report) |
| |
|
| |
|
| | def generate_data_quality_report(segmentation, table_1_1, table_2_1, table_2_3, table_3_1, table_3_2, table_3_3, table_3_4, table_4_1): |
| | |
| | consistency_action = f.consistency_issues_action(table_1_1, table_2_3) |
| | integrity_action = f.integrity_issues_action(table_2_1) |
| | representativity_action = representativity_issues_action(segmentation, table_3_1=None, table_3_2=None, table_3_3=table_3_3, table_3_4=table_3_4) |
| | enumerator_action = enumerator_issue_action(table_4_1) |
| |
|
| | |
| | scenario_a = table_3_3[table_3_3['scenario'] == 'A'].iloc[0] |
| | consistency_score_a = scenario_a['consistency_score'] |
| | representativity_score_a = scenario_a['overall_representativity_score'] |
| | integrity_score_a = scenario_a['integrity_score'] |
| | data_quality_score_a = scenario_a['data_quality_score'] |
| |
|
| | |
| | if data_quality_score_a > 0.85 and all(score > 0.85 for score in [consistency_score_a, representativity_score_a, integrity_score_a]): |
| | quality_summary = ( |
| | "The overall data quality of the dataset is very strong. All dimensions meet the desired thresholds, " |
| | "indicating the data is well-suited for analysis." |
| | ) |
| | elif data_quality_score_a > 0.80: |
| | underperforming = [ |
| | name for score, name in zip( |
| | [consistency_score_a, representativity_score_a, integrity_score_a], |
| | ['Consistency', 'Overall Representativity', 'Integrity'] |
| | ) if score < 0.80 |
| | ] |
| | quality_summary = ( |
| | "The overall data quality score is satisfactory, but the following dimensions require further investigation: " |
| | + ", ".join(underperforming) + ". Please refer to the suggestions below for detailed actions." |
| | ) |
| | else: |
| | quality_summary = ( |
| | "The overall data quality score is below acceptable thresholds. Please take the suggested actions for the dimensions " |
| | "with underperforming scores (< 0.80) to improve data quality." |
| | ) |
| |
|
| | |
| | report = f""" |
| | ### Overall Data Quality Analysis |
| | |
| | After analyzing the data quality score breakdown for the scenario where only urgent cleansing has been applied, the following observations are made: |
| | |
| | - **Consistency Score** : {consistency_score_a:.3f} |
| | - **Overall Representativity Score** : {representativity_score_a:.3f} |
| | - **Integrity Score** : {integrity_score_a:.3f} |
| | - **Overall Data Quality Score** : {data_quality_score_a:.3f} |
| | |
| | #### Summary |
| | {quality_summary} |
| | |
| | --- |
| | |
| | ### Consistency Action Suggestions |
| | {consistency_action} |
| | |
| | --- |
| | |
| | ### Integrity Action Suggestions |
| | {integrity_action} |
| | |
| | --- |
| | |
| | ### Representativity Action Suggestions |
| | {representativity_action} |
| | |
| | --- |
| | |
| | ### Enumerator Action Suggestions |
| | {enumerator_action} |
| | """ |
| | |
| | return report.strip() |
| |
|
| |
|
| |
|
| | if segmentation =='yes': |
| |
|
| | |
| | report = generate_data_quality_report( |
| | segmentation='yes', |
| | table_1_1=table_1_1, |
| | table_2_1=table_2_1, |
| | table_2_3=table_2_3, |
| | table_3_1=table_3_1, |
| | table_3_2=table_3_2, |
| | table_3_3=table_3_3, |
| | table_3_4=table_3_4, |
| | table_4_1=table_4_1 |
| | ) |
| |
|
| | print(report) |
| |
|
| | else: |
| | |
| | report = generate_data_quality_report( |
| | segmentation='no', |
| | table_1_1=table_1_1, |
| | table_2_1=table_2_1, |
| | table_2_3=table_2_3, |
| | table_3_1=None, |
| | table_3_2=None, |
| | table_3_3=table_3_3, |
| | table_3_4=table_3_4, |
| | table_4_1=table_4_1 |
| | ) |
| |
|
| | print(report) |
| |
|
| |
|