arudradey's picture
download
raw
9.13 kB
/*
* Copyright 2021 The Emscripten Authors. All rights reserved.
* Emscripten is available under two separate licenses, the MIT license and the
* University of Illinois/NCSA Open Source License. Both these licenses can be
* found in the LICENSE file.
*/
#include "atomic.h"
#include "pthread_impl.h"
#include "threading_internal.h"
#include <emscripten/threading.h>
#include <emscripten/console.h>
#include <assert.h>
#include <math.h>
#include <errno.h>
#include <math.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/param.h>
#ifdef __EMSCRIPTEN_PTHREADS__
// Note: We use a weak reference here. If it's null we know that threads are
// not cancelable.
weak long __cancel(void);
#endif
extern void* _emscripten_main_thread_futex;
static int futex_wait_main_browser_thread(volatile void* addr,
uint32_t val,
double timeout, bool cancelable) {
// Atomics.wait is not available in the main browser thread, so simulate it
// via busy spinning. Only the main browser thread is allowed to call into
// this function. It is not thread-safe to be called from any other thread.
assert(emscripten_is_main_browser_thread());
double now = emscripten_get_now();
double end = now + timeout;
// Register globally which address the main thread is simulating to be waiting
// on. When zero, the main thread is not waiting on anything, and on nonzero,
// the contents of the address pointed by _emscripten_main_thread_futex tell
// which address the main thread is simulating its wait on. We need to be
// careful of recursion here: If we wait on a futex, and then call
// _emscripten_yield() below, that will call code that takes the proxying
// mutex - which can once more reach this code in a nested call. To avoid
// interference between the two (there is just a single
// _emscripten_main_thread_futex at a time), unmark ourselves before calling
// the potentially-recursive call. See below for how we handle the case of our
// futex being notified during the time in between when we are not set as the
// value of _emscripten_main_thread_futex.
void* last_addr = a_cas_p(&_emscripten_main_thread_futex, 0, (void*)addr);
// We must not have already been waiting.
assert(last_addr == 0);
while (1) {
#ifdef __EMSCRIPTEN_PTHREADS__
if (cancelable && __pthread_self()->cancel) {
return __cancel();
}
#endif
// Check for a timeout.
now = emscripten_get_now();
if (now > end) {
// We timed out, so stop marking ourselves as waiting.
last_addr = a_cas_p(&_emscripten_main_thread_futex, (void*)addr, 0);
// The current value must have been our address which we set, or
// in a race it was set to 0 which means another thread just allowed
// us to run, but (tragically) that happened just a bit too late.
assert(last_addr == addr || last_addr == 0);
return -ETIMEDOUT;
}
// We are performing a blocking loop here, so we must handle proxied
// events from pthreads, to avoid deadlocks.
// Note that we have to do so carefully, as we may take a lock while
// doing so, which can recurse into this function; stop marking
// ourselves as waiting while we do so.
last_addr = a_cas_p(&_emscripten_main_thread_futex, (void*)addr, 0);
assert(last_addr == addr || last_addr == 0);
if (last_addr == 0) {
// We were told to stop waiting, so stop.
break;
}
bool timer_fired = _emscripten_yield(now);
if (timer_fired) {
return -EINTR;
}
// Check the value, as if we were starting the futex all over again.
// This handles the following case:
//
// * wait on futex A
// * recurse into _emscripten_yield(),
// which waits on futex B. that sets the _emscripten_main_thread_futex
// address to futex B, and there is no longer any mention of futex A.
// * a worker is done with futex A. it checks _emscripten_main_thread_futex
// but does not see A, so it does nothing special for the main thread.
// * a worker is done with futex B. it flips mainThreadMutex from B
// to 0, ending the wait on futex B.
// * we return to the wait on futex A. _emscripten_main_thread_futex is 0,
// but that is because of futex B being done - we can't tell from
// _emscripten_main_thread_futex whether A is done or not. therefore,
// check the memory value of the futex.
//
// That case motivates the design here. Given that, checking the memory
// address is also necessary for other reasons: we unset and re-set our
// address in _emscripten_main_thread_futex around calls to
// _emscripten_yield(), and a worker could
// attempt to wake us up right before/after such times.
//
// Note that checking the memory value of the futex is valid to do: we
// could easily have been delayed (relative to the worker holding on
// to futex A), which means we could be starting all of our work at the
// later time when there is no need to block. The only "odd" thing is
// that we may have caused side effects in that "delay" time. But the
// only side effects we can have are to call
// _emscripten_yield(). That is always ok to
// do on the main thread (it's why it is ok for us to call it in the
// middle of this function, and elsewhere). So if we check the value
// here and return, it's the same is if what happened on the main thread
// was the same as calling _emscripten_yield()
// a few times before calling emscripten_futex_wait().
if (atomic_load((_Atomic uint32_t*)addr) != val) {
return -EWOULDBLOCK;
}
// Mark us as waiting once more, and continue the loop.
last_addr = a_cas_p(&_emscripten_main_thread_futex, 0, (void*)addr);
assert(last_addr == 0);
}
return 0;
}
static double dummy() {
return INFINITY;
}
weak_alias(dummy, _emscripten_next_timer);
static int _do_futex_wait(volatile void *addr, uint32_t val, double max_wait_ms) {
if ((((intptr_t)addr)&3) != 0) {
return -EINVAL;
}
int ret;
#ifdef __EMSCRIPTEN_PTHREADS__
pthread_t self = __pthread_self();
bool cancelable = __cancel && self->canceldisable != PTHREAD_CANCEL_DISABLE;
#else
bool cancelable = false;
#endif
// For the main browser thread and audio worklets we can't use
// __builtin_wasm_memory_atomic_wait32 so we have busy wait instead.
if (!_emscripten_thread_supports_atomics_wait()) {
return futex_wait_main_browser_thread(addr, val, max_wait_ms, cancelable);
}
DBG("emscripten_futex_wait ms=%f", max_wait_ms);
bool is_runtime_thread = emscripten_is_main_runtime_thread();
if (is_runtime_thread) {
max_wait_ms = fmin(max_wait_ms, fmax(0, _emscripten_next_timer()));
}
// -1 (or any negative number) means wait indefinitely.
int64_t max_wait_ns = ATOMICS_WAIT_DURATION_INFINITE;
if (max_wait_ms != INFINITY) {
max_wait_ns = (int64_t)(max_wait_ms * 1e6);
}
#ifdef __EMSCRIPTEN_PTHREADS__
uintptr_t expected_null = 0;
if (atomic_compare_exchange_strong(&self->wait_addr, &expected_null, (uintptr_t)addr)) {
DBG("emscripten_futex_wait atomic.wait ns=%lld", max_wait_ns);
ret = __builtin_wasm_memory_atomic_wait32((int*)addr, val, max_wait_ns);
} else {
DBG("emscripten_futex_wait skipping atomic.wait due to NOTIFY_BIT");
// CAS failed, NOTIFY_BIT must have been set. In this case we don't
// actually wait at all. Instead we behave as if we spuriously woke up
// right away.
assert(expected_null & NOTIFY_BIT);
ret = ATOMICS_WAIT_OK;
}
// Clear the wait_addr
bool notified = atomic_exchange(&self->wait_addr, 0) & NOTIFY_BIT;
// Here we are mimicking the behaviour of musl's __syscall_cp_c which wraps
// the linux futex syscall.
if (self->cancel && cancelable) {
return __cancel();
}
DBG("emscripten_futex_wait done notified=%d cancelable=%d cancel=%d", notified, cancelable, self->cancel);
// Pass 0 here, which means we don't have access to the current time in this
// function. This tells _emscripten_yield to call emscripten_get_now if (and
// only if) it needs to know the time.
bool timer_fired = _emscripten_yield(0);
if (notified || timer_fired) {
return -EINTR;
}
#else // __EMSCRIPTEN_PTHREADS__
ret = __builtin_wasm_memory_atomic_wait32((int*)addr, val, max_wait_ns);
bool timer_fired = _emscripten_yield(0);
if (timer_fired) {
return -EINTR;
}
#endif // __EMSCRIPTEN_PTHREADS__
if (ret == ATOMICS_WAIT_NOT_EQUAL) {
return -EWOULDBLOCK;
}
if (ret == ATOMICS_WAIT_TIMED_OUT) {
return -ETIMEDOUT;
}
assert(ret == ATOMICS_WAIT_OK);
return 0;
}
int emscripten_futex_wait(volatile void *addr, uint32_t val, double max_wait_ms) {
emscripten_conditional_set_current_thread_status(EM_THREAD_STATUS_RUNNING, EM_THREAD_STATUS_WAITFUTEX);
int ret = _do_futex_wait(addr, val, max_wait_ms);
emscripten_conditional_set_current_thread_status(EM_THREAD_STATUS_WAITFUTEX, EM_THREAD_STATUS_RUNNING);
return ret;
}

Xet Storage Details

Size:
9.13 kB
·
Xet hash:
037a64bf98ae3044dcc60636a34d28b5d9ad449e330215aecaf02880699972c8

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.