File size: 3,361 Bytes
8a03cf9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c4db3fe
8a03cf9
 
 
 
 
 
 
f9ea822
8a03cf9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c928bc7
8a03cf9
 
 
 
 
 
 
 
 
 
 
 
 
c928bc7
8a03cf9
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import threading
import time
from functools import partial
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, LabelSet
from datetime import datetime
import random
import numpy as np

class BokehApp:
    def __init__(self, doc):
        self.doc = doc
        # Create a threading.Event to signal the thread to shut down
        self.stop_event = threading.Event()
        
        # Create a sample plot
        x = datetime.now()
        y = random.uniform(30, 500)
        self.source = ColumnDataSource(data=dict(x=[x], y=[y]))
        self.label_source = self.source.clone()
        p = figure(x_axis_type='datetime', 
                   width = 1800, 
                   height = 900,
                   x_axis_label="Time",
                   y_axis_label="Value",
                   title="Real Time Stock Price Streaming",
                   )
        p.title.align = "center"
        p.title.text_font_size="20px"
        p.toolbar.autohide = True
        p.line(x='x', y='y', source=self.source)
        p.scatter(x='x', y='y', source=self.source, size=8)

        #Create value Label
        label = LabelSet(x = 'x',
                         y = 'y', 
                         text= 'y', 
                        source = self.label_source,
                        x_offset=5, y_offset=5,
                        text_font_size="10pt")
        p.add_layout(label)
        # Start the background thread
        self.thread = threading.Thread(target=self.get_data, args=(self.stop_event,), daemon=True)
        self.thread.start()

        # Register the cleanup function
        self.doc.on_session_destroyed(self.on_session_destroyed)
        
        # Add the plot to the document
        self.doc.add_root(p)

    def get_data(self, stop_event):
        """
        Function to run in a separate thread.
        Infinite while loop until stop_event is set to True with on_session_destroyed().
        Simulates a long-running data-fetching task.
        """
        curr_value = self.source.data['y'][-1]
        log_curr_value = np.log(curr_value)

        while not stop_event.is_set():
            x = datetime.now()
            log_random_growth = random.normalvariate(mu=.00001, sigma=.001)
            log_y = log_curr_value + log_random_growth
            log_curr_value = log_y
            y = np.round(np.exp(log_curr_value),2)
            new_data = dict(x=[x], y=[y])
                
            # Safely schedule the update using a next tick callback
            self.doc.add_next_tick_callback(partial(self.update_plot, new_data))
            time.sleep(1)
        
        print("Background thread is shutting down gracefully.", flush=True)

    def update_plot(self, new_data):
        """
        This function is executed by the next tick callback on the main thread.
        It safely updates the ColumnDataSource.
        """
        self.source.stream(new_data, rollover=1000)
        self.label_source.data = new_data

    def on_session_destroyed(self, session_context):
        """
        Callback function that runs when a user closes the document.
        """
        print(f"Session {session_context.id} destroyed. Stopping background thread.", flush=True)
        self.stop_event.set()

# Create an instance of the class when the script is run by the Bokeh server
BokehApp(curdoc())