File size: 15,222 Bytes
9e5fa5b
44c2214
 
 
 
 
 
 
9e5fa5b
 
 
44c2214
 
9e5fa5b
 
1f55553
9e5fa5b
 
 
1f55553
 
 
375c179
1f55553
 
44c2214
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e5fa5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1f55553
9e5fa5b
 
 
 
 
 
 
1f55553
9e5fa5b
 
1f55553
9e5fa5b
 
 
 
 
 
 
 
 
 
 
 
 
1f55553
44c2214
 
9e5fa5b
 
 
 
 
 
 
44c2214
 
 
 
 
 
 
 
 
 
 
9e5fa5b
 
1f55553
 
375c179
44c2214
1f55553
 
 
 
 
375c179
 
1f55553
 
 
 
 
44c2214
 
 
 
1f55553
 
 
375c179
1f55553
375c179
 
 
 
 
 
 
1f55553
 
9e5fa5b
 
 
44c2214
1f55553
44c2214
 
 
1f55553
 
 
 
 
 
 
 
 
 
 
 
 
 
9e5fa5b
 
 
375c179
1f55553
 
 
 
375c179
1f55553
 
44c2214
 
 
 
 
 
 
04e24fd
44c2214
 
 
 
 
 
 
04e24fd
44c2214
 
 
 
 
 
 
 
 
 
 
 
 
04e24fd
 
 
 
44c2214
 
 
 
 
 
 
 
04e24fd
 
 
 
 
 
44c2214
 
 
 
9e5fa5b
04e24fd
9e5fa5b
04e24fd
44c2214
04e24fd
 
9e5fa5b
44c2214
 
04e24fd
 
44c2214
 
 
9e5fa5b
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python3
"""Market Data Feeder β€” generates synthetic orders with realistic price dynamics.

Price evolution uses regime-based simulation (trending / mean-reverting / volatile)
so that technical indicators (SMA, EMA, MACD, RSI, Bollinger Bands) produce
meaningful, non-random signals for the RL and LLM trading strategies.
"""

import sys
sys.path.insert(0, "/app")

import json, math, time, random, os, threading
from collections import deque

from shared.config import Config
from shared.kafka_utils import create_producer, create_consumer

ORDER_INTERVAL = 60.0 / Config.ORDERS_PER_MIN

# Module-level state (shared with control listener thread)
_securities = {}
_running = True
_suspended = False


# ── Price dynamics engine ─────────────────────────────────────────────────────

