| |
|
|
| import logging |
| from typing import List, Dict, Any, Optional, Tuple, Union |
| import random |
| import time |
| import math |
| import asyncio |
|
|
| import torch |
| import uvicorn |
| |
| from fastapi import FastAPI, HTTPException, Body, Depends, Path, status, Query |
| from pydantic import BaseModel, Field |
|
|
| |
| try: |
| from tensor_storage import TensorStorage |
| from nql_agent import NQLAgent |
| |
| |
| |
| |
| except ImportError as e: |
| print(f"ERROR: Could not import Tensorus modules (TensorStorage, NQLAgent): {e}") |
| print("Please ensure tensor_storage.py and nql_agent.py are in the Python path.") |
| |
| raise |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| |
| |
| tensor_storage_instance = TensorStorage() |
| nql_agent_instance = NQLAgent(tensor_storage_instance) |
| logger.info("Tensorus components (TensorStorage, NQLAgent) initialized successfully.") |
| |
| |
| |
| except Exception as e: |
| logger.exception(f"Failed to initialize Tensorus components: {e}") |
| |
| raise RuntimeError(f"Tensorus initialization failed: {e}") from e |
|
|
| |
| |
| |
| agent_registry = { |
| "ingestion": { |
| "name": "Data Ingestion", |
| "description": "Monitors sources and ingests data into datasets.", |
| "status": "stopped", |
| "config": {"source_directory": "temp_ingestion_source", "polling_interval_sec": 10}, |
| "last_log_timestamp": None, |
| |
| }, |
| "rl_trainer": { |
| "name": "RL Trainer", |
| "description": "Trains reinforcement learning models using stored experiences.", |
| "status": "stopped", |
| "config": {"experience_dataset": "rl_experiences", "batch_size": 128, "target_update_freq": 500}, |
| "last_log_timestamp": None, |
| "sim_steps": 0, |
| }, |
| "automl_search": { |
| "name": "AutoML Search", |
| "description": "Performs hyperparameter optimization.", |
| "status": "stopped", |
| "config": {"trials": 50, "results_dataset": "automl_results", "task_type": "regression"}, |
| "last_log_timestamp": None, |
| "sim_trials": 0, |
| "sim_best_score": None, |
| }, |
| |
| "nql_query": { |
| "name": "NQL Query Service", |
| "description": "Processes natural language queries.", |
| "status": "running", |
| "config": {"parser_type": "regex"}, |
| "last_log_timestamp": None, |
| }, |
| } |
|
|
| def _simulate_agent_log(agent_id: str) -> str: |
| """Generates a simulated log line.""" |
| |
| if agent_id not in agent_registry: |
| logger.warning(f"Attempted to simulate log for unknown agent_id: {agent_id}") |
| return f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} [ERROR] (Unknown Agent: {agent_id}) Log simulation failed." |
|
|
| log_levels = ["INFO", "DEBUG", "WARNING", "ERROR"] |
| messages = [ |
| "Processing item batch", "Training epoch completed", "Search trial finished", |
| "Connection error detected", "Optimization step", "Query received", |
| "Target network synced", "Disk space low", "Operation successful", "Agent starting up", |
| "Configuration loaded", "Model evaluated", "Experience stored", "Found new file" |
| ] |
| ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
| level = random.choice(log_levels) |
| msg = random.choice(messages) |
| |
| agent_name = agent_registry.get(agent_id, {}).get("name", agent_id) |
| return f"{ts} [{level}] ({agent_name}) {msg}" |
|
|
|
|
| |
| def _validate_tensor_data(data: List[Any], shape: List[int]): |
| """Validates nested list structure against a given shape (recursive).""" |
| if not shape: |
| if not isinstance(data, (int, float)): |
| raise ValueError("Scalar tensor data must be a single number (int or float).") |
| return True |
| if not isinstance(data, list): |
| |
| raise ValueError(f"Data for shape {shape} must be a list, but got {type(data).__name__}.") |
|
|
| expected_len = shape[0] |
| if len(data) != expected_len: |
| raise ValueError(f"Dimension 0 mismatch for shape {shape}: Expected length {expected_len}, got {len(data)}.") |
|
|
| if len(shape) > 1: |
| for item in data: |
| _validate_tensor_data(item, shape[1:]) |
| elif len(shape) == 1: |
| if not all(isinstance(x, (int, float)) for x in data): |
| |
| first_bad_type = next((type(x).__name__ for x in data if not isinstance(x, (int, float))), "unknown") |
| raise ValueError(f"Innermost list elements must be numbers (int or float), found type '{first_bad_type}'.") |
| return True |
|
|
| def list_to_tensor(shape: List[int], dtype_str: str, data: Union[List[Any], int, float]) -> torch.Tensor: |
| """Converts a nested list or scalar to a PyTorch tensor with validation.""" |
| try: |
| dtype_map = { |
| 'float32': torch.float32, 'float': torch.float, |
| 'float64': torch.float64, 'double': torch.double, |
| 'int32': torch.int32, 'int': torch.int, |
| 'int64': torch.int64, 'long': torch.long, |
| 'bool': torch.bool |
| } |
| torch_dtype = dtype_map.get(dtype_str.lower()) |
| if torch_dtype is None: |
| raise ValueError(f"Unsupported dtype string: '{dtype_str}'. Supported: {list(dtype_map.keys())}") |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| tensor = torch.tensor(data, dtype=torch_dtype) |
|
|
| |
| if list(tensor.shape) != shape: |
| logger.warning(f"Created tensor shape {list(tensor.shape)} differs from requested {shape}. Attempting reshape.") |
| try: |
| tensor = tensor.reshape(shape) |
| logger.info(f"Reshape successful to {shape}.") |
| except RuntimeError as reshape_err: |
| |
| logger.error(f"Reshape failed: {reshape_err}") |
| |
| raise ValueError(f"Created tensor shape {list(tensor.shape)} != requested {shape} and reshape failed: {reshape_err}") from reshape_err |
| return tensor |
| except (TypeError, ValueError) as e: |
| |
| logger.error(f"Error converting list to tensor: {e}. Shape: {shape}, Dtype: {dtype_str}, Data type: {type(data).__name__}") |
| raise ValueError(f"Failed tensor conversion: {e}") from e |
| except Exception as e: |
| |
| logger.exception(f"Unexpected error during list_to_tensor: {e}", exc_info=True) |
| raise ValueError(f"Unexpected tensor conversion error: {e}") from e |
|
|
| def tensor_to_list(tensor: torch.Tensor) -> Tuple[List[int], str, Union[List[Any], int, float]]: |
| """Converts a PyTorch tensor back to shape, dtype string, and nested list/scalar.""" |
| if not isinstance(tensor, torch.Tensor): |
| |
| raise TypeError(f"Input must be a PyTorch Tensor, got {type(tensor).__name__}") |
| shape = list(tensor.shape) |
| |
| dtype_str = str(tensor.dtype).replace('torch.', '') |
| |
| if tensor.ndim == 0: |
| data = tensor.item() |
| else: |
| data = tensor.tolist() |
| return shape, dtype_str, data |
|
|
| |
| class DatasetCreateRequest(BaseModel): |
| name: str = Field(..., description="Unique name for the new dataset.", example="my_image_dataset") |
|
|
| class TensorInput(BaseModel): |
| shape: List[int] = Field(..., description="Shape of the tensor (e.g., [height, width, channels]).", example=[2, 3]) |
| dtype: str = Field(..., description="Data type (e.g., 'float32', 'int64', 'bool').", example="float32") |
| data: Union[List[Any], int, float] = Field(..., description="Tensor data as a nested list for multi-dim tensors, or a single number for scalars.", example=[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) |
| metadata: Optional[Dict[str, Any]] = Field(None, description="Optional key-value metadata.", example={"source": "api_ingest", "timestamp": 1678886400}) |
|
|
| class NQLQueryRequest(BaseModel): |
| query: str = Field(..., description="Natural language query string.", example="find image tensors from 'my_image_dataset' where metadata.source = 'web_scrape'") |
|
|
| class TensorOutput(BaseModel): |
| record_id: str = Field(..., description="Unique record ID assigned during ingestion.") |
| shape: List[int] = Field(..., description="Shape of the retrieved tensor.") |
| dtype: str = Field(..., description="Data type of the retrieved tensor.") |
| data: Union[List[Any], int, float] = Field(..., description="Tensor data (nested list or scalar).") |
| metadata: Dict[str, Any] = Field(..., description="Associated metadata.") |
|
|
| class NQLResponse(BaseModel): |
| success: bool = Field(..., description="Indicates if the query was successfully processed (syntax, execution).") |
| message: str = Field(..., description="Status message (e.g., 'Query successful', 'Error parsing query').") |
| count: Optional[int] = Field(None, description="Number of matching records found.") |
| results: Optional[List[TensorOutput]] = Field(None, description="List of matching tensor records.") |
|
|
| class ApiResponse(BaseModel): |
| success: bool = Field(..., description="Indicates if the API operation was successful.") |
| message: str = Field(..., description="A descriptive status message.") |
| data: Optional[Any] = Field(None, description="Optional data payload relevant to the operation (e.g., record_id, list of names).") |
|
|
| |
| class AgentInfo(BaseModel): |
| id: str = Field(..., description="Unique identifier for the agent (e.g., 'ingestion', 'rl_trainer').") |
| name: str = Field(..., description="User-friendly display name of the agent.") |
| description: str = Field(..., description="Brief description of the agent's purpose.") |
| status: str = Field(..., description="Current operational status (e.g., running, stopped, error, starting, stopping).") |
| config: Dict[str, Any] = Field(..., description="Current configuration parameters the agent is using.") |
|
|
| class AgentStatus(AgentInfo): |
| |
| last_log_timestamp: Optional[float] = Field(None, description="Unix timestamp of the last known log message received or generated for this agent.") |
|
|
| class AgentLogResponse(BaseModel): |
| logs: List[str] = Field(..., description="List of recent log entries for the agent.") |
|
|
| |
| class DashboardMetrics(BaseModel): |
| timestamp: float = Field(..., description="Unix timestamp when the metrics were generated (UTC).") |
| dataset_count: int = Field(..., description="Total number of datasets currently managed.") |
| total_records_est: int = Field(..., description="Estimated total number of tensor records across all datasets (Simulated).") |
| agent_status_summary: Dict[str, int] = Field(..., description="Summary count of agents grouped by their status.") |
| data_ingestion_rate: float = Field(..., description="Simulated data ingestion rate (records/sec).") |
| avg_query_latency_ms: float = Field(..., description="Simulated average NQL query processing latency (ms).") |
| rl_latest_reward: Optional[float] = Field(None, description="Simulated latest reward value obtained by the RL trainer.") |
| rl_total_steps: int = Field(..., description="Simulated total training steps taken by the RL trainer.") |
| automl_best_score: Optional[float] = Field(None, description="Simulated best score found by the AutoML search so far.") |
| automl_trials_completed: int = Field(..., description="Simulated number of AutoML trials completed.") |
| system_cpu_usage_percent: float = Field(..., description="Simulated overall system CPU usage percentage.") |
| system_memory_usage_percent: float = Field(..., description="Simulated overall system memory usage percentage.") |
|
|
| |
| app = FastAPI( |
| title="Tensorus API", |
| description="API for interacting with the Tensorus Agentic Tensor Database/Data Lake. Includes dataset management, NQL querying, and agent control placeholders.", |
| version="0.0.1", |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ) |
|
|
| |
| async def get_tensor_storage() -> TensorStorage: |
| """Dependency function to get the global TensorStorage instance.""" |
| |
| return tensor_storage_instance |
|
|
| async def get_nql_agent() -> NQLAgent: |
| """Dependency function to get the global NQLAgent instance.""" |
| return nql_agent_instance |
|
|
|
|
| |
|
|
| |
| @app.post("/datasets/create", response_model=ApiResponse, status_code=status.HTTP_201_CREATED, tags=["Datasets"]) |
| async def create_dataset(req: DatasetCreateRequest, storage: TensorStorage = Depends(get_tensor_storage)): |
| """ |
| Creates a new, empty dataset with the specified unique name. |
| |
| - **req**: Request body containing the dataset name. |
| - **storage**: Injected TensorStorage instance. |
| \f |
| Raises HTTPException: |
| - 409 Conflict: If a dataset with the same name already exists. |
| - 500 Internal Server Error: For unexpected errors during creation. |
| """ |
| try: |
| |
| storage.create_dataset(req.name) |
| logger.info(f"Dataset '{req.name}' created successfully.") |
| return ApiResponse(success=True, message=f"Dataset '{req.name}' created successfully.") |
| except ValueError as e: |
| |
| logger.warning(f"Attempted to create existing dataset '{req.name}': {e}") |
| raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) |
| except Exception as e: |
| |
| logger.exception(f"Unexpected error creating dataset '{req.name}': {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while creating dataset.") |
|
|
| @app.post("/datasets/{name}/ingest", response_model=ApiResponse, status_code=status.HTTP_201_CREATED, tags=["Data Ingestion"]) |
| async def ingest_tensor( |
| name: str = Path(..., description="The name of the target dataset for ingestion."), |
| tensor_input: TensorInput = Body(..., description="The tensor data and metadata to ingest."), |
| storage: TensorStorage = Depends(get_tensor_storage) |
| ): |
| """ |
| Ingests a single tensor (provided in JSON format) into the specified dataset. |
| |
| - **name**: Path parameter for the target dataset name. |
| - **tensor_input**: Request body containing shape, dtype, data, and optional metadata. |
| - **storage**: Injected TensorStorage instance. |
| \f |
| Raises HTTPException: |
| - 400 Bad Request: If tensor data is invalid, shape/dtype mismatch, or other validation errors occur. |
| - 404 Not Found: If the specified dataset name does not exist. |
| - 500 Internal Server Error: For unexpected storage or processing errors. |
| """ |
| try: |
| |
| tensor = list_to_tensor(tensor_input.shape, tensor_input.dtype, tensor_input.data) |
|
|
| |
| |
| record_id = storage.insert(name, tensor, tensor_input.metadata) |
|
|
| logger.info(f"Ingested tensor into dataset '{name}' with record_id: {record_id}") |
| return ApiResponse(success=True, message="Tensor ingested successfully.", data={"record_id": record_id}) |
|
|
| except ValueError as e: |
| logger.error(f"ValueError during ingestion into '{name}': {e}") |
| |
| |
| |
| if "Dataset not found" in str(e) or "does not exist" in str(e): |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Dataset '{name}' not found.") |
| else: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid tensor data or parameters: {e}") |
| except TypeError as e: |
| logger.error(f"TypeError during ingestion into '{name}': {e}") |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid data type provided: {e}") |
| except Exception as e: |
| logger.exception(f"Unexpected error ingesting into dataset '{name}': {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error during ingestion.") |
|
|
| @app.get("/datasets/{name}/fetch", response_model=ApiResponse, tags=["Data Retrieval"]) |
| async def fetch_dataset( |
| name: str = Path(..., description="The name of the dataset to fetch records from."), |
| storage: TensorStorage = Depends(get_tensor_storage) |
| ): |
| """ |
| Retrieves all records (including tensor data and metadata) from a specified dataset. |
| |
| - **name**: Path parameter for the dataset name. |
| - **storage**: Injected TensorStorage instance. |
| \f |
| Returns: |
| - ApiResponse containing a list of TensorOutput objects in the 'data' field. |
| |
| Raises HTTPException: |
| - 404 Not Found: If the dataset name does not exist. |
| - 500 Internal Server Error: For unexpected errors during retrieval or data conversion. |
| """ |
| try: |
| |
| |
| records = storage.get_dataset_with_metadata(name) |
| output_records = [] |
| processed_count = 0 |
| skipped_count = 0 |
|
|
| for i, record in enumerate(records): |
| |
| if not isinstance(record, dict) or 'tensor' not in record or 'metadata' not in record: |
| logger.warning(f"Skipping record index {i} in '{name}' due to missing keys or invalid format.") |
| skipped_count += 1 |
| continue |
| try: |
| |
| shape, dtype, data_list = tensor_to_list(record['tensor']) |
| |
| record_id = record['metadata'].get('record_id', f"missing_id_{random.randint(1000,9999)}_{i}") |
| if record_id.startswith("missing_id_"): |
| logger.warning(f"Record index {i} in '{name}' missing 'record_id' in metadata.") |
|
|
| output_records.append(TensorOutput( |
| record_id=record_id, |
| shape=shape, |
| dtype=dtype, |
| data=data_list, |
| metadata=record['metadata'] |
| )) |
| processed_count += 1 |
| except Exception as conversion_err: |
| rec_id_for_log = record.get('metadata', {}).get('record_id', f'index_{i}') |
| logger.error(f"Error converting tensor to list for record '{rec_id_for_log}' in dataset '{name}': {conversion_err}", exc_info=True) |
| skipped_count += 1 |
| |
|
|
| log_message = f"Fetched dataset '{name}'. Processed: {processed_count}, Skipped: {skipped_count}." |
| logger.info(log_message) |
| return ApiResponse(success=True, message=log_message, data=output_records) |
|
|
| except ValueError as e: |
| logger.warning(f"Attempted to fetch non-existent dataset '{name}': {e}") |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) |
| except Exception as e: |
| logger.exception(f"Unexpected error fetching dataset '{name}': {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while fetching dataset.") |
|
|
| @app.get("/datasets", response_model=ApiResponse, tags=["Datasets"]) |
| async def list_datasets(storage: TensorStorage = Depends(get_tensor_storage)): |
| """ |
| Lists the names of all available datasets managed by the TensorStorage. |
| |
| - **storage**: Injected TensorStorage instance. |
| \f |
| Returns: |
| - ApiResponse containing a list of dataset names in the 'data' field. |
| |
| Raises HTTPException: |
| - 500 Internal Server Error: If there's an error retrieving the list from storage. |
| """ |
| try: |
| |
| |
| |
| if hasattr(storage, 'list_datasets') and callable(storage.list_datasets): |
| dataset_names = storage.list_datasets() |
| elif hasattr(storage, 'datasets') and isinstance(storage.datasets, dict): |
| dataset_names = list(storage.datasets.keys()) |
| else: |
| |
| logger.error("TensorStorage instance does not have a recognized method (list_datasets) or attribute (datasets dict) to list datasets.") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="API configuration error: Cannot list datasets.") |
|
|
| logger.info(f"Retrieved dataset list: Count={len(dataset_names)}") |
| return ApiResponse(success=True, message="Retrieved dataset list successfully.", data=dataset_names) |
| except Exception as e: |
| logger.exception(f"Unexpected error listing datasets: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while listing datasets.") |
|
|
| |
| @app.post("/query", response_model=NQLResponse, tags=["Querying"]) |
| async def execute_nql_query( |
| request: NQLQueryRequest, |
| nql_agent_svc: NQLAgent = Depends(get_nql_agent) |
| ): |
| """ |
| Executes a Natural Query Language (NQL) query against the stored tensor data. |
| |
| - **request**: Request body containing the NQL query string. |
| - **nql_agent_svc**: Injected NQLAgent instance. |
| \f |
| Returns: |
| - NQLResponse containing the query success status, message, count, and results. |
| |
| Raises HTTPException: |
| - 400 Bad Request: If the NQL query is invalid or fails processing as reported by the NQLAgent. |
| - 500 Internal Server Error: For unexpected errors during query processing or result conversion. |
| """ |
| logger.info(f"Received NQL query: '{request.query}'") |
| try: |
| |
| |
| |
| nql_result = nql_agent_svc.process_query(request.query) |
|
|
| output_results = None |
| processed_count = 0 |
| skipped_count = 0 |
|
|
| if nql_result.get('success') and isinstance(nql_result.get('results'), list): |
| output_results = [] |
| for i, record in enumerate(nql_result['results']): |
| |
| if not isinstance(record, dict) or 'tensor' not in record or 'metadata' not in record: |
| logger.warning(f"Skipping NQL result record index {i} due to missing keys or invalid format.") |
| skipped_count += 1 |
| continue |
| try: |
| |
| shape, dtype, data_list = tensor_to_list(record['tensor']) |
| |
| record_id = record['metadata'].get('record_id', f"missing_id_{random.randint(1000,9999)}_{i}") |
| if record_id.startswith("missing_id_"): |
| logger.warning(f"NQL result record index {i} missing 'record_id' in metadata.") |
|
|
| output_results.append(TensorOutput( |
| record_id=record_id, |
| shape=shape, |
| dtype=dtype, |
| data=data_list, |
| metadata=record['metadata'] |
| )) |
| processed_count += 1 |
| except Exception as conversion_err: |
| rec_id_for_log = record.get('metadata', {}).get('record_id', f'index_{i}') |
| logger.error(f"Error converting tensor to list for NQL result record '{rec_id_for_log}': {conversion_err}", exc_info=True) |
| skipped_count += 1 |
| continue |
|
|
| |
| |
| response = NQLResponse( |
| success=nql_result.get('success', False), |
| message=nql_result.get('message', 'Error: Query processing failed unexpectedly.'), |
| count=nql_result.get('count', processed_count if output_results is not None else None), |
| results=output_results |
| ) |
|
|
| if not response.success: |
| logger.warning(f"NQL query failed: '{request.query}'. Reason: {response.message}") |
| |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=response.message) |
|
|
| log_message = f"NQL query successful: '{request.query}'. Found: {response.count}, Processed: {processed_count}, Skipped: {skipped_count}." |
| logger.info(log_message) |
| |
| |
| return response |
|
|
| except HTTPException as e: |
| |
| raise e |
| except Exception as e: |
| |
| logger.exception(f"Unexpected error processing NQL query '{request.query}': {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error during query processing.") |
|
|
|
|
| |
|
|
| @app.get("/agents", response_model=List[AgentInfo], tags=["Agents"]) |
| async def list_agents(): |
| """ |
| Lists all registered agents and their basic information (name, description, status, config). |
| Reads data from the global `agent_registry`. |
| \f |
| Returns: |
| - A list of AgentInfo objects. |
| |
| Raises HTTPException: |
| - 500 Internal Server Error: If the agent registry is unexpectedly unavailable or malformed. |
| """ |
| try: |
| agents_list = [] |
| for agent_id, details in agent_registry.items(): |
| |
| if not isinstance(details, dict) or not all(k in details for k in ["name", "description", "status", "config"]): |
| |
| logger.warning(f"Agent '{agent_id}' in registry is missing required keys or is not a dict. Details: {details}. Skipping.") |
| continue |
| try: |
| agents_list.append(AgentInfo( |
| id=agent_id, |
| name=details["name"], |
| description=details["description"], |
| status=details["status"], |
| config=details["config"] |
| )) |
| except Exception as pydantic_err: |
| logger.error(f"Error creating AgentInfo for agent '{agent_id}': {pydantic_err}. Details: {details}", exc_info=True) |
| continue |
|
|
| logger.info(f"Retrieved list of {len(agents_list)} agents.") |
| return agents_list |
| except Exception as e: |
| |
| logger.exception(f"Unexpected error listing agents: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error listing agents.") |
|
|
|
|
| @app.get("/agents/{agent_id}/status", response_model=AgentStatus, tags=["Agents"]) |
| async def get_agent_status_api(agent_id: str = Path(..., description="The unique identifier of the agent.")): |
| """ |
| Gets the current status, configuration, and last log timestamp for a specific agent. |
| Reads data from the global `agent_registry`. |
| |
| - **agent_id**: Path parameter for the agent's unique ID. |
| \f |
| Returns: |
| - AgentStatus object containing the agent's details. |
| |
| Raises HTTPException: |
| - 404 Not Found: If the agent_id does not exist in the registry. |
| - 500 Internal Server Error: If the agent's entry in the registry is malformed. |
| """ |
| logger.debug(f"Request received for status of agent '{agent_id}'.") |
| if agent_id not in agent_registry: |
| logger.warning(f"Status requested for unknown agent '{agent_id}'.") |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.") |
|
|
| details = agent_registry[agent_id] |
| |
| if not isinstance(details, dict) or not all(k in details for k in ["name", "description", "status", "config"]): |
| logger.error(f"Agent '{agent_id}' registry entry is malformed: {details}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal error: Malformed status data for agent '{agent_id}'.") |
|
|
| |
| |
|
|
| try: |
| status_response = AgentStatus( |
| id=agent_id, |
| name=details["name"], |
| description=details["description"], |
| status=details["status"], |
| config=details["config"], |
| last_log_timestamp=details.get("last_log_timestamp") |
| ) |
| logger.info(f"Returning status for agent '{agent_id}': {status_response.status}") |
| return status_response |
| except Exception as pydantic_err: |
| logger.error(f"Error creating AgentStatus response for agent '{agent_id}': {pydantic_err}. Details: {details}", exc_info=True) |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal error creating status response for agent '{agent_id}'.") |
|
|
|
|
| @app.post("/agents/{agent_id}/start", response_model=ApiResponse, status_code=status.HTTP_202_ACCEPTED, tags=["Agents"]) |
| async def start_agent_api(agent_id: str = Path(..., description="The unique identifier of the agent to start.")): |
| """ |
| Signals an agent to start its operation (Placeholder/Simulated). |
| Updates the agent's status in the global `agent_registry`. |
| |
| - **agent_id**: Path parameter for the agent's unique ID. |
| \f |
| Returns: |
| - ApiResponse indicating success or failure (if already running/starting). |
| |
| Raises HTTPException: |
| - 404 Not Found: If the agent_id does not exist. |
| """ |
| logger.info(f"Received start signal for agent '{agent_id}'.") |
| if agent_id not in agent_registry: |
| logger.warning(f"Start signal received for unknown agent '{agent_id}'.") |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.") |
|
|
| |
| current_status = agent_registry[agent_id].get("status", "unknown") |
| if current_status in ["running", "starting"]: |
| logger.info(f"Agent '{agent_id}' is already {current_status}. No action taken.") |
| |
| return ApiResponse(success=False, message=f"Agent '{agent_id}' is already {current_status}.") |
| if current_status == "error": |
| |
| logger.warning(f"Attempting to start agent '{agent_id}' which is in 'error' state. Resetting status.") |
| |
|
|
| logger.info(f"API: Processing start signal for agent '{agent_id}' (Placeholder Action).") |
| |
| agent_registry[agent_id]["status"] = "starting" |
| |
| |
| |
| await asyncio.sleep(0.1) |
| agent_registry[agent_id]["status"] = "running" |
| agent_registry[agent_id]["last_log_timestamp"] = time.time() |
| logger.info(f"Agent '{agent_id}' status set to 'running' (simulated).") |
| return ApiResponse(success=True, message=f"Start signal sent to agent '{agent_id}'. Status is now 'running' (simulated).") |
|
|
| @app.post("/agents/{agent_id}/stop", response_model=ApiResponse, status_code=status.HTTP_202_ACCEPTED, tags=["Agents"]) |
| async def stop_agent_api(agent_id: str = Path(..., description="The unique identifier of the agent to stop.")): |
| """ |
| Signals an agent to stop its operation gracefully (Placeholder/Simulated). |
| Updates the agent's status in the global `agent_registry`. |
| |
| - **agent_id**: Path parameter for the agent's unique ID. |
| \f |
| Returns: |
| - ApiResponse indicating success or failure (if already stopped/stopping). |
| |
| Raises HTTPException: |
| - 404 Not Found: If the agent_id does not exist. |
| """ |
| logger.info(f"Received stop signal for agent '{agent_id}'.") |
| if agent_id not in agent_registry: |
| logger.warning(f"Stop signal received for unknown agent '{agent_id}'.") |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.") |
|
|
| |
| current_status = agent_registry[agent_id].get("status", "unknown") |
| |
| if current_status in ["stopped", "stopping"]: |
| |
| logger.info(f"Agent '{agent_id}' is already {current_status}. No action taken.") |
| return ApiResponse(success=False, message=f"Agent '{agent_id}' is already {current_status}.") |
|
|
| logger.info(f"API: Processing stop signal for agent '{agent_id}' (Placeholder Action).") |
| |
| agent_registry[agent_id]["status"] = "stopping" |
| |
| |
| await asyncio.sleep(0.1) |
| agent_registry[agent_id]["status"] = "stopped" |
| agent_registry[agent_id]["last_log_timestamp"] = time.time() |
| logger.info(f"Agent '{agent_id}' status set to 'stopped' (simulated).") |
| return ApiResponse(success=True, message=f"Stop signal sent to agent '{agent_id}'. Status is now 'stopped' (simulated).") |
|
|
|
|
| @app.get("/agents/{agent_id}/logs", response_model=AgentLogResponse, tags=["Agents"]) |
| async def get_agent_logs_api( |
| agent_id: str = Path(..., description="The unique identifier of the agent."), |
| lines: int = Query(20, ge=1, le=1000, description="Maximum number of recent log lines to retrieve.") |
| ): |
| """ |
| Retrieves recent logs for a specific agent (Simulated - generates new logs each time). |
| |
| - **agent_id**: Path parameter for the agent's unique ID. |
| - **lines**: Query parameter for the number of log lines (default 20, min 1, max 1000). |
| \f |
| Returns: |
| - AgentLogResponse containing a list of simulated log strings. |
| |
| Raises HTTPException: |
| - 404 Not Found: If the agent_id does not exist. |
| - 500 Internal Server Error: If log generation fails. |
| """ |
| logger.debug(f"Request received for logs of agent '{agent_id}' (lines={lines}).") |
| if agent_id not in agent_registry: |
| logger.warning(f"Log request for unknown agent '{agent_id}'.") |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.") |
|
|
| |
|
|
| |
| |
| try: |
| simulated_logs = [_simulate_agent_log(agent_id) for _ in range(lines)] |
| agent_registry[agent_id]["last_log_timestamp"] = time.time() |
| logger.info(f"Generated {len(simulated_logs)} simulated log lines for agent '{agent_id}'.") |
| return AgentLogResponse(logs=simulated_logs) |
| except Exception as e: |
| logger.exception(f"Error generating simulated logs for agent '{agent_id}': {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error generating logs for agent '{agent_id}'.") |
|
|
|
|
| |
| @app.get("/metrics/dashboard", response_model=DashboardMetrics, tags=["Metrics & Monitoring"]) |
| async def get_dashboard_metrics(storage: TensorStorage = Depends(get_tensor_storage)): |
| """ |
| Provides aggregated dashboard metrics, combining real data (like dataset count) |
| with simulated data for agent performance and system health. |
| |
| NOTE: This endpoint currently modifies simulation state (e.g., sim_steps) |
| within a GET request, which is not ideal REST practice. For production, |
| simulation updates should occur in a background task. |
| |
| - **storage**: Injected TensorStorage instance. |
| \f |
| Returns: |
| - DashboardMetrics object containing various metrics. |
| |
| Raises HTTPException: |
| - 500 Internal Server Error: If critical metrics cannot be retrieved or calculated. |
| """ |
| logger.debug("Request received for dashboard metrics.") |
| current_time = time.time() |
| metrics_data = {} |
|
|
| |
| try: |
| |
| if hasattr(storage, 'list_datasets') and callable(storage.list_datasets): |
| dataset_count = len(storage.list_datasets()) |
| elif hasattr(storage, 'datasets') and isinstance(storage.datasets, dict): |
| dataset_count = len(storage.datasets.keys()) |
| else: |
| logger.error("TensorStorage instance lacks list_datasets() method or datasets dict.") |
| dataset_count = -1 |
| metrics_data["dataset_count"] = dataset_count |
| logger.debug(f"Retrieved dataset count: {dataset_count}") |
| except Exception as e: |
| logger.exception(f"Failed to get dataset count for metrics: {e}") |
| metrics_data["dataset_count"] = -1 |
|
|
| |
| |
|
|
| |
| status_counts = {"running": 0, "stopped": 0, "error": 0, "starting": 0, "stopping": 0, "unknown": 0} |
| for agent_id, details in agent_registry.items(): |
| status = details.get("status", "unknown") |
| if status not in status_counts: |
| logger.warning(f"Agent '{agent_id}' has unexpected status '{status}'. Counting as 'unknown'.") |
| status = "unknown" |
| status_counts[status] += 1 |
| metrics_data["agent_status_summary"] = status_counts |
|
|
| |
| ds_count = metrics_data.get("dataset_count", -1) |
| metrics_data["total_records_est"] = ds_count * random.randint(500, 5000) if ds_count >= 0 else 0 |
|
|
| |
| |
| ingestion_running = agent_registry.get("ingestion", {}).get("status") == "running" |
| rl_running = agent_registry.get("rl_trainer", {}).get("status") == "running" |
| automl_running = agent_registry.get("automl_search", {}).get("status") == "running" |
|
|
| metrics_data["data_ingestion_rate"] = random.uniform(5.0, 50.0) * (1.0 if ingestion_running else 0.1) |
| metrics_data["avg_query_latency_ms"] = random.uniform(50.0, 300.0) * (1 + 0.5 * math.sin(current_time / 60)) |
|
|
| |
| |
| rl_agent_state = agent_registry.setdefault("rl_trainer", {"sim_steps": 0}) |
| rl_total_steps = int(max(0, rl_agent_state.get("sim_steps", 0) + (random.randint(10, 150) if rl_running else 0))) |
| rl_agent_state["sim_steps"] = rl_total_steps |
| metrics_data["rl_total_steps"] = rl_total_steps |
| metrics_data["rl_latest_reward"] = random.gauss(10, 5.0) if rl_running else None |
|
|
| automl_agent_state = agent_registry.setdefault("automl_search", {"sim_trials": 0, "sim_best_score": None}) |
| automl_trials_completed = int(max(0, automl_agent_state.get("sim_trials", 0) + (random.randint(0, 3) if automl_running else 0))) |
| automl_agent_state["sim_trials"] = automl_trials_completed |
| metrics_data["automl_trials_completed"] = automl_trials_completed |
|
|
| current_best = automl_agent_state.get("sim_best_score", None) |
| automl_best_score = None |
| if automl_running: |
| if current_best is None: |
| automl_best_score = random.uniform(0.7, 0.95) |
| else: |
| |
| improvement_factor = random.uniform(1.0, 1.005) |
| automl_best_score = min(1.0, current_best * improvement_factor) |
| automl_agent_state["sim_best_score"] = automl_best_score |
| elif current_best is not None: |
| automl_best_score = current_best |
| metrics_data["automl_best_score"] = automl_best_score |
| |
|
|
| |
| cpu_load = random.uniform(5.0, 25.0) \ |
| + (15 if ingestion_running else 0) \ |
| + (25 if rl_running else 0) \ |
| + (10 if automl_running else 0) |
| |
| metrics_data["system_cpu_usage_percent"] = min(100.0, max(0.0, cpu_load + random.uniform(-2.0, 2.0))) |
|
|
| mem_load = random.uniform(15.0, 40.0) \ |
| + (metrics_data.get("dataset_count", 0) * 0.75) |
| |
| metrics_data["system_memory_usage_percent"] = min(100.0, max(0.0, mem_load + random.uniform(-3.0, 3.0))) |
|
|
| |
| try: |
| |
| metrics = DashboardMetrics( |
| timestamp=current_time, |
| dataset_count=metrics_data["dataset_count"], |
| total_records_est=metrics_data["total_records_est"], |
| agent_status_summary=metrics_data["agent_status_summary"], |
| data_ingestion_rate=round(metrics_data["data_ingestion_rate"], 2), |
| avg_query_latency_ms=round(metrics_data["avg_query_latency_ms"], 1), |
| rl_latest_reward=(round(metrics_data["rl_latest_reward"], 3) |
| if metrics_data.get("rl_latest_reward") is not None else None), |
| rl_total_steps=metrics_data["rl_total_steps"], |
| automl_best_score=(round(metrics_data["automl_best_score"], 5) |
| if metrics_data.get("automl_best_score") is not None else None), |
| automl_trials_completed=metrics_data["automl_trials_completed"], |
| system_cpu_usage_percent=round(metrics_data["system_cpu_usage_percent"], 1), |
| system_memory_usage_percent=round(metrics_data["system_memory_usage_percent"], 1) |
| ) |
| logger.info("Successfully generated dashboard metrics.") |
| return metrics |
| except Exception as e: |
| |
| logger.exception(f"Error constructing DashboardMetrics response from data: {metrics_data}. Error: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error constructing metrics response.") |
|
|
|
|
| |
| @app.get("/", include_in_schema=False) |
| async def read_root(): |
| """Provides a simple welcome message for the API root.""" |
| |
| return {"message": "Welcome to the Tensorus API! Visit /docs or /redoc for interactive documentation."} |
|
|
| |
| if __name__ == "__main__": |
| |
|
|
| |
| modules_ok = True |
| try: |
| from tensor_storage import TensorStorage |
| from nql_agent import NQLAgent |
| except ImportError as import_err: |
| print(f"\nERROR: Missing required local modules: {import_err}.") |
| print("Please ensure tensor_storage.py and nql_agent.py are in the same directory or Python path.\n") |
| modules_ok = False |
| |
|
|
| if modules_ok: |
| print(f"--- Starting Tensorus API Server (v{app.version} with Agent Placeholders) ---") |
| print(f"--- Logging level set to: {logging.getLevelName(logger.getEffectiveLevel())} ---") |
| print(f"--- Access API documentation at http://0.0.0.0:8000/docs ---") |
| print(f"--- Alternative documentation at http://0.0.0.0:8000/redoc ---") |
| print("--- Press CTRL+C to stop ---") |
|
|
| |
| uvicorn.run( |
| "api:app", |
| host="0.0.0.0", |
| port=8000, |
| reload=True, |
| log_level=logging.getLevelName(logger.getEffectiveLevel()).lower(), |
| |
| |
| ) |
| else: |
| print("--- API Server NOT started due to missing modules. ---") |
|
|