Update Consumer.py
Browse files- Consumer.py +6 -21
Consumer.py
CHANGED
|
@@ -49,9 +49,9 @@ def stopall():
|
|
| 49 |
#global active_tasks
|
| 50 |
count = 0
|
| 51 |
for uid in list(active_tasks.keys()):
|
| 52 |
-
if active_tasks != asyncio.current_task():
|
| 53 |
-
|
| 54 |
-
|
| 55 |
try:
|
| 56 |
# Check if an event loop is already running
|
| 57 |
try:
|
|
@@ -85,8 +85,8 @@ def execute_code_sync(code, uid):
|
|
| 85 |
|
| 86 |
# Inject stop functions into locals
|
| 87 |
local_scope = locals()
|
| 88 |
-
local_scope['stop'] = stop
|
| 89 |
-
local_scope['stopall'] = stopall
|
| 90 |
|
| 91 |
try:
|
| 92 |
with contextlib.redirect_stdout(stdout_capture):
|
|
@@ -107,22 +107,7 @@ async def process_task(uid, code):
|
|
| 107 |
print(f"Processing {uid}...")
|
| 108 |
|
| 109 |
try:
|
| 110 |
-
#
|
| 111 |
-
# Note: 'processing' is set by API when polled, but if it's long running, 'wait' might be better for UI feedback?
|
| 112 |
-
# The user requested 'wait' status.
|
| 113 |
-
# However, update_server moves it to log. We might not want to move it to log yet if it's running?
|
| 114 |
-
# But our API design moves to log on update.
|
| 115 |
-
# For true async status updates while running, we'd need a separate 'running' table or update the queue table.
|
| 116 |
-
# Given current API, 'update' moves to log. So we can't really set intermediate status easily without API changes.
|
| 117 |
-
# BUT, the request said: "assigning each exec to its uid async wait loop and returning status 'wait'".
|
| 118 |
-
# This implies immediate return.
|
| 119 |
-
|
| 120 |
-
# Let's interpret: The Python loop should likely NOT block.
|
| 121 |
-
# Setting status to "wait" immediately via update_server would move it to log, effectively "finishing" it from queue perspective.
|
| 122 |
-
# If the user wants to see it "waiting" in UI, moving to log with status 'wait' is one way.
|
| 123 |
-
# But then how do we update it again when it finishes? The log table has unique UID?
|
| 124 |
-
# API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries.
|
| 125 |
-
|
| 126 |
# Step 1: Set status to 'wait'
|
| 127 |
update_server(uid, "Execution started...", "wait")
|
| 128 |
|
|
|
|
| 49 |
#global active_tasks
|
| 50 |
count = 0
|
| 51 |
for uid in list(active_tasks.keys()):
|
| 52 |
+
if active_tasks[uid] != asyncio.current_task():
|
| 53 |
+
stop(uid)
|
| 54 |
+
count += 1
|
| 55 |
try:
|
| 56 |
# Check if an event loop is already running
|
| 57 |
try:
|
|
|
|
| 85 |
|
| 86 |
# Inject stop functions into locals
|
| 87 |
local_scope = locals()
|
| 88 |
+
#local_scope['stop'] = stop
|
| 89 |
+
#local_scope['stopall'] = stopall
|
| 90 |
|
| 91 |
try:
|
| 92 |
with contextlib.redirect_stdout(stdout_capture):
|
|
|
|
| 107 |
print(f"Processing {uid}...")
|
| 108 |
|
| 109 |
try:
|
| 110 |
+
# API: INSERT ... ON DUPLICATE KEY UPDATE. So we CAN update log entries.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 111 |
# Step 1: Set status to 'wait'
|
| 112 |
update_server(uid, "Execution started...", "wait")
|
| 113 |
|