class PriceDynamics:
    """Per-symbol state for regime-driven price simulation."""

    REGIMES = ("trending_up", "trending_down", "mean_revert", "volatile", "calm")

    def __init__(self, symbol: str, start_price: float):
        self.symbol = symbol
        self.start_price = start_price
        self.price = start_price
        self.regime = random.choice(self.REGIMES)
        self.regime_ticks = 0
        self.regime_duration = random.randint(20, 60)
        # Rolling history for indicator-aware order generation
        self.history = deque(maxlen=60)
        self.history.append(start_price)
        # Momentum / mean-reversion state
        self.momentum = 0.0
        self.volatility = start_price * 0.005  # ~0.5% base volatility

    def tick(self) -> float:
        """Advance one tick and return the new mid price."""
        tick_size = Config.TICK_SIZE
        self.regime_ticks += 1

        # Switch regime when duration expires
        if self.regime_ticks >= self.regime_duration:
            self._switch_regime()

        # Compute drift + noise based on regime
        if self.regime == "trending_up":
            drift = self.volatility * random.uniform(0.2, 0.8)
            noise = random.gauss(0, self.volatility * 0.5)
            self.momentum = max(self.momentum * 0.95 + drift * 0.05, 0)
        elif self.regime == "trending_down":
            drift = -self.volatility * random.uniform(0.2, 0.8)
            noise = random.gauss(0, self.volatility * 0.5)
            self.momentum = min(self.momentum * 0.95 + drift * 0.05, 0)
        elif self.regime == "mean_revert":
            # Pull toward start price
            pull = (self.start_price - self.price) * 0.03
            drift = pull
            noise = random.gauss(0, self.volatility * 0.3)
            self.momentum *= 0.8
        elif self.regime == "volatile":
            drift = random.gauss(0, self.volatility * 0.5)
            noise = random.gauss(0, self.volatility * 1.5)
            self.momentum *= 0.9
        else:  # calm
            drift = random.gauss(0, self.volatility * 0.1)
            noise = random.gauss(0, self.volatility * 0.2)
            self.momentum *= 0.95

        delta = drift + noise + self.momentum
        # Quantize to tick size
        delta = round(delta / tick_size) * tick_size
        new_price = self.price + delta

        # Floor at 1.00
        new_price = max(1.00, round(new_price, 2))

        # Adaptive volatility: expand in volatile regime, contract in calm
        if self.regime == "volatile":
            self.volatility = min(self.volatility * 1.002, self.price * 0.02)
        elif self.regime == "calm":
            self.volatility = max(self.volatility * 0.998, self.price * 0.002)

        self.price = new_price
        self.history.append(new_price)
        return new_price

    def _switch_regime(self):
        """Transition to a new regime with weighted probabilities."""
        # Bias: if price far from start, favor mean reversion
        deviation = (self.price - self.start_price) / self.start_price
        if abs(deviation) > 0.15:
            weights = [0.05, 0.05, 0.50, 0.20, 0.20]
        elif abs(deviation) > 0.08:
            weights = [0.15, 0.15, 0.30, 0.20, 0.20]
        else:
            weights = [0.25, 0.25, 0.10, 0.15, 0.25]

        self.regime = random.choices(self.REGIMES, weights=weights)[0]
        self.regime_ticks = 0
        self.regime_duration = random.randint(15, 50)

    def sma(self, period: int) -> float:
        h = list(self.history)
        if len(h) < period:
            return sum(h) / len(h)
        return sum(h[-period:]) / period

    def ema(self, period: int) -> float:
        h = list(self.history)
        alpha = 2.0 / (period + 1)
        ema_val = h[0]
        for p in h[1:]:
            ema_val = alpha * p + (1 - alpha) * ema_val
        return ema_val

    def rsi(self, period: int = 14) -> float:
        h = list(self.history)
        if len(h) < 2:
            return 50.0
        gains, losses = [], []
        for i in range(1, len(h)):
            d = h[i] - h[i - 1]
            gains.append(max(d, 0))
            losses.append(max(-d, 0))
        n = min(period, len(gains))
        avg_gain = sum(gains[-n:]) / n if n > 0 else 0
        avg_loss = sum(losses[-n:]) / n if n > 0 else 0.001
        rs = avg_gain / avg_loss if avg_loss > 0 else 100
        return 100 - 100 / (1 + rs)

    def bollinger_position(self, period: int = 20) -> float:
        """Returns 0..1 position within Bollinger Bands (0.5 = at SMA)."""
        h = list(self.history)
        n = min(period, len(h))
        window = h[-n:]
        mean = sum(window) / n
        std = max((sum((x - mean) ** 2 for x in window) / n) ** 0.5, 0.01)
        upper = mean + 2 * std
        lower = mean - 2 * std
        if upper == lower:
            return 0.5
        return max(0.0, min(1.0, (self.price - lower) / (upper - lower)))

    def spread_and_depth(self) -> tuple:
        """Compute adaptive spread and depth quantities based on regime."""
        base_spread = 0.10
        if self.regime == "volatile":
            spread = base_spread * random.uniform(1.5, 2.5)
            depth_qty_range = (30, 100)
        elif self.regime == "calm":
            spread = base_spread * random.uniform(0.6, 1.0)
            depth_qty_range = (100, 300)
        elif self.regime in ("trending_up", "trending_down"):
            spread = base_spread * random.uniform(0.8, 1.5)
            depth_qty_range = (50, 200)
        else:
            spread = base_spread
            depth_qty_range = (80, 250)
        return round(spread, 2), depth_qty_range


