Spaces:
Sleeping
Sleeping
| import abc | |
| import asyncio | |
| import time | |
| from typing import List, Dict | |
| from ingest.robots import allowed | |
| from ingest.health import HealthStatus | |
| class GenericFOIAAdapter(abc.ABC): | |
| source_name: str = "UNKNOWN" | |
| base_url: str = "" | |
| source_type: str = "stub" # stub | live | |
| def __init__(self, rate_limit_seconds: float = 1.0): | |
| self.rate_limit_seconds = rate_limit_seconds | |
| self._last_call = 0.0 | |
| self.last_health: HealthStatus | None = None | |
| async def _rate_limit(self): | |
| now = asyncio.get_event_loop().time() | |
| delta = now - self._last_call | |
| if delta < self.rate_limit_seconds: | |
| await asyncio.sleep(self.rate_limit_seconds - delta) | |
| self._last_call = asyncio.get_event_loop().time() | |
| async def _guard(self, url: str) -> bool: | |
| return await allowed(url) | |
| async def _stub_result(self, query: str) -> List[Dict]: | |
| await self._rate_limit() | |
| self.last_health = HealthStatus(ok=True, latency_ms=0) | |
| return [{ | |
| "source": self.source_name, | |
| "title": f"{self.source_name} FOIA result for '{query}'", | |
| "url": self.base_url, | |
| "snippet": "Public FOIA reading room placeholder result (stub).", | |
| "live": False, | |
| "extended": False, | |
| "health": self.last_health.__dict__ | |
| }] | |
| async def search(self, query: str) -> List[Dict]: | |
| raise NotImplementedError |