File size: 5,944 Bytes
9a2349b
 
 
 
 
 
 
 
 
 
 
 
46540bf
 
 
 
 
 
 
 
94691c4
46540bf
 
 
 
 
9a2349b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
535b45a
 
 
 
 
9a2349b
 
 
 
 
 
c928aca
 
 
535b45a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f0ef7c
 
46540bf
535b45a
 
 
 
9a2349b
 
 
 
 
 
 
 
 
46540bf
 
 
 
 
9a2349b
 
 
 
 
 
 
 
 
 
 
 
 
 
b13ffb4
 
9a2349b
 
 
c928aca
9a2349b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46540bf
9a2349b
 
 
46540bf
9a2349b
 
 
 
46540bf
9a2349b
4b4b7ee
 
 
b13ffb4
 
4b4b7ee
 
b13ffb4
9a2349b
46540bf
9a2349b
 
 
 
 
 
 
 
 
 
 
b13ffb4
 
9a2349b
 
b13ffb4
46540bf
 
9a2349b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import asyncio
import requests
import io as i_o
import contextlib
import sys
import concurrent.futures
from FoxDot import * 

# Configuration
API_URL = "https://houseofplaying.com/hop/api.php"
POLL_INTERVAL = 1  # Seconds

def change_key(amt = 3, scl="minor", duration=4, flag=True):
    Root.default = ((Root.default+amt) % 14)-5
    print(f"Key changed to {Root.default}")
    Scale.default = scl
    if flag:
       Clock.future(duration, change_key, args=[amt, scl, duration])

def change_cycle(notes, scales, plen, flag=True):
    if (Clock.now() % plen) == 0:
       Scale.default = scales[(Clock.now()/plen) % len(notes)]
       Root.default = notes[(Clock.now()/plen) % len(notes)]
       if flag:
           change_cycle(notes, scales, plen)

# Global dictionary to store active tasks and their cancellation tokens (or just the futures)
active_tasks = {}

def poll_server():
    try:
        response = requests.get(f"{API_URL}?action=poll")
        if response.status_code == 200:
            return response.json()
    except Exception as e:
        print(f"Error polling server: {e}")
    return None

def update_server(uid, output, status):
    try:
        data = {'uid': uid, 'output': output, 'status': status}
        requests.post(f"{API_URL}?action=update", json=data)
    except Exception as e:
        print(f"Error updating server: {e}")

def stop(uid):
    """Stops the execution of the cell with the given uid."""
    if uid in active_tasks:
        print(f"Stopping task {uid}...")
        task = active_tasks[uid]
        if task != asyncio.current_task():
            task.cancel()
            return f"Stopped {uid}"
        else:
            return "Cannot stop itself."
    return f"Task {uid} not found or not running."

def stopall():
    """Stops all running executions."""
    count = 0
    for uid in list(active_tasks.keys()):
        if active_tasks[uid] != asyncio.current_task():
            stop(uid)
            count += 1
    try:
        # Check if an event loop is already running
        try:
            oldloop = asyncio.get_running_loop()
            print("Starting new event loop...")
            loop = asyncio.run(main())
            oldloop.stop()
        except RuntimeError:
            loop = asyncio.run(main())
            oldloop.stop()
        if oldloop and oldloop.is_running():
            print("Event loop already running. Scheduling main task...")
            loop.create_task(main())
        else:
            print("Starting new event loop...")
            loop=asyncio.run(main())
            loop.run_forever()

    except KeyboardInterrupt:
        print("Stopping consumer...")

    return f"Stopped {count} tasks, killed main loop and restarted"

def execute_code_sync(code, uid):
    # Capture stdout
    stdout_capture = i_o.StringIO()
    status = "completed"
    output = ""
    
    # Inject stop functions into locals
    local_scope = locals()
    local_scope['stop'] = stop
    local_scope['stopall'] = stopall
    local_scope['change_key'] = change_key
    local_scope['change_cycle'] = change_cycle
    
    try:
        with contextlib.redirect_stdout(stdout_capture):
            # Execute the code
            # Note: exec() is dangerous if not sandboxed
            exec(code, globals(), local_scope)
        output = stdout_capture.getvalue()
    except Exception as e:
        status = "error"
        output = f"{e}"
        print(f"An unexpected error occurred in {uid}: {e}")
    
    return output, status

async def process_task(uid, code):
    #global loop
    #global active_tasks
    print(f"Processing {uid}...")
    
    try:
        # API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries.    
        # Step 1: Set status to 'wait'
        update_server(uid, "Execution started...", "wait")
        
        # Step 2: Run code in thread
        loop = asyncio.get_running_loop()
        # We wrap the sync execution in a Task to allow cancellation
        output, status = await loop.run_in_executor(None, execute_code_sync, code, uid)
        
        # Step 3: Update with final result
        update_server(uid, output, status)
        
    except asyncio.CancelledError:
        print(f"Task {uid} cancelled.")
        update_server(uid, "Execution cancelled.", "error")
    except Exception as e:
        print(f"Error in task {uid}: {e}")
        update_server(uid, str(e), "error")
    finally:
        if uid in active_tasks:
            del active_tasks[uid]

async def main():
    print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}")

    while True:
        # Move polling to thread to avoid blocking loop
        task_data = await asyncio.to_thread(poll_server)

        if task_data:
            uid = task_data.get('uid')
            code = task_data.get('code')
            print(f"Received task {uid}")

            # Create async task
            taskold = None
            if uid in active_tasks:
               taskold = active_tasks[uid]
            task = asyncio.create_task(process_task(uid, code))
            active_tasks[uid] = task
            if taskold and taskold.is_running():
                taskold.cancel()

        await asyncio.sleep(POLL_INTERVAL)
      
if __name__ == "__main__":
    try:
        # Check if an event loop is already running
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop and loop.is_running():
            print("Event loop already running. Scheduling main task...")
            loop.create_task(main())
            # Note: We assume the existing loop will keep running.
            # If this is a script that just exits, we might need loop.run_forever() if not already happening.
        else:
            print("Starting new event loop...")
            loop = asyncio.run(main())

        loop.run_forever()
    except KeyboardInterrupt:
        print("Stopping consumer...")