_dynamics: dict[str, PriceDynamics] = {}


# ── Securities I/O ────────────────────────────────────────────────────────────

def load_securities():
    """Load securities from file: SYMBOL start_price current_price"""
    securities = {}
    if not os.path.exists(Config.SECURITIES_FILE):
        raise FileNotFoundError(f"{Config.SECURITIES_FILE} not found")
    with open(Config.SECURITIES_FILE) as f:
        for line in f:
            if not line.strip() or line.startswith("#"):
                continue
            parts = line.split()
            if len(parts) >= 3:
                symbol, start, current = parts[0], float(parts[1]), float(parts[2])
                securities[symbol] = {"start": start, "current": current}
    return securities


def save_securities(securities):
    """Persist securities with header line"""
    with open(Config.SECURITIES_FILE, "w") as f:
        f.write("#SYMBOL\t<start_price>\t<current_price>\n")
        for sym, vals in securities.items():
            f.write(f"{sym}\t{vals['start']:.2f}\t{vals['current']:.2f}\n")


_order_counter = 0


def make_order(symbol, side, price, qty):
    global _order_counter
    _order_counter += 1
    return {
        "symbol": symbol,
        "side": side,
        "price": round(price, 2),
        "quantity": qty,
        "cl_ord_id": f"MDF-{int(time.time()*1000)}-{_order_counter}",
        "timestamp": time.time(),
        "source": "MDF"
    }


def make_snapshot(symbol, best_bid, best_ask, bid_size, ask_size, dyn: PriceDynamics):
    """Build snapshot with embedded technical indicators."""
    return {
        "symbol": symbol,
        "best_bid": round(best_bid, 2),
        "best_ask": round(best_ask, 2),
        "bid_size": bid_size,
        "ask_size": ask_size,
        "timestamp": time.time(),
        "source": "MDF",
        "indicators": {
            "sma_5":  round(dyn.sma(5), 4),
            "sma_20": round(dyn.sma(20), 4),
            "ema_12": round(dyn.ema(12), 4),
            "ema_26": round(dyn.ema(26), 4),
            "macd":   round(dyn.ema(12) - dyn.ema(26), 4),
            "rsi_14": round(dyn.rsi(14), 2),
            "bb_pos": round(dyn.bollinger_position(20), 4),
            "regime": dyn.regime,
        },
    }


def listen_control(ctrl_consumer):
    """Background thread: listen for start/stop/suspend/resume control messages."""
    global _running, _suspended, _securities, _dynamics
    print("[MDF] Control listener started")
    for msg in ctrl_consumer:
        action = (msg.value or {}).get("action")
        if action == "stop":
            _running = False
            _suspended = False
            print("[MDF] STOP signal received – simulation stopped")
        elif action == "start":
            try:
                new_secs = load_securities()
                _securities.clear()
                _securities.update(new_secs)
                # Re-init dynamics for new session
                _dynamics.clear()
                for sym, vals in _securities.items():
                    _dynamics[sym] = PriceDynamics(sym, vals["current"])
                print(f"[MDF] START signal – reloaded securities: {list(_securities.keys())}")
            except Exception as e:
                print(f"[MDF] Error reloading securities on start: {e}")
            _suspended = False
            _running = True
            print("[MDF] Simulation started")
        elif action == "suspend":
            _suspended = True
            print("[MDF] SUSPEND signal received – order generation paused")
        elif action == "resume":
            _suspended = False
            print("[MDF] RESUME signal received – order generation resumed")


