import pandas as pd import numpy as np import json import re from fuzzywuzzy import process class FMEADataPipeline: def __init__(self,catalog_profile,fmea,catalog_code,threshold): self.catalog_profile = catalog_profile self.fmea = fmea self.catalog_code = catalog_code self.threshold = threshold self.json_cp = None self.cp = None self.object_part = None self.symptom = None self.damage = None self.cause = None self.ERS_filename = None self.code_group = None self.fmea_code = {} self.new_fmea = None def read_catalog_profile(self): self.cp = pd.read_json(self.catalog_profile['cp'],orient='split') self.object_part = pd.read_json(self.catalog_profile['object part'],orient='split') self.symptom = pd.read_json(self.catalog_profile['symptom'],orient='split') self.damage = pd.read_json(self.catalog_profile['damage'],orient='split') self.cause = pd.read_json(self.catalog_profile['cause'],orient='split') def build_connector(self): self.code_group = self.cp[self.cp['Catalog profile']==self.catalog_code][['Catalog','Code group']] self.fmea_code = {'fmea code': ['Component','Failure Mode','Failure Mechanism','Failure Cause'], 'Catalog':['B-Object Part','D-Symptom','C-Damage','5-Cause']} self.fmea_code = pd.DataFrame(self.fmea_code) self.code_group = pd.merge(self.code_group,self.fmea_code,how='left',on='Catalog') self.fmea['Catalog Profile (SAP)'] = self.catalog_code def split_columns_rows(self): columns_to_split = ['Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect'] separated_rows = [] # Iterate over each row in the DataFrame for _, row in self.fmea.iterrows(): # Process each column to split by various numbering formats (e.g., '1.', '1)', '1-', etc.) split_values = [] for col in columns_to_split: # Split by numbering patterns and strip whitespace if isinstance(row[col], str) and row[col]: values = [item.strip() for item in re.split(r'\d+[\)\.-]\s*|[a-zA-Z]+[\)\.-]\s*', row[col]) if item.strip()] else: values = [row[col]] # Keep non-string values as is split_values.append(values) # Check the maximum length of split values across columns to split max_length = max(len(values) for values in split_values) # Ensure all split columns have equal length by repeating the last value if needed for i, values in enumerate(split_values): if len(values) < max_length: split_values[i] = values + [values[-1]] * (max_length - len(values)) # Create new rows for each split value for i in range(max_length): new_row = row.copy() for col, values in zip(columns_to_split, split_values): new_row[col] = values[i] separated_rows.append(new_row) # rewrite previous fmea self.fmea = pd.DataFrame(separated_rows) def bracket_remover(self): columns = ['Component','Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect'] for col in columns: if col in self.fmea.columns: self.fmea[col] = self.fmea[col].str.replace(r"\s*\(.*?\)", "", regex=True).str.strip() def column_matcher_21Jan(self): for code, sap in zip(self.fmea_code['fmea code'], [self.object_part, self.symptom, self.damage, self.cause]): # Find the matching code group for the current FMEA code matching_code_group = self.code_group[self.code_group['fmea code'] == code]['Code group'] if matching_code_group.empty: continue # Skip if no matching code group is found # Get the first matching code group value matching_code_group_value = matching_code_group.values[0] # Extract the catalog profile and its group for prioritization catalog_profile = self.code_group[self.code_group['fmea code'] == code]['Catalog'].values[0] catalog_group = catalog_profile[1] # Second character of the catalog code # Filter SAP table for each priority level # 1. Catalog Profile profile_sap = sap[sap['Code group'] == catalog_profile] s_profile = profile_sap['Short text'].tolist() m_profile = self.fmea[code].apply(lambda x: process.extract(x, s_profile, limit=1)) m2_profile = m_profile.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold])) # 2. Catalog Group (excluding the catalog profile) group_sap = sap[(sap['Code group'].str[1] == catalog_group) & (sap['Code group'] != catalog_profile)] s_group = group_sap['Short text'].tolist() m_group = self.fmea[code].apply(lambda x: process.extract(x, s_group, limit=1)) m2_group = m_group.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold])) # 3. Entire SAP catalog s_all = sap['Short text'].tolist() m_all = self.fmea[code].apply(lambda x: process.extract(x, s_all, limit=1)) m2_all = m_all.apply(lambda x: ''.join([i[0] for i in x if i[1] >= self.threshold])) # Add "_secondary" flag to m2_all values m2_all_flagged = m2_all.apply(lambda x: f"{x}_secondary" if x else x) # Merge prioritized matches: Profile > Group > All merged_m2 = m2_profile.combine(m2_group, lambda x, y: x if x else y) merged_m2 = merged_m2.combine(m2_all_flagged, lambda x, y: x if x else y) # Create mapping dictionaries mapping_dict_code = sap.set_index('Short text')['Code'].to_dict() mapping_dict_short_text = sap.set_index('Code')['Short text'].to_dict() catalog_code_dict = sap.set_index('Short text')['Code group'].to_dict() # Create the new column name based on catalog name = catalog_profile # Apply the mapping for the catalog column self.fmea[name] = merged_m2.apply( lambda x: mapping_dict_code.get(x.replace("_secondary", "")) if x else None ) # Construct the description column with catalog code source self.fmea[f"{name}_description"] = merged_m2.apply( lambda x: ( f"{x.replace('_secondary', '')} ({catalog_code_dict.get(x.replace('_secondary', ''), 'Unknown')})" if "_secondary" in x else mapping_dict_short_text.get(mapping_dict_code.get(x), x) ) ) return self.fmea def column_arranger(self): catalog_profile = self.fmea.pop('Catalog Profile (SAP)') b_object_part = self.fmea.pop('B-Object Part') d_symptom = self.fmea.pop('D-Symptom') c_damage = self.fmea.pop('C-Damage') cause_5 = self.fmea.pop('5-Cause') b_object_part_desc = self.fmea.pop('B-Object Part_description') d_symptom_desc = self.fmea.pop('D-Symptom_description') c_damage_desc = self.fmea.pop('C-Damage_description') cause_5_desc = self.fmea.pop('5-Cause_description') self.fmea.insert(1,catalog_profile.name,catalog_profile) self.fmea.insert(3,b_object_part.name,b_object_part) self.fmea.insert(4,b_object_part_desc.name,b_object_part_desc) self.fmea.insert(6,d_symptom.name,d_symptom) self.fmea.insert(7,d_symptom_desc.name,d_symptom_desc) self.fmea.insert(9,c_damage.name,c_damage) self.fmea.insert(10,c_damage_desc.name,c_damage_desc) self.fmea.insert(12,cause_5.name,cause_5) self.fmea.insert(13,cause_5_desc.name,cause_5_desc) print('Completed column arranger function') self.fmea.to_excel('processed_fmea.xlsx', index=False) return self.fmea def process_and_split_excel(self): new_rows = [] columns = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)'] clean_columns = ['Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points # Regex to remove bullet points or numbering in the clean columns bullet_pattern = r'^\s*[\da-zA-Z]+[)\.\-•]?\s*' # To clean bullets for specific columns for _, row in self.fmea.iterrows(): cell_value = row.loc[columns[0]] if isinstance(cell_value, str): # Split on newline characters (\n) points = [point.strip() for point in cell_value.split('\n') if point.strip()] for idx, point in enumerate(points): new_row = row.copy() for column in columns: column_value = row[column] if isinstance(column_value, str): # Split column by newline and align them column_points = [p.strip() for p in column_value.split('\n') if p.strip()] new_value = column_points[idx] if idx < len(column_points) else np.nan # Clean bullet points for specific columns if column in clean_columns: new_value = re.sub(bullet_pattern, '', new_value).strip() if isinstance(new_value, str) else new_value new_row[column] = new_value else: new_row[column] = np.nan if idx > 0 else column_value new_rows.append(new_row) else: # If the value is not a string, add the row without modification new_rows.append(row) # Create a new DataFrame with processed rows self.new_fmea = pd.DataFrame(new_rows) self.new_fmea.to_excel('processed_fmea.xlsx', index=False) return self.new_fmea def process_and_split_excel_2(self): columns_to_split = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)'] clean_columns = ['Proposed Task', 'Task Type','Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points bullet_pattern = r'^\s*(?:\d+[\)\.\-•]\s*|[a-zA-Z]\))' # Regex to clean bullets separated_rows = [] for _, row in self.fmea.iterrows(): split_values = [] for col in columns_to_split: if isinstance(row[col], str) and row[col]: values = [item.strip() for item in row[col].split('\n') if item.strip()] if col in clean_columns: values = [re.sub(bullet_pattern, '', v).strip() for v in values] else: values = [row[col]] # Keep non-string values as is split_values.append(values) # Determine the maximum number of splits across all columns max_length = max(len(values) for values in split_values) # Ensure all columns have the same number of values by repeating the last value for i, values in enumerate(split_values): if len(values) < max_length: split_values[i] = values + [values[-1]] * (max_length - len(values)) # Create new rows for each split value for i in range(max_length): new_row = row.copy() for col, values in zip(columns_to_split, split_values): new_row[col] = values[i] separated_rows.append(new_row) # Create a new DataFrame with processed rows self.new_fmea = pd.DataFrame(separated_rows) self.new_fmea.to_excel('processed_fmea.xlsx', index=False) return self.new_fmea