File size: 2,136 Bytes
f60a6c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from asyncio import get_running_loop, run, get_event_loop, Semaphore

from scorevision.utils.settings import get_settings

_SESSIONS: dict[int, ClientSession] = {}
_SEMAPHORES: dict[int, Semaphore] = {}


async def _close_all_clients_async():
    for sess in list(_SESSIONS.values()):
        try:
            if sess and not sess.closed:
                await sess.close()
        except Exception:
            pass
    _SESSIONS.clear()


def close_http_clients():
    """ """
    try:
        loop = get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        loop.create_task(_close_all_clients_async())
    else:
        run(_close_all_clients_async())


def _loop_key() -> int:
    try:
        loop = get_running_loop()
    except RuntimeError:
        loop = get_event_loop()
    return id(loop)


async def get_async_client() -> ClientSession:
    settings = get_settings()
    key = _loop_key()
    sess = _SESSIONS.get(key)
    if sess is None or sess.closed:
        sess = ClientSession(
            timeout=ClientTimeout(total=settings.SCOREVISION_API_TIMEOUT_S),
            connector=TCPConnector(
                limit=0,
                limit_per_host=0,
            ),
        )
        _SESSIONS[key] = sess
    return sess


def get_semaphore() -> Semaphore:
    settings = get_settings()
    key = _loop_key()
    sem = _SEMAPHORES.get(key)
    if sem is None:
        cap = max(1, settings.SCOREVISION_MAX_CONCURRENT_API_CALLS)
        sem = Semaphore(cap)
        _SEMAPHORES[key] = sem
    return sem


# @asynccontextmanager
# async def create_async_session():
#     settings = get_settings()
#     connector = TCPConnector(
#         limit=settings.SCOREVISION_MAX_CONCURRENT_API_CALLS * 2,
#         limit_per_host=settings.SCOREVISION_MAX_CONCURRENT_API_CALLS,
#     )
#     session = ClientSession(
#         timeout=ClientTimeout(total=settings.SCOREVISION_API_TIMEOUT_S),
#         connector=connector,
#     )
#     try:
#         yield session
#     finally:
#         await session.close()