import asyncio import requests import io as i_o import contextlib import sys import concurrent.futures from FoxDot import * # Configuration API_URL = "https://houseofplaying.com/hop/api.php" POLL_INTERVAL = 1 # Seconds def change_key(amt = 3, scl="minor", duration=4, flag=True): Root.default = ((Root.default+amt) % 14)-5 print(f"Key changed to {Root.default}") Scale.default = scl if flag: Clock.future(duration, change_key, args=[amt, scl, duration]) def change_cycle(notes, scales, plen, flag=True): if (Clock.now() % plen) == 0: Scale.default = scales[(Clock.now()/plen) % len(notes)] Root.default = notes[(Clock.now()/plen) % len(notes)] if flag: change_cycle(notes, scales, plen) # Global dictionary to store active tasks and their cancellation tokens (or just the futures) active_tasks = {} def poll_server(): try: response = requests.get(f"{API_URL}?action=poll") if response.status_code == 200: return response.json() except Exception as e: print(f"Error polling server: {e}") return None def update_server(uid, output, status): try: data = {'uid': uid, 'output': output, 'status': status} requests.post(f"{API_URL}?action=update", json=data) except Exception as e: print(f"Error updating server: {e}") def stop(uid): """Stops the execution of the cell with the given uid.""" if uid in active_tasks: print(f"Stopping task {uid}...") task = active_tasks[uid] if task != asyncio.current_task(): task.cancel() return f"Stopped {uid}" else: return "Cannot stop itself." return f"Task {uid} not found or not running." def stopall(): """Stops all running executions.""" count = 0 for uid in list(active_tasks.keys()): if active_tasks[uid] != asyncio.current_task(): stop(uid) count += 1 try: # Check if an event loop is already running try: oldloop = asyncio.get_running_loop() print("Starting new event loop...") loop = asyncio.run(main()) oldloop.stop() except RuntimeError: loop = asyncio.run(main()) oldloop.stop() if oldloop and oldloop.is_running(): print("Event loop already running. Scheduling main task...") loop.create_task(main()) else: print("Starting new event loop...") loop=asyncio.run(main()) loop.run_forever() except KeyboardInterrupt: print("Stopping consumer...") return f"Stopped {count} tasks, killed main loop and restarted" def execute_code_sync(code, uid): # Capture stdout stdout_capture = i_o.StringIO() status = "completed" output = "" # Inject stop functions into locals local_scope = locals() local_scope['stop'] = stop local_scope['stopall'] = stopall local_scope['change_key'] = change_key local_scope['change_cycle'] = change_cycle try: with contextlib.redirect_stdout(stdout_capture): # Execute the code # Note: exec() is dangerous if not sandboxed exec(code, globals(), local_scope) output = stdout_capture.getvalue() except Exception as e: status = "error" output = f"{e}" print(f"An unexpected error occurred in {uid}: {e}") return output, status async def process_task(uid, code): #global loop #global active_tasks print(f"Processing {uid}...") try: # API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries. # Step 1: Set status to 'wait' update_server(uid, "Execution started...", "wait") # Step 2: Run code in thread loop = asyncio.get_running_loop() # We wrap the sync execution in a Task to allow cancellation output, status = await loop.run_in_executor(None, execute_code_sync, code, uid) # Step 3: Update with final result update_server(uid, output, status) except asyncio.CancelledError: print(f"Task {uid} cancelled.") update_server(uid, "Execution cancelled.", "error") except Exception as e: print(f"Error in task {uid}: {e}") update_server(uid, str(e), "error") finally: if uid in active_tasks: del active_tasks[uid] async def main(): print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}") while True: # Move polling to thread to avoid blocking loop task_data = await asyncio.to_thread(poll_server) if task_data: uid = task_data.get('uid') code = task_data.get('code') print(f"Received task {uid}") # Create async task taskold = None if uid in active_tasks: taskold = active_tasks[uid] task = asyncio.create_task(process_task(uid, code)) active_tasks[uid] = task if taskold and taskold.is_running(): taskold.cancel() await asyncio.sleep(POLL_INTERVAL) if __name__ == "__main__": try: # Check if an event loop is already running try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): print("Event loop already running. Scheduling main task...") loop.create_task(main()) # Note: We assume the existing loop will keep running. # If this is a script that just exits, we might need loop.run_forever() if not already happening. else: print("Starting new event loop...") loop = asyncio.run(main()) loop.run_forever() except KeyboardInterrupt: print("Stopping consumer...")