File size: 7,601 Bytes
34b531b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from __future__ import annotations

import csv
import math
import re
from dataclasses import dataclass

from app.config import RAW_DIR


TECHNICAL_QUERY_PATTERN = re.compile(
    r"\b(ptkt|kỹ thuật|ky thuat|rsi|macd|bollinger|ma\d*|sma|ema|"
    r"đường trung bình|duong trung binh|hỗ trợ|ho tro|kháng cự|khang cu|"
    r"xu hướng|xu huong|chỉ báo|chi bao)\b",
    flags=re.IGNORECASE,
)


@dataclass(frozen=True)
class OHLCVRow:
    date: str
    open: float
    high: float
    low: float
    close: float
    volume: float


def is_technical_query(question: str) -> bool:
    return bool(TECHNICAL_QUERY_PATTERN.search(question))


def parse_number(value: str) -> float | None:
    cleaned = re.sub(r"[^\d,.\-]", "", value or "")
    if not cleaned:
        return None
    if "," in cleaned and "." in cleaned:
        cleaned = cleaned.replace(",", "")
    elif "," in cleaned:
        cleaned = cleaned.replace(",", "")
    try:
        return float(cleaned)
    except ValueError:
        return None


def normalize_header(header: str) -> str:
    lowered = header.strip().lower()
    mapping = {
        "ngày": "date",
        "date": "date",
        "time": "date",
        "open": "open",
        "mở cửa": "open",
        "high": "high",
        "cao nhất": "high",
        "low": "low",
        "thấp nhất": "low",
        "close": "close",
        "đóng cửa": "close",
        "giá": "close",
        "volume": "volume",
        "kl": "volume",
        "klgd": "volume",
    }
    return mapping.get(lowered, lowered)


def load_ohlcv_rows(ticker: str) -> list[OHLCVRow]:
    rows: list[OHLCVRow] = []
    csv_dir = RAW_DIR / "csv" / ticker.upper()
    if not csv_dir.exists():
        return rows

    for path in csv_dir.glob("*.csv"):
        with path.open("r", encoding="utf-8-sig", newline="") as handle:
            reader = csv.DictReader(handle)
            if not reader.fieldnames:
                continue
            field_map = {field: normalize_header(field) for field in reader.fieldnames}
            normalized_fields = set(field_map.values())
            required = {"date", "open", "high", "low", "close", "volume"}
            if not required.issubset(normalized_fields):
                continue

            for row in reader:
                normalized = {field_map[key]: value for key, value in row.items() if key in field_map}
                parsed = {
                    key: parse_number(normalized.get(key, ""))
                    for key in ["open", "high", "low", "close", "volume"]
                }
                if any(value is None for value in parsed.values()):
                    continue
                rows.append(
                    OHLCVRow(
                        date=str(normalized.get("date", "")),
                        open=float(parsed["open"] or 0),
                        high=float(parsed["high"] or 0),
                        low=float(parsed["low"] or 0),
                        close=float(parsed["close"] or 0),
                        volume=float(parsed["volume"] or 0),
                    )
                )
    return rows


def simple_moving_average(values: list[float], window: int) -> float | None:
    if len(values) < window:
        return None
    return sum(values[-window:]) / window


def rsi(values: list[float], window: int = 14) -> float | None:
    if len(values) <= window:
        return None
    gains: list[float] = []
    losses: list[float] = []
    for previous, current in zip(values[-window - 1 : -1], values[-window:]):
        delta = current - previous
        gains.append(max(delta, 0))
        losses.append(abs(min(delta, 0)))
    average_gain = sum(gains) / window
    average_loss = sum(losses) / window
    if average_loss == 0:
        return 100.0
    rs = average_gain / average_loss
    return 100 - (100 / (1 + rs))


def ema_series(values: list[float], window: int) -> list[float]:
    if not values:
        return []
    multiplier = 2 / (window + 1)
    ema_values = [values[0]]
    for value in values[1:]:
        ema_values.append((value - ema_values[-1]) * multiplier + ema_values[-1])
    return ema_values


def macd(values: list[float]) -> tuple[float, float, float] | None:
    if len(values) < 35:
        return None
    ema12 = ema_series(values, 12)
    ema26 = ema_series(values, 26)
    macd_line = [short - long for short, long in zip(ema12[-len(ema26) :], ema26)]
    signal_line = ema_series(macd_line, 9)
    histogram = macd_line[-1] - signal_line[-1]
    return macd_line[-1], signal_line[-1], histogram


def bollinger(values: list[float], window: int = 20) -> tuple[float, float, float] | None:
    if len(values) < window:
        return None
    recent = values[-window:]
    middle = sum(recent) / window
    variance = sum((value - middle) ** 2 for value in recent) / window
    std = math.sqrt(variance)
    return middle - 2 * std, middle, middle + 2 * std


def latest_snapshot_context(ticker: str) -> str:
    path = RAW_DIR / "csv" / ticker.upper() / "stock_overview_timeseries.csv"
    if not path.exists():
        return ""
    with path.open("r", encoding="utf-8-sig", newline="") as handle:
        rows = list(csv.DictReader(handle))
    if not rows:
        return ""
    row = rows[-1]
    fields = [
        "date",
        "price",
        "change",
        "change_percent",
        "volume",
        "day_high",
        "day_low",
        "reference_price",
        "foreign_buy_volume",
        "foreign_sell_volume",
        "bid_1_price",
        "offer_1_price",
    ]
    lines = [f"{field}: {row.get(field, '')}" for field in fields if row.get(field)]
    return "\n".join(lines)


def build_technical_context(ticker: str | None) -> str:
    if not ticker:
        return ""

    ticker = ticker.upper()
    rows = load_ohlcv_rows(ticker)
    snapshot = latest_snapshot_context(ticker)
    lines = [f"Technical analysis data for {ticker}:"]

    if snapshot:
        lines.append("Current intraday snapshot:")
        lines.append(snapshot)

    if not rows:
        lines.append(
            "No historical OHLCV file with date/open/high/low/close/volume columns was found. "
            "RSI, MACD, moving averages and Bollinger Bands cannot be computed reliably from the current raw data."
        )
        lines.append(
            "The crawled 24HMoney technical page appears to expose only locked/summary content, "
            "not concrete indicator values."
        )
        return "\n".join(lines)

    closes = [row.close for row in rows]
    latest = rows[-1]
    lines.append(f"Latest OHLCV: {latest}")
    for window in [20, 50, 200]:
        value = simple_moving_average(closes, window)
        if value is not None:
            lines.append(f"SMA{window}: {value:.2f}")
    rsi14 = rsi(closes)
    if rsi14 is not None:
        lines.append(f"RSI14: {rsi14:.2f}")
    macd_values = macd(closes)
    if macd_values is not None:
        macd_line, signal_line, histogram = macd_values
        lines.append(
            f"MACD: line={macd_line:.2f}, signal={signal_line:.2f}, histogram={histogram:.2f}"
        )
    bands = bollinger(closes)
    if bands is not None:
        lower, middle, upper = bands
        lines.append(f"Bollinger(20,2): lower={lower:.2f}, middle={middle:.2f}, upper={upper:.2f}")
    return "\n".join(lines)