alwaysgood commited on
Commit
902ce76
·
verified ·
1 Parent(s): fa82f2c

Update preprocessing.py

Browse files
Files changed (1) hide show
  1. preprocessing.py +182 -4
preprocessing.py CHANGED
@@ -99,7 +99,17 @@ def convert_tide_level_to_residual(df, station_id):
99
  if harmonic_level is None:
100
  harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5)
101
 
102
- if harmonic_level is not None:
 
 
 
 
 
 
 
 
 
 
103
  residual = tide_level - harmonic_level
104
  residual_values.append(residual)
105
  successful_conversions += 1
@@ -184,6 +194,158 @@ def create_mock_residual_data(df):
184
  print(f"✅ 가상 residual 데이터 {len(residual_values)}개 생성")
185
  return df
186
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
  def validate_input_data(df):
188
  """입력 데이터 유효성 검증"""
189
  print("🔍 입력 데이터 검증 중...")
@@ -535,7 +697,7 @@ def handle_missing_values(df, station_id=None):
535
  def preprocess_uploaded_file(file_path, station_id):
536
  """
537
  업로드된 파일의 전체 전처리 파이프라인
538
- tide_level → residual 변환 + 검증
539
  """
540
  try:
541
  print(f"\n🚀 {station_id} 관측소 데이터 전처리 시작")
@@ -550,10 +712,26 @@ def preprocess_uploaded_file(file_path, station_id):
550
  if not is_valid:
551
  return None, f"입력 데이터 오류:\n" + "\n".join(issues)
552
 
553
- # 3. 결측치 처리
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
554
  df_cleaned = handle_missing_values(df, station_id)
555
 
556
- # 4. tide_level → residual 변환
557
  converted_df = convert_tide_level_to_residual(df_cleaned, station_id)
558
 
559
  # 5. 변환된 데이터를 임시 파일로 저장
 
99
  if harmonic_level is None:
100
  harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5)
101
 
102
+ # 이상치 플래그 확인
103
+ is_outlier = False
104
+ if '_tide_outlier_flag' in df_last_144.columns:
105
+ is_outlier = df_last_144.at[idx, '_tide_outlier_flag'] if not pd.isna(df_last_144.at[idx, '_tide_outlier_flag']) else False
106
+
107
+ if is_outlier:
108
+ # 이상치로 탐지된 경우 residual = 0 (harmonic만 사용)
109
+ residual_values.append(0.0)
110
+ successful_conversions += 1
111
+ print(f" 🚨 이상치 탐지된 시점 {timestamp}: residual=0 적용")
112
+ elif harmonic_level is not None:
113
  residual = tide_level - harmonic_level
114
  residual_values.append(residual)
115
  successful_conversions += 1
 
194
  print(f"✅ 가상 residual 데이터 {len(residual_values)}개 생성")
195
  return df
196
 
