api / ingestion_agent.py
tensorus's picture
Upload 11 files
aa654a4 verified
# ingestion_agent.py
"""
Implements the Autonomous Data Ingestion Agent for Tensorus.
This agent monitors a source directory for new data files (e.g., CSV, images),
preprocesses them into tensors using configurable functions, performs basic
validation, and inserts them into a specified dataset in TensorStorage.
Future Enhancements:
- Monitor cloud storage (S3, GCS) and APIs.
- More robust error handling for malformed files.
- More sophisticated duplicate detection (e.g., file hashing).
- Support for streaming data sources.
- Asynchronous processing for higher throughput.
- More complex and configurable preprocessing pipelines.
- Schema validation against predefined dataset schemas.
- Resource management controls.
"""
import os
import time
import glob
import logging
import threading
import csv
from PIL import Image
import torch
import torchvision.transforms as T # Use torchvision for image transforms
from typing import Dict, Callable, Optional, Tuple, List, Any
from tensor_storage import TensorStorage # Import our storage module
# Configure basic logging (can be customized further)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Default Preprocessing Functions ---
def preprocess_csv(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
"""
Basic CSV preprocessor. Assumes numeric data.
Reads a CSV, converts rows to tensors (one tensor per row or one tensor for the whole file).
Returns a single tensor representing the whole file for simplicity here.
"""
metadata = {"source_file": file_path, "type": "csv"}
data = []
try:
with open(file_path, 'r', newline='') as csvfile:
reader = csv.reader(csvfile)
header = next(reader, None) # Skip header row
metadata["header"] = header
for row in reader:
# Attempt to convert row elements to floats
try:
numeric_row = [float(item) for item in row]
data.append(numeric_row)
except ValueError:
logger.warning(f"Skipping non-numeric row in {file_path}: {row}")
continue # Skip rows that can't be fully converted to float
if not data:
logger.warning(f"No numeric data found or processed in CSV file: {file_path}")
return None, metadata
tensor = torch.tensor(data, dtype=torch.float32)
logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
return tensor, metadata
except Exception as e:
logger.error(f"Failed to process CSV file {file_path}: {e}")
return None, metadata
def preprocess_image(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
"""
Basic Image preprocessor using Pillow and Torchvision transforms.
Opens an image, applies standard transformations (resize, normalize),
and returns it as a tensor.
"""
metadata = {"source_file": file_path, "type": "image"}
try:
img = Image.open(file_path).convert('RGB') # Ensure 3 channels (RGB)
# Example transform: Resize, convert to tensor, normalize
# These should ideally be configurable
transform = T.Compose([
T.Resize((128, 128)), # Example fixed size
T.ToTensor(), # Converts PIL image (H, W, C) [0,255] to Tensor (C, H, W) [0,1]
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet stats
])
tensor = transform(img)
metadata["original_size"] = img.size # (width, height)
logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
return tensor, metadata
except FileNotFoundError:
logger.error(f"Image file not found: {file_path}")
return None, metadata
except Exception as e:
logger.error(f"Failed to process image file {file_path}: {e}")
return None, metadata
# --- Data Ingestion Agent Class ---
class DataIngestionAgent:
"""
Monitors a directory for new files and ingests them into TensorStorage.
"""
def __init__(self,
tensor_storage: TensorStorage,
dataset_name: str,
source_directory: str,
polling_interval_sec: int = 10,
preprocessing_rules: Optional[Dict[str, Callable[[str], Tuple[Optional[torch.Tensor], Dict[str, Any]]]]] = None):
"""
Initializes the DataIngestionAgent.
Args:
tensor_storage: An instance of the TensorStorage class.
dataset_name: The name of the dataset in TensorStorage to ingest into.
source_directory: The path to the local directory to monitor.
polling_interval_sec: How often (in seconds) to check the directory.
preprocessing_rules: A dictionary mapping lowercase file extensions
(e.g., '.csv', '.png') to preprocessing functions.
Each function takes a file path and returns a
Tuple containing the processed Tensor (or None on failure)
and a metadata dictionary. If None, default rules are used.
"""
if not isinstance(tensor_storage, TensorStorage):
raise TypeError("tensor_storage must be an instance of TensorStorage")
if not os.path.isdir(source_directory):
raise ValueError(f"Source directory '{source_directory}' does not exist or is not a directory.")
self.tensor_storage = tensor_storage
self.dataset_name = dataset_name
self.source_directory = source_directory
self.polling_interval = polling_interval_sec
# Ensure dataset exists
try:
self.tensor_storage.get_dataset(self.dataset_name)
logger.info(f"Agent targeting existing dataset '{self.dataset_name}'.")
except ValueError:
logger.info(f"Dataset '{self.dataset_name}' not found. Creating it.")
self.tensor_storage.create_dataset(self.dataset_name)
# Default preprocessing rules if none provided
if preprocessing_rules is None:
self.preprocessing_rules = {
'.csv': preprocess_csv,
'.png': preprocess_image,
'.jpg': preprocess_image,
'.jpeg': preprocess_image,
'.tif': preprocess_image,
'.tiff': preprocess_image,
}
logger.info("Using default preprocessing rules for CSV and common image formats.")
else:
self.preprocessing_rules = preprocessing_rules
logger.info(f"Using custom preprocessing rules for extensions: {list(preprocessing_rules.keys())}")
self.processed_files = set() # Keep track of files already processed in this session
self._stop_event = threading.Event()
self._monitor_thread = None
logger.info(f"DataIngestionAgent initialized for dataset '{self.dataset_name}' monitoring '{self.source_directory}'.")
def _validate_data(self, tensor: Optional[torch.Tensor], metadata: Dict[str, Any]) -> bool:
"""
Performs basic validation on the preprocessed tensor.
Returns True if valid, False otherwise.
"""
if tensor is None:
logger.warning(f"Validation failed: Tensor is None for {metadata.get('source_file', 'N/A')}")
return False
if not isinstance(tensor, torch.Tensor):
logger.warning(f"Validation failed: Output is not a tensor for {metadata.get('source_file', 'N/A')}")
return False
# Add more specific checks if needed (e.g., tensor.numel() > 0)
return True
def _process_file(self, file_path: str) -> None:
"""Processes a single detected file."""
logger.info(f"Detected new file: {file_path}")
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
preprocessor = self.preprocessing_rules.get(file_extension)
if preprocessor:
logger.debug(f"Applying preprocessor for '{file_extension}' to {file_path}")
try:
tensor, metadata = preprocessor(file_path)
if self._validate_data(tensor, metadata):
# Ensure tensor is not None before insertion
if tensor is not None:
record_id = self.tensor_storage.insert(self.dataset_name, tensor, metadata)
logger.info(f"Successfully ingested '{file_path}' into dataset '{self.dataset_name}' with record ID: {record_id}")
self.processed_files.add(file_path) # Mark as processed only on success
else:
# Should have been caught by validation, but as safeguard:
logger.error(f"Validation passed but tensor is None for {file_path}. Skipping insertion.")
else:
logger.warning(f"Data validation failed for {file_path}. Skipping insertion.")
except Exception as e:
logger.error(f"Unhandled error during preprocessing or insertion for {file_path}: {e}", exc_info=True)
else:
logger.debug(f"No preprocessor configured for file extension '{file_extension}'. Skipping file: {file_path}")
def _scan_source_directory(self) -> None:
"""Scans the source directory for new files matching the rules."""
logger.debug(f"Scanning directory: {self.source_directory}")
supported_extensions = self.preprocessing_rules.keys()
try:
# Use glob to find all files, then filter
# This might be inefficient for huge directories, consider os.scandir
all_files = glob.glob(os.path.join(self.source_directory, '*'), recursive=False) # Non-recursive
for file_path in all_files:
if not os.path.isfile(file_path):
continue # Skip directories
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
if file_extension in supported_extensions and file_path not in self.processed_files:
self._process_file(file_path)
except Exception as e:
logger.error(f"Error scanning source directory '{self.source_directory}': {e}", exc_info=True)
def _monitor_loop(self) -> None:
"""The main loop executed by the background thread."""
logger.info(f"Starting monitoring loop for '{self.source_directory}'. Polling interval: {self.polling_interval} seconds.")
while not self._stop_event.is_set():
self._scan_source_directory()
# Wait for the specified interval or until stop event is set
self._stop_event.wait(self.polling_interval)
logger.info("Monitoring loop stopped.")
def start(self) -> None:
"""Starts the monitoring process in a background thread."""
if self._monitor_thread is not None and self._monitor_thread.is_alive():
logger.warning("Agent monitoring is already running.")
return
self._stop_event.clear()
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("Data Ingestion Agent started monitoring.")
def stop(self) -> None:
"""Signals the monitoring thread to stop."""
if self._monitor_thread is None or not self._monitor_thread.is_alive():
logger.info("Agent monitoring is not running.")
return
logger.info("Stopping Data Ingestion Agent monitoring...")
self._stop_event.set()
self._monitor_thread.join(timeout=self.polling_interval + 5) # Wait for thread to finish
if self._monitor_thread.is_alive():
logger.warning("Monitoring thread did not stop gracefully after timeout.")
else:
logger.info("Data Ingestion Agent monitoring stopped successfully.")
self._monitor_thread = None
# --- Example Usage ---
if __name__ == "__main__":
logger.info("--- Starting Ingestion Agent Example ---")
# 1. Setup TensorStorage
storage = TensorStorage()
# 2. Setup a temporary directory for the agent to monitor
source_dir = "temp_ingestion_source"
if not os.path.exists(source_dir):
os.makedirs(source_dir)
logger.info(f"Created temporary source directory: {source_dir}")
# 3. Create the Ingestion Agent
# We'll use a short polling interval for demonstration
agent = DataIngestionAgent(
tensor_storage=storage,
dataset_name="raw_data",
source_directory=source_dir,
polling_interval_sec=5
)
# 4. Start the agent (runs in the background)
agent.start()
# 5. Simulate adding files to the source directory
print("\nSimulating file creation...")
time.sleep(2) # Give agent time to start initial scan
# Create a dummy CSV file
csv_path = os.path.join(source_dir, "data_1.csv")
with open(csv_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Timestamp", "Value1", "Value2"])
writer.writerow(["1678886400", "10.5", "20.1"])
writer.writerow(["1678886460", "11.2", "20.5"])
writer.writerow(["1678886520", "invalid", "20.9"]) # Test non-numeric row
writer.writerow(["1678886580", "10.9", "21.0"])
print(f"Created CSV: {csv_path}")
# Create a dummy image file (requires Pillow)
try:
img_path = os.path.join(source_dir, "image_1.png")
dummy_img = Image.new('RGB', (60, 30), color = 'red')
dummy_img.save(img_path)
print(f"Created Image: {img_path}")
except ImportError:
print("Pillow not installed, skipping image creation. Install with: pip install Pillow")
except Exception as e:
print(f"Could not create dummy image: {e}")
# Create an unsupported file type
txt_path = os.path.join(source_dir, "notes.txt")
with open(txt_path, 'w') as f:
f.write("This is a test file.")
print(f"Created TXT: {txt_path} (should be skipped)")
# 6. Let the agent run for a couple of polling cycles
print(f"\nWaiting for agent to process files (polling interval {agent.polling_interval}s)...")
time.sleep(agent.polling_interval * 2 + 1) # Wait for 2 cycles + buffer
# 7. Check the contents of TensorStorage
print("\n--- Checking TensorStorage contents ---")
try:
ingested_data = storage.get_dataset_with_metadata("raw_data")
print(f"Found {len(ingested_data)} items in dataset 'raw_data':")
for item in ingested_data:
print(f" Record ID: {item['metadata'].get('record_id')}, Source: {item['metadata'].get('source_file')}, Shape: {item['tensor'].shape}, Dtype: {item['tensor'].dtype}")
# print(f" Tensor: {item['tensor']}") # Can be verbose
except ValueError as e:
print(f"Could not retrieve dataset 'raw_data': {e}")
# 8. Stop the agent
print("\n--- Stopping Agent ---")
agent.stop()
# 9. Clean up the temporary directory (optional)
# print(f"\nCleaning up temporary directory: {source_dir}")
# import shutil
# shutil.rmtree(source_dir)
logger.info("--- Ingestion Agent Example Finished ---")