File size: 15,585 Bytes
aa654a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 | # ingestion_agent.py
"""
Implements the Autonomous Data Ingestion Agent for Tensorus.
This agent monitors a source directory for new data files (e.g., CSV, images),
preprocesses them into tensors using configurable functions, performs basic
validation, and inserts them into a specified dataset in TensorStorage.
Future Enhancements:
- Monitor cloud storage (S3, GCS) and APIs.
- More robust error handling for malformed files.
- More sophisticated duplicate detection (e.g., file hashing).
- Support for streaming data sources.
- Asynchronous processing for higher throughput.
- More complex and configurable preprocessing pipelines.
- Schema validation against predefined dataset schemas.
- Resource management controls.
"""
import os
import time
import glob
import logging
import threading
import csv
from PIL import Image
import torch
import torchvision.transforms as T # Use torchvision for image transforms
from typing import Dict, Callable, Optional, Tuple, List, Any
from tensor_storage import TensorStorage # Import our storage module
# Configure basic logging (can be customized further)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Default Preprocessing Functions ---
def preprocess_csv(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
"""
Basic CSV preprocessor. Assumes numeric data.
Reads a CSV, converts rows to tensors (one tensor per row or one tensor for the whole file).
Returns a single tensor representing the whole file for simplicity here.
"""
metadata = {"source_file": file_path, "type": "csv"}
data = []
try:
with open(file_path, 'r', newline='') as csvfile:
reader = csv.reader(csvfile)
header = next(reader, None) # Skip header row
metadata["header"] = header
for row in reader:
# Attempt to convert row elements to floats
try:
numeric_row = [float(item) for item in row]
data.append(numeric_row)
except ValueError:
logger.warning(f"Skipping non-numeric row in {file_path}: {row}")
continue # Skip rows that can't be fully converted to float
if not data:
logger.warning(f"No numeric data found or processed in CSV file: {file_path}")
return None, metadata
tensor = torch.tensor(data, dtype=torch.float32)
logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
return tensor, metadata
except Exception as e:
logger.error(f"Failed to process CSV file {file_path}: {e}")
return None, metadata
def preprocess_image(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
"""
Basic Image preprocessor using Pillow and Torchvision transforms.
Opens an image, applies standard transformations (resize, normalize),
and returns it as a tensor.
"""
metadata = {"source_file": file_path, "type": "image"}
try:
img = Image.open(file_path).convert('RGB') # Ensure 3 channels (RGB)
# Example transform: Resize, convert to tensor, normalize
# These should ideally be configurable
transform = T.Compose([
T.Resize((128, 128)), # Example fixed size
T.ToTensor(), # Converts PIL image (H, W, C) [0,255] to Tensor (C, H, W) [0,1]
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet stats
])
tensor = transform(img)
metadata["original_size"] = img.size # (width, height)
logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
return tensor, metadata
except FileNotFoundError:
logger.error(f"Image file not found: {file_path}")
return None, metadata
except Exception as e:
logger.error(f"Failed to process image file {file_path}: {e}")
return None, metadata
# --- Data Ingestion Agent Class ---
class DataIngestionAgent:
"""
Monitors a directory for new files and ingests them into TensorStorage.
"""
def __init__(self,
tensor_storage: TensorStorage,
dataset_name: str,
source_directory: str,
polling_interval_sec: int = 10,
preprocessing_rules: Optional[Dict[str, Callable[[str], Tuple[Optional[torch.Tensor], Dict[str, Any]]]]] = None):
"""
Initializes the DataIngestionAgent.
Args:
tensor_storage: An instance of the TensorStorage class.
dataset_name: The name of the dataset in TensorStorage to ingest into.
source_directory: The path to the local directory to monitor.
polling_interval_sec: How often (in seconds) to check the directory.
preprocessing_rules: A dictionary mapping lowercase file extensions
(e.g., '.csv', '.png') to preprocessing functions.
Each function takes a file path and returns a
Tuple containing the processed Tensor (or None on failure)
and a metadata dictionary. If None, default rules are used.
"""
if not isinstance(tensor_storage, TensorStorage):
raise TypeError("tensor_storage must be an instance of TensorStorage")
if not os.path.isdir(source_directory):
raise ValueError(f"Source directory '{source_directory}' does not exist or is not a directory.")
self.tensor_storage = tensor_storage
self.dataset_name = dataset_name
self.source_directory = source_directory
self.polling_interval = polling_interval_sec
# Ensure dataset exists
try:
self.tensor_storage.get_dataset(self.dataset_name)
logger.info(f"Agent targeting existing dataset '{self.dataset_name}'.")
except ValueError:
logger.info(f"Dataset '{self.dataset_name}' not found. Creating it.")
self.tensor_storage.create_dataset(self.dataset_name)
# Default preprocessing rules if none provided
if preprocessing_rules is None:
self.preprocessing_rules = {
'.csv': preprocess_csv,
'.png': preprocess_image,
'.jpg': preprocess_image,
'.jpeg': preprocess_image,
'.tif': preprocess_image,
'.tiff': preprocess_image,
}
logger.info("Using default preprocessing rules for CSV and common image formats.")
else:
self.preprocessing_rules = preprocessing_rules
logger.info(f"Using custom preprocessing rules for extensions: {list(preprocessing_rules.keys())}")
self.processed_files = set() # Keep track of files already processed in this session
self._stop_event = threading.Event()
self._monitor_thread = None
logger.info(f"DataIngestionAgent initialized for dataset '{self.dataset_name}' monitoring '{self.source_directory}'.")
def _validate_data(self, tensor: Optional[torch.Tensor], metadata: Dict[str, Any]) -> bool:
"""
Performs basic validation on the preprocessed tensor.
Returns True if valid, False otherwise.
"""
if tensor is None:
logger.warning(f"Validation failed: Tensor is None for {metadata.get('source_file', 'N/A')}")
return False
if not isinstance(tensor, torch.Tensor):
logger.warning(f"Validation failed: Output is not a tensor for {metadata.get('source_file', 'N/A')}")
return False
# Add more specific checks if needed (e.g., tensor.numel() > 0)
return True
def _process_file(self, file_path: str) -> None:
"""Processes a single detected file."""
logger.info(f"Detected new file: {file_path}")
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
preprocessor = self.preprocessing_rules.get(file_extension)
if preprocessor:
logger.debug(f"Applying preprocessor for '{file_extension}' to {file_path}")
try:
tensor, metadata = preprocessor(file_path)
if self._validate_data(tensor, metadata):
# Ensure tensor is not None before insertion
if tensor is not None:
record_id = self.tensor_storage.insert(self.dataset_name, tensor, metadata)
logger.info(f"Successfully ingested '{file_path}' into dataset '{self.dataset_name}' with record ID: {record_id}")
self.processed_files.add(file_path) # Mark as processed only on success
else:
# Should have been caught by validation, but as safeguard:
logger.error(f"Validation passed but tensor is None for {file_path}. Skipping insertion.")
else:
logger.warning(f"Data validation failed for {file_path}. Skipping insertion.")
except Exception as e:
logger.error(f"Unhandled error during preprocessing or insertion for {file_path}: {e}", exc_info=True)
else:
logger.debug(f"No preprocessor configured for file extension '{file_extension}'. Skipping file: {file_path}")
def _scan_source_directory(self) -> None:
"""Scans the source directory for new files matching the rules."""
logger.debug(f"Scanning directory: {self.source_directory}")
supported_extensions = self.preprocessing_rules.keys()
try:
# Use glob to find all files, then filter
# This might be inefficient for huge directories, consider os.scandir
all_files = glob.glob(os.path.join(self.source_directory, '*'), recursive=False) # Non-recursive
for file_path in all_files:
if not os.path.isfile(file_path):
continue # Skip directories
_, file_extension = os.path.splitext(file_path)
file_extension = file_extension.lower()
if file_extension in supported_extensions and file_path not in self.processed_files:
self._process_file(file_path)
except Exception as e:
logger.error(f"Error scanning source directory '{self.source_directory}': {e}", exc_info=True)
def _monitor_loop(self) -> None:
"""The main loop executed by the background thread."""
logger.info(f"Starting monitoring loop for '{self.source_directory}'. Polling interval: {self.polling_interval} seconds.")
while not self._stop_event.is_set():
self._scan_source_directory()
# Wait for the specified interval or until stop event is set
self._stop_event.wait(self.polling_interval)
logger.info("Monitoring loop stopped.")
def start(self) -> None:
"""Starts the monitoring process in a background thread."""
if self._monitor_thread is not None and self._monitor_thread.is_alive():
logger.warning("Agent monitoring is already running.")
return
self._stop_event.clear()
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("Data Ingestion Agent started monitoring.")
def stop(self) -> None:
"""Signals the monitoring thread to stop."""
if self._monitor_thread is None or not self._monitor_thread.is_alive():
logger.info("Agent monitoring is not running.")
return
logger.info("Stopping Data Ingestion Agent monitoring...")
self._stop_event.set()
self._monitor_thread.join(timeout=self.polling_interval + 5) # Wait for thread to finish
if self._monitor_thread.is_alive():
logger.warning("Monitoring thread did not stop gracefully after timeout.")
else:
logger.info("Data Ingestion Agent monitoring stopped successfully.")
self._monitor_thread = None
# --- Example Usage ---
if __name__ == "__main__":
logger.info("--- Starting Ingestion Agent Example ---")
# 1. Setup TensorStorage
storage = TensorStorage()
# 2. Setup a temporary directory for the agent to monitor
source_dir = "temp_ingestion_source"
if not os.path.exists(source_dir):
os.makedirs(source_dir)
logger.info(f"Created temporary source directory: {source_dir}")
# 3. Create the Ingestion Agent
# We'll use a short polling interval for demonstration
agent = DataIngestionAgent(
tensor_storage=storage,
dataset_name="raw_data",
source_directory=source_dir,
polling_interval_sec=5
)
# 4. Start the agent (runs in the background)
agent.start()
# 5. Simulate adding files to the source directory
print("\nSimulating file creation...")
time.sleep(2) # Give agent time to start initial scan
# Create a dummy CSV file
csv_path = os.path.join(source_dir, "data_1.csv")
with open(csv_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Timestamp", "Value1", "Value2"])
writer.writerow(["1678886400", "10.5", "20.1"])
writer.writerow(["1678886460", "11.2", "20.5"])
writer.writerow(["1678886520", "invalid", "20.9"]) # Test non-numeric row
writer.writerow(["1678886580", "10.9", "21.0"])
print(f"Created CSV: {csv_path}")
# Create a dummy image file (requires Pillow)
try:
img_path = os.path.join(source_dir, "image_1.png")
dummy_img = Image.new('RGB', (60, 30), color = 'red')
dummy_img.save(img_path)
print(f"Created Image: {img_path}")
except ImportError:
print("Pillow not installed, skipping image creation. Install with: pip install Pillow")
except Exception as e:
print(f"Could not create dummy image: {e}")
# Create an unsupported file type
txt_path = os.path.join(source_dir, "notes.txt")
with open(txt_path, 'w') as f:
f.write("This is a test file.")
print(f"Created TXT: {txt_path} (should be skipped)")
# 6. Let the agent run for a couple of polling cycles
print(f"\nWaiting for agent to process files (polling interval {agent.polling_interval}s)...")
time.sleep(agent.polling_interval * 2 + 1) # Wait for 2 cycles + buffer
# 7. Check the contents of TensorStorage
print("\n--- Checking TensorStorage contents ---")
try:
ingested_data = storage.get_dataset_with_metadata("raw_data")
print(f"Found {len(ingested_data)} items in dataset 'raw_data':")
for item in ingested_data:
print(f" Record ID: {item['metadata'].get('record_id')}, Source: {item['metadata'].get('source_file')}, Shape: {item['tensor'].shape}, Dtype: {item['tensor'].dtype}")
# print(f" Tensor: {item['tensor']}") # Can be verbose
except ValueError as e:
print(f"Could not retrieve dataset 'raw_data': {e}")
# 8. Stop the agent
print("\n--- Stopping Agent ---")
agent.stop()
# 9. Clean up the temporary directory (optional)
# print(f"\nCleaning up temporary directory: {source_dir}")
# import shutil
# shutil.rmtree(source_dir)
logger.info("--- Ingestion Agent Example Finished ---") |