GodsDevProject commited on
Commit
ee29427
·
verified ·
1 Parent(s): f4a4150

Create base_adapter.py

Browse files
Files changed (1) hide show
  1. ingest/base_adapter.py +22 -0
ingest/base_adapter.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from abc import ABC, abstractmethod
3
+
4
+ class BaseAdapter(ABC):
5
+ name: str = "Unknown"
6
+ source_type: str = "stub" # "live" or "stub"
7
+ base_url: str = ""
8
+
9
+ def __init__(self, rate_limit_seconds: float = 1.0):
10
+ self._rate_limit = rate_limit_seconds
11
+ self._last_call = 0
12
+
13
+ async def _throttle(self):
14
+ now = asyncio.get_event_loop().time()
15
+ elapsed = now - self._last_call
16
+ if elapsed < self._rate_limit:
17
+ await asyncio.sleep(self._rate_limit - elapsed)
18
+ self._last_call = asyncio.get_event_loop().time()
19
+
20
+ @abstractmethod
21
+ async def search(self, query: str):
22
+ pass