# app.py """ Base Streamlit frontend structure for the Tensorus platform. Does not include direct backend API call implementations. """ import streamlit as st import json import time # import requests # No longer needed directly if API calls are removed/placeholdered import logging import torch from typing import List, Dict, Any, Optional, Union, Tuple # --- Page Configuration --- st.set_page_config( page_title="Tensorus Dashboard", page_icon="🧱", layout="wide", initial_sidebar_state="expanded" ) # --- Configure Logging (Optional but good practice) --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Integrated Tensor Utilities (Kept from app.py) --- def _validate_tensor_data(data: List[Any], shape: List[int]): """ Validates if the nested list structure of 'data' matches the 'shape'. Raises ValueError on mismatch. (Optional validation) """ if not shape: if not isinstance(data, (int, float)): raise ValueError("Scalar tensor data must be a single number.") return True if not isinstance(data, list): raise ValueError(f"Data for shape {shape} must be a list.") expected_len = shape[0] if len(data) != expected_len: raise ValueError(f"Dimension 0 mismatch: Expected {expected_len}, got {len(data)} for shape {shape}.") if len(shape) > 1: for item in data: _validate_tensor_data(item, shape[1:]) elif len(shape) == 1: if not all(isinstance(x, (int, float)) for x in data): raise ValueError("Innermost list elements must be numbers.") return True def list_to_tensor(shape: List[int], dtype_str: str, data: Union[List[Any], int, float]) -> torch.Tensor: """ Converts a Python list (potentially nested) or scalar into a PyTorch tensor with the specified shape and dtype. """ try: dtype_map = { 'float32': torch.float32, 'float': torch.float, 'float64': torch.float64, 'double': torch.double, 'int32': torch.int32, 'int': torch.int, 'int64': torch.int64, 'long': torch.long, 'bool': torch.bool } torch_dtype = dtype_map.get(dtype_str.lower()) if torch_dtype is None: raise ValueError(f"Unsupported dtype string: {dtype_str}") tensor = torch.tensor(data, dtype=torch_dtype) if list(tensor.shape) != shape: logger.debug(f"Initial tensor shape {list(tensor.shape)} differs from target {shape}. Attempting reshape.") try: tensor = tensor.reshape(shape) except RuntimeError as reshape_err: raise ValueError(f"Created tensor shape {list(tensor.shape)} != requested {shape} and reshape failed: {reshape_err}") from reshape_err return tensor except (TypeError, ValueError) as e: logger.error(f"Error converting list to tensor: {e}. Shape: {shape}, Dtype: {dtype_str}") raise ValueError(f"Failed tensor conversion: {e}") from e except Exception as e: logger.exception(f"Unexpected error during list_to_tensor: {e}") raise ValueError(f"Unexpected tensor conversion error: {e}") from e def tensor_to_list(tensor: torch.Tensor) -> Tuple[List[int], str, List[Any]]: """ Converts a PyTorch tensor back into its shape, dtype string, and nested list representation. """ if not isinstance(tensor, torch.Tensor): raise TypeError("Input must be a torch.Tensor") shape = list(tensor.shape) dtype_str = str(tensor.dtype).split('.')[-1] data = tensor.tolist() return shape, dtype_str, data # --- Placeholder UI Utilities (Implementations Removed) --- # Define the base URL (might be needed if placeholders call a dummy endpoint) # API_BASE_URL = "https://tensorus-api.hf.space" # Keep if needed, remove if not def get_api_status(): """Placeholder: Checks if the backend API is reachable.""" logger.warning("Placeholder function 'get_api_status' called. Returning dummy data.") # Simulate success for UI structure return True, {"message": "Tensorus API is reachable.", "version": "?.?.?"} # Or simulate failure: # return False, {"error": "Tensorus API connection failed."} def get_agent_status(): """Placeholder: Fetches status for all agents from the backend.""" logger.warning("Placeholder function 'get_agent_status' called. Returning dummy data.") # Return dummy structure matching expected output return { "dummy_agent_1": { "name": "Dummy Agent One", "running": False, "config": {"param": "value"}, "logs": ["Log line 1", "Log line 2"] }, "dummy_agent_2": { "name": "Dummy Agent Two", "running": True, "config": {"interval": 10}, "logs": ["Agent started", "Processing..."] } } # Or return None to simulate fetch error: # return None def start_agent(agent_id: str): """Placeholder: Sends a request to start an agent.""" logger.warning(f"Placeholder function 'start_agent' called for {agent_id}.") return {"success": True, "message": f"Placeholder: Start signal sent to {agent_id}."} def stop_agent(agent_id: str): """Placeholder: Sends a request to stop an agent.""" logger.warning(f"Placeholder function 'stop_agent' called for {agent_id}.") return {"success": True, "message": f"Placeholder: Stop signal sent to {agent_id}."} def configure_agent(agent_id: str, config: dict): """Placeholder: Sends a request to configure an agent.""" logger.warning(f"Placeholder function 'configure_agent' called for {agent_id} with config: {config}.") return {"success": True, "message": f"Placeholder: Configuration sent to {agent_id}."} def post_nql_query(query: str): """Placeholder: Sends an NQL query to the backend.""" logger.warning(f"Placeholder function 'post_nql_query' called with query: {query}") # Simulate a simple response return { "query": query, "response_text": f"Placeholder: Processed query '{query}'. Found 2 results.", "results": [ {"id": "dummy_id_1", "data": "[1, 2, 3]", "metadata": {"source": "dummy"}}, {"id": "dummy_id_2", "data": "[4, 5, 6]", "metadata": {"source": "dummy"}} ], "error": None } # Or simulate an error: # return {"query": query, "response_text": "Placeholder: Error processing query.", "error": "Syntax error"} def get_datasets(): """Placeholder: Fetches the list of available datasets.""" logger.warning("Placeholder function 'get_datasets' called.") return ["dummy_dataset_A", "dummy_dataset_B", "rl_experiences_dummy"] def get_dataset_preview(dataset_name: str, limit: int = 5): """Placeholder: Fetches preview data for a specific dataset.""" logger.warning(f"Placeholder function 'get_dataset_preview' called for {dataset_name}.") if dataset_name.startswith("dummy"): return { "dataset": dataset_name, "record_count": 100, # Dummy count "preview": [ {"record_id": f"rec_{i}", "shape": "[10]", "dtype": "float32", "metadata_preview": "{'source': 'dummy'}"} for i in range(limit) ] } else: return None # Simulate dataset not found or error def operate_explorer(dataset: str, operation: str, index: int, params: dict): """Placeholder: Sends an operation request to the data explorer.""" logger.warning(f"Placeholder function 'operate_explorer' called: ds={dataset}, op={operation}, idx={index}, params={params}") # Simulate a result based on operation result_data = f"Placeholder result for {operation} on {dataset}[{index}]" if operation == "info": result_data = {"shape": [10, 5], "dtype": "float32", "numel": 50} elif operation == "head": result_data = [[1.0, 2.0], [3.0, 4.0]] # Dummy data elif operation == "sum": result_data = 123.45 # Dummy scalar result return { "success": True, "metadata": { "operation": operation, "dataset": dataset, "index": index, "params": params, "status": "Placeholder Success" }, "result_data": result_data } # --- Initialize Session State --- # (Kept identical to app.py) if 'agent_status' not in st.session_state: st.session_state.agent_status = None if 'datasets' not in st.session_state: st.session_state.datasets = [] if 'selected_dataset' not in st.session_state: st.session_state.selected_dataset = None if 'dataset_preview' not in st.session_state: st.session_state.dataset_preview = None if 'explorer_result' not in st.session_state: st.session_state.explorer_result = None if 'nql_response' not in st.session_state: st.session_state.nql_response = None # --- Sidebar --- # (Kept identical to app.py, but uses placeholder functions) with st.sidebar: app_mode = "" st.title("Tensorus") st.markdown("---") # API Status Check (uses placeholder) st.subheader("API Status") api_ok, api_info = get_api_status() # Uses placeholder function if api_ok: st.success(f"API Status: {api_info.get('message', 'OK')}") else: st.error(f"API Connection Failed: {api_info.get('error', 'Unknown error')}") st.warning("This is a placeholder status. Implement API connection check.") # st.stop() # Don't stop in base version # st.markdown("---") # --- Main Page Content --- # (Kept identical to app.py, but uses placeholder functions) if app_mode == "Dashboard": st.title("📊 Operations Dashboard") st.warning("Live WebSocket dashboard view is best accessed directly via the backend's `/dashboard` HTML page or a dedicated JS frontend. This is a simplified view.") # You might want to remove or change this link if the backend isn't running # st.markdown(f"Access the basic live dashboard here: Backend Dashboard", unsafe_allow_html=True) st.info("This Streamlit view doesn't currently support live WebSocket updates.") elif app_mode == "Agent Control": st.title("🤖 Agent Control Panel") if st.button("Refresh Agent Status"): st.session_state.agent_status = get_agent_status() # Uses placeholder function if st.session_state.agent_status: agents = st.session_state.agent_status agent_ids = list(agents.keys()) if not agent_ids: st.warning("No agents reported (Placeholder).") else: selected_agent_id = st.selectbox("Select Agent", agent_ids) if selected_agent_id: agent_info = agents[selected_agent_id] st.subheader(f"Agent: {agent_info.get('name', selected_agent_id)}") col1, col2 = st.columns(2) with col1: st.metric("Status", "Running" if agent_info.get('running') else "Stopped") st.write("**Configuration:**") st.json(agent_info.get('config', {})) with col2: st.write("**Recent Logs:**") st.code('\n'.join(agent_info.get('logs', [])), language='log') st.write("**Actions:**") btn_col1, btn_col2, btn_col3 = st.columns(3) with btn_col1: if st.button("Start Agent", key=f"start_{selected_agent_id}", disabled=agent_info.get('running')): result = start_agent(selected_agent_id) # Uses placeholder function st.toast(result.get("message", "Request sent.")) st.session_state.agent_status = get_agent_status() # Refresh status (placeholder) st.rerun() with btn_col2: if st.button("Stop Agent", key=f"stop_{selected_agent_id}", disabled=not agent_info.get('running')): result = stop_agent(selected_agent_id) # Uses placeholder function st.toast(result.get("message", "Request sent.")) st.session_state.agent_status = get_agent_status() # Refresh status (placeholder) st.rerun() # Configure button removed as configure_agent was removed # with btn_col3: # st.write("Configure (Not Implemented in Placeholder)") else: st.info("Click 'Refresh Agent Status' to load placeholder agent information.") elif app_mode == "NQL Chat": st.title("💬 Natural Language Query (NQL)") st.info("Ask questions about the data stored in Tensorus (e.g., 'show me tensors from rl_experiences', 'count records in sample_data').") user_query = st.text_input("Enter your query:", key="nql_query_input") if st.button("Submit Query", key="nql_submit"): if user_query: with st.spinner("Processing query..."): st.session_state.nql_response = post_nql_query(user_query) # Uses placeholder function else: st.warning("Please enter a query.") if st.session_state.nql_response: resp = st.session_state.nql_response st.markdown("---") st.write(f"**Query:** {resp.get('query')}") if resp.get("error"): st.error(f"Error: {resp.get('error')}") else: st.success(f"**Response:** {resp.get('response_text')}") if resp.get("results"): st.write("**Results Preview:**") st.json(resp.get("results")) # Clear response after displaying # st.session_state.nql_response = None # Keep it displayed until next query elif app_mode == "Data Explorer": st.title("🔍 Data Explorer") if not st.session_state.datasets or st.button("Refresh Datasets"): st.session_state.datasets = get_datasets() # Uses placeholder function if not st.session_state.datasets: st.warning("No datasets found (Placeholder).") else: st.session_state.selected_dataset = st.selectbox( "Select Dataset", st.session_state.datasets, index=st.session_state.datasets.index(st.session_state.selected_dataset) if st.session_state.selected_dataset in st.session_state.datasets else 0 ) if st.session_state.selected_dataset: if st.button("Show Preview", key="preview_btn"): with st.spinner(f"Fetching preview for {st.session_state.selected_dataset}..."): st.session_state.dataset_preview = get_dataset_preview(st.session_state.selected_dataset) # Uses placeholder function if st.session_state.dataset_preview: st.subheader(f"Preview: {st.session_state.dataset_preview.get('dataset')}") st.write(f"Total Records: {st.session_state.dataset_preview.get('record_count')}") # Adjust dataframe display based on placeholder structure preview_df_data = st.session_state.dataset_preview.get('preview', []) if preview_df_data: st.dataframe(preview_df_data) else: st.write("No preview data available.") st.markdown("---") elif st.session_state.selected_dataset and 'preview_btn' in st.session_state and st.session_state.preview_btn: # If button was clicked but preview is None/empty st.warning(f"Could not fetch preview for {st.session_state.selected_dataset} (Placeholder).") st.subheader("Perform Operation") # Use dummy record count if preview failed record_count = st.session_state.dataset_preview.get('record_count', 1) if st.session_state.dataset_preview else 1 tensor_index = st.number_input("Select Tensor Index", min_value=0, max_value=max(0, record_count - 1), value=0, step=1) operations = ["info", "head", "slice", "sum", "mean", "reshape", "transpose"] selected_op = st.selectbox("Select Operation", operations) params = {} # Dynamic parameter inputs (kept identical) if selected_op == "head": params['count'] = st.number_input("Count", min_value=1, value=5, step=1) elif selected_op == "slice": params['dim'] = st.number_input("Dimension (dim)", value=0, step=1) params['start'] = st.number_input("Start Index", value=0, step=1) params['end'] = st.number_input("End Index (optional)", value=None, step=1, format="%d") params['step'] = st.number_input("Step (optional)", value=None, step=1, format="%d") elif selected_op in ["sum", "mean"]: dim_input = st.text_input("Dimension(s) (optional, e.g., 0 or 0,1)") if dim_input: try: params['dim'] = [int(x.strip()) for x in dim_input.split(',')] if ',' in dim_input else int(dim_input) except ValueError: st.warning("Invalid dimension format.") params['keepdim'] = st.checkbox("Keep Dimensions (keepdim)", value=False) elif selected_op == "reshape": shape_input = st.text_input("Target Shape (comma-separated, e.g., 2,3,5)") if shape_input: try: params['shape'] = [int(x.strip()) for x in shape_input.split(',')] except ValueError: st.warning("Invalid shape format.") elif selected_op == "transpose": params['dim0'] = st.number_input("Dimension 0", value=0, step=1) params['dim1'] = st.number_input("Dimension 1", value=1, step=1) if st.button("Run Operation", key="run_op_btn"): valid_request = True if selected_op == "reshape" and not params.get('shape'): st.error("Target Shape is required for reshape.") valid_request = False # Add other validation if needed if valid_request: with st.spinner(f"Running {selected_op} on {st.session_state.selected_dataset}[{tensor_index}]..."): st.session_state.explorer_result = operate_explorer( # Uses placeholder function st.session_state.selected_dataset, selected_op, tensor_index, params ) if st.session_state.explorer_result: st.markdown("---") st.subheader("Operation Result") st.write("**Metadata:**") st.json(st.session_state.explorer_result.get("metadata", {})) st.write("**Result Data:**") # Display result data appropriately result_data = st.session_state.explorer_result.get("result_data", "No data returned.") # Try to display as JSON, fallback to text try: st.json(result_data) except Exception: st.text(result_data) # Clear result after displaying # st.session_state.explorer_result = None # Keep displayed until next op