File size: 50,235 Bytes
aa654a4 a104b26 aa654a4 d4ca86a aa654a4 d4ca86a aa654a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 | # api.py
import logging
from typing import List, Dict, Any, Optional, Tuple, Union
import random # For simulating logs/status
import time # For simulating timestamps
import math # For simulating metrics
import asyncio # Added for potential background tasks later
import torch
import uvicorn
# Added Query import
from fastapi import FastAPI, HTTPException, Body, Depends, Path, status, Query
from pydantic import BaseModel, Field
# Import Tensorus modules - Ensure these files exist in your project path
try:
from tensor_storage import TensorStorage
from nql_agent import NQLAgent
# Import other agents if needed for direct control (less common for API layer)
# from ingestion_agent import DataIngestionAgent
# from rl_agent import RLAgent
# from automl_agent import AutoMLAgent
except ImportError as e:
print(f"ERROR: Could not import Tensorus modules (TensorStorage, NQLAgent): {e}")
print("Please ensure tensor_storage.py and nql_agent.py are in the Python path.")
# Optionally raise the error or exit if these are critical at startup
raise
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Global Tensorus Instances ---
try:
# Ensure TensorStorage and NQLAgent can be instantiated without arguments
# or provide necessary configuration here.
tensor_storage_instance = TensorStorage()
nql_agent_instance = NQLAgent(tensor_storage_instance)
logger.info("Tensorus components (TensorStorage, NQLAgent) initialized successfully.")
# NOTE: Actual agent processes (Ingestion, RL, AutoML) are assumed to be
# running independently for now. This API layer will *coordinate* with them
# in a full implementation, but currently only manages placeholder state.
except Exception as e:
logger.exception(f"Failed to initialize Tensorus components: {e}")
# This is critical, so raise an error to prevent the API from starting incorrectly.
raise RuntimeError(f"Tensorus initialization failed: {e}") from e
# --- Placeholder Agent State Management ---
# In a real system, this would interact with a process manager, message queue,
# or shared state store (like Redis) to control and monitor actual agent processes.
agent_registry = {
"ingestion": {
"name": "Data Ingestion",
"description": "Monitors sources and ingests data into datasets.",
"status": "stopped", # Possible statuses: running, stopped, error, starting, stopping
"config": {"source_directory": "temp_ingestion_source", "polling_interval_sec": 10},
"last_log_timestamp": None,
# Add simulation state if needed by metrics endpoint
},
"rl_trainer": {
"name": "RL Trainer",
"description": "Trains reinforcement learning models using stored experiences.",
"status": "stopped",
"config": {"experience_dataset": "rl_experiences", "batch_size": 128, "target_update_freq": 500},
"last_log_timestamp": None,
"sim_steps": 0, # Added for metrics simulation
},
"automl_search": {
"name": "AutoML Search",
"description": "Performs hyperparameter optimization.",
"status": "stopped",
"config": {"trials": 50, "results_dataset": "automl_results", "task_type": "regression"},
"last_log_timestamp": None,
"sim_trials": 0, # Added for metrics simulation
"sim_best_score": None, # Added for metrics simulation
},
# NQL Agent is stateless, typically part of API request/response, but could be listed
"nql_query": {
"name": "NQL Query Service",
"description": "Processes natural language queries.",
"status": "running", # Assumed always running as part of API
"config": {"parser_type": "regex"}, # Example config
"last_log_timestamp": None,
},
}
def _simulate_agent_log(agent_id: str) -> str:
"""Generates a simulated log line."""
# Check if agent_id exists to prevent KeyError
if agent_id not in agent_registry:
logger.warning(f"Attempted to simulate log for unknown agent_id: {agent_id}")
return f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} [ERROR] (Unknown Agent: {agent_id}) Log simulation failed."
log_levels = ["INFO", "DEBUG", "WARNING", "ERROR"]
messages = [
"Processing item batch", "Training epoch completed", "Search trial finished",
"Connection error detected", "Optimization step", "Query received",
"Target network synced", "Disk space low", "Operation successful", "Agent starting up",
"Configuration loaded", "Model evaluated", "Experience stored", "Found new file"
]
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
level = random.choice(log_levels)
msg = random.choice(messages)
# Safely get the agent name using .get()
agent_name = agent_registry.get(agent_id, {}).get("name", agent_id) # Default to id if name missing
return f"{ts} [{level}] ({agent_name}) {msg}"
# --- Helper Functions (Tensor Conversion) ---
def _validate_tensor_data(data: List[Any], shape: List[int]):
"""Validates nested list structure against a given shape (recursive)."""
if not shape: # Scalar case
if not isinstance(data, (int, float)):
raise ValueError("Scalar tensor data must be a single number (int or float).")
return True
if not isinstance(data, list):
# Improved error message
raise ValueError(f"Data for shape {shape} must be a list, but got {type(data).__name__}.")
expected_len = shape[0]
if len(data) != expected_len:
raise ValueError(f"Dimension 0 mismatch for shape {shape}: Expected length {expected_len}, got {len(data)}.")
if len(shape) > 1: # Recurse for inner dimensions
for item in data:
_validate_tensor_data(item, shape[1:])
elif len(shape) == 1: # Innermost dimension, check element types
if not all(isinstance(x, (int, float)) for x in data):
# Find the first non-numeric type for a more specific error
first_bad_type = next((type(x).__name__ for x in data if not isinstance(x, (int, float))), "unknown")
raise ValueError(f"Innermost list elements must be numbers (int or float), found type '{first_bad_type}'.")
return True
def list_to_tensor(shape: List[int], dtype_str: str, data: Union[List[Any], int, float]) -> torch.Tensor:
"""Converts a nested list or scalar to a PyTorch tensor with validation."""
try:
dtype_map = {
'float32': torch.float32, 'float': torch.float,
'float64': torch.float64, 'double': torch.double,
'int32': torch.int32, 'int': torch.int,
'int64': torch.int64, 'long': torch.long,
'bool': torch.bool
}
torch_dtype = dtype_map.get(dtype_str.lower())
if torch_dtype is None:
raise ValueError(f"Unsupported dtype string: '{dtype_str}'. Supported: {list(dtype_map.keys())}")
# Optional: Perform strict validation before torch.tensor()
# This can catch structure errors early but might be redundant with torch.tensor checks.
# try:
# _validate_tensor_data(data, shape)
# except ValueError as val_err:
# logger.error(f"Input data validation failed for shape {shape}: {val_err}")
# raise ValueError(f"Input data validation failed: {val_err}") from val_err
# Let torch handle initial conversion and type checking
tensor = torch.tensor(data, dtype=torch_dtype)
# Verify shape after creation and attempt reshape if necessary
if list(tensor.shape) != shape:
logger.warning(f"Created tensor shape {list(tensor.shape)} differs from requested {shape}. Attempting reshape.")
try:
tensor = tensor.reshape(shape)
logger.info(f"Reshape successful to {shape}.")
except RuntimeError as reshape_err:
# Include original error for better debugging
logger.error(f"Reshape failed: {reshape_err}")
# Improved error message
raise ValueError(f"Created tensor shape {list(tensor.shape)} != requested {shape} and reshape failed: {reshape_err}") from reshape_err
return tensor
except (TypeError, ValueError) as e:
# Catch specific conversion errors
logger.error(f"Error converting list to tensor: {e}. Shape: {shape}, Dtype: {dtype_str}, Data type: {type(data).__name__}")
raise ValueError(f"Failed tensor conversion: {e}") from e
except Exception as e:
# Catch any other unexpected errors
logger.exception(f"Unexpected error during list_to_tensor: {e}", exc_info=True)
raise ValueError(f"Unexpected tensor conversion error: {e}") from e
def tensor_to_list(tensor: torch.Tensor) -> Tuple[List[int], str, Union[List[Any], int, float]]:
"""Converts a PyTorch tensor back to shape, dtype string, and nested list/scalar."""
if not isinstance(tensor, torch.Tensor):
# More specific error message
raise TypeError(f"Input must be a PyTorch Tensor, got {type(tensor).__name__}")
shape = list(tensor.shape)
# Robustly get dtype string, removing the 'torch.' prefix
dtype_str = str(tensor.dtype).replace('torch.', '')
# Handle 0-dim tensors (scalars) correctly
if tensor.ndim == 0:
data = tensor.item() # Extract scalar value
else:
data = tensor.tolist() # Convert multi-dim tensor to nested list
return shape, dtype_str, data
# --- Pydantic Models ---
class DatasetCreateRequest(BaseModel):
name: str = Field(..., description="Unique name for the new dataset.", example="my_image_dataset")
class TensorInput(BaseModel):
shape: List[int] = Field(..., description="Shape of the tensor (e.g., [height, width, channels]).", example=[2, 3])
dtype: str = Field(..., description="Data type (e.g., 'float32', 'int64', 'bool').", example="float32")
data: Union[List[Any], int, float] = Field(..., description="Tensor data as a nested list for multi-dim tensors, or a single number for scalars.", example=[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
metadata: Optional[Dict[str, Any]] = Field(None, description="Optional key-value metadata.", example={"source": "api_ingest", "timestamp": 1678886400})
class NQLQueryRequest(BaseModel):
query: str = Field(..., description="Natural language query string.", example="find image tensors from 'my_image_dataset' where metadata.source = 'web_scrape'")
class TensorOutput(BaseModel):
record_id: str = Field(..., description="Unique record ID assigned during ingestion.")
shape: List[int] = Field(..., description="Shape of the retrieved tensor.")
dtype: str = Field(..., description="Data type of the retrieved tensor.")
data: Union[List[Any], int, float] = Field(..., description="Tensor data (nested list or scalar).")
metadata: Dict[str, Any] = Field(..., description="Associated metadata.")
class NQLResponse(BaseModel):
success: bool = Field(..., description="Indicates if the query was successfully processed (syntax, execution).")
message: str = Field(..., description="Status message (e.g., 'Query successful', 'Error parsing query').")
count: Optional[int] = Field(None, description="Number of matching records found.")
results: Optional[List[TensorOutput]] = Field(None, description="List of matching tensor records.")
class ApiResponse(BaseModel):
success: bool = Field(..., description="Indicates if the API operation was successful.")
message: str = Field(..., description="A descriptive status message.")
data: Optional[Any] = Field(None, description="Optional data payload relevant to the operation (e.g., record_id, list of names).")
# --- NEW Pydantic Models for Agents ---
class AgentInfo(BaseModel):
id: str = Field(..., description="Unique identifier for the agent (e.g., 'ingestion', 'rl_trainer').")
name: str = Field(..., description="User-friendly display name of the agent.")
description: str = Field(..., description="Brief description of the agent's purpose.")
status: str = Field(..., description="Current operational status (e.g., running, stopped, error, starting, stopping).")
config: Dict[str, Any] = Field(..., description="Current configuration parameters the agent is using.")
class AgentStatus(AgentInfo):
# Inherits fields from AgentInfo
last_log_timestamp: Optional[float] = Field(None, description="Unix timestamp of the last known log message received or generated for this agent.")
class AgentLogResponse(BaseModel):
logs: List[str] = Field(..., description="List of recent log entries for the agent.")
# --- NEW Pydantic Model for Dashboard Metrics ---
class DashboardMetrics(BaseModel):
timestamp: float = Field(..., description="Unix timestamp when the metrics were generated (UTC).")
dataset_count: int = Field(..., description="Total number of datasets currently managed.")
total_records_est: int = Field(..., description="Estimated total number of tensor records across all datasets (Simulated).")
agent_status_summary: Dict[str, int] = Field(..., description="Summary count of agents grouped by their status.")
data_ingestion_rate: float = Field(..., description="Simulated data ingestion rate (records/sec).")
avg_query_latency_ms: float = Field(..., description="Simulated average NQL query processing latency (ms).")
rl_latest_reward: Optional[float] = Field(None, description="Simulated latest reward value obtained by the RL trainer.")
rl_total_steps: int = Field(..., description="Simulated total training steps taken by the RL trainer.")
automl_best_score: Optional[float] = Field(None, description="Simulated best score found by the AutoML search so far.")
automl_trials_completed: int = Field(..., description="Simulated number of AutoML trials completed.")
system_cpu_usage_percent: float = Field(..., description="Simulated overall system CPU usage percentage.")
system_memory_usage_percent: float = Field(..., description="Simulated overall system memory usage percentage.")
# --- FastAPI App Instance ---
app = FastAPI(
title="Tensorus API",
description="API for interacting with the Tensorus Agentic Tensor Database/Data Lake. Includes dataset management, NQL querying, and agent control placeholders.",
version="0.0.1", # Incremented version for fixes
# Add contact, license info if desired
# contact={
# "name": "API Support",
# "url": "http://example.com/support",
# "email": "support@example.com",
# },
# license_info={
# "name": "Apache 2.0",
# "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
# },
)
# --- Dependency Functions ---
async def get_tensor_storage() -> TensorStorage:
"""Dependency function to get the global TensorStorage instance."""
# In a more complex app, this might involve connection pooling or session management
return tensor_storage_instance
async def get_nql_agent() -> NQLAgent:
"""Dependency function to get the global NQLAgent instance."""
return nql_agent_instance
# --- API Endpoints ---
# --- Dataset Management Endpoints ---
@app.post("/datasets/create", response_model=ApiResponse, status_code=status.HTTP_201_CREATED, tags=["Datasets"])
async def create_dataset(req: DatasetCreateRequest, storage: TensorStorage = Depends(get_tensor_storage)):
"""
Creates a new, empty dataset with the specified unique name.
- **req**: Request body containing the dataset name.
- **storage**: Injected TensorStorage instance.
\f
Raises HTTPException:
- 409 Conflict: If a dataset with the same name already exists.
- 500 Internal Server Error: For unexpected errors during creation.
"""
try:
# Assuming TensorStorage.create_dataset raises ValueError if exists
storage.create_dataset(req.name)
logger.info(f"Dataset '{req.name}' created successfully.")
return ApiResponse(success=True, message=f"Dataset '{req.name}' created successfully.")
except ValueError as e:
# Catch specific error for existing dataset
logger.warning(f"Attempted to create existing dataset '{req.name}': {e}")
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
except Exception as e:
# Catch any other unexpected storage errors
logger.exception(f"Unexpected error creating dataset '{req.name}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while creating dataset.")
@app.post("/datasets/{name}/ingest", response_model=ApiResponse, status_code=status.HTTP_201_CREATED, tags=["Data Ingestion"])
async def ingest_tensor(
name: str = Path(..., description="The name of the target dataset for ingestion."),
tensor_input: TensorInput = Body(..., description="The tensor data and metadata to ingest."),
storage: TensorStorage = Depends(get_tensor_storage)
):
"""
Ingests a single tensor (provided in JSON format) into the specified dataset.
- **name**: Path parameter for the target dataset name.
- **tensor_input**: Request body containing shape, dtype, data, and optional metadata.
- **storage**: Injected TensorStorage instance.
\f
Raises HTTPException:
- 400 Bad Request: If tensor data is invalid, shape/dtype mismatch, or other validation errors occur.
- 404 Not Found: If the specified dataset name does not exist.
- 500 Internal Server Error: For unexpected storage or processing errors.
"""
try:
# Convert incoming list/scalar data to a tensor
tensor = list_to_tensor(tensor_input.shape, tensor_input.dtype, tensor_input.data)
# Insert into storage, assuming it returns a unique record ID
# Also assuming storage.insert raises ValueError if dataset 'name' not found
record_id = storage.insert(name, tensor, tensor_input.metadata)
logger.info(f"Ingested tensor into dataset '{name}' with record_id: {record_id}")
return ApiResponse(success=True, message="Tensor ingested successfully.", data={"record_id": record_id})
except ValueError as e: # Catch errors from list_to_tensor or storage.insert
logger.error(f"ValueError during ingestion into '{name}': {e}")
# Differentiate between bad data and dataset not found
# Suggestion: Modify TensorStorage to raise specific exceptions (e.g., DatasetNotFoundError)
# for more robust error handling here instead of string matching.
if "Dataset not found" in str(e) or "does not exist" in str(e): # Adapt based on TensorStorage's error messages
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Dataset '{name}' not found.")
else: # Assume other ValueErrors are due to bad input data/shape/dtype
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid tensor data or parameters: {e}")
except TypeError as e: # Catch potential type errors during tensor creation
logger.error(f"TypeError during ingestion into '{name}': {e}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid data type provided: {e}")
except Exception as e:
logger.exception(f"Unexpected error ingesting into dataset '{name}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error during ingestion.")
@app.get("/datasets/{name}/fetch", response_model=ApiResponse, tags=["Data Retrieval"])
async def fetch_dataset(
name: str = Path(..., description="The name of the dataset to fetch records from."),
storage: TensorStorage = Depends(get_tensor_storage)
):
"""
Retrieves all records (including tensor data and metadata) from a specified dataset.
- **name**: Path parameter for the dataset name.
- **storage**: Injected TensorStorage instance.
\f
Returns:
- ApiResponse containing a list of TensorOutput objects in the 'data' field.
Raises HTTPException:
- 404 Not Found: If the dataset name does not exist.
- 500 Internal Server Error: For unexpected errors during retrieval or data conversion.
"""
try:
# Assuming get_dataset_with_metadata returns list of dicts {'tensor': ..., 'metadata': ...}
# or raises ValueError if dataset not found
records = storage.get_dataset_with_metadata(name)
output_records = []
processed_count = 0
skipped_count = 0
for i, record in enumerate(records):
# Ensure 'tensor' and 'metadata' keys exist in each record from storage
if not isinstance(record, dict) or 'tensor' not in record or 'metadata' not in record: # Added type check
logger.warning(f"Skipping record index {i} in '{name}' due to missing keys or invalid format.")
skipped_count += 1
continue
try:
# Convert tensor back to list format for JSON response
shape, dtype, data_list = tensor_to_list(record['tensor'])
# Ensure record_id is present in metadata, provide a default if missing
record_id = record['metadata'].get('record_id', f"missing_id_{random.randint(1000,9999)}_{i}")
if record_id.startswith("missing_id_"):
logger.warning(f"Record index {i} in '{name}' missing 'record_id' in metadata.")
output_records.append(TensorOutput(
record_id=record_id,
shape=shape,
dtype=dtype,
data=data_list,
metadata=record['metadata']
))
processed_count += 1
except Exception as conversion_err:
rec_id_for_log = record.get('metadata', {}).get('record_id', f'index_{i}')
logger.error(f"Error converting tensor to list for record '{rec_id_for_log}' in dataset '{name}': {conversion_err}", exc_info=True) # Added exc_info
skipped_count += 1
# Optionally skip problematic records or handle differently
log_message = f"Fetched dataset '{name}'. Processed: {processed_count}, Skipped: {skipped_count}."
logger.info(log_message)
return ApiResponse(success=True, message=log_message, data=output_records)
except ValueError as e: # Typically "Dataset not found" from storage
logger.warning(f"Attempted to fetch non-existent dataset '{name}': {e}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except Exception as e:
logger.exception(f"Unexpected error fetching dataset '{name}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while fetching dataset.")
@app.get("/datasets", response_model=ApiResponse, tags=["Datasets"])
async def list_datasets(storage: TensorStorage = Depends(get_tensor_storage)):
"""
Lists the names of all available datasets managed by the TensorStorage.
- **storage**: Injected TensorStorage instance.
\f
Returns:
- ApiResponse containing a list of dataset names in the 'data' field.
Raises HTTPException:
- 500 Internal Server Error: If there's an error retrieving the list from storage.
"""
try:
# Adapt this call based on your TensorStorage implementation
# Example: dataset_names = storage.list_datasets()
# Example: dataset_names = list(storage.datasets.keys()) # If it's a simple dict
if hasattr(storage, 'list_datasets') and callable(storage.list_datasets): # Check if callable
dataset_names = storage.list_datasets()
elif hasattr(storage, 'datasets') and isinstance(storage.datasets, dict):
dataset_names = list(storage.datasets.keys())
else:
# Improved error message
logger.error("TensorStorage instance does not have a recognized method (list_datasets) or attribute (datasets dict) to list datasets.")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="API configuration error: Cannot list datasets.")
logger.info(f"Retrieved dataset list: Count={len(dataset_names)}")
return ApiResponse(success=True, message="Retrieved dataset list successfully.", data=dataset_names)
except Exception as e:
logger.exception(f"Unexpected error listing datasets: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error while listing datasets.")
# --- Querying Endpoint ---
@app.post("/query", response_model=NQLResponse, tags=["Querying"])
async def execute_nql_query(
request: NQLQueryRequest,
nql_agent_svc: NQLAgent = Depends(get_nql_agent)
):
"""
Executes a Natural Query Language (NQL) query against the stored tensor data.
- **request**: Request body containing the NQL query string.
- **nql_agent_svc**: Injected NQLAgent instance.
\f
Returns:
- NQLResponse containing the query success status, message, count, and results.
Raises HTTPException:
- 400 Bad Request: If the NQL query is invalid or fails processing as reported by the NQLAgent.
- 500 Internal Server Error: For unexpected errors during query processing or result conversion.
"""
logger.info(f"Received NQL query: '{request.query}'")
try:
# Assuming process_query returns a dict like:
# {'success': bool, 'message': str, 'count': Optional[int], 'results': Optional[List[dict]]}
# where each dict in 'results' has 'tensor' and 'metadata' keys.
nql_result = nql_agent_svc.process_query(request.query)
output_results = None
processed_count = 0
skipped_count = 0
if nql_result.get('success') and isinstance(nql_result.get('results'), list):
output_results = []
for i, record in enumerate(nql_result['results']):
# Basic validation of expected keys in results from NQLAgent
if not isinstance(record, dict) or 'tensor' not in record or 'metadata' not in record:
logger.warning(f"Skipping NQL result record index {i} due to missing keys or invalid format.")
skipped_count += 1
continue
try:
# Convert tensor to list for response
shape, dtype, data_list = tensor_to_list(record['tensor'])
# Ensure record_id exists, provide default
record_id = record['metadata'].get('record_id', f"missing_id_{random.randint(1000,9999)}_{i}")
if record_id.startswith("missing_id_"):
logger.warning(f"NQL result record index {i} missing 'record_id' in metadata.")
output_results.append(TensorOutput(
record_id=record_id,
shape=shape,
dtype=dtype,
data=data_list,
metadata=record['metadata']
))
processed_count += 1
except Exception as conversion_err:
rec_id_for_log = record.get('metadata', {}).get('record_id', f'index_{i}')
logger.error(f"Error converting tensor to list for NQL result record '{rec_id_for_log}': {conversion_err}", exc_info=True) # Added exc_info
skipped_count += 1
continue # Skip problematic records
# Construct response using Pydantic model for validation
# Safely get values from nql_result dict
response = NQLResponse(
success=nql_result.get('success', False),
message=nql_result.get('message', 'Error: Query processing failed unexpectedly.'),
count=nql_result.get('count', processed_count if output_results is not None else None), # Use processed count if available
results=output_results
)
if not response.success:
logger.warning(f"NQL query failed: '{request.query}'. Reason: {response.message}")
# Return 400 for query parsing/execution issues reported by the agent
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=response.message)
log_message = f"NQL query successful: '{request.query}'. Found: {response.count}, Processed: {processed_count}, Skipped: {skipped_count}."
logger.info(log_message)
# Optionally update response message if counts differ significantly
# response.message = log_message
return response
except HTTPException as e:
# Re-raise HTTPExceptions that were already handled (like the 400 above)
raise e
except Exception as e:
# Catch unexpected errors during query processing or result conversion
logger.exception(f"Unexpected error processing NQL query '{request.query}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error during query processing.")
# --- Agent Control Endpoints ---
@app.get("/agents", response_model=List[AgentInfo], tags=["Agents"])
async def list_agents():
"""
Lists all registered agents and their basic information (name, description, status, config).
Reads data from the global `agent_registry`.
\f
Returns:
- A list of AgentInfo objects.
Raises HTTPException:
- 500 Internal Server Error: If the agent registry is unexpectedly unavailable or malformed.
"""
try:
agents_list = []
for agent_id, details in agent_registry.items():
# Validate expected keys before creating AgentInfo to prevent Pydantic errors
if not isinstance(details, dict) or not all(k in details for k in ["name", "description", "status", "config"]):
# More detailed logging
logger.warning(f"Agent '{agent_id}' in registry is missing required keys or is not a dict. Details: {details}. Skipping.")
continue
try:
agents_list.append(AgentInfo(
id=agent_id,
name=details["name"],
description=details["description"],
status=details["status"],
config=details["config"]
))
except Exception as pydantic_err: # Catch potential Pydantic validation errors
logger.error(f"Error creating AgentInfo for agent '{agent_id}': {pydantic_err}. Details: {details}", exc_info=True) # Added exc_info
continue # Skip malformed entries
logger.info(f"Retrieved list of {len(agents_list)} agents.")
return agents_list
except Exception as e:
# Catch errors iterating the registry itself
logger.exception(f"Unexpected error listing agents: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error listing agents.")
@app.get("/agents/{agent_id}/status", response_model=AgentStatus, tags=["Agents"])
async def get_agent_status_api(agent_id: str = Path(..., description="The unique identifier of the agent.")):
"""
Gets the current status, configuration, and last log timestamp for a specific agent.
Reads data from the global `agent_registry`.
- **agent_id**: Path parameter for the agent's unique ID.
\f
Returns:
- AgentStatus object containing the agent's details.
Raises HTTPException:
- 404 Not Found: If the agent_id does not exist in the registry.
- 500 Internal Server Error: If the agent's entry in the registry is malformed.
"""
logger.debug(f"Request received for status of agent '{agent_id}'.")
if agent_id not in agent_registry:
logger.warning(f"Status requested for unknown agent '{agent_id}'.")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.")
details = agent_registry[agent_id]
# Basic validation of expected keys
if not isinstance(details, dict) or not all(k in details for k in ["name", "description", "status", "config"]):
logger.error(f"Agent '{agent_id}' registry entry is malformed: {details}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal error: Malformed status data for agent '{agent_id}'.")
# Simulate potential status updates if needed (optional placeholder)
# if details['status'] in ['starting', 'stopping']: details['status'] = random.choice(['running', 'stopped', 'error'])
try:
status_response = AgentStatus(
id=agent_id,
name=details["name"],
description=details["description"],
status=details["status"],
config=details["config"],
last_log_timestamp=details.get("last_log_timestamp") # Safely get optional field
)
logger.info(f"Returning status for agent '{agent_id}': {status_response.status}")
return status_response
except Exception as pydantic_err: # Catch potential Pydantic validation errors
logger.error(f"Error creating AgentStatus response for agent '{agent_id}': {pydantic_err}. Details: {details}", exc_info=True) # Added exc_info
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal error creating status response for agent '{agent_id}'.")
@app.post("/agents/{agent_id}/start", response_model=ApiResponse, status_code=status.HTTP_202_ACCEPTED, tags=["Agents"])
async def start_agent_api(agent_id: str = Path(..., description="The unique identifier of the agent to start.")):
"""
Signals an agent to start its operation (Placeholder/Simulated).
Updates the agent's status in the global `agent_registry`.
- **agent_id**: Path parameter for the agent's unique ID.
\f
Returns:
- ApiResponse indicating success or failure (if already running/starting).
Raises HTTPException:
- 404 Not Found: If the agent_id does not exist.
"""
logger.info(f"Received start signal for agent '{agent_id}'.")
if agent_id not in agent_registry:
logger.warning(f"Start signal received for unknown agent '{agent_id}'.")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.")
# Check current status before attempting start
current_status = agent_registry[agent_id].get("status", "unknown")
if current_status in ["running", "starting"]:
logger.info(f"Agent '{agent_id}' is already {current_status}. No action taken.")
# Return success=False for idempotent-like behavior
return ApiResponse(success=False, message=f"Agent '{agent_id}' is already {current_status}.")
if current_status == "error":
# Added logging for starting from error state
logger.warning(f"Attempting to start agent '{agent_id}' which is in 'error' state. Resetting status.")
# Decide if starting from error state is allowed/needs special handling
logger.info(f"API: Processing start signal for agent '{agent_id}' (Placeholder Action).")
# Simulate state change - In reality, trigger async start process
agent_registry[agent_id]["status"] = "starting"
# TODO: Implement actual agent process starting logic (e.g., message queue, process manager call, background task).
# Simulate transition to running after a short delay in a real scenario
# For now, just accept the request and simulate immediate change for simplicity.
await asyncio.sleep(0.1) # Tiny delay to simulate transition time if desired
agent_registry[agent_id]["status"] = "running" # Immediate simulation for now
agent_registry[agent_id]["last_log_timestamp"] = time.time() # Update timestamp on action
logger.info(f"Agent '{agent_id}' status set to 'running' (simulated).")
return ApiResponse(success=True, message=f"Start signal sent to agent '{agent_id}'. Status is now 'running' (simulated).")
@app.post("/agents/{agent_id}/stop", response_model=ApiResponse, status_code=status.HTTP_202_ACCEPTED, tags=["Agents"])
async def stop_agent_api(agent_id: str = Path(..., description="The unique identifier of the agent to stop.")):
"""
Signals an agent to stop its operation gracefully (Placeholder/Simulated).
Updates the agent's status in the global `agent_registry`.
- **agent_id**: Path parameter for the agent's unique ID.
\f
Returns:
- ApiResponse indicating success or failure (if already stopped/stopping).
Raises HTTPException:
- 404 Not Found: If the agent_id does not exist.
"""
logger.info(f"Received stop signal for agent '{agent_id}'.")
if agent_id not in agent_registry:
logger.warning(f"Stop signal received for unknown agent '{agent_id}'.")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.")
# Check current status before attempting stop
current_status = agent_registry[agent_id].get("status", "unknown")
# Consider 'error' as effectively stopped for control purposes, or handle separately if needed
if current_status in ["stopped", "stopping"]:
# Improved logging
logger.info(f"Agent '{agent_id}' is already {current_status}. No action taken.")
return ApiResponse(success=False, message=f"Agent '{agent_id}' is already {current_status}.")
logger.info(f"API: Processing stop signal for agent '{agent_id}' (Placeholder Action).")
# Simulate state change - In reality, trigger async stop process
agent_registry[agent_id]["status"] = "stopping"
# TODO: Implement actual agent process stopping logic (e.g., sending signal, waiting for confirmation).
# Simulate transition to stopped after a short delay in a real scenario
await asyncio.sleep(0.1) # Tiny delay
agent_registry[agent_id]["status"] = "stopped" # Immediate simulation for now
agent_registry[agent_id]["last_log_timestamp"] = time.time() # Update timestamp on action
logger.info(f"Agent '{agent_id}' status set to 'stopped' (simulated).")
return ApiResponse(success=True, message=f"Stop signal sent to agent '{agent_id}'. Status is now 'stopped' (simulated).")
@app.get("/agents/{agent_id}/logs", response_model=AgentLogResponse, tags=["Agents"])
async def get_agent_logs_api(
agent_id: str = Path(..., description="The unique identifier of the agent."),
lines: int = Query(20, ge=1, le=1000, description="Maximum number of recent log lines to retrieve.") # Added Query validation
):
"""
Retrieves recent logs for a specific agent (Simulated - generates new logs each time).
- **agent_id**: Path parameter for the agent's unique ID.
- **lines**: Query parameter for the number of log lines (default 20, min 1, max 1000).
\f
Returns:
- AgentLogResponse containing a list of simulated log strings.
Raises HTTPException:
- 404 Not Found: If the agent_id does not exist.
- 500 Internal Server Error: If log generation fails.
"""
logger.debug(f"Request received for logs of agent '{agent_id}' (lines={lines}).")
if agent_id not in agent_registry:
logger.warning(f"Log request for unknown agent '{agent_id}'.")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent '{agent_id}' not found.")
# Parameter 'lines' is already validated by FastAPI/Pydantic via Query(ge=1, le=1000)
# TODO: Implement actual log retrieval from agent process, file, or logging service.
# This simulation generates new logs each time, it doesn't store/retrieve history.
try:
simulated_logs = [_simulate_agent_log(agent_id) for _ in range(lines)]
agent_registry[agent_id]["last_log_timestamp"] = time.time() # Update timestamp on access
logger.info(f"Generated {len(simulated_logs)} simulated log lines for agent '{agent_id}'.")
return AgentLogResponse(logs=simulated_logs)
except Exception as e:
logger.exception(f"Error generating simulated logs for agent '{agent_id}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error generating logs for agent '{agent_id}'.")
# --- Metrics & Monitoring Endpoint ---
@app.get("/metrics/dashboard", response_model=DashboardMetrics, tags=["Metrics & Monitoring"])
async def get_dashboard_metrics(storage: TensorStorage = Depends(get_tensor_storage)):
"""
Provides aggregated dashboard metrics, combining real data (like dataset count)
with simulated data for agent performance and system health.
NOTE: This endpoint currently modifies simulation state (e.g., sim_steps)
within a GET request, which is not ideal REST practice. For production,
simulation updates should occur in a background task.
- **storage**: Injected TensorStorage instance.
\f
Returns:
- DashboardMetrics object containing various metrics.
Raises HTTPException:
- 500 Internal Server Error: If critical metrics cannot be retrieved or calculated.
"""
logger.debug("Request received for dashboard metrics.")
current_time = time.time()
metrics_data = {} # Use a dict to build metrics before creating the Pydantic model
# --- Real Metrics (with error handling) ---
try:
# Adapt based on your actual TensorStorage implementation
if hasattr(storage, 'list_datasets') and callable(storage.list_datasets):
dataset_count = len(storage.list_datasets())
elif hasattr(storage, 'datasets') and isinstance(storage.datasets, dict):
dataset_count = len(storage.datasets.keys())
else:
logger.error("TensorStorage instance lacks list_datasets() method or datasets dict.")
dataset_count = -1 # Indicate error
metrics_data["dataset_count"] = dataset_count
logger.debug(f"Retrieved dataset count: {dataset_count}")
except Exception as e:
logger.exception(f"Failed to get dataset count for metrics: {e}")
metrics_data["dataset_count"] = -1 # Indicate error fetching
# --- Simulated/Placeholder Metrics ---
# TODO: Replace simulations with actual metric collection from agents/storage/system.
# Agent Status Summary (from placeholder registry)
status_counts = {"running": 0, "stopped": 0, "error": 0, "starting": 0, "stopping": 0, "unknown": 0}
for agent_id, details in agent_registry.items():
status = details.get("status", "unknown")
if status not in status_counts:
logger.warning(f"Agent '{agent_id}' has unexpected status '{status}'. Counting as 'unknown'.")
status = "unknown"
status_counts[status] += 1
metrics_data["agent_status_summary"] = status_counts
# Simulate Total Records (Only estimate if dataset_count is valid)
ds_count = metrics_data.get("dataset_count", -1)
metrics_data["total_records_est"] = ds_count * random.randint(500, 5000) if ds_count >= 0 else 0
# Simulate performance metrics (slightly dynamic based on time/status)
# Use .get() for safe access in case agents are removed from registry later
ingestion_running = agent_registry.get("ingestion", {}).get("status") == "running"
rl_running = agent_registry.get("rl_trainer", {}).get("status") == "running"
automl_running = agent_registry.get("automl_search", {}).get("status") == "running"
metrics_data["data_ingestion_rate"] = random.uniform(5.0, 50.0) * (1.0 if ingestion_running else 0.1)
metrics_data["avg_query_latency_ms"] = random.uniform(50.0, 300.0) * (1 + 0.5 * math.sin(current_time / 60)) # Smoother oscillation
# --- Simulation state update (WARNING: Modifies state in GET request) ---
# This part modifies the global state. Better practice: use a background task.
rl_agent_state = agent_registry.setdefault("rl_trainer", {"sim_steps": 0}) # Ensure key and sim_steps exist
rl_total_steps = int(max(0, rl_agent_state.get("sim_steps", 0) + (random.randint(10, 150) if rl_running else 0))) # Adjusted range
rl_agent_state["sim_steps"] = rl_total_steps # Store simulated steps back
metrics_data["rl_total_steps"] = rl_total_steps
metrics_data["rl_latest_reward"] = random.gauss(10, 5.0) if rl_running else None # Example reward distribution
automl_agent_state = agent_registry.setdefault("automl_search", {"sim_trials": 0, "sim_best_score": None}) # Ensure keys exist
automl_trials_completed = int(max(0, automl_agent_state.get("sim_trials", 0) + (random.randint(0, 3) if automl_running else 0)))
automl_agent_state["sim_trials"] = automl_trials_completed
metrics_data["automl_trials_completed"] = automl_trials_completed
current_best = automl_agent_state.get("sim_best_score", None)
automl_best_score = None
if automl_running:
if current_best is None:
automl_best_score = random.uniform(0.7, 0.95) # Example initial score (e.g., accuracy)
else:
# Simulate improvement (higher is better for this example score)
improvement_factor = random.uniform(1.0, 1.005)
automl_best_score = min(1.0, current_best * improvement_factor) # Cap at 1.0
automl_agent_state["sim_best_score"] = automl_best_score # Store back
elif current_best is not None:
automl_best_score = current_best # Keep last known best if stopped
metrics_data["automl_best_score"] = automl_best_score
# --- End Simulation state update ---
# Simulate System Health (with bounds checks)
cpu_load = random.uniform(5.0, 25.0) \
+ (15 if ingestion_running else 0) \
+ (25 if rl_running else 0) \
+ (10 if automl_running else 0)
# Ensure value is between 0 and 100
metrics_data["system_cpu_usage_percent"] = min(100.0, max(0.0, cpu_load + random.uniform(-2.0, 2.0)))
mem_load = random.uniform(15.0, 40.0) \
+ (metrics_data.get("dataset_count", 0) * 0.75) # Memory scales slightly with datasets
# Ensure value is between 0 and 100
metrics_data["system_memory_usage_percent"] = min(100.0, max(0.0, mem_load + random.uniform(-3.0, 3.0)))
# --- Construct Response using Pydantic Model ---
try:
# Use the collected metrics_data dictionary
metrics = DashboardMetrics(
timestamp=current_time,
dataset_count=metrics_data["dataset_count"],
total_records_est=metrics_data["total_records_est"],
agent_status_summary=metrics_data["agent_status_summary"],
data_ingestion_rate=round(metrics_data["data_ingestion_rate"], 2),
avg_query_latency_ms=round(metrics_data["avg_query_latency_ms"], 1),
rl_latest_reward=(round(metrics_data["rl_latest_reward"], 3)
if metrics_data.get("rl_latest_reward") is not None else None),
rl_total_steps=metrics_data["rl_total_steps"],
automl_best_score=(round(metrics_data["automl_best_score"], 5)
if metrics_data.get("automl_best_score") is not None else None),
automl_trials_completed=metrics_data["automl_trials_completed"],
system_cpu_usage_percent=round(metrics_data["system_cpu_usage_percent"], 1),
system_memory_usage_percent=round(metrics_data["system_memory_usage_percent"], 1)
)
logger.info("Successfully generated dashboard metrics.")
return metrics
except Exception as e:
# Catch errors during final model creation (e.g., validation errors)
logger.exception(f"Error constructing DashboardMetrics response from data: {metrics_data}. Error: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error constructing metrics response.")
# --- Root Endpoint ---
@app.get("/", include_in_schema=False)
async def read_root():
"""Provides a simple welcome message for the API root."""
# Useful for health checks or simple verification that the API is running
return {"message": "Welcome to the Tensorus API! Visit /docs or /redoc for interactive documentation."}
# --- Main Execution Block ---
if __name__ == "__main__":
# This block allows running the API directly using `python api.py`
# Basic check for required local modules if run directly
modules_ok = True
try:
from tensor_storage import TensorStorage
from nql_agent import NQLAgent
except ImportError as import_err:
print(f"\nERROR: Missing required local modules: {import_err}.")
print("Please ensure tensor_storage.py and nql_agent.py are in the same directory or Python path.\n")
modules_ok = False
# exit(1) # Exit if modules are absolutely critical for startup
if modules_ok:
print(f"--- Starting Tensorus API Server (v{app.version} with Agent Placeholders) ---")
print(f"--- Logging level set to: {logging.getLevelName(logger.getEffectiveLevel())} ---")
print(f"--- Access API documentation at http://0.0.0.0:8000/docs ---")
print(f"--- Alternative documentation at http://0.0.0.0:8000/redoc ---")
print("--- Press CTRL+C to stop ---")
# Use uvicorn to run the app
uvicorn.run(
"api:app", # Points to the 'app' instance in the 'api.py' file
host="0.0.0.0",
port=8000,
reload=True, # Enable auto-reload for development (watches for file changes)
log_level=logging.getLevelName(logger.getEffectiveLevel()).lower(), # Sync uvicorn log level
# Use workers > 1 only if your app is stateless or handles state carefully
# workers=1
)
else:
print("--- API Server NOT started due to missing modules. ---")
|