import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # smote와 ctgan을 이용한 oversampling 진행 # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] input_paths = [f'../data/data_for_modeling/{region}_train.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, input_path in zip(regions, input_paths): # 데이터 읽기 data = pd.read_csv(input_path, index_col=0) data.drop(['Unnamed: 0'], axis=1, inplace=True) print("\n######",region,"#######") print(len(data[data['multi_class']==0]),'|',len(data[data['multi_class']==1]),'|',len(data[data['multi_class']==2])) print(len(data.columns)) import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] input_paths = [f'../data/data_for_modeling/{region}_train.csv' for region in regions] output_paths = [f'../data/data_oversampled/smote_{region}.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, input_path, output_path in zip(regions, input_paths, output_paths): # 데이터 읽기 data = pd.read_csv(input_path, index_col=0) data.drop(['Unnamed: 0'], axis=1, inplace=True) # X와 y 분리 X = data.drop(columns=['multi_class', 'binary_class']) y = data['multi_class'] # 불필요한 열 제거 X.drop(columns=['ground_temp - temp_C', 'hour_sin', 'hour_cos', 'month_sin', 'month_cos'], inplace=True) # 범주형 변수 식별 categorical_features = [i for i, dtype in enumerate(X.dtypes) if dtype != 'float64'] # 각 지역의 multi_class 값이 2인 데이터 개수 계산 count_class_2 = (y == 2).sum() # SMOTENC 적용 smotenc = SMOTENC( categorical_features=categorical_features, sampling_strategy={0: 10000, 1: 10000, 2: count_class_2}, random_state=42 ) X_resampled, y_resampled = smotenc.fit_resample(X, y) # 추가 변수 생성 X_resampled['multi_class'] = y_resampled X_resampled['binary_class'] = X_resampled['multi_class'].apply(lambda x: 0 if x == 2 else 1) X_resampled['hour_sin'] = np.sin(2 * np.pi * X_resampled['hour'] / 24) X_resampled['hour_cos'] = np.cos(2 * np.pi * X_resampled['hour'] / 24) X_resampled['month_sin'] = np.sin(2 * np.pi * X_resampled['month'] / 12) X_resampled['month_cos'] = np.cos(2 * np.pi * X_resampled['month'] / 12) X_resampled['ground_temp - temp_C'] = X_resampled['groundtemp'] - X_resampled['temp_C'] # 결과 저장 X_resampled.to_csv(output_path) print(f"Processed and saved: {region} -> {output_path}") smote_seoul = pd.read_csv('../data/data_oversampled/smote_seoul.csv') print(smote_seoul[smote_seoul['multi_class']==0]['visi'].describe()) print(smote_seoul[smote_seoul['multi_class']==1]['visi'].describe()) import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] input_paths = [f'../data/data_oversampled/smote_{region}.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, input_path in zip(regions, input_paths): # 데이터 읽기 data = pd.read_csv(input_path, index_col=0) data.drop(['Unnamed: 0'], axis=1, inplace=True) print("\n######",region,"#######") print(len(data[data['multi_class']==0]),'|',len(data[data['multi_class']==1]),'|',len(data[data['multi_class']==2])) print(len(data.columns)) import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] input_paths = [f'../data/data_for_modeling/{region}_train.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, input_path in zip(regions, input_paths): # 데이터 읽기 data = pd.read_csv(input_path, index_col=0) data.drop(['Unnamed: 0'], axis=1, inplace=True) print("\n######",region,"#######") print(len(data[data['multi_class']==0]),'|',len(data[data['multi_class']==1]),'|',len(data[data['multi_class']==2])) print(len(data.columns)) import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC import optuna from ctgan import CTGAN import torch import warnings # 지역별 데이터 파일 경로 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] file_paths = [f'../data/data_for_modeling/df_{region}.feather' for region in regions] output_paths = [f'../data/data_oversampled/ctgan_{region}.csv' for region in regions] # GPU 사용 설정 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # 경고 무시 warnings.filterwarnings("ignore", category=UserWarning, module="optuna.distributions") # 지역별 처리 for file_path, output_path in zip(file_paths, output_paths): # 데이터 로드 data = pd.read_feather(file_path) data.drop(['Unnamed: 0'], axis=1, inplace=True) X = data.drop(columns=['multi_class', 'binary_class']) y = data['multi_class'] # 불필요한 열 제거 X.drop(columns=['ground_temp - temp_C', 'hour_sin', 'hour_cos', 'month_sin', 'month_cos'], inplace=True) # SMOTENC에서 사용할 범주형 변수 열 번호 설정 categorical_features_indices = [i for i, dtype in enumerate(X.dtypes) if dtype != 'float64'] # sampling_strategy 설정 count_class_0 = (y == 0).sum() count_class_1 = (y == 1).sum() count_class_2 = (y == 2).sum() sampling_strategy = { 0: 500 if count_class_0 <= 500 else 1000, 1: int(np.ceil(count_class_1 / 100) * 100), # 백의 자리로 올림 2: count_class_2 } # SMOTENC 적용 smotenc = SMOTENC(categorical_features=categorical_features_indices, sampling_strategy=sampling_strategy, random_state=42) X_resampled, y_resampled = smotenc.fit_resample(X, y) # Resampled 데이터 생성 lerp_data = X_resampled.copy() lerp_data['multi_class'] = y_resampled # CTGAN에서 사용할 범주형 변수 열 이름 설정 categorical_features = [ col for col, dtype in zip(lerp_data.columns, lerp_data.dtypes) if dtype != 'float64' ] # Optuna 목적 함수 정의 def objective(trial): # 하이퍼파라미터 탐색 범위 설정 embedding_dim = trial.suggest_int("embedding_dim", 64, 128) generator_dim = trial.suggest_categorical("generator_dim", [(64, 64), (128, 128)]) discriminator_dim = trial.suggest_categorical("discriminator_dim", [(64, 64), (128, 128)]) pac = trial.suggest_categorical("pac", [4, 8]) batch_size = trial.suggest_categorical("batch_size", [64, 128, 256]) discriminator_steps = trial.suggest_int("discriminator_steps", 1, 3) # CTGAN 모델 생성 ctgan = CTGAN( embedding_dim=embedding_dim, generator_dim=generator_dim, discriminator_dim=discriminator_dim, batch_size=batch_size, discriminator_steps=discriminator_steps, pac=pac ) # 범주 0 데이터 필터링 data_0 = lerp_data[lerp_data['multi_class'] == 0] # 모델 학습 ctgan.fit(data_0, discrete_columns=categorical_features) # 샘플 생성 generated_data = ctgan.sample(len(data_0) * 2) # 평가: 샘플의 연속형 변수 분포 비교 real_visi = data_0['visi'] generated_visi = generated_data['visi'] # 분포 간 차이(MSE) 계산 mse = ((real_visi.mean() - generated_visi.mean())**2 + (real_visi.std() - generated_visi.std())**2) return -mse # Optuna로 최적화 수행 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=50) # 최적 하이퍼파라미터 출력 best_params = study.best_params # 최적 하이퍼파라미터로 CTGAN 학습 및 샘플 생성 ctgan = CTGAN( embedding_dim=best_params["embedding_dim"], generator_dim=best_params["generator_dim"], discriminator_dim=best_params["discriminator_dim"], batch_size=best_params["batch_size"], discriminator_steps=best_params["discriminator_steps"], pac=best_params["pac"] ) # 범주 0 데이터로 최종 학습 ctgan.fit(lerp_data[lerp_data['multi_class'] == 0], discrete_columns=categorical_features) generated_0 = ctgan.sample(19500 if count_class_0 <= 500 else 19000) # 범주 1 데이터 최적화 및 생성 def objective_class1(trial): embedding_dim = trial.suggest_int("embedding_dim", 128, 512) generator_dim = trial.suggest_categorical("generator_dim", [(128, 128), (256, 256)]) discriminator_dim = trial.suggest_categorical("discriminator_dim", [(128, 128), (256, 256)]) pac = trial.suggest_categorical("pac", [4, 8]) batch_size = trial.suggest_categorical("batch_size", [256, 512, 1024]) discriminator_steps = trial.suggest_int("discriminator_steps", 1, 5) ctgan = CTGAN( embedding_dim=embedding_dim, generator_dim=generator_dim, discriminator_dim=discriminator_dim, batch_size=batch_size, discriminator_steps=discriminator_steps, pac=pac ) data_1 = lerp_data[lerp_data['multi_class'] == 1] ctgan.fit(data_1, discrete_columns=categorical_features) generated_data = ctgan.sample(len(data_1) * 2) real_visi = data_1['visi'] generated_visi = generated_data['visi'] mse = ((real_visi.mean() - generated_visi.mean())**2 + (real_visi.std() - generated_visi.std())**2) return -mse study_class1 = optuna.create_study(direction="maximize") study_class1.optimize(objective_class1, n_trials=30) best_params_class1 = study_class1.best_params ctgan = CTGAN( embedding_dim=best_params_class1["embedding_dim"], generator_dim=best_params_class1["generator_dim"], discriminator_dim=best_params_class1["discriminator_dim"], batch_size=best_params_class1["batch_size"], discriminator_steps=best_params_class1["discriminator_steps"], pac=best_params_class1["pac"] ) ctgan.fit(lerp_data[lerp_data['multi_class'] == 1], discrete_columns=categorical_features) generated_1 = ctgan.sample(20000 - int(np.ceil(count_class_1 / 100) * 100)) # 데이터 병합 및 저장 well_generated0 = generated_0[(generated_0['visi'] >= 0) & (generated_0['visi'] < 100)] well_generated1 = generated_1[(generated_1['visi'] >= 100) & (generated_1['visi'] < 500)] smote_gan_data = pd.concat([lerp_data, well_generated0, well_generated1], axis=0) # 제거변수 복구 smote_gan_data['binary_class'] = smote_gan_data['multi_class'].apply(lambda x: 0 if x == 2 else 1) smote_gan_data['hour_sin'] = np.sin(2 * np.pi * smote_gan_data['hour'] / 24) smote_gan_data['hour_cos'] = np.cos(2 * np.pi * smote_gan_data['hour'] / 24) smote_gan_data['month_sin'] = np.sin(2 * np.pi * smote_gan_data['month'] / 12) smote_gan_data['month_cos'] = np.cos(2 * np.pi * smote_gan_data['month'] / 12) smote_gan_data['ground_temp - temp_C'] = smote_gan_data['groundtemp'] - smote_gan_data['temp_C'] # 결과 저장 smote_gan_data.to_csv(output_path, index = False) print(f"Processed and saved: {region} -> {output_path}") import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] input_paths = [f'../data/data_oversampled/ctgan_{region}.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, input_path in zip(regions, input_paths): # 데이터 읽기 data = pd.read_csv(input_path) print("\n######",region,"#######") print(len(data[data['multi_class']==0]),'|',len(data[data['multi_class']==1]),'|',len(data[data['multi_class']==2])) print(len(data.columns)) busan_check = pd.read_csv('../data/data_oversampled/ctgan_busan.csv') print(busan_check[busan_check['multi_class']==0]['visi'].describe()) print(busan_check[busan_check['multi_class']==1]['visi'].describe()) print(busan_check[busan_check['multi_class']==2]['visi'].describe()) import pandas as pd import numpy as np from imblearn.over_sampling import SMOTENC # 파일 경로와 지역 이름 리스트 regions = ['busan', 'daegu', 'daejeon', 'incheon', 'seoul','gwangju'] origin_paths = [f'../data/data_for_modeling/{region}_train.csv' for region in regions] augment_paths = [f'../data/data_oversampled/ctgan_{region}.csv' for region in regions] # 반복적으로 각 지역 데이터 처리 for region, origin_path, augment_path in zip(regions, origin_paths, augment_paths): # 데이터 읽기 origin = pd.read_csv(origin_path, index_col=0) augment = pd.read_csv(augment_path) # 증강된 데이터에서 범주 2 데이터 제거 filtered_data = augment[augment['multi_class'] != 2] # 원본 데이터에서 범주 2 데이터 추출 original_class2 = origin[origin['multi_class'] == 2] # 제거된 데이터에 원본 범주 2 데이터를 추가 final_data = pd.concat([filtered_data, original_class2], axis=0) # 인덱스 재설정 final_data.reset_index(drop=True, inplace=True) # 결과 저장 final_data.to_csv(augment_path, index = False) print("\n######",region,"#######") print(len(final_data[final_data['multi_class']==0]),'|',len(final_data[final_data['multi_class']==1]),'|',len(final_data[final_data['multi_class']==2])) print(len(data.columns))