kmi_dashboard / eda_functions.py
delima1234-Sunbright
KMI Dashboard
5e0490f
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class SprayDryerEDAPipeline:
"""
Pipeline untuk Exploratory Data Analysis (EDA) dan preprocessing data spray dryer
"""
def __init__(self, data_path=None, dataframe=None):
"""
Inisialisasi pipeline
Parameters:
-----------
data_path : str, optional
Path ke file data (CSV, Excel, dll)
dataframe : pd.DataFrame, optional
DataFrame yang sudah dimuat
"""
if dataframe is not None:
self.df_original = dataframe.copy()
elif data_path:
self.df_original = self.load_data(data_path)
else:
raise ValueError("Harus memberikan data_path atau dataframe")
self.df = self.df_original.copy()
self.product_dataframes = {}
self.setup_visualization()
def setup_visualization(self):
"""Setup parameter visualisasi"""
plt.style.use('default')
sns.set_palette("husl")
def load_data(self, path):
"""Load data dari file"""
if path.endswith('.csv'):
return pd.read_csv(path)
elif path.endswith(('.xlsx', '.xls')):
return pd.read_excel(path)
else:
raise ValueError("Format file tidak didukung")
# ============= STEP 1: PEMERIKSAAN KOLOM AWAL =============
def check_and_fix_columns(self):
"""
Step 1: Pemeriksaan dan perbaikan nama kolom
"""
print("="*80)
print("STEP 1: PEMERIKSAAN KOLOM AWAL")
print("="*80)
# Daftar kolom standar
standard_columns = [
'Date_time', 'Drier_On_Product', 'D101330TT', 'D102260TIC_CV',
'D102265TIC_PV', 'D102265TIC_CV', 'D102266TIC', 'D101264FTSCL',
'Product', 'GAS_MMBTU', 'fixed_rounded_time'
]
print(f"Kolom yang ada di dataframe: {list(self.df.columns)}")
print(f"\nKolom standar yang diharapkan: {standard_columns}")
standard_lookup = {col.lower(): col for col in standard_columns}
column_mapping = {}
unmatched_column = []
for actual_col in self.df.columns:
actual_col_lower = actual_col.lower()
if actual_col_lower in standard_lookup:
standard_name = standard_lookup[actual_col_lower]
if actual_col != standard_name:
column_mapping[actual_col] = standard_name
else:
unmatched_column.append(actual_col)
# Rename kolom
self.df.rename(columns=column_mapping, inplace=True)
# Hapus kolom yang tidak ada dalam daftar standar
cols_to_keep = [col for col in self.df.columns if col in standard_columns]
cols_removed = [col for col in self.df.columns if col not in standard_columns]
if cols_removed:
print(f"\nKolom yang dihapus: {cols_removed}")
self.df = self.df[cols_to_keep]
# Cek apakah fixed_rounded_time ada
if 'fixed_rounded_time' not in self.df.columns:
print("\nKolom 'fixed_rounded_time' tidak ditemukan. Akan dibuat nanti.")
print(f"\nKolom final: {list(self.df.columns)}")
print(f"Shape dataframe: {self.df.shape}")
# ============= STEP 2: VALIDASI KOLOM PRODUCT =============
def validate_product_names(self):
"""
Step 2: Validasi dan standardisasi nama produk secara otomatis.
"""
print("\n" + "="*80)
print("STEP 2: VALIDASI DAN STANDARDISASI KOLOM PRODUCT")
print("="*80)
# Pastikan kolom 'Product' ada
if 'Product' not in self.df.columns:
print("PERINGATAN: Kolom 'Product' tidak ditemukan. Melewati langkah ini.")
print("="*80 + "\n")
return self.df
# 1. Daftar nama produk standar (sumber kebenaran)
standard_products = [
'CKP BASE', 'CMP BASE', 'BMP BASE', 'MORIGRO BASE', 'CKH BASE',
'CMH BASE', 'BMH BASE', 'CKR BASE', 'CMR BASE', 'BMR BASE',
'CGI BASE', 'NL33 BASE POWDER', 'CKS BASE', 'CHIL SCHOOL',
'CHIL MIL SOYA', 'CIP', 'CIP CHAMBER'
]
# 2. Mapping HANYA untuk kasus-kasus khusus/salah ketik yang tidak bisa ditebak
# Contoh: ada kata 'BASE' ganda, atau singkatan yang tidak standar.
special_product_mapping = {
'CMR BASE BASE': 'CMR BASE',
'CGI 6-12 BASE' : 'CGI BASE',
'CMH BASE': 'CMH BASE',
'BMH BASE': 'BMH BASE'
}
print(f"Produk unik sebelum standardisasi: {self.df['Product'].unique()}")
# 3. Buat kamus pencocokan (lookup map) utama secara otomatis
# Kunci: nama produk dalam format UPPERCASE dan tanpa spasi berlebih
# Nilai: nama produk standar yang benar
# Mulai dengan standard products
product_lookup = {prod.upper().strip(): prod for prod in standard_products}
# Timpa/tambahkan dengan special mapping. Ini memastikan kasus khusus diutamakan.
for key, value in special_product_mapping.items():
product_lookup[key.upper().strip()] = value
# 4. Gunakan metode .map() dari Pandas untuk efisiensi tinggi
# Ini jauh lebih cepat daripada .apply() untuk data besar
# Simpan kolom produk asli untuk perbandingan
original_products = self.df['Product'].copy()
# Buat series baru dengan nilai yang sudah dinormalisasi (uppercase, strip)
normalized_products = self.df['Product'].astype(str).str.upper().str.strip()
# Gunakan .map() untuk mengganti nilai. Nilai yang tidak ada di `product_lookup` akan menjadi NaN
self.df['Product'] = normalized_products.map(product_lookup)
# Isi kembali nilai yang menjadi NaN dengan nilai aslinya.
# Ini memastikan produk yang tidak dikenali tidak akan hilang/diubah.
self.df['Product'].fillna(original_products, inplace=True)
print(f"\nProduk unik setelah standardisasi: {self.df['Product'].unique()}")
print(f"\nJumlah setiap produk:\n{self.df['Product'].value_counts()}")
# 5. (Opsional tapi sangat direkomendasikan) Laporkan produk yang tidak berhasil distandardisasi
final_products_set = set(self.df['Product'].unique())
standard_products_set = set(standard_products)
unstandardized = final_products_set - standard_products_set
# Hapus None atau NaN jika ada dalam hasil
unstandardized = {item for item in unstandardized if pd.notna(item)}
if unstandardized:
print("\n" + "-"*40)
print(f"PERINGATAN: Ditemukan {len(unstandardized)} produk yang tidak sesuai standar:")
for item in unstandardized:
print(f" - '{item}'")
print("Pertimbangkan untuk menambahkannya ke `standard_products` atau `special_product_mapping`.")
print("-"*40)
print("\n" + "="*80)
print("STEP 2 SELESAI")
print("="*80 + "\n")
return self.df
# ============= STEP 3: PEMISAHAN DATA PER PRODUK =============
def separate_data_by_product(self):
"""
Step 3: Pemisahan data berdasarkan produk
"""
print("\n" + "="*80)
print("STEP 3: PEMISAHAN DATA PER PRODUK")
print("="*80)
unique_products = self.df['Product'].unique()
print(f"Memisahkan data untuk {len(unique_products)} produk...")
for product in unique_products:
self.product_dataframes[product] = self.df[self.df['Product'] == product].copy()
print(f"\n{product}: {len(self.product_dataframes[product])} baris")
# Tampilkan statistik deskriptif
print("\n" + "-"*50)
print("STATISTIK DESKRIPTIF - DATA KESELURUHAN")
print("-"*50)
print(self.df.describe())
print("\n" + "-"*50)
print("INFO DATA KESELURUHAN")
print("-"*50)
print(self.df.info())
# Statistik per produk
for product, df_product in self.product_dataframes.items():
print("\n" + "-"*50)
print(f"STATISTIK DESKRIPTIF - {product}")
print("-"*50)
print(df_product.describe())
print(f"\nINFO - {product}")
print(df_product.info())
# ============= STEP 4: IDENTIFIKASI ANOMALI DATA =============
def identify_anomalies(self):
"""
Step 4: Identifikasi anomali berdasarkan aturan teknis
"""
print("\n" + "="*80)
print("STEP 4: IDENTIFIKASI ANOMALI DATA")
print("="*80)
anomaly_rules = {
'D101330TT': {'min': 20, 'max': 130, 'zero_anomaly': True},
'D102265TIC_PV': {'min': 20, 'zero_anomaly': True},
'D102265TIC_CV': {'zero_allowed_products': ['CIP', 'CIP CHAMBER']},
'D102266TIC': {'zero_anomaly': True}
}
anomalies = []
for product, df_product in self.product_dataframes.items():
print(f"\nMemeriksa anomali untuk produk: {product}")
for column, rules in anomaly_rules.items():
if column not in df_product.columns:
continue
# Cek nilai 0
if 'zero_anomaly' in rules and rules['zero_anomaly']:
zero_count = (df_product[column] == 0).sum()
if zero_count > 0:
anomalies.append({
'Product': product,
'Column': column,
'Anomaly': 'Nilai 0',
'Count': zero_count
})
print(f" - {column}: Ditemukan {zero_count} nilai 0 (anomali)")
# Cek nilai 0 untuk D102265TIC_CV
if 'zero_allowed_products' in rules:
if product not in rules['zero_allowed_products']:
zero_count = (df_product[column] == 0).sum()
if zero_count > 0:
anomalies.append({
'Product': product,
'Column': column,
'Anomaly': 'Nilai 0 (tidak diizinkan untuk produk ini)',
'Count': zero_count
})
print(f" - {column}: Ditemukan {zero_count} nilai 0 (anomali untuk produk non-CIP)")
# Cek nilai minimum
if 'min' in rules:
below_min = (df_product[column] < rules['min']).sum()
if below_min > 0:
anomalies.append({
'Product': product,
'Column': column,
'Anomaly': f'Nilai < {rules["min"]}',
'Count': below_min
})
print(f" - {column}: Ditemukan {below_min} nilai < {rules['min']}")
# Cek nilai maksimum
if 'max' in rules:
above_max = (df_product[column] > rules['max']).sum()
if above_max > 0:
anomalies.append({
'Product': product,
'Column': column,
'Anomaly': f'Nilai > {rules["max"]}',
'Count': above_max
})
print(f" - {column}: Ditemukan {above_max} nilai > {rules['max']}")
if anomalies:
anomaly_df = pd.DataFrame(anomalies)
print("\n" + "-"*50)
print("RINGKASAN ANOMALI")
print("-"*50)
print(anomaly_df.to_string())
else:
print("\nTidak ditemukan anomali berdasarkan aturan yang ditetapkan.")
# ============= STEP 5: VALIDASI KOLOM DRIER_ON_PRODUCT =============
def validate_drier_on_product(self):
"""
Step 5: Validasi kolom Drier_On_Product
"""
print("\n" + "="*80)
print("STEP 5: VALIDASI KOLOM DRIER_ON_PRODUCT")
print("="*80)
production_products = [
'CKP BASE', 'CMP BASE', 'BMP BASE', 'MORIGRO BASE', 'CKH BASE',
'CMH BASE', 'BMH BASE', 'CKR BASE', 'CMR BASE', 'BMR BASE',
'CGI BASE', 'NL33 BASE POWDER', 'CKS BASE', 'CHIL SCHOOL',
'CHIL MIL SOYA'
]
cip_products = ['CIP', 'CIP CHAMBER']
validation_errors = []
for product in self.df['Product'].unique():
df_product = self.df[self.df['Product'] == product]
if product in production_products:
# Harus 1
wrong_values = df_product[df_product['Drier_On_Product'] != 1]
if len(wrong_values) > 0:
validation_errors.append({
'Product': product,
'Expected': 1,
'Wrong_Count': len(wrong_values)
})
print(f"ERROR: {product} memiliki {len(wrong_values)} baris dengan Drier_On_Product != 1")
elif product in cip_products:
# Harus 0
wrong_values = df_product[df_product['Drier_On_Product'] != 0]
if len(wrong_values) > 0:
validation_errors.append({
'Product': product,
'Expected': 0,
'Wrong_Count': len(wrong_values)
})
print(f"ERROR: {product} memiliki {len(wrong_values)} baris dengan Drier_On_Product != 0")
if not validation_errors:
print("✓ Semua nilai Drier_On_Product sesuai dengan ketentuan")
else:
error_df = pd.DataFrame(validation_errors)
print("\nRingkasan Error Validasi:")
print(error_df)
# ============= STEP 6: CEK MISSING VALUES DAN DUPLIKASI =============
def check_missing_and_duplicates(self):
"""
Step 6: Periksa missing values dan hapus duplikasi
"""
print("\n" + "="*80)
print("STEP 6: CEK MISSING VALUES DAN DUPLIKASI")
print("="*80)
# Cek missing values
print("Missing Values per Kolom:")
missing_counts = self.df.isnull().sum()
print(missing_counts[missing_counts > 0] if any(missing_counts > 0) else "Tidak ada missing values")
# Cek duplikasi berdasarkan Date_time
duplicates = self.df[self.df.duplicated(subset=['Date_time'], keep=False)]
print(f"\nJumlah baris duplikat berdasarkan Date_time: {len(duplicates)}")
if len(duplicates) > 0:
print("Menghapus duplikasi...")
self.df = self.df.drop_duplicates(subset=['Date_time'], keep='first')
print(f"Shape setelah menghapus duplikasi: {self.df.shape}")
# Update product dataframes
for product in self.product_dataframes.keys():
self.product_dataframes[product] = self.df[self.df['Product'] == product].copy()
# ============= STEP 7: PERHITUNGAN DURASI PRODUKSI =============
def calculate_production_duration(self):
"""
Step 7: Hitung durasi produksi untuk setiap produk berdasarkan segmen produksi yang berkelanjutan.
Logika:
1. Data diurutkan berdasarkan waktu.
2. Sebuah "segmen" produksi diidentifikasi sebagai blok baris yang berurutan
di mana nama produknya sama.
3. Jika nama produk pada baris saat ini berbeda dari baris sebelumnya,
maka itu dianggap sebagai awal dari segmen baru.
4. Durasi dihitung untuk setiap segmen (end_time - start_time).
5. Total durasi untuk satu produk adalah jumlah dari semua durasi segmennya.
"""
print("\n" + "="*80)
print("STEP 7: PERHITUNGAN DURASI PRODUKSI (METODE SEGMENTASI)")
print("="*80)
# Pastikan tipe data dan urutan sudah benar
try:
self.df['Date_time'] = pd.to_datetime(self.df['Date_time'])
except Exception as e:
print(f"Error saat konversi 'Date_time': {e}")
return
if self.df.empty:
print("DataFrame kosong, tidak ada durasi untuk dihitung.")
return
# Urutkan dataframe berdasarkan waktu, ini krusial untuk logika segmentasi
df_sorted = self.df.sort_values('Date_time').copy()
# --- Logika Inti: Identifikasi Segmen Produksi ---
# Buat kolom 'segment_id' yang akan unik untuk setiap blok produksi yang berkelanjutan.
# .shift() membandingkan produk di baris saat ini dengan baris sebelumnya.
# .cumsum() akan mengakumulasi nilai (True=1, False=0), sehingga menciptakan ID unik untuk setiap segmen.
df_sorted['segment_id'] = (df_sorted['Product'] != df_sorted['Product'].shift()).cumsum()
# Kelompokkan berdasarkan Produk dan ID Segmen untuk mendapatkan start dan end time setiap segmen
production_segments = df_sorted.groupby(['Product', 'segment_id']).agg(
Start_Time=('Date_time', 'min'),
End_Time=('Date_time', 'max'),
Data_Points=('Date_time', 'count')
).reset_index()
# Hitung durasi untuk setiap segmen
production_segments['Duration'] = production_segments['End_Time'] - production_segments['Start_Time']
# Filter hanya untuk produk produksi (bukan CIP)
production_segments_filtered = production_segments[
~production_segments['Product'].isin(['CIP', 'CIP CHAMBER'])
].copy()
if production_segments_filtered.empty:
print("Tidak ada data produksi (non-CIP) untuk dihitung durasinya.")
return
# Hitung total durasi dengan menjumlahkan durasi dari semua segmen per produk
total_durations = production_segments_filtered.groupby('Product')['Duration'].sum().reset_index()
# Konversi total durasi ke jam
total_durations['Total_Duration_Hours'] = round(total_durations['Duration'].dt.total_seconds() / 3600, 2)
# Gabungkan dengan jumlah data points
total_data_points = production_segments_filtered.groupby('Product')['Data_Points'].sum().reset_index()
summary_df = pd.merge(total_durations, total_data_points, on='Product')
print("--- RINGKASAN TOTAL DURASI PRODUKSI PER PRODUK ---")
print(summary_df[['Product', 'Total_Duration_Hours', 'Data_Points']].to_string(index=False))
print("\n" + "-"*80)
print("--- DETAIL SEGMEN PRODUKSI ---")
# Tampilkan detail setiap segmen untuk setiap produk
for product in summary_df['Product'].unique():
print(f"\nProduk: {product}")
product_segment_details = production_segments_filtered[production_segments_filtered['Product'] == product].copy()
# Konversi durasi segmen ke menit untuk keterbacaan
product_segment_details['Duration_Minutes'] = round(product_segment_details['Duration'].dt.total_seconds() / 60, 2)
print(product_segment_details[[
'Start_Time',
'End_Time',
'Duration_Minutes',
'Data_Points'
]].to_string(index=False))
# ============= STEP 8: PEMBUATAN KOLOM FIXED_ROUNDED_TIME =============
def create_fixed_rounded_time(self):
"""
Step 8: Buat kolom fixed_rounded_time jika belum ada
"""
print("\n" + "="*80)
print("STEP 8: PEMBUATAN KOLOM FIXED_ROUNDED_TIME")
print("="*80)
if 'fixed_rounded_time' not in self.df.columns:
print("Membuat kolom fixed_rounded_time...")
self.df['Date_time'] = pd.to_datetime(self.df['Date_time'])
self.df['fixed_rounded_time'] = (self.df['Date_time'] + pd.Timedelta(hours=1)).dt.floor('H')
print("Sample hasil:")
print(self.df[['Date_time', 'fixed_rounded_time']].head(10))
# Update product dataframes
for product in self.product_dataframes.keys():
self.product_dataframes[product] = self.df[self.df['Product'] == product].copy()
else:
print("Kolom fixed_rounded_time sudah ada")
# ============= STEP 9: PERHITUNGAN JUMLAH MENIT =============
def calculate_minutes_per_hour(self):
"""
Step 9: Hitung jumlah data per jam berdasarkan fixed_rounded_time
"""
print("\n" + "="*80)
print("STEP 9: PERHITUNGAN JUMLAH DATA PER JAM")
print("="*80)
if 'fixed_rounded_time' not in self.df.columns:
print("ERROR: Kolom fixed_rounded_time tidak ditemukan!")
return
# Hitung jumlah data per jam
jumlah_data_per_jam = self.df.groupby('fixed_rounded_time').size()
jumlah_data_per_jam_df = jumlah_data_per_jam.reset_index(name='Jumlah Data Per Jam')
# Filter data dengan jumlah kurang dari 60
jumlah_data_kurang_60 = jumlah_data_per_jam_df[jumlah_data_per_jam_df['Jumlah Data Per Jam'] <= 60]
# Urutkan dari jumlah terkecil ke terbesar
jumlah_data_kurang_60 = jumlah_data_kurang_60.sort_values(by='Jumlah Data Per Jam', ascending=True)
print(f"Jam dengan data < 60 menit ({len(jumlah_data_kurang_60)} jam):")
pd.set_option("display.max_rows", None)
print(jumlah_data_kurang_60.to_string())
pd.set_option("display.max_rows", 10)
# ============= STEP 10: VISUALISASI DATA =============
def create_line_plots(self, show_all_products=True, show_overall=True):
"""
Step 10: Buat line plot untuk visualisasi data
"""
print("\n" + "="*80)
print("STEP 10: VISUALISASI DATA (LINE PLOTS)")
print("="*80)
numeric_columns = self.df.select_dtypes(include=[np.number]).columns
numeric_columns = [col for col in numeric_columns if col not in ['Drier_On_Product']]
# Plot untuk keseluruhan data
if show_overall:
print("\nMembuat plot untuk keseluruhan produk...")
df_plot = self.df[self.df['Drier_On_Product'] == 1].copy()
if len(df_plot) > 0:
df_plot = df_plot.sort_values('Date_time')
for column in numeric_columns:
if column in df_plot.columns:
plt.figure(figsize=(30, 5))
plt.plot(df_plot['Date_time'], df_plot[column], marker='o', markersize=2, label=column)
plt.title(f'Line Plot of {column} Over Time - All Products', fontsize=14)
plt.xlabel('Date_time')
plt.ylabel(column)
plt.xticks(rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Plot untuk setiap produk
if show_all_products:
for product, df_product in self.product_dataframes.items():
if product not in ['CIP', 'CIP CHAMBER']:
print(f"\nMembuat plot untuk produk: {product}")
df_plot = df_product[df_product['Drier_On_Product'] == 1].copy()
if len(df_plot) > 0:
df_plot = df_plot.sort_values('Date_time')
for column in numeric_columns:
if column in df_plot.columns:
plt.figure(figsize=(20, 4))
plt.plot(df_plot['Date_time'], df_plot[column], marker='o', markersize=3, label=column)
plt.title(f'{product} - {column} Over Time', fontsize=12)
plt.xlabel('Date_time')
plt.ylabel(column)
plt.xticks(rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ============= STEP 11: IDENTIFIKASI OUTLIERS =============
def identify_outliers(self, show_plots=True):
"""
Step 11: Identifikasi outliers menggunakan metode IQR
"""
print("\n" + "="*80)
print("STEP 11: IDENTIFIKASI OUTLIERS")
print("="*80)
def analyze_outliers(dataframe, product_name="Overall"):
"""Analisis outliers untuk dataframe tertentu"""
df_copy = dataframe.copy()
df_copy['Date_time'] = pd.to_datetime(df_copy['Date_time'])
drier_on_data = df_copy[df_copy['Drier_On_Product'] == 1].copy()
if drier_on_data.empty:
print(f"Tidak ada data dengan Drier_On_Product == 1 untuk {product_name}")
return None, None
print(f"\n{'='*60}")
print(f"Analisis Outliers - {product_name}")
print(f"Total data yang dianalisis: {len(drier_on_data)} baris")
print(f"{'='*60}")
all_stats_data = []
list_of_outliers = []
numeric_columns = drier_on_data.select_dtypes(include=np.number).columns.drop('Drier_On_Product', errors='ignore')
for column in numeric_columns:
if column in drier_on_data.columns:
param_data = drier_on_data[column].dropna()
if len(param_data) > 0:
Q1 = param_data.quantile(0.25)
Q3 = param_data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = param_data[(param_data < lower_bound) | (param_data > upper_bound)]
has_outliers = not outliers.empty
mean_val = param_data.mean()
median_val = param_data.median()
std_val = param_data.std()
chosen_val = median_val if has_outliers else mean_val
all_stats_data.append({
'Parameter': column,
'Mean': mean_val,
'Median': median_val,
'Std_Dev': std_val,
'Batas_Bawah': lower_bound,
'Batas_Atas': upper_bound,
'Has_Outliers': has_outliers,
'Outliers_Count': len(outliers),
'Chosen_Value': chosen_val
})
if has_outliers:
outlier_mask = (drier_on_data[column] < lower_bound) | (drier_on_data[column] > upper_bound)
outlier_rows = drier_on_data[outlier_mask]
for index, row in outlier_rows.iterrows():
list_of_outliers.append({
'Tanggal dan Jam': row['Date_time'],
'Kolom Outliers': column,
'Nilai Outliers': row[column],
'Produk': row['Product'] if 'Product' in row else product_name
})
# Visualisasi jika diminta
if show_plots and has_outliers:
fig, axes = plt.subplots(1, 2, figsize=(18, 5))
# Histogram
axes[0].hist(param_data, bins=30, edgecolor='black', alpha=0.7)
axes[0].axvline(mean_val, color='green', linestyle='--', label=f'Mean: {mean_val:.2f}')
axes[0].axvline(median_val, color='red', linestyle='--', label=f'Median: {median_val:.2f}')
axes[0].set_title(f'Distribution - {column}')
axes[0].legend()
# Time series with outliers
axes[1].plot(range(len(param_data)), param_data.values, 'b-', alpha=0.5)
axes[1].axhline(upper_bound, color='purple', linestyle='--', label=f'Upper: {upper_bound:.2f}')
axes[1].axhline(lower_bound, color='orange', linestyle='--', label=f'Lower: {lower_bound:.2f}')
if has_outliers:
outlier_indices = []
outlier_values = []
for i, (idx, val) in enumerate(param_data.items()):
if val < lower_bound or val > upper_bound:
outlier_indices.append(i)
outlier_values.append(val)
axes[1].scatter(outlier_indices, outlier_values, color='red', s=50, zorder=5, label='Outliers')
axes[1].set_title(f'Time Series - {column}')
axes[1].legend()
plt.suptitle(f'{product_name}: {column}', fontsize=14)
plt.tight_layout()
plt.show()
result_df = pd.DataFrame(all_stats_data) if all_stats_data else None
outliers_df = pd.DataFrame(list_of_outliers) if list_of_outliers else None
if outliers_df is not None and not outliers_df.empty:
outliers_df = outliers_df.sort_values(by='Tanggal dan Jam').reset_index(drop=True)
return result_df, outliers_df
# Analisis untuk keseluruhan data
print("\n" + "="*70)
print("ANALISIS OUTLIERS - KESELURUHAN DATA")
print("="*70)
overall_stats, overall_outliers = analyze_outliers(self.df, "OVERALL")
if overall_stats is not None:
print("\nRingkasan Statistik - Keseluruhan:")
print(overall_stats.to_string())
if overall_outliers is not None and not overall_outliers.empty:
print(f"\nTotal Outliers Keseluruhan: {len(overall_outliers)}")
print("\nSample Outliers (10 pertama):")
print(overall_outliers.head(10).to_string())
# Analisis untuk setiap produk
for product, df_product in self.product_dataframes.items():
if product not in ['CIP', 'CIP CHAMBER']:
stats, outliers = analyze_outliers(df_product, product)
if stats is not None:
print(f"\n{'='*50}")
print(f"Ringkasan Statistik - {product}:")
print(stats.to_string())
if outliers is not None and not outliers.empty:
print(f"\nTotal Outliers {product}: {len(outliers)}")
# ============= MAIN PIPELINE EXECUTION =============
def run_full_pipeline(self, show_visualizations=True):
"""
Menjalankan seluruh pipeline EDA
"""
print("\n" + "="*80)
print(" " * 20 + "SPRAY DRYER EDA PIPELINE")
print(" " * 25 + "STARTING ANALYSIS")
print("="*80)
try:
# Step 1: Pemeriksaan kolom
self.check_and_fix_columns()
# Step 2: Validasi nama produk
self.validate_product_names()
# Step 3: Pemisahan data per produk
self.separate_data_by_product()
# Step 4: Identifikasi anomali
self.identify_anomalies()
# Step 5: Validasi Drier_On_Product
self.validate_drier_on_product()
# Step 6: Cek missing values dan duplikasi
self.check_missing_and_duplicates()
# Step 7: Hitung durasi produksi
self.calculate_production_duration()
# Step 8: Buat kolom fixed_rounded_time
self.create_fixed_rounded_time()
# Step 9: Hitung jumlah menit per jam
self.calculate_minutes_per_hour()
# Step 10: Visualisasi (optional)
if show_visualizations:
self.create_line_plots(show_all_products=False, show_overall=True)
# Step 11: Identifikasi outliers
self.identify_outliers(show_plots=show_visualizations)
print("\n" + "="*80)
print(" " * 25 + "PIPELINE COMPLETED SUCCESSFULLY")
print("="*80)
return self.df, self.product_dataframes
except Exception as e:
print(f"\nERROR dalam pipeline: {str(e)}")
raise
def get_summary(self):
"""
Mendapatkan ringkasan hasil analisis
"""
summary = {
'total_rows': len(self.df),
'total_columns': len(self.df.columns),
'unique_products': self.df['Product'].nunique(),
'date_range': {
'start': self.df['Date_time'].min(),
'end': self.df['Date_time'].max()
},
'missing_values': self.df.isnull().sum().to_dict(),
'product_counts': self.df['Product'].value_counts().to_dict()
}
print("\n" + "="*50)
print("📊 DATA SUMMARY")
print("="*50)
print(f"{'Total rows':20}: {summary['total_rows']:,}")
print(f"{'Total columns':20}: {summary['total_columns']}")
print(f"{'Unique products':20}: {summary['unique_products']}")
print(f"{'Date range':20}: {summary['date_range']['start']}{summary['date_range']['end']}")
print("\n🔍 Missing values per column")
print("-"*50)
for col, val in summary['missing_values'].items():
print(f"{col:25} : {val}")
print("\n📦 Product counts")
print("-"*50)
for prod, count in summary['product_counts'].items():
print(f"{prod:25} : {count:,}")
return summary
# ======================================================================
# HELPER FUNCTIONS UNTUK DASHBOARD STREAMLIT (EDA)
# ======================================================================
def compute_eda_summary(df: pd.DataFrame,
date_col: str = "Date_time",
product_col: str = "Product") -> dict:
"""Ringkasan umum dataset untuk metric/cards di dashboard."""
df = df.copy()
# Tanggal
if date_col in df.columns:
df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
date_min = df[date_col].min()
date_max = df[date_col].max()
else:
date_min, date_max = pd.NaT, pd.NaT
total_rows = len(df)
total_columns = df.shape[1]
total_missing = int(df.isna().sum().sum())
duplicate_rows = int(df.duplicated().sum())
if product_col in df.columns:
product_counts = df[product_col].value_counts().to_dict()
unique_products = int(df[product_col].nunique())
else:
product_counts = {}
unique_products = 0
summary = {
"total_rows": total_rows,
"total_columns": total_columns,
"date_min": date_min,
"date_max": date_max,
"total_missing": total_missing,
"duplicate_rows": duplicate_rows,
"unique_products": unique_products,
"product_counts": product_counts,
}
return summary
def compute_anomaly_table(df: pd.DataFrame,
product_col: str = "Product") -> pd.DataFrame:
"""
Hitung anomali berbasis rule teknis (error validasi),
dengan struktur kolom: Product, Column, Anomaly, Count.
"""
if product_col not in df.columns:
return pd.DataFrame(columns=["Product", "Column", "Anomaly", "Count"])
anomaly_rules = {
'D101330TT': {'min': 20, 'max': 130, 'zero_anomaly': True},
'D102265TIC_PV': {'min': 20, 'zero_anomaly': True},
'D102265TIC_CV': {'zero_allowed_products': ['CIP', 'CIP CHAMBER']},
'D102266TIC': {'zero_anomaly': True}
}
anomalies = []
for product, df_product in df.groupby(product_col):
for column, rules in anomaly_rules.items():
if column not in df_product.columns:
continue
# Nilai 0 yang dianggap anomali
if rules.get("zero_anomaly", False):
zero_count = (df_product[column] == 0).sum()
if zero_count > 0:
anomalies.append({
"Product": product,
"Column": column,
"Anomaly": "Nilai 0",
"Count": int(zero_count),
})
# Nilai 0 untuk kolom yang hanya boleh 0 di produk tertentu
if "zero_allowed_products" in rules:
if product not in rules["zero_allowed_products"]:
zero_count = (df_product[column] == 0).sum()
if zero_count > 0:
anomalies.append({
"Product": product,
"Column": column,
"Anomaly": "Nilai 0 (tidak diizinkan untuk produk ini)",
"Count": int(zero_count),
})
# Nilai < min
if "min" in rules:
below_min = (df_product[column] < rules["min"]).sum()
if below_min > 0:
anomalies.append({
"Product": product,
"Column": column,
"Anomaly": f"Nilai < {rules['min']}",
"Count": int(below_min),
})
# Nilai > max
if "max" in rules:
above_max = (df_product[column] > rules["max"]).sum()
if above_max > 0:
anomalies.append({
"Product": product,
"Column": column,
"Anomaly": f"Nilai > {rules['max']}",
"Count": int(above_max),
})
if not anomalies:
return pd.DataFrame(columns=["Product", "Column", "Anomaly", "Count"])
anomaly_df = pd.DataFrame(anomalies)
anomaly_df = anomaly_df.groupby(
["Product", "Column", "Anomaly"], as_index=False
)["Count"].sum()
return anomaly_df
def compute_production_segments(df: pd.DataFrame,
product_col: str = "Product",
time_col: str = "Date_time") -> pd.DataFrame:
"""
Hitung segmen produksi kontinu per produk.
Output kolom:
Product | Start_Time | End_Time | Duration_Minutes | Data_Points
"""
if product_col not in df.columns or time_col not in df.columns:
return pd.DataFrame(columns=["Product", "Start_Time", "End_Time", "Duration_Minutes", "Data_Points"])
df_seg = df[[product_col, time_col]].copy()
df_seg[time_col] = pd.to_datetime(df_seg[time_col], errors="coerce")
df_seg = df_seg.dropna(subset=[time_col]).sort_values(time_col)
# Segment id: berubah setiap kali Product berubah
df_seg["segment_id"] = (df_seg[product_col] != df_seg[product_col].shift()).cumsum()
grouped = df_seg.groupby([product_col, "segment_id"]).agg(
Start_Time=(time_col, "min"),
End_Time=(time_col, "max"),
Data_Points=(time_col, "count"),
).reset_index()
grouped["Duration"] = grouped["End_Time"] - grouped["Start_Time"]
# Filter non-CIP jika perlu (optional)
grouped = grouped[~grouped[product_col].isin(["CIP", "CIP CHAMBER"])]
if grouped.empty:
return pd.DataFrame(columns=["Product", "Start_Time", "End_Time", "Duration_Minutes", "Data_Points"])
grouped["Duration_Minutes"] = grouped["Duration"].dt.total_seconds() / 60.0
result = grouped[[product_col, "Start_Time", "End_Time", "Duration_Minutes", "Data_Points"]].copy()
result.rename(columns={product_col: "Product"}, inplace=True)
return result
def create_line_plots(df: pd.DataFrame,
params: list,
product_label: str = "All Data",
time_col: str = "Date_time"):
"""
Membuat 6 plot distribusi parameter proses (2x3 grid) vs waktu.
Mengembalikan satu Figure matplotlib untuk ditampilkan di Streamlit.
"""
df_plot = df.copy()
if time_col in df_plot.columns:
df_plot[time_col] = pd.to_datetime(df_plot[time_col], errors="coerce")
df_plot = df_plot.dropna(subset=[time_col]).sort_values(time_col)
# Siapkan figure 2x3
fig, axes = plt.subplots(2, 3, figsize=(18, 8), sharex=True)
axes = axes.flatten()
for i, param in enumerate(params):
ax = axes[i]
if param in df_plot.columns:
ax.plot(df_plot[time_col], df_plot[param], marker=".", linewidth=0.7)
ax.set_title(param)
ax.grid(True, alpha=0.3)
else:
ax.set_title(f"{param} (not found)")
ax.axis("off")
# Jika params < 6, matikan axis kosong
for j in range(len(params), 6):
axes[j].axis("off")
fig.suptitle(f"Distribusi Parameter Proses – {product_label}", fontsize=14)
fig.tight_layout(rect=[0, 0.03, 1, 0.95])
return fig
def identify_outliers(df: pd.DataFrame,
params: list,
product_label: str = "All Data",
time_col: str = "Date_time"):
"""
Deteksi outlier dengan metode IQR untuk setiap parameter dalam `params`.
Mengembalikan:
- fig_out : Figure 2x3 dengan plot time series + highlight outlier
- total_outliers: total jumlah outlier semua parameter
- outlier_stats_df: tabel ringkasan per parameter
"""
df_proc = df.copy()
if time_col in df_proc.columns:
df_proc[time_col] = pd.to_datetime(df_proc[time_col], errors="coerce")
df_proc = df_proc.dropna(subset=[time_col]).sort_values(time_col)
stats_rows = []
total_outliers = 0
fig, axes = plt.subplots(2, 3, figsize=(18, 8), sharex=True)
axes = axes.flatten()
for i, param in enumerate(params):
ax = axes[i]
if param not in df_proc.columns:
ax.set_title(f"{param} (not found)")
ax.axis("off")
continue
series = df_proc[param].astype(float)
series_no_na = series.dropna()
if series_no_na.empty:
ax.set_title(f"{param} (no data)")
ax.axis("off")
continue
Q1 = series_no_na.quantile(0.25)
Q3 = series_no_na.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outlier_mask = (series < lower) | (series > upper)
outlier_idx = df_proc.index[outlier_mask]
outlier_vals = series[outlier_mask]
count_out = int(outlier_mask.sum())
total_outliers += count_out
stats_rows.append({
"Parameter": param,
"Q1": Q1,
"Q3": Q3,
"IQR": IQR,
"Lower_Bound": lower,
"Upper_Bound": upper,
"Outliers_Count": count_out,
})
# Plot time series + highlight outliers
ax.plot(df_proc[time_col], series, linewidth=0.7)
if count_out > 0:
ax.scatter(df_proc.loc[outlier_idx, time_col], outlier_vals, s=15)
ax.axhline(lower, linestyle="--")
ax.axhline(upper, linestyle="--")
ax.set_title(f"{param} (outliers: {count_out})")
ax.grid(True, alpha=0.3)
# Matikan axis kosong jika params < 6
for j in range(len(params), 6):
axes[j].axis("off")
fig.suptitle(f"Outlier Detection – {product_label}", fontsize=14)
fig.tight_layout(rect=[0, 0.03, 1, 0.95])
outlier_stats_df = pd.DataFrame(stats_rows)
return fig, total_outliers, outlier_stats_df
def compute_stats_table(df: pd.DataFrame,
params: list,
target_col: str = None) -> pd.DataFrame:
"""
Tabel statistik deskriptif untuk parameter proses + kolom target gas (jika ada).
"""
cols = [c for c in params if c in df.columns]
if target_col and target_col in df.columns:
cols.append(target_col)
if not cols:
return pd.DataFrame()
desc = df[cols].describe().T # index = parameter
return desc
# ============= CARA PENGGUNAAN =============
if __name__ == "__main__":
df = pd.read_csv(r"C:\Dokumen\One To Many_17_10_2025\MMBTU\DASHBOARD\One To Many\disagregasi_data_spraydryer_terbaru_10_17_2025.csv")
pipeline = SprayDryerEDAPipeline(dataframe=df)
processed_df, product_dfs = pipeline.run_full_pipeline(show_visualizations=True)
processed_df.to_csv(r"Processed Data Pipeline EDA_21_10_2025.csv", index=False)
summary = pipeline.get_summary()
print(summary)