| |
| """ |
| Implements the Autonomous Data Ingestion Agent for Tensorus. |
| |
| This agent monitors a source directory for new data files (e.g., CSV, images), |
| preprocesses them into tensors using configurable functions, performs basic |
| validation, and inserts them into a specified dataset in TensorStorage. |
| |
| Future Enhancements: |
| - Monitor cloud storage (S3, GCS) and APIs. |
| - More robust error handling for malformed files. |
| - More sophisticated duplicate detection (e.g., file hashing). |
| - Support for streaming data sources. |
| - Asynchronous processing for higher throughput. |
| - More complex and configurable preprocessing pipelines. |
| - Schema validation against predefined dataset schemas. |
| - Resource management controls. |
| """ |
|
|
| import os |
| import time |
| import glob |
| import logging |
| import threading |
| import csv |
| from PIL import Image |
| import torch |
| import torchvision.transforms as T |
|
|
| from typing import Dict, Callable, Optional, Tuple, List, Any |
| from tensor_storage import TensorStorage |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
|
|
| def preprocess_csv(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]: |
| """ |
| Basic CSV preprocessor. Assumes numeric data. |
| Reads a CSV, converts rows to tensors (one tensor per row or one tensor for the whole file). |
| Returns a single tensor representing the whole file for simplicity here. |
| """ |
| metadata = {"source_file": file_path, "type": "csv"} |
| data = [] |
| try: |
| with open(file_path, 'r', newline='') as csvfile: |
| reader = csv.reader(csvfile) |
| header = next(reader, None) |
| metadata["header"] = header |
| for row in reader: |
| |
| try: |
| numeric_row = [float(item) for item in row] |
| data.append(numeric_row) |
| except ValueError: |
| logger.warning(f"Skipping non-numeric row in {file_path}: {row}") |
| continue |
|
|
| if not data: |
| logger.warning(f"No numeric data found or processed in CSV file: {file_path}") |
| return None, metadata |
|
|
| tensor = torch.tensor(data, dtype=torch.float32) |
| logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}") |
| return tensor, metadata |
| except Exception as e: |
| logger.error(f"Failed to process CSV file {file_path}: {e}") |
| return None, metadata |
|
|
|
|
| def preprocess_image(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]: |
| """ |
| Basic Image preprocessor using Pillow and Torchvision transforms. |
| Opens an image, applies standard transformations (resize, normalize), |
| and returns it as a tensor. |
| """ |
| metadata = {"source_file": file_path, "type": "image"} |
| try: |
| img = Image.open(file_path).convert('RGB') |
|
|
| |
| |
| transform = T.Compose([ |
| T.Resize((128, 128)), |
| T.ToTensor(), |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
| ]) |
|
|
| tensor = transform(img) |
| metadata["original_size"] = img.size |
| logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}") |
| return tensor, metadata |
| except FileNotFoundError: |
| logger.error(f"Image file not found: {file_path}") |
| return None, metadata |
| except Exception as e: |
| logger.error(f"Failed to process image file {file_path}: {e}") |
| return None, metadata |
|
|
| |
|
|
| class DataIngestionAgent: |
| """ |
| Monitors a directory for new files and ingests them into TensorStorage. |
| """ |
|
|
| def __init__(self, |
| tensor_storage: TensorStorage, |
| dataset_name: str, |
| source_directory: str, |
| polling_interval_sec: int = 10, |
| preprocessing_rules: Optional[Dict[str, Callable[[str], Tuple[Optional[torch.Tensor], Dict[str, Any]]]]] = None): |
| """ |
| Initializes the DataIngestionAgent. |
| |
| Args: |
| tensor_storage: An instance of the TensorStorage class. |
| dataset_name: The name of the dataset in TensorStorage to ingest into. |
| source_directory: The path to the local directory to monitor. |
| polling_interval_sec: How often (in seconds) to check the directory. |
| preprocessing_rules: A dictionary mapping lowercase file extensions |
| (e.g., '.csv', '.png') to preprocessing functions. |
| Each function takes a file path and returns a |
| Tuple containing the processed Tensor (or None on failure) |
| and a metadata dictionary. If None, default rules are used. |
| """ |
| if not isinstance(tensor_storage, TensorStorage): |
| raise TypeError("tensor_storage must be an instance of TensorStorage") |
| if not os.path.isdir(source_directory): |
| raise ValueError(f"Source directory '{source_directory}' does not exist or is not a directory.") |
|
|
| self.tensor_storage = tensor_storage |
| self.dataset_name = dataset_name |
| self.source_directory = source_directory |
| self.polling_interval = polling_interval_sec |
|
|
| |
| try: |
| self.tensor_storage.get_dataset(self.dataset_name) |
| logger.info(f"Agent targeting existing dataset '{self.dataset_name}'.") |
| except ValueError: |
| logger.info(f"Dataset '{self.dataset_name}' not found. Creating it.") |
| self.tensor_storage.create_dataset(self.dataset_name) |
|
|
| |
| if preprocessing_rules is None: |
| self.preprocessing_rules = { |
| '.csv': preprocess_csv, |
| '.png': preprocess_image, |
| '.jpg': preprocess_image, |
| '.jpeg': preprocess_image, |
| '.tif': preprocess_image, |
| '.tiff': preprocess_image, |
| } |
| logger.info("Using default preprocessing rules for CSV and common image formats.") |
| else: |
| self.preprocessing_rules = preprocessing_rules |
| logger.info(f"Using custom preprocessing rules for extensions: {list(preprocessing_rules.keys())}") |
|
|
| self.processed_files = set() |
| self._stop_event = threading.Event() |
| self._monitor_thread = None |
| logger.info(f"DataIngestionAgent initialized for dataset '{self.dataset_name}' monitoring '{self.source_directory}'.") |
|
|
|
|
| def _validate_data(self, tensor: Optional[torch.Tensor], metadata: Dict[str, Any]) -> bool: |
| """ |
| Performs basic validation on the preprocessed tensor. |
| Returns True if valid, False otherwise. |
| """ |
| if tensor is None: |
| logger.warning(f"Validation failed: Tensor is None for {metadata.get('source_file', 'N/A')}") |
| return False |
| if not isinstance(tensor, torch.Tensor): |
| logger.warning(f"Validation failed: Output is not a tensor for {metadata.get('source_file', 'N/A')}") |
| return False |
| |
| return True |
|
|
| def _process_file(self, file_path: str) -> None: |
| """Processes a single detected file.""" |
| logger.info(f"Detected new file: {file_path}") |
| _, file_extension = os.path.splitext(file_path) |
| file_extension = file_extension.lower() |
|
|
| preprocessor = self.preprocessing_rules.get(file_extension) |
|
|
| if preprocessor: |
| logger.debug(f"Applying preprocessor for '{file_extension}' to {file_path}") |
| try: |
| tensor, metadata = preprocessor(file_path) |
|
|
| if self._validate_data(tensor, metadata): |
| |
| if tensor is not None: |
| record_id = self.tensor_storage.insert(self.dataset_name, tensor, metadata) |
| logger.info(f"Successfully ingested '{file_path}' into dataset '{self.dataset_name}' with record ID: {record_id}") |
| self.processed_files.add(file_path) |
| else: |
| |
| logger.error(f"Validation passed but tensor is None for {file_path}. Skipping insertion.") |
|
|
| else: |
| logger.warning(f"Data validation failed for {file_path}. Skipping insertion.") |
|
|
| except Exception as e: |
| logger.error(f"Unhandled error during preprocessing or insertion for {file_path}: {e}", exc_info=True) |
| else: |
| logger.debug(f"No preprocessor configured for file extension '{file_extension}'. Skipping file: {file_path}") |
|
|
|
|
| def _scan_source_directory(self) -> None: |
| """Scans the source directory for new files matching the rules.""" |
| logger.debug(f"Scanning directory: {self.source_directory}") |
| supported_extensions = self.preprocessing_rules.keys() |
|
|
| try: |
| |
| |
| all_files = glob.glob(os.path.join(self.source_directory, '*'), recursive=False) |
|
|
| for file_path in all_files: |
| if not os.path.isfile(file_path): |
| continue |
|
|
| _, file_extension = os.path.splitext(file_path) |
| file_extension = file_extension.lower() |
|
|
| if file_extension in supported_extensions and file_path not in self.processed_files: |
| self._process_file(file_path) |
|
|
| except Exception as e: |
| logger.error(f"Error scanning source directory '{self.source_directory}': {e}", exc_info=True) |
|
|
|
|
| def _monitor_loop(self) -> None: |
| """The main loop executed by the background thread.""" |
| logger.info(f"Starting monitoring loop for '{self.source_directory}'. Polling interval: {self.polling_interval} seconds.") |
| while not self._stop_event.is_set(): |
| self._scan_source_directory() |
| |
| self._stop_event.wait(self.polling_interval) |
| logger.info("Monitoring loop stopped.") |
|
|
|
|
| def start(self) -> None: |
| """Starts the monitoring process in a background thread.""" |
| if self._monitor_thread is not None and self._monitor_thread.is_alive(): |
| logger.warning("Agent monitoring is already running.") |
| return |
|
|
| self._stop_event.clear() |
| self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) |
| self._monitor_thread.start() |
| logger.info("Data Ingestion Agent started monitoring.") |
|
|
| def stop(self) -> None: |
| """Signals the monitoring thread to stop.""" |
| if self._monitor_thread is None or not self._monitor_thread.is_alive(): |
| logger.info("Agent monitoring is not running.") |
| return |
|
|
| logger.info("Stopping Data Ingestion Agent monitoring...") |
| self._stop_event.set() |
| self._monitor_thread.join(timeout=self.polling_interval + 5) |
|
|
| if self._monitor_thread.is_alive(): |
| logger.warning("Monitoring thread did not stop gracefully after timeout.") |
| else: |
| logger.info("Data Ingestion Agent monitoring stopped successfully.") |
| self._monitor_thread = None |
|
|
|
|
| |
| if __name__ == "__main__": |
| logger.info("--- Starting Ingestion Agent Example ---") |
|
|
| |
| storage = TensorStorage() |
|
|
| |
| source_dir = "temp_ingestion_source" |
| if not os.path.exists(source_dir): |
| os.makedirs(source_dir) |
| logger.info(f"Created temporary source directory: {source_dir}") |
|
|
| |
| |
| agent = DataIngestionAgent( |
| tensor_storage=storage, |
| dataset_name="raw_data", |
| source_directory=source_dir, |
| polling_interval_sec=5 |
| ) |
|
|
| |
| agent.start() |
|
|
| |
| print("\nSimulating file creation...") |
| time.sleep(2) |
|
|
| |
| csv_path = os.path.join(source_dir, "data_1.csv") |
| with open(csv_path, 'w', newline='') as f: |
| writer = csv.writer(f) |
| writer.writerow(["Timestamp", "Value1", "Value2"]) |
| writer.writerow(["1678886400", "10.5", "20.1"]) |
| writer.writerow(["1678886460", "11.2", "20.5"]) |
| writer.writerow(["1678886520", "invalid", "20.9"]) |
| writer.writerow(["1678886580", "10.9", "21.0"]) |
| print(f"Created CSV: {csv_path}") |
|
|
| |
| try: |
| img_path = os.path.join(source_dir, "image_1.png") |
| dummy_img = Image.new('RGB', (60, 30), color = 'red') |
| dummy_img.save(img_path) |
| print(f"Created Image: {img_path}") |
| except ImportError: |
| print("Pillow not installed, skipping image creation. Install with: pip install Pillow") |
| except Exception as e: |
| print(f"Could not create dummy image: {e}") |
|
|
|
|
| |
| txt_path = os.path.join(source_dir, "notes.txt") |
| with open(txt_path, 'w') as f: |
| f.write("This is a test file.") |
| print(f"Created TXT: {txt_path} (should be skipped)") |
|
|
|
|
| |
| print(f"\nWaiting for agent to process files (polling interval {agent.polling_interval}s)...") |
| time.sleep(agent.polling_interval * 2 + 1) |
|
|
|
|
| |
| print("\n--- Checking TensorStorage contents ---") |
| try: |
| ingested_data = storage.get_dataset_with_metadata("raw_data") |
| print(f"Found {len(ingested_data)} items in dataset 'raw_data':") |
| for item in ingested_data: |
| print(f" Record ID: {item['metadata'].get('record_id')}, Source: {item['metadata'].get('source_file')}, Shape: {item['tensor'].shape}, Dtype: {item['tensor'].dtype}") |
| |
| except ValueError as e: |
| print(f"Could not retrieve dataset 'raw_data': {e}") |
|
|
|
|
| |
| print("\n--- Stopping Agent ---") |
| agent.stop() |
|
|
| |
| |
| |
| |
|
|
| logger.info("--- Ingestion Agent Example Finished ---") |