Buckets:
| /* | |
| * Copyright 2021 The Emscripten Authors. All rights reserved. | |
| * Emscripten is available under two separate licenses, the MIT license and the | |
| * University of Illinois/NCSA Open Source License. Both these licenses can be | |
| * found in the LICENSE file. | |
| */ | |
| // Task Queue Lifetime Management | |
| // ------------------------------- | |
| // | |
| // When tasks are added to a task queue, the Worker running the target thread | |
| // receives an event that will cause it to execute the queue when it next | |
| // returns to its event loop. In some cases the queue will already have been | |
| // executed before then, but the event is still received and the queue is still | |
| // executed. These events contain references to the queue so that the target | |
| // thread will know which queue to execute. | |
| // | |
| // To avoid use-after-free bugs, we cannot free a task queue immediately when | |
| // `em_task_queue_destroy` is called; instead, we must defer freeing the queue | |
| // until all of its outstanding notifications have been processed. We defer | |
| // freeing the queue using an atomic flag. Each time a notification containing a | |
| // reference to a task queue is generated, we set the flag on that task queue. | |
| // Each time that task queue is processed, we clear the flag as long as another | |
| // notification for the queue has not been generated in the mean time. The | |
| // proxying queue can only be freed once `em_task_queue_destroy` has been called | |
| // and its notification flag has been cleared. | |
| // | |
| // But an extra complication is that the target thread may have died by the time | |
| // it gets back to its event loop to process its notifications. In that case the | |
| // thread's Worker will still receive a notification and have to clear the | |
| // notification flag without a live runtime. Without a live runtime, there is no | |
| // stack, so the worker cannot safely free the queue at this point even if the | |
| // notification flag is cleared. We need a separate thread with a live runtime | |
| // to perform the free. | |
| // | |
| // To ensure that queues are eventually freed, we place destroyed queues in a | |
| // global "zombie list" where they wait for their notification flags to be | |
| // cleared. The zombie list is scanned and zombie queues without outstanding | |
| // notifications are freed whenever a new queue is constructed. In principle the | |
| // zombie list could be scanned at any time, but the queue constructor is a nice | |
| // place to do it because scanning there is sufficient to keep the number of | |
| // zombie queues from growing without bound; creating a new zombie ultimately | |
| // requires creating a new queue. | |
| // | |
| // ------------------------------- | |
| // The head of the zombie list. Its mutex protects access to the list and its | |
| // other fields are not used. | |
| static em_task_queue zombie_list_head = {.mutex = PTHREAD_MUTEX_INITIALIZER, | |
| .zombie_prev = &zombie_list_head, | |
| .zombie_next = &zombie_list_head}; | |
| static void em_task_queue_free(em_task_queue* queue) { | |
| pthread_mutex_destroy(&queue->mutex); | |
| free(queue->tasks); | |
| free(queue); | |
| } | |
| static void cull_zombies() { | |
| if (pthread_mutex_trylock(&zombie_list_head.mutex) != 0) { | |
| // Some other thread is already culling. In principle there may be new | |
| // cullable zombies after it finishes, but it's not worth waiting to find | |
| // out. | |
| return; | |
| } | |
| em_task_queue* curr = zombie_list_head.zombie_next; | |
| while (curr != &zombie_list_head) { | |
| em_task_queue* next = curr->zombie_next; | |
| if (curr->notification == NOTIFICATION_NONE) { | |
| // Remove the zombie from the list and free it. | |
| curr->zombie_prev->zombie_next = curr->zombie_next; | |
| curr->zombie_next->zombie_prev = curr->zombie_prev; | |
| em_task_queue_free(curr); | |
| } | |
| curr = next; | |
| } | |
| pthread_mutex_unlock(&zombie_list_head.mutex); | |
| } | |
| em_task_queue* em_task_queue_create(pthread_t thread) { | |
| // Free any queue that has been destroyed and is safe to free. | |
| cull_zombies(); | |
| em_task_queue* queue = malloc(sizeof(em_task_queue)); | |
| if (queue == NULL) { | |
| return NULL; | |
| } | |
| task* tasks = malloc(sizeof(task) * EM_TASK_QUEUE_INITIAL_CAPACITY); | |
| if (tasks == NULL) { | |
| free(queue); | |
| return NULL; | |
| } | |
| *queue = (em_task_queue){.notification = NOTIFICATION_NONE, | |
| .mutex = PTHREAD_MUTEX_INITIALIZER, | |
| .thread = thread, | |
| .processing = 0, | |
| .tasks = tasks, | |
| .capacity = EM_TASK_QUEUE_INITIAL_CAPACITY, | |
| .head = 0, | |
| .tail = 0, | |
| .zombie_prev = NULL, | |
| .zombie_next = NULL}; | |
| return queue; | |
| } | |
| void em_task_queue_destroy(em_task_queue* queue) { | |
| assert(queue->zombie_next == NULL && queue->zombie_prev == NULL); | |
| if (queue->notification == NOTIFICATION_NONE) { | |
| // No outstanding references to the queue, so we can go ahead and free it. | |
| em_task_queue_free(queue); | |
| return; | |
| } | |
| // Otherwise add the queue to the zombie list so that it will eventually be | |
| // freed safely. | |
| pthread_mutex_lock(&zombie_list_head.mutex); | |
| queue->zombie_next = &zombie_list_head; | |
| queue->zombie_prev = zombie_list_head.zombie_prev; | |
| queue->zombie_next->zombie_prev = queue; | |
| queue->zombie_prev->zombie_next = queue; | |
| pthread_mutex_unlock(&zombie_list_head.mutex); | |
| } | |
| // Not thread safe. Returns 1 on success and 0 on failure. | |
| static bool em_task_queue_grow(em_task_queue* queue) { | |
| // Allocate a larger task queue. | |
| int new_capacity = queue->capacity * 2; | |
| task* new_tasks = malloc(sizeof(task) * new_capacity); | |
| if (new_tasks == NULL) { | |
| return false; | |
| } | |
| // Copy the tasks such that the head of the queue is at the beginning of the | |
| // buffer. There are two cases to handle: either the queue wraps around the | |
| // end of the old buffer or it does not. | |
| int queued_tasks; | |
| if (queue->head <= queue->tail) { | |
| // No wrap. Copy the tasks in one chunk. | |
| queued_tasks = queue->tail - queue->head; | |
| memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * queued_tasks); | |
| } else { | |
| // Wrap. Copy `first_queued` tasks up to the end of the old buffer and | |
| // `last_queued` tasks at the beginning of the old buffer. | |
| int first_queued = queue->capacity - queue->head; | |
| int last_queued = queue->tail; | |
| queued_tasks = first_queued + last_queued; | |
| memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * first_queued); | |
| memcpy(new_tasks + first_queued, queue->tasks, sizeof(task) * last_queued); | |
| } | |
| free(queue->tasks); | |
| queue->tasks = new_tasks; | |
| queue->capacity = new_capacity; | |
| queue->head = 0; | |
| queue->tail = queued_tasks; | |
| return true; | |
| } | |
| void em_task_queue_execute(em_task_queue* queue) { | |
| DBG("em_task_queue_execute"); | |
| queue->processing = 1; | |
| pthread_mutex_lock(&queue->mutex); | |
| while (!em_task_queue_is_empty(queue)) { | |
| task t = em_task_queue_dequeue(queue); | |
| // Unlock while the task is running to allow more work to be queued in | |
| // parallel. | |
| pthread_mutex_unlock(&queue->mutex); | |
| t.func(t.arg); | |
| pthread_mutex_lock(&queue->mutex); | |
| } | |
| pthread_mutex_unlock(&queue->mutex); | |
| queue->processing = 0; | |
| DBG("done em_task_queue_execute"); | |
| } | |
| void em_task_queue_cancel(em_task_queue* queue) { | |
| pthread_mutex_lock(&queue->mutex); | |
| while (!em_task_queue_is_empty(queue)) { | |
| task t = em_task_queue_dequeue(queue); | |
| if (t.cancel) { | |
| t.cancel(t.arg); | |
| } | |
| } | |
| pthread_mutex_unlock(&queue->mutex); | |
| // Any subsequent messages to this queue (for example if a pthread struct is | |
| // reused for a future thread, potentially on a different worker) will require | |
| // a new notification. Clearing the flag is safe here because in both the | |
| // proxying queue and mailbox cases, there are no more outstanding references | |
| // to the queue after thread shutdown. | |
| queue->notification = NOTIFICATION_NONE; | |
| } | |
| bool em_task_queue_enqueue(em_task_queue* queue, task t) { | |
| if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) { | |
| return false; | |
| } | |
| queue->tasks[queue->tail] = t; | |
| queue->tail = (queue->tail + 1) % queue->capacity; | |
| return true; | |
| } | |
| task em_task_queue_dequeue(em_task_queue* queue) { | |
| task t = queue->tasks[queue->head]; | |
| queue->head = (queue->head + 1) % queue->capacity; | |
| return t; | |
| } | |
| static void receive_notification(void* arg) { | |
| em_task_queue* tasks = arg; | |
| tasks->notification = NOTIFICATION_RECEIVED; | |
| em_task_queue_execute(tasks); | |
| notification_state expected = NOTIFICATION_RECEIVED; | |
| atomic_compare_exchange_strong( | |
| &tasks->notification, &expected, NOTIFICATION_NONE); | |
| DBG("receive_notification done"); | |
| } | |
| static void cancel_notification(void* arg) { | |
| em_task_queue* tasks = arg; | |
| em_task_queue_cancel(tasks); | |
| } | |
| bool em_task_queue_send(em_task_queue* queue, task t) { | |
| // Ensure the target mailbox will remain open or detect that it is already | |
| // closed. | |
| if (!emscripten_thread_mailbox_ref(queue->thread)) { | |
| return false; | |
| } | |
| pthread_mutex_lock(&queue->mutex); | |
| bool enqueued = em_task_queue_enqueue(queue, t); | |
| pthread_mutex_unlock(&queue->mutex); | |
| if (!enqueued) { | |
| emscripten_thread_mailbox_unref(queue->thread); | |
| return false; | |
| } | |
| // We're done if there is already a pending notification for this task queue. | |
| // Otherwise, we will send one. | |
| notification_state previous = | |
| atomic_exchange(&queue->notification, NOTIFICATION_PENDING); | |
| if (previous == NOTIFICATION_PENDING) { | |
| DBG("em_task_queue_send NOTIFICATION_PENDING already set"); | |
| emscripten_thread_mailbox_unref(queue->thread); | |
| return true; | |
| } | |
| emscripten_thread_mailbox_send(queue->thread, | |
| (task){.func = receive_notification, | |
| .cancel = cancel_notification, | |
| .arg = queue}); | |
| emscripten_thread_mailbox_unref(queue->thread); | |
| return true; | |
| } | |
Xet Storage Details
- Size:
- 10.2 kB
- Xet hash:
- 59c073255811f55ad5ed23d21dc666da1e48949f2992dc239838f68d149dcc14
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.