download
raw
1.14 kB
import os
from asyncio import get_running_loop
from functools import wraps
from django.core.exceptions import SynchronousOnlyOperation
def async_unsafe(message):
"""
Decorator to mark functions as async-unsafe. Someone trying to access
the function while in an async context will get an error message.
"""
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
# Detect a running event loop in this thread.
try:
get_running_loop()
except RuntimeError:
pass
else:
if not os.environ.get("DJANGO_ALLOW_ASYNC_UNSAFE"):
raise SynchronousOnlyOperation(message)
# Pass onward.
return func(*args, **kwargs)
return inner
# If the message is actually a function, then be a no-arguments decorator.
if callable(message):
func = message
message = (
"You cannot call this from an async context - use a thread or "
"sync_to_async."
)
return decorator(func)
else:
return decorator

Xet Storage Details

Size:
1.14 kB
·
Xet hash:
52a07c1d78d0414b233962569dbf02edb18d353b42688bcd1a80da7122645278

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.