File size: 5,944 Bytes
9a2349b 46540bf 94691c4 46540bf 9a2349b 535b45a 9a2349b c928aca 535b45a 9f0ef7c 46540bf 535b45a 9a2349b 46540bf 9a2349b b13ffb4 9a2349b c928aca 9a2349b 46540bf 9a2349b 46540bf 9a2349b 46540bf 9a2349b 4b4b7ee b13ffb4 4b4b7ee b13ffb4 9a2349b 46540bf 9a2349b b13ffb4 9a2349b b13ffb4 46540bf 9a2349b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | import asyncio
import requests
import io as i_o
import contextlib
import sys
import concurrent.futures
from FoxDot import *
# Configuration
API_URL = "https://houseofplaying.com/hop/api.php"
POLL_INTERVAL = 1 # Seconds
def change_key(amt = 3, scl="minor", duration=4, flag=True):
Root.default = ((Root.default+amt) % 14)-5
print(f"Key changed to {Root.default}")
Scale.default = scl
if flag:
Clock.future(duration, change_key, args=[amt, scl, duration])
def change_cycle(notes, scales, plen, flag=True):
if (Clock.now() % plen) == 0:
Scale.default = scales[(Clock.now()/plen) % len(notes)]
Root.default = notes[(Clock.now()/plen) % len(notes)]
if flag:
change_cycle(notes, scales, plen)
# Global dictionary to store active tasks and their cancellation tokens (or just the futures)
active_tasks = {}
def poll_server():
try:
response = requests.get(f"{API_URL}?action=poll")
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error polling server: {e}")
return None
def update_server(uid, output, status):
try:
data = {'uid': uid, 'output': output, 'status': status}
requests.post(f"{API_URL}?action=update", json=data)
except Exception as e:
print(f"Error updating server: {e}")
def stop(uid):
"""Stops the execution of the cell with the given uid."""
if uid in active_tasks:
print(f"Stopping task {uid}...")
task = active_tasks[uid]
if task != asyncio.current_task():
task.cancel()
return f"Stopped {uid}"
else:
return "Cannot stop itself."
return f"Task {uid} not found or not running."
def stopall():
"""Stops all running executions."""
count = 0
for uid in list(active_tasks.keys()):
if active_tasks[uid] != asyncio.current_task():
stop(uid)
count += 1
try:
# Check if an event loop is already running
try:
oldloop = asyncio.get_running_loop()
print("Starting new event loop...")
loop = asyncio.run(main())
oldloop.stop()
except RuntimeError:
loop = asyncio.run(main())
oldloop.stop()
if oldloop and oldloop.is_running():
print("Event loop already running. Scheduling main task...")
loop.create_task(main())
else:
print("Starting new event loop...")
loop=asyncio.run(main())
loop.run_forever()
except KeyboardInterrupt:
print("Stopping consumer...")
return f"Stopped {count} tasks, killed main loop and restarted"
def execute_code_sync(code, uid):
# Capture stdout
stdout_capture = i_o.StringIO()
status = "completed"
output = ""
# Inject stop functions into locals
local_scope = locals()
local_scope['stop'] = stop
local_scope['stopall'] = stopall
local_scope['change_key'] = change_key
local_scope['change_cycle'] = change_cycle
try:
with contextlib.redirect_stdout(stdout_capture):
# Execute the code
# Note: exec() is dangerous if not sandboxed
exec(code, globals(), local_scope)
output = stdout_capture.getvalue()
except Exception as e:
status = "error"
output = f"{e}"
print(f"An unexpected error occurred in {uid}: {e}")
return output, status
async def process_task(uid, code):
#global loop
#global active_tasks
print(f"Processing {uid}...")
try:
# API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries.
# Step 1: Set status to 'wait'
update_server(uid, "Execution started...", "wait")
# Step 2: Run code in thread
loop = asyncio.get_running_loop()
# We wrap the sync execution in a Task to allow cancellation
output, status = await loop.run_in_executor(None, execute_code_sync, code, uid)
# Step 3: Update with final result
update_server(uid, output, status)
except asyncio.CancelledError:
print(f"Task {uid} cancelled.")
update_server(uid, "Execution cancelled.", "error")
except Exception as e:
print(f"Error in task {uid}: {e}")
update_server(uid, str(e), "error")
finally:
if uid in active_tasks:
del active_tasks[uid]
async def main():
print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}")
while True:
# Move polling to thread to avoid blocking loop
task_data = await asyncio.to_thread(poll_server)
if task_data:
uid = task_data.get('uid')
code = task_data.get('code')
print(f"Received task {uid}")
# Create async task
taskold = None
if uid in active_tasks:
taskold = active_tasks[uid]
task = asyncio.create_task(process_task(uid, code))
active_tasks[uid] = task
if taskold and taskold.is_running():
taskold.cancel()
await asyncio.sleep(POLL_INTERVAL)
if __name__ == "__main__":
try:
# Check if an event loop is already running
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print("Event loop already running. Scheduling main task...")
loop.create_task(main())
# Note: We assume the existing loop will keep running.
# If this is a script that just exits, we might need loop.run_forever() if not already happening.
else:
print("Starting new event loop...")
loop = asyncio.run(main())
loop.run_forever()
except KeyboardInterrupt:
print("Stopping consumer...")
|