Golden_ERS / HF_processor.py
Copopopopo's picture
Update HF_processor.py
af1260e verified
import pandas as pd
import numpy as np
import json
import re
from fuzzywuzzy import process
class FMEADataPipeline:
def __init__(self,catalog_profile,fmea,catalog_code,threshold):
self.catalog_profile = catalog_profile
self.fmea = fmea
self.catalog_code = catalog_code
self.threshold = threshold
self.json_cp = None
self.cp = None
self.object_part = None
self.symptom = None
self.damage = None
self.cause = None
self.ERS_filename = None
self.code_group = None
self.fmea_code = {}
self.new_fmea = None
def read_catalog_profile(self):
self.cp = pd.read_json(self.catalog_profile['cp'],orient='split')
self.object_part = pd.read_json(self.catalog_profile['object part'],orient='split')
self.symptom = pd.read_json(self.catalog_profile['symptom'],orient='split')
self.damage = pd.read_json(self.catalog_profile['damage'],orient='split')
self.cause = pd.read_json(self.catalog_profile['cause'],orient='split')
def build_connector(self):
self.code_group = self.cp[self.cp['Catalog profile']==self.catalog_code][['Catalog','Code group']]
self.fmea_code = {'fmea code': ['Component','Failure Mode','Failure Mechanism','Failure Cause'],
'Catalog':['B-Object Part','D-Symptom','C-Damage','5-Cause']}
self.fmea_code = pd.DataFrame(self.fmea_code)
self.code_group = pd.merge(self.code_group,self.fmea_code,how='left',on='Catalog')
self.fmea['Catalog Profile (SAP)'] = self.catalog_code
def split_columns_rows(self):
columns_to_split = ['Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect']
separated_rows = []
# Iterate over each row in the DataFrame
for _, row in self.fmea.iterrows():
# Process each column to split by various numbering formats (e.g., '1.', '1)', '1-', etc.)
split_values = []
for col in columns_to_split:
# Split by numbering patterns and strip whitespace
if isinstance(row[col], str) and row[col]:
values = [item.strip() for item in re.split(r'\d+[\)\.-]\s*|[a-zA-Z]+[\)\.-]\s*', row[col]) if item.strip()]
else:
values = [row[col]] # Keep non-string values as is
split_values.append(values)
# Check the maximum length of split values across columns to split
max_length = max(len(values) for values in split_values)
# Ensure all split columns have equal length by repeating the last value if needed
for i, values in enumerate(split_values):
if len(values) < max_length:
split_values[i] = values + [values[-1]] * (max_length - len(values))
# Create new rows for each split value
for i in range(max_length):
new_row = row.copy()
for col, values in zip(columns_to_split, split_values):
new_row[col] = values[i]
separated_rows.append(new_row)
# rewrite previous fmea
self.fmea = pd.DataFrame(separated_rows)
def bracket_remover(self):
columns = ['Component','Failure Mode', 'Failure Mechanism', 'Failure Cause','Failure Effect']
for col in columns:
if col in self.fmea.columns:
self.fmea[col] = self.fmea[col].str.replace(r"\s*\(.*?\)", "", regex=True).str.strip()
def column_matcher_21Jan(self):
for code, sap in zip(self.fmea_code['fmea code'], [self.object_part, self.symptom, self.damage, self.cause]):
# Find the matching code group for the current FMEA code
matching_code_group = self.code_group[self.code_group['fmea code'] == code]['Code group']
if matching_code_group.empty:
continue # Skip if no matching code group is found
# Get the first matching code group value
matching_code_group_value = matching_code_group.values[0]
# Extract the catalog profile and its group for prioritization
catalog_profile = self.code_group[self.code_group['fmea code'] == code]['Catalog'].values[0]
catalog_group = catalog_profile[1] # Second character of the catalog code
# Filter SAP table for each priority level
# 1. Catalog Profile
profile_sap = sap[sap['Code group'] == catalog_profile]
s_profile = profile_sap['Short text'].tolist()
m_profile = self.fmea[code].apply(lambda x: process.extract(x, s_profile, limit=1))
m2_profile = m_profile.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold]))
# 2. Catalog Group (excluding the catalog profile)
group_sap = sap[(sap['Code group'].str[1] == catalog_group) & (sap['Code group'] != catalog_profile)]
s_group = group_sap['Short text'].tolist()
m_group = self.fmea[code].apply(lambda x: process.extract(x, s_group, limit=1))
m2_group = m_group.apply(lambda x: ', '.join([i[0] for i in x if i[1] >= self.threshold]))
# 3. Entire SAP catalog
s_all = sap['Short text'].tolist()
m_all = self.fmea[code].apply(lambda x: process.extract(x, s_all, limit=1))
m2_all = m_all.apply(lambda x: ''.join([i[0] for i in x if i[1] >= self.threshold]))
# Add "_secondary" flag to m2_all values
m2_all_flagged = m2_all.apply(lambda x: f"{x}_secondary" if x else x)
# Merge prioritized matches: Profile > Group > All
merged_m2 = m2_profile.combine(m2_group, lambda x, y: x if x else y)
merged_m2 = merged_m2.combine(m2_all_flagged, lambda x, y: x if x else y)
# Create mapping dictionaries
mapping_dict_code = sap.set_index('Short text')['Code'].to_dict()
mapping_dict_short_text = sap.set_index('Code')['Short text'].to_dict()
catalog_code_dict = sap.set_index('Short text')['Code group'].to_dict()
# Create the new column name based on catalog
name = catalog_profile
# Apply the mapping for the catalog column
self.fmea[name] = merged_m2.apply(
lambda x: mapping_dict_code.get(x.replace("_secondary", "")) if x else None
)
# Construct the description column with catalog code source
self.fmea[f"{name}_description"] = merged_m2.apply(
lambda x: (
f"{x.replace('_secondary', '')} ({catalog_code_dict.get(x.replace('_secondary', ''), 'Unknown')})"
if "_secondary" in x else
mapping_dict_short_text.get(mapping_dict_code.get(x), x)
)
)
return self.fmea
def column_arranger(self):
catalog_profile = self.fmea.pop('Catalog Profile (SAP)')
b_object_part = self.fmea.pop('B-Object Part')
d_symptom = self.fmea.pop('D-Symptom')
c_damage = self.fmea.pop('C-Damage')
cause_5 = self.fmea.pop('5-Cause')
b_object_part_desc = self.fmea.pop('B-Object Part_description')
d_symptom_desc = self.fmea.pop('D-Symptom_description')
c_damage_desc = self.fmea.pop('C-Damage_description')
cause_5_desc = self.fmea.pop('5-Cause_description')
self.fmea.insert(1,catalog_profile.name,catalog_profile)
self.fmea.insert(3,b_object_part.name,b_object_part)
self.fmea.insert(4,b_object_part_desc.name,b_object_part_desc)
self.fmea.insert(6,d_symptom.name,d_symptom)
self.fmea.insert(7,d_symptom_desc.name,d_symptom_desc)
self.fmea.insert(9,c_damage.name,c_damage)
self.fmea.insert(10,c_damage_desc.name,c_damage_desc)
self.fmea.insert(12,cause_5.name,cause_5)
self.fmea.insert(13,cause_5_desc.name,cause_5_desc)
print('Completed column arranger function')
self.fmea.to_excel('processed_fmea.xlsx', index=False)
return self.fmea
def process_and_split_excel(self):
new_rows = []
columns = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)']
clean_columns = ['Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points
# Regex to remove bullet points or numbering in the clean columns
bullet_pattern = r'^\s*[\da-zA-Z]+[)\.\-•]?\s*' # To clean bullets for specific columns
for _, row in self.fmea.iterrows():
cell_value = row.loc[columns[0]]
if isinstance(cell_value, str):
# Split on newline characters (\n)
points = [point.strip() for point in cell_value.split('\n') if point.strip()]
for idx, point in enumerate(points):
new_row = row.copy()
for column in columns:
column_value = row[column]
if isinstance(column_value, str):
# Split column by newline and align them
column_points = [p.strip() for p in column_value.split('\n') if p.strip()]
new_value = column_points[idx] if idx < len(column_points) else np.nan
# Clean bullet points for specific columns
if column in clean_columns:
new_value = re.sub(bullet_pattern, '', new_value).strip() if isinstance(new_value, str) else new_value
new_row[column] = new_value
else:
new_row[column] = np.nan if idx > 0 else column_value
new_rows.append(new_row)
else:
# If the value is not a string, add the row without modification
new_rows.append(row)
# Create a new DataFrame with processed rows
self.new_fmea = pd.DataFrame(new_rows)
self.new_fmea.to_excel('processed_fmea.xlsx', index=False)
return self.new_fmea
def process_and_split_excel_2(self):
columns_to_split = ['Proposed Task', 'Task Type', 'Frequency', 'Action Party', 'TA (Y/N)']
clean_columns = ['Proposed Task', 'Task Type','Frequency', 'Action Party', 'TA (Y/N)'] # Columns to clean bullet points
bullet_pattern = r'^\s*(?:\d+[\)\.\-•]\s*|[a-zA-Z]\))' # Regex to clean bullets
separated_rows = []
for _, row in self.fmea.iterrows():
split_values = []
for col in columns_to_split:
if isinstance(row[col], str) and row[col]:
values = [item.strip() for item in row[col].split('\n') if item.strip()]
if col in clean_columns:
values = [re.sub(bullet_pattern, '', v).strip() for v in values]
else:
values = [row[col]] # Keep non-string values as is
split_values.append(values)
# Determine the maximum number of splits across all columns
max_length = max(len(values) for values in split_values)
# Ensure all columns have the same number of values by repeating the last value
for i, values in enumerate(split_values):
if len(values) < max_length:
split_values[i] = values + [values[-1]] * (max_length - len(values))
# Create new rows for each split value
for i in range(max_length):
new_row = row.copy()
for col, values in zip(columns_to_split, split_values):
new_row[col] = values[i]
separated_rows.append(new_row)
# Create a new DataFrame with processed rows
self.new_fmea = pd.DataFrame(separated_rows)
self.new_fmea.to_excel('processed_fmea.xlsx', index=False)
return self.new_fmea