Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| import json | |
| import re | |
| from fuzzywuzzy import process | |
| class FMEADataPipeline: | |
| def __init__(self,catalog_profile,fmea,catalog_code,threshold): | |
| self.catalog_profile = catalog_profile | |
| self.fmea = fmea | |
| self.catalog_code = catalog_code | |
| self.threshold = threshold | |
| self.json_cp = None | |
| self.cp = None | |
| self.object_part = None | |
| self.symptom = None | |
| self.damage = None | |
| self.cause = None | |
| self.ERS_filename = None | |
| self.code_group = None | |
| self.fmea_code = {} | |
| self.new_fmea = None | |
| def read_catalog_profile(self): | |
| self.cp = pd.read_json(self.catalog_profile['cp'],orient='split') | |
| self.object_part = pd.read_json(self.catalog_profile['object part'],orient='split') | |
| self.symptom = pd.read_json(self.catalog_profile['symptom'],orient='split') | |
| self.damage = pd.read_json(self.catalog_profile['damage'],orient='split') | |
| self.cause = pd.read_json(self.catalog_profile['cause'],orient='split') | |
| def build_connector(self): | |
| self.code_group = self.cp[self.cp['Catalog profile']==self.catalog_code][['Catalog','Code group']] | |
| self.fmea_code = {'fmea code': ['Component','Failure Mode','Failure Mechanism','Failure Cause'], | |
| 'Catalog':['B-Object Part','D-Symptom','C-Damage','5-Cause']} | |
| self.fmea_code = pd.DataFrame(self.fmea_code) | |
| self.code_group = pd.merge(self.code_group,self.fmea_code,how='left',on='Catalog') | |
| self.fmea['Catalog Profile (SAP)'] = self.catalog_code | |
| def split_columns_rows(self): | |
| columns_to_split = ['Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect'] | |
| separated_rows = [] | |
| # Iterate over each row in the DataFrame | |
| for _, row in self.fmea.iterrows(): | |
| # Process each column to split by various numbering formats (e.g., '1.', '1)', '1-', etc.) | |
| split_values = [] | |
| for col in columns_to_split: | |
| # Split by numbering patterns and strip whitespace | |
| if isinstance(row[col], str) and row[col]: | |
| values = [item.strip() for item in re.split(r'\d+[\)\.-]\s*|[a-zA-Z]+[\)\.-]\s*', row[col]) if item.strip()] | |
| else: | |
| values = [row[col]] # Keep non-string values as is | |
| split_values.append(values) | |
| # Check the maximum length of split values across columns to split | |
| max_length = max(len(values) for values in split_values) | |
| # Ensure all split columns have equal length by repeating the last value if needed | |
| for i, values in enumerate(split_values): | |
| if len(values) < max_length: | |
| split_values[i] = values + [values[-1]] * (max_length - len(values)) | |
| # Create new rows for each split value | |
| for i in range(max_length): | |
| new_row = row.copy() | |
| for col, values in zip(columns_to_split, split_values): | |
| new_row[col] = values[i] | |
| separated_rows.append(new_row) | |
| # rewrite previous fmea | |
| self.fmea = pd.DataFrame(separated_rows) | |
| def bracket_remover(self): | |
| columns = ['Component','Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect'] | |
| for col in columns: | |
| if col in self.fmea.columns: | |
| self.fmea[col] = self.fmea[col].str.replace(r"\s*\(.*?\)", "", regex=True).str.strip() | |
| def column_matcher_21Jan(self): | |
| for code, sap in zip(self.fmea_code['fmea code'], [self.object_part, self.symptom, self.damage, self.cause]): | |
| # Find the matching code group for the current FMEA code | |
| matching_code_group = self.code_group[self.code_group['fmea code'] == code]['Code group'] | |
| if matching_code_group.empty: | |
| continue # Skip if no matching code group is found | |
| # Get the first matching code group value | |
| matching_code_group_value = matching_code_group.values[0] | |
| # Extract the catalog profile and its group for prioritization | |
| catalog_profile = self.code_group[self.code_group['fmea code'] == code]['Catalog'].values[0] | |
| catalog_group = catalog_profile[1] # Second character of the catalog code | |
| # Filter SAP table for each priority level | |
| # 1. Catalog Profile | |
| profile_sap = sap[sap['Code group'] == catalog_profile] | |
| s_profile = profile_sap['Short text'].tolist() | |
| m_profile = self.fmea[code].apply(lambda x: process.extract(x, s_profile, limit=1)) | |
| m2_profile = m_profile.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold])) | |
| # 2. Catalog Group (excluding the catalog profile) | |
| group_sap = sap[(sap['Code group'].str[1] == catalog_group) & (sap['Code group'] != catalog_profile)] | |
| s_group = group_sap['Short text'].tolist() | |
| m_group = self.fmea[code].apply(lambda x: process.extract(x, s_group, limit=1)) | |
| m2_group = m_group.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold])) | |
| # 3. Entire SAP catalog | |
| s_all = sap['Short text'].tolist() | |
| m_all = self.fmea[code].apply(lambda x: process.extract(x, s_all, limit=1)) | |
| m2_all = m_all.apply(lambda x: ''.join([i[0] for i in x if i[1] >= self.threshold])) | |
| # Add "_secondary" flag to m2_all values | |
| m2_all_flagged = m2_all.apply(lambda x: f"{x}_secondary" if x else x) | |
| # Merge prioritized matches: Profile > Group > All | |
| merged_m2 = m2_profile.combine(m2_group, lambda x, y: x if x else y) | |
| merged_m2 = merged_m2.combine(m2_all_flagged, lambda x, y: x if x else y) | |
| # Create mapping dictionaries | |
| mapping_dict_code = sap.set_index('Short text')['Code'].to_dict() | |
| mapping_dict_short_text = sap.set_index('Code')['Short text'].to_dict() | |
| catalog_code_dict = sap.set_index('Short text')['Code group'].to_dict() | |
| # Create the new column name based on catalog | |
| name = catalog_profile | |
| # Apply the mapping for the catalog column | |
| self.fmea[name] = merged_m2.apply( | |
| lambda x: mapping_dict_code.get(x.replace("_secondary", "")) if x else None | |
| ) | |
| # Construct the description column with catalog code source | |
| self.fmea[f"{name}_description"] = merged_m2.apply( | |
| lambda x: ( | |
| f"{x.replace('_secondary', '')} ({catalog_code_dict.get(x.replace('_secondary', ''), 'Unknown')})" | |
| if "_secondary" in x else | |
| mapping_dict_short_text.get(mapping_dict_code.get(x), x) | |
| ) | |
| ) | |
| return self.fmea | |
| def column_arranger(self): | |
| catalog_profile = self.fmea.pop('Catalog Profile (SAP)') | |
| b_object_part = self.fmea.pop('B-Object Part') | |
| d_symptom = self.fmea.pop('D-Symptom') | |
| c_damage = self.fmea.pop('C-Damage') | |
| cause_5 = self.fmea.pop('5-Cause') | |
| b_object_part_desc = self.fmea.pop('B-Object Part_description') | |
| d_symptom_desc = self.fmea.pop('D-Symptom_description') | |
| c_damage_desc = self.fmea.pop('C-Damage_description') | |
| cause_5_desc = self.fmea.pop('5-Cause_description') | |
| self.fmea.insert(1,catalog_profile.name,catalog_profile) | |
| self.fmea.insert(3,b_object_part.name,b_object_part) | |
| self.fmea.insert(4,b_object_part_desc.name,b_object_part_desc) | |
| self.fmea.insert(6,d_symptom.name,d_symptom) | |
| self.fmea.insert(7,d_symptom_desc.name,d_symptom_desc) | |
| self.fmea.insert(9,c_damage.name,c_damage) | |
| self.fmea.insert(10,c_damage_desc.name,c_damage_desc) | |
| self.fmea.insert(12,cause_5.name,cause_5) | |
| self.fmea.insert(13,cause_5_desc.name,cause_5_desc) | |
| print('Completed column arranger function') | |
| self.fmea.to_excel('processed_fmea.xlsx', index=False) | |
| return self.fmea | |
| def process_and_split_excel(self): | |
| new_rows = [] | |
| columns = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)'] | |
| clean_columns = ['Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points | |
| # Regex to remove bullet points or numbering in the clean columns | |
| bullet_pattern = r'^\s*[\da-zA-Z]+[)\.\-•]?\s*' # To clean bullets for specific columns | |
| for _, row in self.fmea.iterrows(): | |
| cell_value = row.loc[columns[0]] | |
| if isinstance(cell_value, str): | |
| # Split on newline characters (\n) | |
| points = [point.strip() for point in cell_value.split('\n') if point.strip()] | |
| for idx, point in enumerate(points): | |
| new_row = row.copy() | |
| for column in columns: | |
| column_value = row[column] | |
| if isinstance(column_value, str): | |
| # Split column by newline and align them | |
| column_points = [p.strip() for p in column_value.split('\n') if p.strip()] | |
| new_value = column_points[idx] if idx < len(column_points) else np.nan | |
| # Clean bullet points for specific columns | |
| if column in clean_columns: | |
| new_value = re.sub(bullet_pattern, '', new_value).strip() if isinstance(new_value, str) else new_value | |
| new_row[column] = new_value | |
| else: | |
| new_row[column] = np.nan if idx > 0 else column_value | |
| new_rows.append(new_row) | |
| else: | |
| # If the value is not a string, add the row without modification | |
| new_rows.append(row) | |
| # Create a new DataFrame with processed rows | |
| self.new_fmea = pd.DataFrame(new_rows) | |
| self.new_fmea.to_excel('processed_fmea.xlsx', index=False) | |
| return self.new_fmea | |
| def process_and_split_excel_2(self): | |
| columns_to_split = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)'] | |
| clean_columns = ['Proposed Task', 'Task Type','Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points | |
| bullet_pattern = r'^\s*(?:\d+[\)\.\-•]\s*|[a-zA-Z]\))' # Regex to clean bullets | |
| separated_rows = [] | |
| for _, row in self.fmea.iterrows(): | |
| split_values = [] | |
| for col in columns_to_split: | |
| if isinstance(row[col], str) and row[col]: | |
| values = [item.strip() for item in row[col].split('\n') if item.strip()] | |
| if col in clean_columns: | |
| values = [re.sub(bullet_pattern, '', v).strip() for v in values] | |
| else: | |
| values = [row[col]] # Keep non-string values as is | |
| split_values.append(values) | |
| # Determine the maximum number of splits across all columns | |
| max_length = max(len(values) for values in split_values) | |
| # Ensure all columns have the same number of values by repeating the last value | |
| for i, values in enumerate(split_values): | |
| if len(values) < max_length: | |
| split_values[i] = values + [values[-1]] * (max_length - len(values)) | |
| # Create new rows for each split value | |
| for i in range(max_length): | |
| new_row = row.copy() | |
| for col, values in zip(columns_to_split, split_values): | |
| new_row[col] = values[i] | |
| separated_rows.append(new_row) | |
| # Create a new DataFrame with processed rows | |
| self.new_fmea = pd.DataFrame(separated_rows) | |
| self.new_fmea.to_excel('processed_fmea.xlsx', index=False) | |
| return self.new_fmea | |