| from __future__ import annotations
|
|
|
| import csv
|
| import math
|
| import re
|
| from dataclasses import dataclass
|
|
|
| from app.config import RAW_DIR
|
|
|
|
|
| TECHNICAL_QUERY_PATTERN = re.compile(
|
| r"\b(ptkt|kỹ thuật|ky thuat|rsi|macd|bollinger|ma\d*|sma|ema|"
|
| r"đường trung bình|duong trung binh|hỗ trợ|ho tro|kháng cự|khang cu|"
|
| r"xu hướng|xu huong|chỉ báo|chi bao)\b",
|
| flags=re.IGNORECASE,
|
| )
|
|
|
|
|
| @dataclass(frozen=True)
|
| class OHLCVRow:
|
| date: str
|
| open: float
|
| high: float
|
| low: float
|
| close: float
|
| volume: float
|
|
|
|
|
| def is_technical_query(question: str) -> bool:
|
| return bool(TECHNICAL_QUERY_PATTERN.search(question))
|
|
|
|
|
| def parse_number(value: str) -> float | None:
|
| cleaned = re.sub(r"[^\d,.\-]", "", value or "")
|
| if not cleaned:
|
| return None
|
| if "," in cleaned and "." in cleaned:
|
| cleaned = cleaned.replace(",", "")
|
| elif "," in cleaned:
|
| cleaned = cleaned.replace(",", "")
|
| try:
|
| return float(cleaned)
|
| except ValueError:
|
| return None
|
|
|
|
|
| def normalize_header(header: str) -> str:
|
| lowered = header.strip().lower()
|
| mapping = {
|
| "ngày": "date",
|
| "date": "date",
|
| "time": "date",
|
| "open": "open",
|
| "mở cửa": "open",
|
| "high": "high",
|
| "cao nhất": "high",
|
| "low": "low",
|
| "thấp nhất": "low",
|
| "close": "close",
|
| "đóng cửa": "close",
|
| "giá": "close",
|
| "volume": "volume",
|
| "kl": "volume",
|
| "klgd": "volume",
|
| }
|
| return mapping.get(lowered, lowered)
|
|
|
|
|
| def load_ohlcv_rows(ticker: str) -> list[OHLCVRow]:
|
| rows: list[OHLCVRow] = []
|
| csv_dir = RAW_DIR / "csv" / ticker.upper()
|
| if not csv_dir.exists():
|
| return rows
|
|
|
| for path in csv_dir.glob("*.csv"):
|
| with path.open("r", encoding="utf-8-sig", newline="") as handle:
|
| reader = csv.DictReader(handle)
|
| if not reader.fieldnames:
|
| continue
|
| field_map = {field: normalize_header(field) for field in reader.fieldnames}
|
| normalized_fields = set(field_map.values())
|
| required = {"date", "open", "high", "low", "close", "volume"}
|
| if not required.issubset(normalized_fields):
|
| continue
|
|
|
| for row in reader:
|
| normalized = {field_map[key]: value for key, value in row.items() if key in field_map}
|
| parsed = {
|
| key: parse_number(normalized.get(key, ""))
|
| for key in ["open", "high", "low", "close", "volume"]
|
| }
|
| if any(value is None for value in parsed.values()):
|
| continue
|
| rows.append(
|
| OHLCVRow(
|
| date=str(normalized.get("date", "")),
|
| open=float(parsed["open"] or 0),
|
| high=float(parsed["high"] or 0),
|
| low=float(parsed["low"] or 0),
|
| close=float(parsed["close"] or 0),
|
| volume=float(parsed["volume"] or 0),
|
| )
|
| )
|
| return rows
|
|
|
|
|
| def simple_moving_average(values: list[float], window: int) -> float | None:
|
| if len(values) < window:
|
| return None
|
| return sum(values[-window:]) / window
|
|
|
|
|
| def rsi(values: list[float], window: int = 14) -> float | None:
|
| if len(values) <= window:
|
| return None
|
| gains: list[float] = []
|
| losses: list[float] = []
|
| for previous, current in zip(values[-window - 1 : -1], values[-window:]):
|
| delta = current - previous
|
| gains.append(max(delta, 0))
|
| losses.append(abs(min(delta, 0)))
|
| average_gain = sum(gains) / window
|
| average_loss = sum(losses) / window
|
| if average_loss == 0:
|
| return 100.0
|
| rs = average_gain / average_loss
|
| return 100 - (100 / (1 + rs))
|
|
|
|
|
| def ema_series(values: list[float], window: int) -> list[float]:
|
| if not values:
|
| return []
|
| multiplier = 2 / (window + 1)
|
| ema_values = [values[0]]
|
| for value in values[1:]:
|
| ema_values.append((value - ema_values[-1]) * multiplier + ema_values[-1])
|
| return ema_values
|
|
|
|
|
| def macd(values: list[float]) -> tuple[float, float, float] | None:
|
| if len(values) < 35:
|
| return None
|
| ema12 = ema_series(values, 12)
|
| ema26 = ema_series(values, 26)
|
| macd_line = [short - long for short, long in zip(ema12[-len(ema26) :], ema26)]
|
| signal_line = ema_series(macd_line, 9)
|
| histogram = macd_line[-1] - signal_line[-1]
|
| return macd_line[-1], signal_line[-1], histogram
|
|
|
|
|
| def bollinger(values: list[float], window: int = 20) -> tuple[float, float, float] | None:
|
| if len(values) < window:
|
| return None
|
| recent = values[-window:]
|
| middle = sum(recent) / window
|
| variance = sum((value - middle) ** 2 for value in recent) / window
|
| std = math.sqrt(variance)
|
| return middle - 2 * std, middle, middle + 2 * std
|
|
|
|
|
| def latest_snapshot_context(ticker: str) -> str:
|
| path = RAW_DIR / "csv" / ticker.upper() / "stock_overview_timeseries.csv"
|
| if not path.exists():
|
| return ""
|
| with path.open("r", encoding="utf-8-sig", newline="") as handle:
|
| rows = list(csv.DictReader(handle))
|
| if not rows:
|
| return ""
|
| row = rows[-1]
|
| fields = [
|
| "date",
|
| "price",
|
| "change",
|
| "change_percent",
|
| "volume",
|
| "day_high",
|
| "day_low",
|
| "reference_price",
|
| "foreign_buy_volume",
|
| "foreign_sell_volume",
|
| "bid_1_price",
|
| "offer_1_price",
|
| ]
|
| lines = [f"{field}: {row.get(field, '')}" for field in fields if row.get(field)]
|
| return "\n".join(lines)
|
|
|
|
|
| def build_technical_context(ticker: str | None) -> str:
|
| if not ticker:
|
| return ""
|
|
|
| ticker = ticker.upper()
|
| rows = load_ohlcv_rows(ticker)
|
| snapshot = latest_snapshot_context(ticker)
|
| lines = [f"Technical analysis data for {ticker}:"]
|
|
|
| if snapshot:
|
| lines.append("Current intraday snapshot:")
|
| lines.append(snapshot)
|
|
|
| if not rows:
|
| lines.append(
|
| "No historical OHLCV file with date/open/high/low/close/volume columns was found. "
|
| "RSI, MACD, moving averages and Bollinger Bands cannot be computed reliably from the current raw data."
|
| )
|
| lines.append(
|
| "The crawled 24HMoney technical page appears to expose only locked/summary content, "
|
| "not concrete indicator values."
|
| )
|
| return "\n".join(lines)
|
|
|
| closes = [row.close for row in rows]
|
| latest = rows[-1]
|
| lines.append(f"Latest OHLCV: {latest}")
|
| for window in [20, 50, 200]:
|
| value = simple_moving_average(closes, window)
|
| if value is not None:
|
| lines.append(f"SMA{window}: {value:.2f}")
|
| rsi14 = rsi(closes)
|
| if rsi14 is not None:
|
| lines.append(f"RSI14: {rsi14:.2f}")
|
| macd_values = macd(closes)
|
| if macd_values is not None:
|
| macd_line, signal_line, histogram = macd_values
|
| lines.append(
|
| f"MACD: line={macd_line:.2f}, signal={signal_line:.2f}, histogram={histogram:.2f}"
|
| )
|
| bands = bollinger(closes)
|
| if bands is not None:
|
| lower, middle, upper = bands
|
| lines.append(f"Bollinger(20,2): lower={lower:.2f}, middle={middle:.2f}, upper={upper:.2f}")
|
| return "\n".join(lines)
|
|
|