| |
| |
| |
| |
| |
| |
|
|
| #include <assert.h> |
| #include <emscripten/threading.h> |
| #include <stdatomic.h> |
| #include <stdlib.h> |
| #include <string.h> |
|
|
| #include "em_task_queue.h" |
| #include "proxying_notification_state.h" |
| #include "thread_mailbox.h" |
| #include "threading_internal.h" |
|
|
| #define EM_TASK_QUEUE_INITIAL_CAPACITY 128 |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| static em_task_queue zombie_list_head = {.mutex = PTHREAD_MUTEX_INITIALIZER, |
| .zombie_prev = &zombie_list_head, |
| .zombie_next = &zombie_list_head}; |
|
|
| static void em_task_queue_free(em_task_queue* queue) { |
| pthread_mutex_destroy(&queue->mutex); |
| free(queue->tasks); |
| free(queue); |
| } |
|
|
| static void cull_zombies() { |
| if (pthread_mutex_trylock(&zombie_list_head.mutex) != 0) { |
| |
| |
| |
| return; |
| } |
| em_task_queue* curr = zombie_list_head.zombie_next; |
| while (curr != &zombie_list_head) { |
| em_task_queue* next = curr->zombie_next; |
| if (curr->notification == NOTIFICATION_NONE) { |
| |
| curr->zombie_prev->zombie_next = curr->zombie_next; |
| curr->zombie_next->zombie_prev = curr->zombie_prev; |
| em_task_queue_free(curr); |
| } |
| curr = next; |
| } |
| pthread_mutex_unlock(&zombie_list_head.mutex); |
| } |
|
|
| em_task_queue* em_task_queue_create(pthread_t thread) { |
| |
| cull_zombies(); |
|
|
| em_task_queue* queue = malloc(sizeof(em_task_queue)); |
| if (queue == NULL) { |
| return NULL; |
| } |
| task* tasks = malloc(sizeof(task) * EM_TASK_QUEUE_INITIAL_CAPACITY); |
| if (tasks == NULL) { |
| free(queue); |
| return NULL; |
| } |
| *queue = (em_task_queue){.notification = NOTIFICATION_NONE, |
| .mutex = PTHREAD_MUTEX_INITIALIZER, |
| .thread = thread, |
| .processing = 0, |
| .tasks = tasks, |
| .capacity = EM_TASK_QUEUE_INITIAL_CAPACITY, |
| .head = 0, |
| .tail = 0, |
| .zombie_prev = NULL, |
| .zombie_next = NULL}; |
| return queue; |
| } |
|
|
| void em_task_queue_destroy(em_task_queue* queue) { |
| assert(queue->zombie_next == NULL && queue->zombie_prev == NULL); |
| if (queue->notification == NOTIFICATION_NONE) { |
| |
| em_task_queue_free(queue); |
| return; |
| } |
| |
| |
| pthread_mutex_lock(&zombie_list_head.mutex); |
| queue->zombie_next = &zombie_list_head; |
| queue->zombie_prev = zombie_list_head.zombie_prev; |
| queue->zombie_next->zombie_prev = queue; |
| queue->zombie_prev->zombie_next = queue; |
| pthread_mutex_unlock(&zombie_list_head.mutex); |
| } |
|
|
| |
| static bool em_task_queue_grow(em_task_queue* queue) { |
| |
| int new_capacity = queue->capacity * 2; |
| task* new_tasks = malloc(sizeof(task) * new_capacity); |
| if (new_tasks == NULL) { |
| return false; |
| } |
| |
| |
| |
| int queued_tasks; |
| if (queue->head <= queue->tail) { |
| |
| queued_tasks = queue->tail - queue->head; |
| memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * queued_tasks); |
| } else { |
| |
| |
| int first_queued = queue->capacity - queue->head; |
| int last_queued = queue->tail; |
| queued_tasks = first_queued + last_queued; |
| memcpy(new_tasks, &queue->tasks[queue->head], sizeof(task) * first_queued); |
| memcpy(new_tasks + first_queued, queue->tasks, sizeof(task) * last_queued); |
| } |
| free(queue->tasks); |
| queue->tasks = new_tasks; |
| queue->capacity = new_capacity; |
| queue->head = 0; |
| queue->tail = queued_tasks; |
| return true; |
| } |
|
|
| void em_task_queue_execute(em_task_queue* queue) { |
| DBG("em_task_queue_execute"); |
| queue->processing = 1; |
| pthread_mutex_lock(&queue->mutex); |
| while (!em_task_queue_is_empty(queue)) { |
| task t = em_task_queue_dequeue(queue); |
| |
| |
| pthread_mutex_unlock(&queue->mutex); |
| t.func(t.arg); |
| pthread_mutex_lock(&queue->mutex); |
| } |
| pthread_mutex_unlock(&queue->mutex); |
| queue->processing = 0; |
| DBG("done em_task_queue_execute"); |
| } |
|
|
| void em_task_queue_cancel(em_task_queue* queue) { |
| pthread_mutex_lock(&queue->mutex); |
| while (!em_task_queue_is_empty(queue)) { |
| task t = em_task_queue_dequeue(queue); |
| if (t.cancel) { |
| t.cancel(t.arg); |
| } |
| } |
| pthread_mutex_unlock(&queue->mutex); |
| |
| |
| |
| |
| |
| queue->notification = NOTIFICATION_NONE; |
| } |
|
|
| bool em_task_queue_enqueue(em_task_queue* queue, task t) { |
| if (em_task_queue_is_full(queue) && !em_task_queue_grow(queue)) { |
| return false; |
| } |
| queue->tasks[queue->tail] = t; |
| queue->tail = (queue->tail + 1) % queue->capacity; |
| return true; |
| } |
|
|
| task em_task_queue_dequeue(em_task_queue* queue) { |
| task t = queue->tasks[queue->head]; |
| queue->head = (queue->head + 1) % queue->capacity; |
| return t; |
| } |
|
|
| static void receive_notification(void* arg) { |
| em_task_queue* tasks = arg; |
| tasks->notification = NOTIFICATION_RECEIVED; |
| em_task_queue_execute(tasks); |
| notification_state expected = NOTIFICATION_RECEIVED; |
| atomic_compare_exchange_strong( |
| &tasks->notification, &expected, NOTIFICATION_NONE); |
| DBG("receive_notification done"); |
| } |
|
|
| static void cancel_notification(void* arg) { |
| em_task_queue* tasks = arg; |
| em_task_queue_cancel(tasks); |
| } |
|
|
| bool em_task_queue_send(em_task_queue* queue, task t) { |
| |
| |
| if (!emscripten_thread_mailbox_ref(queue->thread)) { |
| return false; |
| } |
|
|
| pthread_mutex_lock(&queue->mutex); |
| bool enqueued = em_task_queue_enqueue(queue, t); |
| pthread_mutex_unlock(&queue->mutex); |
| if (!enqueued) { |
| emscripten_thread_mailbox_unref(queue->thread); |
| return false; |
| } |
|
|
| |
| |
| notification_state previous = |
| atomic_exchange(&queue->notification, NOTIFICATION_PENDING); |
| if (previous == NOTIFICATION_PENDING) { |
| DBG("em_task_queue_send NOTIFICATION_PENDING already set"); |
| emscripten_thread_mailbox_unref(queue->thread); |
| return true; |
| } |
|
|
| emscripten_thread_mailbox_send(queue->thread, |
| (task){.func = receive_notification, |
| .cancel = cancel_notification, |
| .arg = queue}); |
| emscripten_thread_mailbox_unref(queue->thread); |
| return true; |
| } |
|
|