File size: 4,591 Bytes
cbe113f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use eframe::egui;
use egui_plot::{Line, Plot, PlotPoints};
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::mpsc;
use zeromq::{Socket, SocketRecv};

#[derive(Clone, Debug, Deserialize)]
struct TickData {
    symbol: String,
    bid: f64,
    ask: f64,
    time: i64,
}

struct Mt5ChartApp {
    receiver: mpsc::Receiver<TickData>,
    data: Vec<TickData>,
    symbol: String,
}

impl Mt5ChartApp {
    fn new(receiver: mpsc::Receiver<TickData>) -> Self {
        Self {
            receiver,
            data: Vec::new(),
            symbol: "Waiting for data...".to_string(),
        }
    }
}

impl eframe::App for Mt5ChartApp {
    fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
        // Receive all available data from the channel without blocking
        while let Ok(tick) = self.receiver.try_recv() {
            self.symbol = tick.symbol.clone();
            self.data.push(tick);
            // Keep only last 1000 points to avoid memory issues for this demo
            if self.data.len() > 1000 {
                self.data.remove(0);
            }
        }

        egui::CentralPanel::default().show(ctx, |ui| {
            ui.heading(format!("MT5 Live Chart: {}", self.symbol));
            if let Some(last_tick) = self.data.last() {
                ui.label(format!("Bid: {:.5} | Ask: {:.5}", last_tick.bid, last_tick.ask));
            }

            let plot = Plot::new("mt5_plot")
                .view_aspect(2.0)
                .legend(egui_plot::Legend::default());

            plot.show(ui, |plot_ui| {
                let bid_points: PlotPoints = self.data
                    .iter()
                    .enumerate()
                    .map(|(i, t)| [i as f64, t.bid])
                    .collect();
                
                let ask_points: PlotPoints = self.data
                    .iter()
                    .enumerate()
                    .map(|(i, t)| [i as f64, t.ask])
                    .collect();

                plot_ui.line(Line::new(bid_points).name("Bid").color(egui::Color32::from_rgb(100, 200, 100)));
                plot_ui.line(Line::new(ask_points).name("Ask").color(egui::Color32::from_rgb(200, 100, 100)));
            });
        });

        // Request a repaint to update the chart continuously
        ctx.request_repaint();
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = mpsc::channel(100);

    // Spawn ZMQ Subscriber task
    tokio::spawn(async move {
        // ZMQ Context and Socket
        let mut socket = zeromq::SubSocket::new();
        match socket.connect("tcp://127.0.0.1:5555").await {
            Ok(_) => println!("Connected to ZMQ Publisher"),
            Err(e) => eprintln!("Failed to connect to ZMQ: {}", e),
        }
        
        let _ = socket.subscribe("").await;

        loop {
             match socket.recv().await {
                Ok(msg) => {
                    // msg is a MultiPart message, usually the first part depends on subscription
                    // In our simple case, payload is likely in the first frame or the whole string
                    if let Some(payload_bytes) = msg.get(0) {
                         if let Ok(json_str) = std::str::from_utf8(payload_bytes) {
                             // Attempt to parse JSON
                             // The MT5 EA sends: {"symbol":..., "bid":..., ...}
                             match serde_json::from_str::<TickData>(json_str) {
                                 Ok(tick) => {
                                     if let Err(e) = tx.send(tick).await {
                                         eprintln!("Channel error: {}", e);
                                         break;
                                     }
                                 }
                                 Err(e) => eprintln!("JSON Parse Error: {}. Msg: {}", e, json_str),
                             }
                         }
                    }
                }
                Err(e) => {
                    eprintln!("ZMQ Recv Error: {}", e);
                    tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
                }
             }
        }
    });

    let options = eframe::NativeOptions::default();
    eframe::run_native(
        "Rust + ZMQ + MT5's MQL5 Chart",
        options,
        Box::new(|_cc| Box::new(Mt5ChartApp::new(rx))),
    ).map_err(|e| e.into())
}