File size: 5,448 Bytes
4a3a3c3
 
 
9a8390a
4a3a3c3
 
9a8390a
4a3a3c3
 
9a8390a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a3a3c3
 
9a8390a
4a3a3c3
9a8390a
 
 
 
4a3a3c3
9a8390a
 
 
 
 
 
 
4a3a3c3
 
 
9a8390a
 
 
 
 
 
 
4a3a3c3
 
 
 
 
9a8390a
4a3a3c3
 
 
9a8390a
 
 
 
 
 
 
4a3a3c3
 
 
 
 
 
 
 
 
9a8390a
 
4a3a3c3
 
 
 
9a8390a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a3a3c3
9a8390a
 
4a3a3c3
9a8390a
 
 
 
4a3a3c3
 
 
 
9a8390a
 
 
4a3a3c3
9a8390a
 
 
 
 
4a3a3c3
9a8390a
 
 
4a3a3c3
 
 
9a8390a
 
 
 
4a3a3c3
9a8390a
 
 
4a3a3c3
 
 
 
9a8390a
 
4a3a3c3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# components/statistical.py

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Union
from datetime import datetime

class StatisticalAnalyzer:
    """Statistical analysis component with datetime handling"""
    
    @staticmethod
    def preprocess_dataframe(df: pd.DataFrame) -> pd.DataFrame:
        """Preprocess dataframe to handle datetime columns"""
        df_numeric = df.copy()
        
        for column in df.columns:
            # Convert datetime columns to timestamps for numerical analysis
            if pd.api.types.is_datetime64_any_dtype(df[column]) or (
                isinstance(df[column].iloc[0], str) and 
                bool(datetime.strptime(df[column].iloc[0], '%Y-%m-%d'))
            ):
                try:
                    df_numeric[column] = pd.to_datetime(df[column]).astype(np.int64) // 10**9
                except:
                    # If conversion fails, exclude the column
                    df_numeric = df_numeric.drop(columns=[column])
                    
        return df_numeric
    
    @staticmethod
    def analyze_distribution(values: Union[List[float], np.ndarray]) -> Dict:
        """Analyze data distribution"""
        values = np.array(values)
        if not np.issubdtype(values.dtype, np.number):
            raise ValueError("Values must be numeric for distribution analysis")
            
        result = {
            "n_samples": len(values),
            "mean": float(np.mean(values)),
            "std": float(np.std(values)),
            "median": float(np.median(values)),
            "quartiles": [float(np.percentile(values, q)) for q in [25, 50, 75]],
            "skewness": float(stats.skew(values)),
            "kurtosis": float(stats.kurtosis(values))
        }
        
        # Test for normality
        if len(values) >= 3:  # D'Agostino's K^2 test requires at least 3 samples
            statistic, p_value = stats.normaltest(values)
            result["normality_test"] = {
                "statistic": float(statistic),
                "p_value": float(p_value),
                "is_normal": p_value > 0.05
            }
        
        return result
    
    @staticmethod
    def calculate_confidence_interval(
        values: Union[List[float], np.ndarray],
        confidence: float = 0.95
    ) -> Dict:
        """Calculate confidence intervals"""
        values = np.array(values)
        if not np.issubdtype(values.dtype, np.number):
            raise ValueError("Values must be numeric for confidence interval calculation")
            
        mean = np.mean(values)
        std_err = stats.sem(values)
        ci = stats.t.interval(confidence, len(values)-1, loc=mean, scale=std_err)
        
        return {
            "mean": float(mean),
            "ci_lower": float(ci[0]),
            "ci_upper": float(ci[1]),
            "confidence": confidence
        }
    
    def forecast_probability_cone(
        self,
        values: Union[List[float], np.ndarray],
        steps: int = 10,
        confidence: float = 0.95
    ) -> Dict:
        """Generate probability cone forecast"""
        values = np.array(values)
        if not np.issubdtype(values.dtype, np.number):
            raise ValueError("Values must be numeric for forecasting")
            
        # Use exponential smoothing for trend
        alpha = 0.3
        smoothed = []
        s = values[0]
        for value in values:
            s = alpha * value + (1-alpha) * s
            smoothed.append(s)
            
        # Calculate errors for confidence intervals
        errors = values - np.array(smoothed)
        std_err = np.std(errors)
        t_value = stats.t.ppf((1 + confidence) / 2, len(values) - 1)
        
        # Generate forecast
        last_smoothed = smoothed[-1]
        time_points = list(range(steps))
        forecast = [last_smoothed] * steps
        
        # Expanding confidence intervals
        errors = [t_value * std_err * np.sqrt(1 + i/len(values)) 
                 for i in range(steps)]
        
        return {
            "time": time_points,
            "mean": [float(x) for x in forecast],
            "lower": [float(f - e) for f, e in zip(forecast, errors)],
            "upper": [float(f + e) for f, e in zip(forecast, errors)]
        }
    
    def analyze_correlations(self, df: pd.DataFrame) -> Dict:
        """Analyze correlations between numeric variables"""
        # Preprocess to handle datetime columns
        df_numeric = self.preprocess_dataframe(df)
        
        # Calculate correlations only for numeric columns
        numeric_cols = df_numeric.select_dtypes(include=[np.number]).columns
        corr_matrix = df_numeric[numeric_cols].corr()
        
        # Find significant correlations
        significant = []
        for i in range(len(numeric_cols)):
            for j in range(i+1, len(numeric_cols)):
                corr = corr_matrix.iloc[i,j]
                if abs(corr) > 0.5:  # Threshold for significant correlation
                    significant.append({
                        "var1": numeric_cols[i],
                        "var2": numeric_cols[j],
                        "correlation": float(corr)
                    })
        
        return {
            "correlation_matrix": corr_matrix.to_dict(),
            "significant_correlations": significant,
            "numeric_columns": list(numeric_cols)
        }