""" SPARKNET Backend Client Client for connecting Streamlit Cloud to the GPU backend server (Lytos). Handles all API communication with the FastAPI backend. """ import httpx import streamlit as st from typing import Optional, Dict, Any, List, Tuple from dataclasses import dataclass import os def get_backend_url() -> Optional[str]: """Get backend URL from secrets or environment.""" # Try Streamlit secrets first try: if hasattr(st, 'secrets'): if "BACKEND_URL" in st.secrets: return st.secrets["BACKEND_URL"] if "backend" in st.secrets and "url" in st.secrets["backend"]: return st.secrets["backend"]["url"] except: pass # Fall back to environment return os.environ.get("SPARKNET_BACKEND_URL") def is_backend_configured() -> bool: """Check if backend is configured.""" return get_backend_url() is not None @dataclass class BackendResponse: """Generic backend response wrapper.""" success: bool data: Dict[str, Any] error: Optional[str] = None class BackendClient: """ Client for SPARKNET Backend API. Provides methods to: - Check backend health and status - Process documents (OCR, layout detection) - Index documents to RAG - Query RAG system - Search similar chunks """ def __init__(self, base_url: Optional[str] = None, timeout: float = 120.0): self.base_url = base_url or get_backend_url() self.timeout = timeout self._client = None @property def is_configured(self) -> bool: return self.base_url is not None def _get_client(self) -> httpx.Client: if self._client is None: self._client = httpx.Client( base_url=self.base_url, timeout=self.timeout, ) return self._client def close(self): if self._client: self._client.close() self._client = None def health_check(self) -> BackendResponse: """Check if backend is healthy.""" if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() resp = client.get("/api/health") resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def get_status(self) -> BackendResponse: """Get backend system status.""" if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() resp = client.get("/api/status") resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def process_document( self, file_bytes: bytes, filename: str, ocr_engine: str = "paddleocr", max_pages: int = 10, enable_layout: bool = True, preserve_tables: bool = True, ) -> BackendResponse: """ Process a document using the backend. Args: file_bytes: Document content as bytes filename: Original filename ocr_engine: OCR engine to use (paddleocr, tesseract) max_pages: Maximum pages to process enable_layout: Enable layout detection preserve_tables: Preserve table structure Returns: BackendResponse with processing results """ if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() files = {"file": (filename, file_bytes)} data = { "ocr_engine": ocr_engine, "max_pages": str(max_pages), "enable_layout": str(enable_layout).lower(), "preserve_tables": str(preserve_tables).lower(), } resp = client.post("/api/process", files=files, data=data) resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def index_document( self, doc_id: str, text: str, chunks: List[Dict[str, Any]], metadata: Optional[Dict[str, Any]] = None, ) -> BackendResponse: """ Index a document into the RAG system. Args: doc_id: Document identifier text: Full document text chunks: List of chunk dictionaries metadata: Optional metadata Returns: BackendResponse with indexing results """ if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() payload = { "doc_id": doc_id, "text": text, "chunks": chunks, "metadata": metadata or {}, } resp = client.post("/api/index", json=payload) resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def query( self, question: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 5, ) -> BackendResponse: """ Query the RAG system. Args: question: Query question filters: Optional filters (e.g., document_id) top_k: Number of results Returns: BackendResponse with answer and sources """ if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() payload = { "question": question, "filters": filters, "top_k": top_k, } resp = client.post("/api/query", json=payload) resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def search_similar( self, query: str, top_k: int = 5, doc_filter: Optional[str] = None, ) -> BackendResponse: """ Search for similar chunks. Args: query: Search query top_k: Number of results doc_filter: Optional document ID filter Returns: BackendResponse with similar chunks """ if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() payload = { "query": query, "top_k": top_k, "doc_filter": doc_filter, } resp = client.post("/api/search", json=payload) resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) def list_documents(self) -> BackendResponse: """List all indexed documents.""" if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() resp = client.get("/api/documents") resp.raise_for_status() return BackendResponse(True, {"documents": resp.json()}) except Exception as e: return BackendResponse(False, {}, str(e)) def delete_document(self, doc_id: str) -> BackendResponse: """Delete a document from the index.""" if not self.is_configured: return BackendResponse(False, {}, "Backend URL not configured") try: client = self._get_client() resp = client.delete(f"/api/documents/{doc_id}") resp.raise_for_status() return BackendResponse(True, resp.json()) except Exception as e: return BackendResponse(False, {}, str(e)) # Global client instance _backend_client: Optional[BackendClient] = None def get_backend_client() -> BackendClient: """Get or create the backend client.""" global _backend_client if _backend_client is None: _backend_client = BackendClient() return _backend_client def check_backend_available() -> Tuple[bool, Dict[str, Any]]: """ Check if backend is available and return status. Returns: Tuple of (available, status_dict) """ client = get_backend_client() if not client.is_configured: return False, {"error": "Backend URL not configured"} # Health check health = client.health_check() if not health.success: return False, {"error": f"Backend not reachable: {health.error}"} # Get full status status = client.get_status() if not status.success: return False, {"error": f"Failed to get status: {status.error}"} return True, status.data