import pandas as pd import numpy as np from datetime import timedelta from typing import List from src.utils import logger def ar1_process(n: int, mean: float, std: float, phi: float = 0.8, seed_offset: int = 0) -> np.ndarray: """Generate AR(1) time-series. Args: n (int): Number of samples. mean (float): Mean. std (float): Standard deviation. phi (float): Autocorrelation coefficient. seed_offset (int): Seed offset for reproducibility. Returns: np.ndarray: Generated series. """ np.random.seed(42 + seed_offset) x = np.zeros(n) x[0] = np.random.normal(mean, std) for t in range(1, n): x[t] = phi * x[t-1] + np.random.normal(0, std * np.sqrt(1 - phi**2)) return np.clip(x, 0, None) def generate_data() -> None: """Generate synthetic RPC metrics for all nodes.""" try: np.random.seed(42) n_samples = 1000 start_time = pd.Timestamp("2025-10-01 00:00:00") timestamps = pd.date_range(start_time, periods=n_samples, freq="1min") nodes = ["agave1", "agave2", "firedancer1", "firedancer2"] node_params = { "agave1": {"cpu_mean": 45, "cpu_var": 8, "latency_mean": 60, "latency_var": 10, "error_var": 0.005}, "agave2": {"cpu_mean": 48, "cpu_var": 9, "latency_mean": 65, "latency_var": 12, "error_var": 0.006}, "firedancer1": {"cpu_mean": 55, "cpu_var": 15, "latency_mean": 50, "latency_var": 20, "error_var": 0.01}, "firedancer2": {"cpu_mean": 52, "cpu_var": 12, "latency_mean": 55, "latency_var": 18, "error_var": 0.009} } phi = 0.8 n_ramps_per_node = 8 ramp_length = 20 all_data: List[pd.DataFrame] = [] for node in nodes: params = node_params[node] cpu_base = ar1_process(n_samples, params["cpu_mean"], params["cpu_var"], phi, nodes.index(node) * 10) latency_base = ar1_process(n_samples, params["latency_mean"], params["latency_var"], phi, nodes.index(node) * 20) error_base = np.abs(ar1_process(n_samples, 0.02, params["error_var"], phi, nodes.index(node) * 30)) mem_base = ar1_process(n_samples, 50, 10, phi, nodes.index(node) * 40) disk_base = ar1_process(n_samples, 40, 12, phi, nodes.index(node) * 50) block_gap_base = np.random.choice([0, 1], size=n_samples, p=[0.75, 0.25]) ramp_starts = sorted(np.random.choice(n_samples - ramp_length, n_ramps_per_node, replace=False)) cpu = cpu_base.copy() latency = latency_base.copy() error_rate = error_base.copy() mem = mem_base.copy() disk = disk_base.copy() block_gap = block_gap_base.copy() labels = np.zeros(n_samples, dtype=int) for start in ramp_starts: ramp_cpu = np.linspace(0, 45, ramp_length) ramp_latency = np.linspace(0, 250, ramp_length) ramp_error = np.linspace(0, 0.4, ramp_length) ramp_mem = np.linspace(0, 30, ramp_length) ramp_disk = np.linspace(0, 150, ramp_length) ramp_gap = np.full(ramp_length, 5) cpu[start:start+ramp_length] += ramp_cpu latency[start:start+ramp_length] += ramp_latency error_rate[start:start+ramp_length] += ramp_error mem[start:start+ramp_length] += ramp_mem disk[start:start+ramp_length] += ramp_disk block_gap[start:start+ramp_length] = np.maximum(block_gap[start:start+ramp_length], ramp_gap) labels[start:start+ramp_length] = 1 latency = 40 + cpu * 1.5 + error_rate * 200 + np.random.normal(0, 5, n_samples) latency = np.clip(latency, 20, 1000) node_data = [] for i, t in enumerate(timestamps): node_data.append([ t, node, round(cpu[i], 2), round(mem[i], 2), round(disk[i], 2), round(latency[i], 2), round(error_rate[i], 3), int(block_gap[i]), labels[i] ]) node_df = pd.DataFrame(node_data, columns=[ "timestamp", "node", "cpu_usage", "memory_usage", "disk_io", "rpc_latency_ms", "rpc_error_rate", "block_height_gap", "failure_imminent" ]) node_filename = f"data/raw/{node}_metrics.csv" node_df.to_csv(node_filename, index=False) all_data.append(node_df) combined_df = pd.concat(all_data, ignore_index=True) combined_df = combined_df.sort_values(['timestamp', 'node']).reset_index(drop=True) combined_df.to_csv("data/raw/synthetic_rpc_metrics_realistic.csv", index=False) logger.info("Synthetic data generated.") except Exception as e: logger.error(f"Error generating data: {e}") raise if __name__ == "__main__": generate_data()