File size: 9,342 Bytes
4718630 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
"""
SPARKNET Backend Client
Client for connecting Streamlit Cloud to the GPU backend server (Lytos).
Handles all API communication with the FastAPI backend.
"""
import httpx
import streamlit as st
from typing import Optional, Dict, Any, List, Tuple
from dataclasses import dataclass
import os
def get_backend_url() -> Optional[str]:
"""Get backend URL from secrets or environment."""
# Try Streamlit secrets first
try:
if hasattr(st, 'secrets'):
if "BACKEND_URL" in st.secrets:
return st.secrets["BACKEND_URL"]
if "backend" in st.secrets and "url" in st.secrets["backend"]:
return st.secrets["backend"]["url"]
except:
pass
# Fall back to environment
return os.environ.get("SPARKNET_BACKEND_URL")
def is_backend_configured() -> bool:
"""Check if backend is configured."""
return get_backend_url() is not None
@dataclass
class BackendResponse:
"""Generic backend response wrapper."""
success: bool
data: Dict[str, Any]
error: Optional[str] = None
class BackendClient:
"""
Client for SPARKNET Backend API.
Provides methods to:
- Check backend health and status
- Process documents (OCR, layout detection)
- Index documents to RAG
- Query RAG system
- Search similar chunks
"""
def __init__(self, base_url: Optional[str] = None, timeout: float = 120.0):
self.base_url = base_url or get_backend_url()
self.timeout = timeout
self._client = None
@property
def is_configured(self) -> bool:
return self.base_url is not None
def _get_client(self) -> httpx.Client:
if self._client is None:
self._client = httpx.Client(
base_url=self.base_url,
timeout=self.timeout,
)
return self._client
def close(self):
if self._client:
self._client.close()
self._client = None
def health_check(self) -> BackendResponse:
"""Check if backend is healthy."""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
resp = client.get("/api/health")
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def get_status(self) -> BackendResponse:
"""Get backend system status."""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
resp = client.get("/api/status")
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def process_document(
self,
file_bytes: bytes,
filename: str,
ocr_engine: str = "paddleocr",
max_pages: int = 10,
enable_layout: bool = True,
preserve_tables: bool = True,
) -> BackendResponse:
"""
Process a document using the backend.
Args:
file_bytes: Document content as bytes
filename: Original filename
ocr_engine: OCR engine to use (paddleocr, tesseract)
max_pages: Maximum pages to process
enable_layout: Enable layout detection
preserve_tables: Preserve table structure
Returns:
BackendResponse with processing results
"""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
files = {"file": (filename, file_bytes)}
data = {
"ocr_engine": ocr_engine,
"max_pages": str(max_pages),
"enable_layout": str(enable_layout).lower(),
"preserve_tables": str(preserve_tables).lower(),
}
resp = client.post("/api/process", files=files, data=data)
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def index_document(
self,
doc_id: str,
text: str,
chunks: List[Dict[str, Any]],
metadata: Optional[Dict[str, Any]] = None,
) -> BackendResponse:
"""
Index a document into the RAG system.
Args:
doc_id: Document identifier
text: Full document text
chunks: List of chunk dictionaries
metadata: Optional metadata
Returns:
BackendResponse with indexing results
"""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
payload = {
"doc_id": doc_id,
"text": text,
"chunks": chunks,
"metadata": metadata or {},
}
resp = client.post("/api/index", json=payload)
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def query(
self,
question: str,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 5,
) -> BackendResponse:
"""
Query the RAG system.
Args:
question: Query question
filters: Optional filters (e.g., document_id)
top_k: Number of results
Returns:
BackendResponse with answer and sources
"""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
payload = {
"question": question,
"filters": filters,
"top_k": top_k,
}
resp = client.post("/api/query", json=payload)
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def search_similar(
self,
query: str,
top_k: int = 5,
doc_filter: Optional[str] = None,
) -> BackendResponse:
"""
Search for similar chunks.
Args:
query: Search query
top_k: Number of results
doc_filter: Optional document ID filter
Returns:
BackendResponse with similar chunks
"""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
payload = {
"query": query,
"top_k": top_k,
"doc_filter": doc_filter,
}
resp = client.post("/api/search", json=payload)
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
def list_documents(self) -> BackendResponse:
"""List all indexed documents."""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
resp = client.get("/api/documents")
resp.raise_for_status()
return BackendResponse(True, {"documents": resp.json()})
except Exception as e:
return BackendResponse(False, {}, str(e))
def delete_document(self, doc_id: str) -> BackendResponse:
"""Delete a document from the index."""
if not self.is_configured:
return BackendResponse(False, {}, "Backend URL not configured")
try:
client = self._get_client()
resp = client.delete(f"/api/documents/{doc_id}")
resp.raise_for_status()
return BackendResponse(True, resp.json())
except Exception as e:
return BackendResponse(False, {}, str(e))
# Global client instance
_backend_client: Optional[BackendClient] = None
def get_backend_client() -> BackendClient:
"""Get or create the backend client."""
global _backend_client
if _backend_client is None:
_backend_client = BackendClient()
return _backend_client
def check_backend_available() -> Tuple[bool, Dict[str, Any]]:
"""
Check if backend is available and return status.
Returns:
Tuple of (available, status_dict)
"""
client = get_backend_client()
if not client.is_configured:
return False, {"error": "Backend URL not configured"}
# Health check
health = client.health_check()
if not health.success:
return False, {"error": f"Backend not reachable: {health.error}"}
# Get full status
status = client.get_status()
if not status.success:
return False, {"error": f"Failed to get status: {status.error}"}
return True, status.data
|