Spaces:
Build error
Build error
Create batch_processor.py
Browse files- services/batch_processor.py +27 -0
services/batch_processor.py
ADDED
|
@@ -0,0 +1,27 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
# batch_processor.py
|
| 2 |
+
from typing import List, Dict
|
| 3 |
+
import asyncio
|
| 4 |
+
|
| 5 |
+
#TODO explain how to use the batch processor
|
| 6 |
+
class BatchProcessor:
|
| 7 |
+
def __init__(self, max_batch_size: int = 32, max_wait_time: float = 0.1):
|
| 8 |
+
self.max_batch_size = max_batch_size
|
| 9 |
+
self.max_wait_time = max_wait_time
|
| 10 |
+
self.pending_requests: List[Dict] = []
|
| 11 |
+
self.lock = asyncio.Lock()
|
| 12 |
+
|
| 13 |
+
async def add_request(self, request: Dict) -> Any:
|
| 14 |
+
async with self.lock:
|
| 15 |
+
self.pending_requests.append(request)
|
| 16 |
+
if len(self.pending_requests) >= self.max_batch_size:
|
| 17 |
+
return await self._process_batch()
|
| 18 |
+
else:
|
| 19 |
+
await asyncio.sleep(self.max_wait_time)
|
| 20 |
+
if self.pending_requests:
|
| 21 |
+
return await self._process_batch()
|
| 22 |
+
|
| 23 |
+
async def _process_batch(self) -> List[Any]:
|
| 24 |
+
batch = self.pending_requests[:self.max_batch_size]
|
| 25 |
+
self.pending_requests = self.pending_requests[self.max_batch_size:]
|
| 26 |
+
# TODO implement the batch processing logic
|
| 27 |
+
return batch
|