Spaces:
Sleeping
Sleeping
| """ | |
| Python client for the Invoice Processing Pipeline environment. | |
| Usage: | |
| from client import InvoiceEnvClient | |
| from models import InvoiceAction | |
| client = InvoiceEnvClient(base_url="http://localhost:7860") | |
| result = client.reset(task_id="easy") | |
| print(result["observation"]["raw_text"]) | |
| result = client.step({"vendor": "Acme Corp", "date": "2024-06-15", ...}) | |
| print(result["reward"]) | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Dict, Optional | |
| import httpx | |
| class InvoiceEnvClient: | |
| """Synchronous HTTP client for the Invoice Processing Pipeline.""" | |
| def __init__(self, base_url: str = "http://localhost:7860", timeout: float = 30.0): | |
| self.base_url = base_url.rstrip("/") | |
| self._client = httpx.Client(timeout=timeout) | |
| def reset(self, task_id: str = "easy") -> Dict[str, Any]: | |
| """Reset the environment for a new episode.""" | |
| resp = self._client.post(f"{self.base_url}/reset", json={"task_id": task_id}) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def step(self, extracted_data: Dict[str, Any], explanation: str = "", | |
| episode_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Submit extracted/cleaned data and get reward + feedback.""" | |
| body: Dict[str, Any] = {"extracted_data": extracted_data, "explanation": explanation} | |
| if episode_id is not None: | |
| body["episode_id"] = episode_id | |
| resp = self._client.post(f"{self.base_url}/step", json=body) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def state(self) -> Dict[str, Any]: | |
| """Get current episode state.""" | |
| resp = self._client.get(f"{self.base_url}/state") | |
| resp.raise_for_status() | |
| return resp.json() | |
| def tasks(self) -> Dict[str, Any]: | |
| """List available tasks and schemas.""" | |
| resp = self._client.get(f"{self.base_url}/tasks") | |
| resp.raise_for_status() | |
| return resp.json() | |
| def health(self) -> Dict[str, Any]: | |
| """Check server health.""" | |
| resp = self._client.get(f"{self.base_url}/health") | |
| resp.raise_for_status() | |
| return resp.json() | |
| def close(self): | |
| """Close the HTTP client.""" | |
| self._client.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| class AsyncInvoiceEnvClient: | |
| """Async HTTP client for the Invoice Processing Pipeline.""" | |
| def __init__(self, base_url: str = "http://localhost:7860", timeout: float = 30.0): | |
| self.base_url = base_url.rstrip("/") | |
| self._client = httpx.AsyncClient(timeout=timeout) | |
| async def reset(self, task_id: str = "easy") -> Dict[str, Any]: | |
| resp = await self._client.post(f"{self.base_url}/reset", json={"task_id": task_id}) | |
| resp.raise_for_status() | |
| return resp.json() | |
| async def step(self, extracted_data: Dict[str, Any], explanation: str = "", | |
| episode_id: Optional[str] = None) -> Dict[str, Any]: | |
| body: Dict[str, Any] = {"extracted_data": extracted_data, "explanation": explanation} | |
| if episode_id is not None: | |
| body["episode_id"] = episode_id | |
| resp = await self._client.post(f"{self.base_url}/step", json=body) | |
| resp.raise_for_status() | |
| return resp.json() | |
| async def state(self) -> Dict[str, Any]: | |
| resp = await self._client.get(f"{self.base_url}/state") | |
| resp.raise_for_status() | |
| return resp.json() | |
| async def close(self): | |
| await self._client.aclose() | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, *args): | |
| await self.close() |