Spaces:
Sleeping
Sleeping
| import abc | |
| import asyncio | |
| from typing import List, Dict | |
| from ingest.utils import robots_allowed | |
| class GenericFOIAAdapter(abc.ABC): | |
| source_name: str = "UNKNOWN" | |
| base_url: str = "" | |
| live: bool = True | |
| def __init__(self, rate_limit_seconds: float = 1.5): | |
| self.rate_limit_seconds = rate_limit_seconds | |
| self._last_call = 0.0 | |
| async def _rate_limit(self): | |
| now = asyncio.get_event_loop().time() | |
| delta = now - self._last_call | |
| if delta < self.rate_limit_seconds: | |
| await asyncio.sleep(self.rate_limit_seconds - delta) | |
| self._last_call = asyncio.get_event_loop().time() | |
| def robots_allowed(self) -> bool: | |
| if not self.base_url: | |
| return False | |
| return robots_allowed(self.base_url) | |
| async def search(self, query: str) -> List[Dict]: | |
| pass |