Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from typing import Tuple | |
| def generate_sudden_drift(n_samples: int = 1000, drift_point: int = 500) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """κΈκ²©ν λ리ννΈ: t μμ μμ κ°μκΈ° λ°μ΄ν° λΆν¬ λ³κ²½""" | |
| X = np.arange(n_samples) | |
| y = np.zeros(n_samples) | |
| # Before drift: y = 2 + sin(X/50) + noise | |
| y[:drift_point] = 2 + np.sin(X[:drift_point] / 50) + np.random.normal(0, 0.3, drift_point) | |
| # After drift: y = 5 - sin(X/50) + noise (μμ ν λ€λ₯Έ ν¨ν΄) | |
| y[drift_point:] = 5 - np.sin(X[drift_point:] / 50) + np.random.normal(0, 0.3, n_samples - drift_point) | |
| drift_points = np.array([drift_point]) | |
| return X, y, drift_points | |
| def generate_gradual_drift(n_samples: int = 1000, drift_start: int = 300, drift_end: int = 700) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """μ μ§μ λ리ννΈ: λ λΆν¬κ° μμ΄λ©° μ²μ²ν μ ν""" | |
| X = np.arange(n_samples) | |
| y = np.zeros(n_samples) | |
| # Before drift: y = 2 + sin(X/50) + noise | |
| y[:drift_start] = 2 + np.sin(X[:drift_start] / 50) + np.random.normal(0, 0.3, drift_start) | |
| # Gradual transition: μ μ§μ μΌλ‘ λ³ν | |
| transition_length = drift_end - drift_start | |
| for i in range(drift_start, drift_end): | |
| weight = (i - drift_start) / transition_length | |
| old_concept = 2 + np.sin(X[i] / 50) + np.random.normal(0, 0.3) | |
| new_concept = 5 - np.sin(X[i] / 50) + np.random.normal(0, 0.3) | |
| y[i] = (1 - weight) * old_concept + weight * new_concept | |
| # After drift: y = 5 - sin(X/50) + noise | |
| y[drift_end:] = 5 - np.sin(X[drift_end:] / 50) + np.random.normal(0, 0.3, n_samples - drift_end) | |
| drift_points = np.array([drift_start, drift_end]) | |
| return X, y, drift_points | |
| def generate_incremental_drift(n_samples: int = 1000, n_steps: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """μ¦λΆμ λ리ννΈ: κ³λ¨μμΌλ‘ μμ λ³νκ° λμ """ | |
| X = np.arange(n_samples) | |
| y = np.zeros(n_samples) | |
| step_size = n_samples // (n_steps + 1) | |
| drift_points = [] | |
| for step in range(n_steps + 1): | |
| start_idx = step * step_size | |
| end_idx = (step + 1) * step_size if step < n_steps else n_samples | |
| # κ° λ¨κ³λ§λ€ νκ· μ΄ μ‘°κΈμ© λ³ν | |
| mean_shift = 2 + (step / n_steps) * 3 # 2μμ 5λ‘ μ μ§μ λ³ν | |
| y[start_idx:end_idx] = mean_shift + np.sin(X[start_idx:end_idx] / 50) + np.random.normal(0, 0.3, end_idx - start_idx) | |
| if step > 0: | |
| drift_points.append(start_idx) | |
| return X, y, np.array(drift_points) | |
| def generate_recurring_drift(n_samples: int = 1000, cycle_length: int = 250) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """λ°λ³΅μ λ리ννΈ: μ΄μ λΆν¬κ° μ£ΌκΈ°μ μΌλ‘ μ¬λ±μ₯""" | |
| X = np.arange(n_samples) | |
| y = np.zeros(n_samples) | |
| drift_points = [] | |
| for i in range(n_samples): | |
| cycle_pos = i % cycle_length | |
| if cycle_pos < cycle_length // 2: | |
| # Concept A: y = 2 + sin(X/50) + noise | |
| y[i] = 2 + np.sin(X[i] / 50) + np.random.normal(0, 0.3) | |
| else: | |
| # Concept B: y = 5 - sin(X/50) + noise | |
| y[i] = 5 - np.sin(X[i] / 50) + np.random.normal(0, 0.3) | |
| if cycle_pos == cycle_length // 2: | |
| drift_points.append(i) | |
| return X, y, np.array(drift_points) | |
| def get_drift_description(drift_type: str) -> str: | |
| """λ리ννΈ μ νλ³ μ€λͺ λ°ν""" | |
| descriptions = { | |
| "sudden": "κΈκ²©ν λ리ννΈ: νΉμ μμ μμ λ°μ΄ν° λΆν¬κ° κ°μκΈ° λ³κ²½λ©λλ€. μ: ν¬λ°λ―Ή, μ μ± λ³κ²½ λ±", | |
| "gradual": "μ μ§μ λ리ννΈ: μ΄μ λΆν¬μ μ λΆν¬κ° μμ΄λ©° μ²μ²ν μ νλ©λλ€. μ ν κΈ°κ° λμ λ 컨μ μ΄ κ³΅μ‘΄ν©λλ€.", | |
| "incremental": "μ¦λΆμ λ리ννΈ: μμ λ¨κ³λ‘ λ³νκ° λ°μνμ¬ κ³λ¨μ ν¨ν΄μ νμ±ν©λλ€.", | |
| "recurring": "λ°λ³΅μ λ리ννΈ: μ΄μ λΆν¬κ° μ£ΌκΈ°μ μΌλ‘ λ€μ λνλ©λλ€. κ³μ μ±μ΄λ μ£ΌκΈ°μ ν¨ν΄μμ λ°μν©λλ€." | |
| } | |
| return descriptions.get(drift_type, "μ μ μλ λ리ννΈ μ ν") | |