if __name__ == "__main__":
    producer = create_producer(component_name="MDF")

    # Load securities and init dynamics
    _securities = load_securities()
    for sym, vals in _securities.items():
        vals["start"] = vals["current"]
        _dynamics[sym] = PriceDynamics(sym, vals["current"])
    save_securities(_securities)
    print(f"[MDF] Loaded securities: {list(_securities.keys())}")

    # Start control consumer in background thread
    try:
        ctrl_consumer = create_consumer(
            topics=[Config.CONTROL_TOPIC],
            group_id="md-feeder-control",
            component_name="MDF-Control",
            auto_offset_reset="latest",
        )
        threading.Thread(target=listen_control, args=(ctrl_consumer,), daemon=True).start()
    except Exception as e:
        print(f"[MDF] Warning: could not start control consumer: {e}")

    try:
        while True:
            if not _running or _suspended:
                time.sleep(0.5)
                continue

            for sym, vals in list(_securities.items()):
                if not _running or _suspended:
                    break

                dyn = _dynamics.get(sym)
                if not dyn:
                    continue

                # Advance price via regime engine
                mid = dyn.tick()
                vals["current"] = mid
                tick = Config.TICK_SIZE
                spread, depth_qty_range = dyn.spread_and_depth()
                half_spread = spread / 2

                # ── Indicator-aware order bias ────────────────────────────
                rsi = dyn.rsi(14)
                bb_pos = dyn.bollinger_position(20)
                macd = dyn.ema(12) - dyn.ema(26)

                # Bias aggressive side based on indicators
                if rsi > 70 and bb_pos > 0.85:
                    aggr_bias = "SELL"       # overbought β†’ sellers step in
                elif rsi < 30 and bb_pos < 0.15:
                    aggr_bias = "BUY"        # oversold β†’ buyers step in
                elif macd > 0 and dyn.regime == "trending_up":
                    aggr_bias = "BUY"
                elif macd < 0 and dyn.regime == "trending_down":
                    aggr_bias = "SELL"
                else:
                    aggr_bias = random.choice(["BUY", "SELL"])

                # ── Place resting depth on both sides ─────────────────────
                for depth_level in range(3):
                    offset = random.randint(1 + depth_level * 3, 3 + depth_level * 5) * tick
                    bid_price = round(mid - half_spread - offset, 2)
                    ask_price = round(mid + half_spread + offset, 2)
                    bid_qty = random.randint(*depth_qty_range)
                    ask_qty = random.randint(*depth_qty_range)

                    # In trending regimes, skew depth: thicker on the passive side
                    if dyn.regime == "trending_up" and depth_level == 0:
                        bid_qty = int(bid_qty * 1.5)
                    elif dyn.regime == "trending_down" and depth_level == 0:
                        ask_qty = int(ask_qty * 1.5)

                    bid_order = make_order(sym, "BUY", bid_price, bid_qty)
                    ask_order = make_order(sym, "SELL", ask_price, ask_qty)
                    producer.send(Config.ORDERS_TOPIC, bid_order)
                    producer.send(Config.ORDERS_TOPIC, ask_order)

                # ── Aggressive orders to generate trades (20-35%) ─────────
                aggr_prob = 0.35 if dyn.regime == "volatile" else 0.20
                if random.random() < aggr_prob:
                    side = aggr_bias
                    if side == "BUY":
                        price = round(mid + half_spread + random.randint(1, 3) * tick, 2)
                    else:
                        price = round(mid - half_spread - random.randint(1, 3) * tick, 2)
                    qty = random.randint(*depth_qty_range)
                    aggr_order = make_order(sym, side, price, qty)
                    producer.send(Config.ORDERS_TOPIC, aggr_order)

                # ── Persist and publish snapshot with indicators ───────────
                save_securities(_securities)
                best_bid = round(mid - half_spread, 2)
                best_ask = round(mid + half_spread, 2)
                bid_size = random.randint(100, 400)
                ask_size = random.randint(100, 400)
                snap = make_snapshot(sym, best_bid, best_ask, bid_size, ask_size, dyn)
                producer.send(Config.SNAPSHOTS_TOPIC, snap)

                time.sleep(ORDER_INTERVAL)

    except KeyboardInterrupt:
        pass
    finally:
        producer.flush()