File size: 2,492 Bytes
3e3dc68
 
 
 
 
 
 
 
 
 
 
 
 
befeb85
 
 
 
 
3e3dc68
 
 
 
 
 
 
 
 
 
befeb85
3e3dc68
befeb85
 
 
 
 
3e3dc68
befeb85
 
 
 
 
 
 
 
 
 
 
 
3e3dc68
 
 
 
 
 
 
 
 
7ab1194
 
3e3dc68
 
 
7ab1194
3e3dc68
 
 
 
befeb85
3e3dc68
 
 
befeb85
 
7ab1194
3e3dc68
befeb85
3e3dc68
 
 
befeb85
3e3dc68
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np
from typing import Dict, Tuple

def analyze_drift(X: np.ndarray, y: np.ndarray, drift_points: np.ndarray, drift_type: str) -> Dict[str, any]:
    """๋“œ๋ฆฌํ”„ํŠธ ๋ฐ์ดํ„ฐ ๋ถ„์„"""

    analysis = {
        "drift_type": drift_type,
        "total_samples": len(X),
        "num_drift_points": len(drift_points),
        "drift_locations": drift_points.tolist() if len(drift_points) > 0 else [],
    }

    # ์ „์ฒด ํ†ต๊ณ„ (๋ชจ๋“  drift type์„ ์—ฐ์† ๊ฐ’์œผ๋กœ ์ฒ˜๋ฆฌ)
    analysis["mean_y"] = float(np.mean(y))
    analysis["std_y"] = float(np.std(y))
    analysis["min_y"] = float(np.min(y))
    analysis["max_y"] = float(np.max(y))

    # ์„ธ๊ทธ๋จผํŠธ๋ณ„ ๋ถ„์„
    segments = []
    segment_boundaries = [0] + drift_points.tolist() + [len(X)]

    for i in range(len(segment_boundaries) - 1):
        start = segment_boundaries[i]
        end = segment_boundaries[i + 1]

        segment_y = y[start:end]
        segment_X = X[start:end]

        # ์„ ํ˜• ํšŒ๊ท€ ๊ณ„์ˆ˜ ๊ณ„์‚ฐ
        if len(segment_X) > 1:
            coeffs = np.polyfit(segment_X, segment_y, 1)
            slope = float(coeffs[0])
            intercept = float(coeffs[1])
        else:
            slope = 0.0
            intercept = float(segment_y[0]) if len(segment_y) > 0 else 0.0

        segments.append({
            "segment_id": i,
            "start_idx": int(start),
            "end_idx": int(end),
            "mean": float(np.mean(segment_y)),
            "std": float(np.std(segment_y)),
            "slope": slope,
            "intercept": intercept
        })

    analysis["segments"] = segments

    return analysis


def format_analysis_summary(analysis: Dict) -> str:
    """๋ถ„์„ ๊ฒฐ๊ณผ๋ฅผ ์‚ฌ๋žŒ์ด ์ฝ๊ธฐ ์‰ฌ์šด ํ˜•์‹์œผ๋กœ ํฌ๋งท"""

    drift_type = analysis['drift_type']

    summary = f"""
## ๋“œ๋ฆฌํ”„ํŠธ ๋ถ„์„ ๊ฒฐ๊ณผ

**๋“œ๋ฆฌํ”„ํŠธ ์œ ํ˜•:** {drift_type.upper()}

**์ „์ฒด ๋ฐ์ดํ„ฐ:**
- ์ด ์ƒ˜ํ”Œ ์ˆ˜: {analysis['total_samples']}
- ๋“œ๋ฆฌํ”„ํŠธ ๋ฐœ์ƒ ํšŸ์ˆ˜: {analysis['num_drift_points']}
- ํ‰๊ท  ๊ฐ’: {analysis['mean_y']:.2f}
- ํ‘œ์ค€ํŽธ์ฐจ: {analysis['std_y']:.2f}
- ๋ฒ”์œ„: [{analysis['min_y']:.2f}, {analysis['max_y']:.2f}]

**์„ธ๊ทธ๋จผํŠธ๋ณ„ ๋ถ„์„:**
"""

    for seg in analysis['segments']:
        summary += f"""
**์„ธ๊ทธ๋จผํŠธ {seg['segment_id'] + 1}** (์ƒ˜ํ”Œ {seg['start_idx']}-{seg['end_idx']})
- ํ‰๊ท : {seg['mean']:.2f}
- ํ‘œ์ค€ํŽธ์ฐจ: {seg['std']:.2f}
- ํŠธ๋ Œ๋“œ: y = {seg['slope']:.4f}x + {seg['intercept']:.2f}
"""

    return summary