|
|
""" |
|
|
SPARKNET Backend Client |
|
|
|
|
|
Client for connecting Streamlit Cloud to the GPU backend server (Lytos). |
|
|
Handles all API communication with the FastAPI backend. |
|
|
""" |
|
|
|
|
|
import httpx |
|
|
import streamlit as st |
|
|
from typing import Optional, Dict, Any, List, Tuple |
|
|
from dataclasses import dataclass |
|
|
import os |
|
|
|
|
|
|
|
|
def get_backend_url() -> Optional[str]: |
|
|
"""Get backend URL from secrets or environment.""" |
|
|
|
|
|
try: |
|
|
if hasattr(st, 'secrets'): |
|
|
if "BACKEND_URL" in st.secrets: |
|
|
return st.secrets["BACKEND_URL"] |
|
|
if "backend" in st.secrets and "url" in st.secrets["backend"]: |
|
|
return st.secrets["backend"]["url"] |
|
|
except: |
|
|
pass |
|
|
|
|
|
return os.environ.get("SPARKNET_BACKEND_URL") |
|
|
|
|
|
|
|
|
def is_backend_configured() -> bool: |
|
|
"""Check if backend is configured.""" |
|
|
return get_backend_url() is not None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class BackendResponse: |
|
|
"""Generic backend response wrapper.""" |
|
|
success: bool |
|
|
data: Dict[str, Any] |
|
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
class BackendClient: |
|
|
""" |
|
|
Client for SPARKNET Backend API. |
|
|
|
|
|
Provides methods to: |
|
|
- Check backend health and status |
|
|
- Process documents (OCR, layout detection) |
|
|
- Index documents to RAG |
|
|
- Query RAG system |
|
|
- Search similar chunks |
|
|
""" |
|
|
|
|
|
def __init__(self, base_url: Optional[str] = None, timeout: float = 120.0): |
|
|
self.base_url = base_url or get_backend_url() |
|
|
self.timeout = timeout |
|
|
self._client = None |
|
|
|
|
|
@property |
|
|
def is_configured(self) -> bool: |
|
|
return self.base_url is not None |
|
|
|
|
|
def _get_client(self) -> httpx.Client: |
|
|
if self._client is None: |
|
|
self._client = httpx.Client( |
|
|
base_url=self.base_url, |
|
|
timeout=self.timeout, |
|
|
) |
|
|
return self._client |
|
|
|
|
|
def close(self): |
|
|
if self._client: |
|
|
self._client.close() |
|
|
self._client = None |
|
|
|
|
|
def health_check(self) -> BackendResponse: |
|
|
"""Check if backend is healthy.""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
resp = client.get("/api/health") |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def get_status(self) -> BackendResponse: |
|
|
"""Get backend system status.""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
resp = client.get("/api/status") |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def process_document( |
|
|
self, |
|
|
file_bytes: bytes, |
|
|
filename: str, |
|
|
ocr_engine: str = "paddleocr", |
|
|
max_pages: int = 10, |
|
|
enable_layout: bool = True, |
|
|
preserve_tables: bool = True, |
|
|
) -> BackendResponse: |
|
|
""" |
|
|
Process a document using the backend. |
|
|
|
|
|
Args: |
|
|
file_bytes: Document content as bytes |
|
|
filename: Original filename |
|
|
ocr_engine: OCR engine to use (paddleocr, tesseract) |
|
|
max_pages: Maximum pages to process |
|
|
enable_layout: Enable layout detection |
|
|
preserve_tables: Preserve table structure |
|
|
|
|
|
Returns: |
|
|
BackendResponse with processing results |
|
|
""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
|
|
|
files = {"file": (filename, file_bytes)} |
|
|
data = { |
|
|
"ocr_engine": ocr_engine, |
|
|
"max_pages": str(max_pages), |
|
|
"enable_layout": str(enable_layout).lower(), |
|
|
"preserve_tables": str(preserve_tables).lower(), |
|
|
} |
|
|
|
|
|
resp = client.post("/api/process", files=files, data=data) |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def index_document( |
|
|
self, |
|
|
doc_id: str, |
|
|
text: str, |
|
|
chunks: List[Dict[str, Any]], |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
) -> BackendResponse: |
|
|
""" |
|
|
Index a document into the RAG system. |
|
|
|
|
|
Args: |
|
|
doc_id: Document identifier |
|
|
text: Full document text |
|
|
chunks: List of chunk dictionaries |
|
|
metadata: Optional metadata |
|
|
|
|
|
Returns: |
|
|
BackendResponse with indexing results |
|
|
""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
|
|
|
payload = { |
|
|
"doc_id": doc_id, |
|
|
"text": text, |
|
|
"chunks": chunks, |
|
|
"metadata": metadata or {}, |
|
|
} |
|
|
|
|
|
resp = client.post("/api/index", json=payload) |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def query( |
|
|
self, |
|
|
question: str, |
|
|
filters: Optional[Dict[str, Any]] = None, |
|
|
top_k: int = 5, |
|
|
) -> BackendResponse: |
|
|
""" |
|
|
Query the RAG system. |
|
|
|
|
|
Args: |
|
|
question: Query question |
|
|
filters: Optional filters (e.g., document_id) |
|
|
top_k: Number of results |
|
|
|
|
|
Returns: |
|
|
BackendResponse with answer and sources |
|
|
""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
|
|
|
payload = { |
|
|
"question": question, |
|
|
"filters": filters, |
|
|
"top_k": top_k, |
|
|
} |
|
|
|
|
|
resp = client.post("/api/query", json=payload) |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def search_similar( |
|
|
self, |
|
|
query: str, |
|
|
top_k: int = 5, |
|
|
doc_filter: Optional[str] = None, |
|
|
) -> BackendResponse: |
|
|
""" |
|
|
Search for similar chunks. |
|
|
|
|
|
Args: |
|
|
query: Search query |
|
|
top_k: Number of results |
|
|
doc_filter: Optional document ID filter |
|
|
|
|
|
Returns: |
|
|
BackendResponse with similar chunks |
|
|
""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
|
|
|
payload = { |
|
|
"query": query, |
|
|
"top_k": top_k, |
|
|
"doc_filter": doc_filter, |
|
|
} |
|
|
|
|
|
resp = client.post("/api/search", json=payload) |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def list_documents(self) -> BackendResponse: |
|
|
"""List all indexed documents.""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
resp = client.get("/api/documents") |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, {"documents": resp.json()}) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
def delete_document(self, doc_id: str) -> BackendResponse: |
|
|
"""Delete a document from the index.""" |
|
|
if not self.is_configured: |
|
|
return BackendResponse(False, {}, "Backend URL not configured") |
|
|
|
|
|
try: |
|
|
client = self._get_client() |
|
|
resp = client.delete(f"/api/documents/{doc_id}") |
|
|
resp.raise_for_status() |
|
|
return BackendResponse(True, resp.json()) |
|
|
except Exception as e: |
|
|
return BackendResponse(False, {}, str(e)) |
|
|
|
|
|
|
|
|
|
|
|
_backend_client: Optional[BackendClient] = None |
|
|
|
|
|
|
|
|
def get_backend_client() -> BackendClient: |
|
|
"""Get or create the backend client.""" |
|
|
global _backend_client |
|
|
if _backend_client is None: |
|
|
_backend_client = BackendClient() |
|
|
return _backend_client |
|
|
|
|
|
|
|
|
def check_backend_available() -> Tuple[bool, Dict[str, Any]]: |
|
|
""" |
|
|
Check if backend is available and return status. |
|
|
|
|
|
Returns: |
|
|
Tuple of (available, status_dict) |
|
|
""" |
|
|
client = get_backend_client() |
|
|
|
|
|
if not client.is_configured: |
|
|
return False, {"error": "Backend URL not configured"} |
|
|
|
|
|
|
|
|
health = client.health_check() |
|
|
if not health.success: |
|
|
return False, {"error": f"Backend not reachable: {health.error}"} |
|
|
|
|
|
|
|
|
status = client.get_status() |
|
|
if not status.success: |
|
|
return False, {"error": f"Failed to get status: {status.error}"} |
|
|
|
|
|
return True, status.data |
|
|
|