197
+ def detect_harmonic_based_outliers(df, station_id):
198
+ """조화 예측 기반 tide_level 이상치 탐지"""
199
+ print("🌊 Harmonic 기반 tide_level 이상치 탐지 시작...")
200
+
201
+ if 'tide_level' not in df.columns:
202
+ print("⚠️ tide_level 컬럼이 없어서 이상치 탐지를 건너뜁니다.")
203
+ return pd.Series(False, index=df.index)
204
+
205
+ try:
206
+ # 1. 해당 시간대 harmonic_level 조회
207
+ df_copy = df.copy()
208
+ df_copy['date'] = pd.to_datetime(df_copy['date'])
209
+
210
+ # KST 시간대 설정
211
+ kst = pytz.timezone('Asia/Seoul')
212
+ if df_copy['date'].dt.tz is None:
213
+ df_copy['date'] = df_copy['date'].dt.tz_localize(kst)
214
+ else:
215
+ df_copy['date'] = df_copy['date'].dt.tz_convert(kst)
216
+
217
+ start_time = df_copy['date'].min()
218
+ end_time = df_copy['date'].max()
219
+
220
+ print(f"📅 이상치 탐지 시간 범위: {start_time} ~ {end_time}")
221
+
222
+ # 2. Harmonic 데이터 조회
223
+ harmonic_data = get_harmonic_predictions(station_id, start_time, end_time)
224
+
225
+ if not harmonic_data:
226
+ print("❌ 조화 예측 데이터가 없어서 물리적 한계로 대체합니다.")
227
+ # 물리적 한계로 폴백
228
+ physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000)
229
+ return physical_outliers
230
+
231
+ print(f"📊 조화 예측 데이터: {len(harmonic_data)}개 조회")
232
+
233
+ # 3. Harmonic 딕셔너리 생성
234
+ harmonic_dict = {}
235
+ for h_data in harmonic_data:
236
+ try:
237
+ h_time_str = h_data['predicted_at']
238
+ h_time = parse_time_string(h_time_str)
239
+
240
+ if h_time is None:
241
+ continue
242
+
243
+ # KST로 변환
244
+ if h_time.tzinfo is None:
245
+ h_time = pytz.UTC.localize(h_time)
246
+ h_time = h_time.astimezone(kst)
247
+
248
+ # 5분 단위로 정규화
249
+ minutes = (h_time.minute // 5) * 5
250
+ h_time = h_time.replace(minute=minutes, second=0, microsecond=0)
251
+
252
+ harmonic_dict[h_time] = float(h_data['harmonic_level'])
253
+
254
+ except Exception as e:
255
+ print(f"⚠️ 조화 데이터 파싱 오류: {h_data}, {e}")
256
+ continue
257
+
258
+ print(f"📊 사용 가능한 조화 예측: {len(harmonic_dict)}개")
259
+
260
+ # 4. Residual 계산
261
+ residuals = []
262
+ outlier_flags = []
263
+
264
+ for idx, row in df_copy.iterrows():
265
+ tide_level = row['tide_level']
266
+ timestamp = row['date']
267
+
268
+ # 5분 단위로 정규화
269
+ minutes = (timestamp.minute // 5) * 5
270
+ normalized_time = timestamp.replace(minute=minutes, second=0, microsecond=0)
271
+
272
+ # 정확히 매칭되는 harmonic_level 찾기
273
+ harmonic_level = harmonic_dict.get(normalized_time)
274
+
275
+ # 매칭되지 않으면 가장 가까운 시간 찾기 (5분 이내)
276
+ if harmonic_level is None:
277
+ harmonic_level = find_closest_harmonic(normalized_time, harmonic_dict, max_diff_minutes=5)
278
+
279
+ if harmonic_level is not None:
280
+ residual = tide_level - harmonic_level
281
+ residuals.append(residual)
282
+ outlier_flags.append(False) # 일단 정상으로 표시
283
+ else:
284
+ # Harmonic 데이터가 없으면 물리적 한계로 판정
285
+ outlier_flags.append(tide_level < -300 or tide_level > 2000)
286
+ residuals.append(0.0)
287
+
288
+ # 5. Residual 기반 3σ 이상치 탐지
289
+ if len(residuals) > 0:
290
+ residuals_array = np.array(residuals)
291
+ # NaN이 아닌 residual만으로 통계 계산
292
+ valid_residuals = residuals_array[~np.isnan(residuals_array)]
293
+
294
+ if len(valid_residuals) > 3: # 최소 3개 이상 필요
295
+ residual_mean = np.mean(valid_residuals)
296
+ residual_std = np.std(valid_residuals)
297
+
298
+ print(f"📈 Residual 통계: 평균={residual_mean:.1f}cm, 표준편차={residual_std:.1f}cm")
299
+
300
+ if residual_std > 0: # 표준편차가 0이 아닌 경우만
301
+ # 3σ 기준 이상치 탐지
302
+ threshold = 3 * residual_std
303
+ for i, residual in enumerate(residuals):
304
+ if not np.isnan(residual):
305
+ if abs(residual - residual_mean) > threshold:
306
+ outlier_flags[i] = True
307
+
308
+ outlier_count = sum(outlier_flags)
309
+ print(f"🚨 Harmonic 기반 이상치 탐지: {outlier_count}개 (3σ={threshold:.1f}cm 기준)")
310
+ else:
311
+ print("📊 Residual 표준편차가 0이므로 물리적 한계만 적용")
312
+ else:
313
+ print("📊 유효한 residual이 부족하여 물리적 한계만 적용")
314
+
315
+ return pd.Series(outlier_flags, index=df.index)
316
+
317
+ except Exception as e:
318
+ print(f"❌ Harmonic 기반 이상치 탐지 실패: {e}")
319
+ traceback.print_exc()
320
+ # 폴백: 물리적 한계로 탐지
321
+ physical_outliers = (df['tide_level'] < -300) | (df['tide_level'] > 2000)
322
+ return physical_outliers
323
+
324
+ def detect_weather_outliers(df):
325
+ """기상 데이터 물리적 한계 기반 이상치 탐지"""
326
+ print("🌡️ 기상 데이터 물리적 한계 기반 이상치 탐지 시작...")
327
+
328
+ # 물리적 한계 정의
329
+ PHYSICAL_LIMITS = {
330
+ 'air_pres': (850, 1100), # hPa (극한 기상 포함)
331
+ 'wind_speed': (0, 80), # m/s (한국 최대풍속 고려)
332
+ 'air_temp': (-35, 45), # °C (한국 기록 극값)
333
+ 'wind_dir': (0, 360) # degree
334
+ }
335
+
336
+ outliers = pd.DataFrame(False, index=df.index, columns=df.columns)
337
+
338
+ for col, (min_val, max_val) in PHYSICAL_LIMITS.items():
339
+ if col in df.columns:
340
+ col_outliers = (df[col] < min_val) | (df[col] > max_val)
341
+ outlier_count = col_outliers.sum()
342
+
343
+ if outlier_count > 0:
344
+ print(f"🌡️ {col} 물리적 한계 이상치: {outlier_count}개 (범위: {min_val}~{max_val})")
345
+ outliers[col] = col_outliers
346
+
347
+ return outliers
348
+
349
  def validate_input_data(df):
350
  """입력 데이터 유효성 검증"""
351
  print("🔍 입력 데이터 검증 중...")
 
697
  def preprocess_uploaded_file(file_path, station_id):
698
  """
699
  업로드된 파일의 전체 전처리 파이프라인
700
+ 이상치 탐지 → 결측치 처리 → tide_level → residual 변환 + 검증
701
  """
702
  try:
703
  print(f"\n🚀 {station_id} 관측소 데이터 전처리 시작")
 
712
  if not is_valid:
713
  return None, f"입력 데이터 오류:\n" + "\n".join(issues)
714
 
715
+ # 3. 이상탐지 및 처리
716
+ print("\n🔍 이상치 탐지 및 처리 단계")
717
+
718
+ # 3-1. Harmonic 기반 tide_level 이상치 탐지
719
+ tide_outliers = detect_harmonic_based_outliers(df, station_id)
720
+ if tide_outliers.any():
721
+ print(f"🌊 tide_level 이상치 {tide_outliers.sum()}개 → residual=0 처리 예정")
722
+ df.loc[tide_outliers, '_tide_outlier_flag'] = True
723
+
724
+ # 3-2. 기상 데이터 물리적 한계 기반 이상치 탐지
725
+ weather_outliers = detect_weather_outliers(df)
726
+ for col in weather_outliers.columns:
727
+ if weather_outliers[col].any():
728
+ print(f"🌡️ {col} 이상치 {weather_outliers[col].sum()}개 → NaN 변환")
729
+ df.loc[weather_outliers[col], col] = np.nan
730
+
731
+ # 4. 결측치 처리
732
  df_cleaned = handle_missing_values(df, station_id)
733
 
734
+ # 5. tide_level → residual 변환 (이상치 플래그 반영)
735
  converted_df = convert_tide_level_to_residual(df_cleaned, station_id)
736
 
737
  # 5. 변환된 데이터를 임시 파일로 저장