dashboard / app.py
tensorus's picture
Update app.py
9d61cc0 verified
# app.py
"""
Base Streamlit frontend structure for the Tensorus platform.
Does not include direct backend API call implementations.
"""
import streamlit as st
import json
import time
# import requests # No longer needed directly if API calls are removed/placeholdered
import logging
import torch
from typing import List, Dict, Any, Optional, Union, Tuple
# --- Page Configuration ---
st.set_page_config(
page_title="Tensorus Dashboard",
page_icon="🧱",
layout="wide",
initial_sidebar_state="expanded"
)
# --- Configure Logging (Optional but good practice) ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Integrated Tensor Utilities (Kept from app.py) ---
def _validate_tensor_data(data: List[Any], shape: List[int]):
"""
Validates if the nested list structure of 'data' matches the 'shape'.
Raises ValueError on mismatch. (Optional validation)
"""
if not shape:
if not isinstance(data, (int, float)): raise ValueError("Scalar tensor data must be a single number.")
return True
if not isinstance(data, list): raise ValueError(f"Data for shape {shape} must be a list.")
expected_len = shape[0]
if len(data) != expected_len: raise ValueError(f"Dimension 0 mismatch: Expected {expected_len}, got {len(data)} for shape {shape}.")
if len(shape) > 1:
for item in data: _validate_tensor_data(item, shape[1:])
elif len(shape) == 1:
if not all(isinstance(x, (int, float)) for x in data): raise ValueError("Innermost list elements must be numbers.")
return True
def list_to_tensor(shape: List[int], dtype_str: str, data: Union[List[Any], int, float]) -> torch.Tensor:
"""
Converts a Python list (potentially nested) or scalar into a PyTorch tensor
with the specified shape and dtype.
"""
try:
dtype_map = {
'float32': torch.float32, 'float': torch.float,
'float64': torch.float64, 'double': torch.double,
'int32': torch.int32, 'int': torch.int,
'int64': torch.int64, 'long': torch.long,
'bool': torch.bool
}
torch_dtype = dtype_map.get(dtype_str.lower())
if torch_dtype is None: raise ValueError(f"Unsupported dtype string: {dtype_str}")
tensor = torch.tensor(data, dtype=torch_dtype)
if list(tensor.shape) != shape:
logger.debug(f"Initial tensor shape {list(tensor.shape)} differs from target {shape}. Attempting reshape.")
try:
tensor = tensor.reshape(shape)
except RuntimeError as reshape_err:
raise ValueError(f"Created tensor shape {list(tensor.shape)} != requested {shape} and reshape failed: {reshape_err}") from reshape_err
return tensor
except (TypeError, ValueError) as e:
logger.error(f"Error converting list to tensor: {e}. Shape: {shape}, Dtype: {dtype_str}")
raise ValueError(f"Failed tensor conversion: {e}") from e
except Exception as e:
logger.exception(f"Unexpected error during list_to_tensor: {e}")
raise ValueError(f"Unexpected tensor conversion error: {e}") from e
def tensor_to_list(tensor: torch.Tensor) -> Tuple[List[int], str, List[Any]]:
"""
Converts a PyTorch tensor back into its shape, dtype string, and nested list representation.
"""
if not isinstance(tensor, torch.Tensor):
raise TypeError("Input must be a torch.Tensor")
shape = list(tensor.shape)
dtype_str = str(tensor.dtype).split('.')[-1]
data = tensor.tolist()
return shape, dtype_str, data
# --- Placeholder UI Utilities (Implementations Removed) ---
# Define the base URL (might be needed if placeholders call a dummy endpoint)
# API_BASE_URL = "https://tensorus-api.hf.space" # Keep if needed, remove if not
def get_api_status():
"""Placeholder: Checks if the backend API is reachable."""
logger.warning("Placeholder function 'get_api_status' called. Returning dummy data.")
# Simulate success for UI structure
return True, {"message": "Tensorus API is reachable.", "version": "?.?.?"}
# Or simulate failure:
# return False, {"error": "Tensorus API connection failed."}
def get_agent_status():
"""Placeholder: Fetches status for all agents from the backend."""
logger.warning("Placeholder function 'get_agent_status' called. Returning dummy data.")
# Return dummy structure matching expected output
return {
"dummy_agent_1": {
"name": "Dummy Agent One",
"running": False,
"config": {"param": "value"},
"logs": ["Log line 1", "Log line 2"]
},
"dummy_agent_2": {
"name": "Dummy Agent Two",
"running": True,
"config": {"interval": 10},
"logs": ["Agent started", "Processing..."]
}
}
# Or return None to simulate fetch error:
# return None
def start_agent(agent_id: str):
"""Placeholder: Sends a request to start an agent."""
logger.warning(f"Placeholder function 'start_agent' called for {agent_id}.")
return {"success": True, "message": f"Placeholder: Start signal sent to {agent_id}."}
def stop_agent(agent_id: str):
"""Placeholder: Sends a request to stop an agent."""
logger.warning(f"Placeholder function 'stop_agent' called for {agent_id}.")
return {"success": True, "message": f"Placeholder: Stop signal sent to {agent_id}."}
def configure_agent(agent_id: str, config: dict):
"""Placeholder: Sends a request to configure an agent."""
logger.warning(f"Placeholder function 'configure_agent' called for {agent_id} with config: {config}.")
return {"success": True, "message": f"Placeholder: Configuration sent to {agent_id}."}
def post_nql_query(query: str):
"""Placeholder: Sends an NQL query to the backend."""
logger.warning(f"Placeholder function 'post_nql_query' called with query: {query}")
# Simulate a simple response
return {
"query": query,
"response_text": f"Placeholder: Processed query '{query}'. Found 2 results.",
"results": [
{"id": "dummy_id_1", "data": "[1, 2, 3]", "metadata": {"source": "dummy"}},
{"id": "dummy_id_2", "data": "[4, 5, 6]", "metadata": {"source": "dummy"}}
],
"error": None
}
# Or simulate an error:
# return {"query": query, "response_text": "Placeholder: Error processing query.", "error": "Syntax error"}
def get_datasets():
"""Placeholder: Fetches the list of available datasets."""
logger.warning("Placeholder function 'get_datasets' called.")
return ["dummy_dataset_A", "dummy_dataset_B", "rl_experiences_dummy"]
def get_dataset_preview(dataset_name: str, limit: int = 5):
"""Placeholder: Fetches preview data for a specific dataset."""
logger.warning(f"Placeholder function 'get_dataset_preview' called for {dataset_name}.")
if dataset_name.startswith("dummy"):
return {
"dataset": dataset_name,
"record_count": 100, # Dummy count
"preview": [
{"record_id": f"rec_{i}", "shape": "[10]", "dtype": "float32", "metadata_preview": "{'source': 'dummy'}"}
for i in range(limit)
]
}
else:
return None # Simulate dataset not found or error
def operate_explorer(dataset: str, operation: str, index: int, params: dict):
"""Placeholder: Sends an operation request to the data explorer."""
logger.warning(f"Placeholder function 'operate_explorer' called: ds={dataset}, op={operation}, idx={index}, params={params}")
# Simulate a result based on operation
result_data = f"Placeholder result for {operation} on {dataset}[{index}]"
if operation == "info":
result_data = {"shape": [10, 5], "dtype": "float32", "numel": 50}
elif operation == "head":
result_data = [[1.0, 2.0], [3.0, 4.0]] # Dummy data
elif operation == "sum":
result_data = 123.45 # Dummy scalar result
return {
"success": True,
"metadata": {
"operation": operation,
"dataset": dataset,
"index": index,
"params": params,
"status": "Placeholder Success"
},
"result_data": result_data
}
# --- Initialize Session State ---
# (Kept identical to app.py)
if 'agent_status' not in st.session_state:
st.session_state.agent_status = None
if 'datasets' not in st.session_state:
st.session_state.datasets = []
if 'selected_dataset' not in st.session_state:
st.session_state.selected_dataset = None
if 'dataset_preview' not in st.session_state:
st.session_state.dataset_preview = None
if 'explorer_result' not in st.session_state:
st.session_state.explorer_result = None
if 'nql_response' not in st.session_state:
st.session_state.nql_response = None
# --- Sidebar ---
# (Kept identical to app.py, but uses placeholder functions)
with st.sidebar:
app_mode = ""
st.title("Tensorus")
st.markdown("---")
# API Status Check (uses placeholder)
st.subheader("API Status")
api_ok, api_info = get_api_status() # Uses placeholder function
if api_ok:
st.success(f"API Status: {api_info.get('message', 'OK')}")
else:
st.error(f"API Connection Failed: {api_info.get('error', 'Unknown error')}")
st.warning("This is a placeholder status. Implement API connection check.")
# st.stop() # Don't stop in base version
# st.markdown("---")
# --- Main Page Content ---
# (Kept identical to app.py, but uses placeholder functions)
if app_mode == "Dashboard":
st.title("📊 Operations Dashboard")
st.warning("Live WebSocket dashboard view is best accessed directly via the backend's `/dashboard` HTML page or a dedicated JS frontend. This is a simplified view.")
# You might want to remove or change this link if the backend isn't running
# st.markdown(f"Access the basic live dashboard here: <a href='{API_BASE_URL}/dashboard' target='_blank'>Backend Dashboard</a>", unsafe_allow_html=True)
st.info("This Streamlit view doesn't currently support live WebSocket updates.")
elif app_mode == "Agent Control":
st.title("🤖 Agent Control Panel")
if st.button("Refresh Agent Status"):
st.session_state.agent_status = get_agent_status() # Uses placeholder function
if st.session_state.agent_status:
agents = st.session_state.agent_status
agent_ids = list(agents.keys())
if not agent_ids:
st.warning("No agents reported (Placeholder).")
else:
selected_agent_id = st.selectbox("Select Agent", agent_ids)
if selected_agent_id:
agent_info = agents[selected_agent_id]
st.subheader(f"Agent: {agent_info.get('name', selected_agent_id)}")
col1, col2 = st.columns(2)
with col1:
st.metric("Status", "Running" if agent_info.get('running') else "Stopped")
st.write("**Configuration:**")
st.json(agent_info.get('config', {}))
with col2:
st.write("**Recent Logs:**")
st.code('\n'.join(agent_info.get('logs', [])), language='log')
st.write("**Actions:**")
btn_col1, btn_col2, btn_col3 = st.columns(3)
with btn_col1:
if st.button("Start Agent", key=f"start_{selected_agent_id}", disabled=agent_info.get('running')):
result = start_agent(selected_agent_id) # Uses placeholder function
st.toast(result.get("message", "Request sent."))
st.session_state.agent_status = get_agent_status() # Refresh status (placeholder)
st.rerun()
with btn_col2:
if st.button("Stop Agent", key=f"stop_{selected_agent_id}", disabled=not agent_info.get('running')):
result = stop_agent(selected_agent_id) # Uses placeholder function
st.toast(result.get("message", "Request sent."))
st.session_state.agent_status = get_agent_status() # Refresh status (placeholder)
st.rerun()
# Configure button removed as configure_agent was removed
# with btn_col3:
# st.write("Configure (Not Implemented in Placeholder)")
else:
st.info("Click 'Refresh Agent Status' to load placeholder agent information.")
elif app_mode == "NQL Chat":
st.title("💬 Natural Language Query (NQL)")
st.info("Ask questions about the data stored in Tensorus (e.g., 'show me tensors from rl_experiences', 'count records in sample_data').")
user_query = st.text_input("Enter your query:", key="nql_query_input")
if st.button("Submit Query", key="nql_submit"):
if user_query:
with st.spinner("Processing query..."):
st.session_state.nql_response = post_nql_query(user_query) # Uses placeholder function
else:
st.warning("Please enter a query.")
if st.session_state.nql_response:
resp = st.session_state.nql_response
st.markdown("---")
st.write(f"**Query:** {resp.get('query')}")
if resp.get("error"):
st.error(f"Error: {resp.get('error')}")
else:
st.success(f"**Response:** {resp.get('response_text')}")
if resp.get("results"):
st.write("**Results Preview:**")
st.json(resp.get("results"))
# Clear response after displaying
# st.session_state.nql_response = None # Keep it displayed until next query
elif app_mode == "Data Explorer":
st.title("🔍 Data Explorer")
if not st.session_state.datasets or st.button("Refresh Datasets"):
st.session_state.datasets = get_datasets() # Uses placeholder function
if not st.session_state.datasets:
st.warning("No datasets found (Placeholder).")
else:
st.session_state.selected_dataset = st.selectbox(
"Select Dataset",
st.session_state.datasets,
index=st.session_state.datasets.index(st.session_state.selected_dataset) if st.session_state.selected_dataset in st.session_state.datasets else 0
)
if st.session_state.selected_dataset:
if st.button("Show Preview", key="preview_btn"):
with st.spinner(f"Fetching preview for {st.session_state.selected_dataset}..."):
st.session_state.dataset_preview = get_dataset_preview(st.session_state.selected_dataset) # Uses placeholder function
if st.session_state.dataset_preview:
st.subheader(f"Preview: {st.session_state.dataset_preview.get('dataset')}")
st.write(f"Total Records: {st.session_state.dataset_preview.get('record_count')}")
# Adjust dataframe display based on placeholder structure
preview_df_data = st.session_state.dataset_preview.get('preview', [])
if preview_df_data:
st.dataframe(preview_df_data)
else:
st.write("No preview data available.")
st.markdown("---")
elif st.session_state.selected_dataset and 'preview_btn' in st.session_state and st.session_state.preview_btn:
# If button was clicked but preview is None/empty
st.warning(f"Could not fetch preview for {st.session_state.selected_dataset} (Placeholder).")
st.subheader("Perform Operation")
# Use dummy record count if preview failed
record_count = st.session_state.dataset_preview.get('record_count', 1) if st.session_state.dataset_preview else 1
tensor_index = st.number_input("Select Tensor Index", min_value=0, max_value=max(0, record_count - 1), value=0, step=1)
operations = ["info", "head", "slice", "sum", "mean", "reshape", "transpose"]
selected_op = st.selectbox("Select Operation", operations)
params = {}
# Dynamic parameter inputs (kept identical)
if selected_op == "head":
params['count'] = st.number_input("Count", min_value=1, value=5, step=1)
elif selected_op == "slice":
params['dim'] = st.number_input("Dimension (dim)", value=0, step=1)
params['start'] = st.number_input("Start Index", value=0, step=1)
params['end'] = st.number_input("End Index (optional)", value=None, step=1, format="%d")
params['step'] = st.number_input("Step (optional)", value=None, step=1, format="%d")
elif selected_op in ["sum", "mean"]:
dim_input = st.text_input("Dimension(s) (optional, e.g., 0 or 0,1)")
if dim_input:
try: params['dim'] = [int(x.strip()) for x in dim_input.split(',')] if ',' in dim_input else int(dim_input)
except ValueError: st.warning("Invalid dimension format.")
params['keepdim'] = st.checkbox("Keep Dimensions (keepdim)", value=False)
elif selected_op == "reshape":
shape_input = st.text_input("Target Shape (comma-separated, e.g., 2,3,5)")
if shape_input:
try: params['shape'] = [int(x.strip()) for x in shape_input.split(',')]
except ValueError: st.warning("Invalid shape format.")
elif selected_op == "transpose":
params['dim0'] = st.number_input("Dimension 0", value=0, step=1)
params['dim1'] = st.number_input("Dimension 1", value=1, step=1)
if st.button("Run Operation", key="run_op_btn"):
valid_request = True
if selected_op == "reshape" and not params.get('shape'):
st.error("Target Shape is required for reshape.")
valid_request = False
# Add other validation if needed
if valid_request:
with st.spinner(f"Running {selected_op} on {st.session_state.selected_dataset}[{tensor_index}]..."):
st.session_state.explorer_result = operate_explorer( # Uses placeholder function
st.session_state.selected_dataset,
selected_op,
tensor_index,
params
)
if st.session_state.explorer_result:
st.markdown("---")
st.subheader("Operation Result")
st.write("**Metadata:**")
st.json(st.session_state.explorer_result.get("metadata", {}))
st.write("**Result Data:**")
# Display result data appropriately
result_data = st.session_state.explorer_result.get("result_data", "No data returned.")
# Try to display as JSON, fallback to text
try:
st.json(result_data)
except Exception:
st.text(result_data)
# Clear result after displaying
# st.session_state.explorer_result = None # Keep displayed until next op