File size: 5,156 Bytes
01ca3ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import pandas as pd
import numpy as np
from datetime import timedelta
from typing import List
from src.utils import logger

def ar1_process(n: int, mean: float, std: float, phi: float = 0.8, seed_offset: int = 0) -> np.ndarray:
    """Generate AR(1) time-series.
    
    Args:
        n (int): Number of samples.
        mean (float): Mean.
        std (float): Standard deviation.
        phi (float): Autocorrelation coefficient.
        seed_offset (int): Seed offset for reproducibility.
    
    Returns:
        np.ndarray: Generated series.
    """
    np.random.seed(42 + seed_offset)
    x = np.zeros(n)
    x[0] = np.random.normal(mean, std)
    for t in range(1, n):
        x[t] = phi * x[t-1] + np.random.normal(0, std * np.sqrt(1 - phi**2))
    return np.clip(x, 0, None)

def generate_data() -> None:
    """Generate synthetic RPC metrics for all nodes."""
    try:
        np.random.seed(42)
        
        n_samples = 1000
        start_time = pd.Timestamp("2025-10-01 00:00:00")
        timestamps = pd.date_range(start_time, periods=n_samples, freq="1min")
        nodes = ["agave1", "agave2", "firedancer1", "firedancer2"]
        
        node_params = {
            "agave1": {"cpu_mean": 45, "cpu_var": 8, "latency_mean": 60, "latency_var": 10, "error_var": 0.005},
            "agave2": {"cpu_mean": 48, "cpu_var": 9, "latency_mean": 65, "latency_var": 12, "error_var": 0.006},
            "firedancer1": {"cpu_mean": 55, "cpu_var": 15, "latency_mean": 50, "latency_var": 20, "error_var": 0.01},
            "firedancer2": {"cpu_mean": 52, "cpu_var": 12, "latency_mean": 55, "latency_var": 18, "error_var": 0.009}
        }
        
        phi = 0.8
        n_ramps_per_node = 8
        ramp_length = 20
        all_data: List[pd.DataFrame] = []
        
        for node in nodes:
            params = node_params[node]
            cpu_base = ar1_process(n_samples, params["cpu_mean"], params["cpu_var"], phi, nodes.index(node) * 10)
            latency_base = ar1_process(n_samples, params["latency_mean"], params["latency_var"], phi, nodes.index(node) * 20)
            error_base = np.abs(ar1_process(n_samples, 0.02, params["error_var"], phi, nodes.index(node) * 30))
            mem_base = ar1_process(n_samples, 50, 10, phi, nodes.index(node) * 40)
            disk_base = ar1_process(n_samples, 40, 12, phi, nodes.index(node) * 50)
            block_gap_base = np.random.choice([0, 1], size=n_samples, p=[0.75, 0.25])
            
            ramp_starts = sorted(np.random.choice(n_samples - ramp_length, n_ramps_per_node, replace=False))
            cpu = cpu_base.copy()
            latency = latency_base.copy()
            error_rate = error_base.copy()
            mem = mem_base.copy()
            disk = disk_base.copy()
            block_gap = block_gap_base.copy()
            labels = np.zeros(n_samples, dtype=int)
            
            for start in ramp_starts:
                ramp_cpu = np.linspace(0, 45, ramp_length)
                ramp_latency = np.linspace(0, 250, ramp_length)
                ramp_error = np.linspace(0, 0.4, ramp_length)
                ramp_mem = np.linspace(0, 30, ramp_length)
                ramp_disk = np.linspace(0, 150, ramp_length)
                ramp_gap = np.full(ramp_length, 5)
                
                cpu[start:start+ramp_length] += ramp_cpu
                latency[start:start+ramp_length] += ramp_latency
                error_rate[start:start+ramp_length] += ramp_error
                mem[start:start+ramp_length] += ramp_mem
                disk[start:start+ramp_length] += ramp_disk
                block_gap[start:start+ramp_length] = np.maximum(block_gap[start:start+ramp_length], ramp_gap)
                
                labels[start:start+ramp_length] = 1
            
            latency = 40 + cpu * 1.5 + error_rate * 200 + np.random.normal(0, 5, n_samples)
            latency = np.clip(latency, 20, 1000)
            
            node_data = []
            for i, t in enumerate(timestamps):
                node_data.append([
                    t, node, 
                    round(cpu[i], 2), round(mem[i], 2), round(disk[i], 2), 
                    round(latency[i], 2), round(error_rate[i], 3), 
                    int(block_gap[i]), labels[i]
                ])
            
            node_df = pd.DataFrame(node_data, columns=[
                "timestamp", "node", "cpu_usage", "memory_usage", "disk_io",
                "rpc_latency_ms", "rpc_error_rate", "block_height_gap", "failure_imminent"
            ])
            
            node_filename = f"data/raw/{node}_metrics.csv"
            node_df.to_csv(node_filename, index=False)
            
            all_data.append(node_df)
        
        combined_df = pd.concat(all_data, ignore_index=True)
        combined_df = combined_df.sort_values(['timestamp', 'node']).reset_index(drop=True)
        combined_df.to_csv("data/raw/synthetic_rpc_metrics_realistic.csv", index=False)
        logger.info("Synthetic data generated.")
    except Exception as e:
        logger.error(f"Error generating data: {e}")
        raise

if __name__ == "__main__":
    generate_data()