FOIA_Doc_Search / adapters /ingest /generic_public_foia.py
GodsDevProject's picture
Create ingest/generic_public_foia.py
507d753 verified
import abc
import asyncio
import time
from typing import List, Dict
class GenericFOIAAdapter(abc.ABC):
source_name: str = "UNKNOWN"
base_url: str = ""
is_live: bool = True
def __init__(self, rate_limit_seconds: float = 1.0):
self.rate_limit_seconds = rate_limit_seconds
self._last_call = 0.0
async def _rate_limit(self):
now = time.monotonic()
delta = now - self._last_call
if delta < self.rate_limit_seconds:
await asyncio.sleep(self.rate_limit_seconds - delta)
self._last_call = time.monotonic()
@abc.abstractmethod
async def search(self, query: str) -> List[Dict]:
"""
Must return:
{
source, title, url, snippet, agency
}
"""
raise NotImplementedError