shoom013 commited on
Commit
b13ffb4
·
verified ·
1 Parent(s): add5d5b

Update Consumer.py

Browse files
Files changed (1) hide show
  1. Consumer.py +20 -20
Consumer.py CHANGED
@@ -30,8 +30,8 @@ def update_server(uid, output, status):
30
  print(f"Error updating server: {e}")
31
 
32
  def stop(uid):
33
- global loop
34
- global active_tasks
35
  """Stops the execution of the cell with the given uid."""
36
  if uid in active_tasks:
37
  print(f"Stopping task {uid}...")
@@ -45,8 +45,8 @@ def stop(uid):
45
 
46
  def stopall():
47
  """Stops all running executions."""
48
- global loop
49
- global active_tasks
50
  count = 0
51
  for uid in list(active_tasks.keys()):
52
  if active_tasks != asyncio.current_task():
@@ -77,16 +77,16 @@ def stopall():
77
 
78
  def execute_code_sync(code, uid):
79
  # Capture stdout
 
 
80
  stdout_capture = i_o.StringIO()
81
  status = "completed"
82
  output = ""
83
- global loop
84
- global active_tasks
85
 
86
  # Inject stop functions into locals
87
  local_scope = locals()
88
- #local_scope['stop'] = stop
89
- #local_scope['stopall'] = stopall
90
 
91
  try:
92
  with contextlib.redirect_stdout(stdout_capture):
@@ -102,8 +102,8 @@ def execute_code_sync(code, uid):
102
  return output, status
103
 
104
  async def process_task(uid, code):
105
- global loop
106
- global active_tasks
107
  print(f"Processing {uid}...")
108
 
109
  try:
@@ -146,8 +146,6 @@ async def process_task(uid, code):
146
 
147
  async def main():
148
  print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}")
149
- global loop
150
- global active_tasks
151
 
152
  while True:
153
  # Move polling to thread to avoid blocking loop
@@ -159,15 +157,14 @@ async def main():
159
  print(f"Received task {uid}")
160
 
161
  # Create async task
162
- task = active_tasks[uid]
163
- active_tasks[uid] = asyncio.create_task(process_task(uid, code))
164
- if task:
165
- task.cancel()
166
-
167
  await asyncio.sleep(POLL_INTERVAL)
168
 
169
  if __name__ == "__main__":
170
- loop = None
171
  try:
172
  # Check if an event loop is already running
173
  try:
@@ -178,9 +175,12 @@ if __name__ == "__main__":
178
  if loop and loop.is_running():
179
  print("Event loop already running. Scheduling main task...")
180
  loop.create_task(main())
 
 
181
  else:
182
  print("Starting new event loop...")
183
- loop=asyncio.run(main())
184
- loop.run_forever()
 
185
  except KeyboardInterrupt:
186
  print("Stopping consumer...")
 
30
  print(f"Error updating server: {e}")
31
 
32
  def stop(uid):
33
+ #global loop
34
+ #global active_tasks
35
  """Stops the execution of the cell with the given uid."""
36
  if uid in active_tasks:
37
  print(f"Stopping task {uid}...")
 
45
 
46
  def stopall():
47
  """Stops all running executions."""
48
+ #global loop
49
+ #global active_tasks
50
  count = 0
51
  for uid in list(active_tasks.keys()):
52
  if active_tasks != asyncio.current_task():
 
77
 
78
  def execute_code_sync(code, uid):
79
  # Capture stdout
80
+ #global loop
81
+ #global active_tasks
82
  stdout_capture = i_o.StringIO()
83
  status = "completed"
84
  output = ""
 
 
85
 
86
  # Inject stop functions into locals
87
  local_scope = locals()
88
+ local_scope['stop'] = stop
89
+ local_scope['stopall'] = stopall
90
 
91
  try:
92
  with contextlib.redirect_stdout(stdout_capture):
 
102
  return output, status
103
 
104
  async def process_task(uid, code):
105
+ #global loop
106
+ #global active_tasks
107
  print(f"Processing {uid}...")
108
 
109
  try:
 
146
 
147
  async def main():
148
  print(f"Starting FoxDot Consumer (Async)... Polling {API_URL}")
 
 
149
 
150
  while True:
151
  # Move polling to thread to avoid blocking loop
 
157
  print(f"Received task {uid}")
158
 
159
  # Create async task
160
+ task = asyncio.create_task(process_task(uid, code))
161
+ active_tasks[uid] = task
162
+ if uid in active_tasks:
163
+ active_tasks[uid].cancel()
164
+
165
  await asyncio.sleep(POLL_INTERVAL)
166
 
167
  if __name__ == "__main__":
 
168
  try:
169
  # Check if an event loop is already running
170
  try:
 
175
  if loop and loop.is_running():
176
  print("Event loop already running. Scheduling main task...")
177
  loop.create_task(main())
178
+ # Note: We assume the existing loop will keep running.
179
+ # If this is a script that just exits, we might need loop.run_forever() if not already happening.
180
  else:
181
  print("Starting new event loop...")
182
+ loop = asyncio.run(main())
183
+
184
+ loop.run_forever()
185
  except KeyboardInterrupt:
186
  print("Stopping consumer...")