File size: 15,585 Bytes
aa654a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# ingestion_agent.py
"""
Implements the Autonomous Data Ingestion Agent for Tensorus.

This agent monitors a source directory for new data files (e.g., CSV, images),
preprocesses them into tensors using configurable functions, performs basic
validation, and inserts them into a specified dataset in TensorStorage.

Future Enhancements:
- Monitor cloud storage (S3, GCS) and APIs.
- More robust error handling for malformed files.
- More sophisticated duplicate detection (e.g., file hashing).
- Support for streaming data sources.
- Asynchronous processing for higher throughput.
- More complex and configurable preprocessing pipelines.
- Schema validation against predefined dataset schemas.
- Resource management controls.
"""

import os
import time
import glob
import logging
import threading
import csv
from PIL import Image
import torch
import torchvision.transforms as T # Use torchvision for image transforms

from typing import Dict, Callable, Optional, Tuple, List, Any
from tensor_storage import TensorStorage # Import our storage module

# Configure basic logging (can be customized further)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# --- Default Preprocessing Functions ---

def preprocess_csv(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
    """
    Basic CSV preprocessor. Assumes numeric data.
    Reads a CSV, converts rows to tensors (one tensor per row or one tensor for the whole file).
    Returns a single tensor representing the whole file for simplicity here.
    """
    metadata = {"source_file": file_path, "type": "csv"}
    data = []
    try:
        with open(file_path, 'r', newline='') as csvfile:
            reader = csv.reader(csvfile)
            header = next(reader, None) # Skip header row
            metadata["header"] = header
            for row in reader:
                # Attempt to convert row elements to floats
                try:
                    numeric_row = [float(item) for item in row]
                    data.append(numeric_row)
                except ValueError:
                    logger.warning(f"Skipping non-numeric row in {file_path}: {row}")
                    continue # Skip rows that can't be fully converted to float

        if not data:
            logger.warning(f"No numeric data found or processed in CSV file: {file_path}")
            return None, metadata

        tensor = torch.tensor(data, dtype=torch.float32)
        logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
        return tensor, metadata
    except Exception as e:
        logger.error(f"Failed to process CSV file {file_path}: {e}")
        return None, metadata


def preprocess_image(file_path: str) -> Tuple[Optional[torch.Tensor], Dict[str, Any]]:
    """
    Basic Image preprocessor using Pillow and Torchvision transforms.
    Opens an image, applies standard transformations (resize, normalize),
    and returns it as a tensor.
    """
    metadata = {"source_file": file_path, "type": "image"}
    try:
        img = Image.open(file_path).convert('RGB') # Ensure 3 channels (RGB)

        # Example transform: Resize, convert to tensor, normalize
        # These should ideally be configurable
        transform = T.Compose([
            T.Resize((128, 128)), # Example fixed size
            T.ToTensor(), # Converts PIL image (H, W, C) [0,255] to Tensor (C, H, W) [0,1]
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ImageNet stats
        ])

        tensor = transform(img)
        metadata["original_size"] = img.size # (width, height)
        logger.debug(f"Successfully processed {file_path} into tensor shape {tensor.shape}")
        return tensor, metadata
    except FileNotFoundError:
         logger.error(f"Image file not found: {file_path}")
         return None, metadata
    except Exception as e:
        logger.error(f"Failed to process image file {file_path}: {e}")
        return None, metadata

# --- Data Ingestion Agent Class ---

class DataIngestionAgent:
    """
    Monitors a directory for new files and ingests them into TensorStorage.
    """

    def __init__(self,
                 tensor_storage: TensorStorage,
                 dataset_name: str,
                 source_directory: str,
                 polling_interval_sec: int = 10,
                 preprocessing_rules: Optional[Dict[str, Callable[[str], Tuple[Optional[torch.Tensor], Dict[str, Any]]]]] = None):
        """
        Initializes the DataIngestionAgent.

        Args:
            tensor_storage: An instance of the TensorStorage class.
            dataset_name: The name of the dataset in TensorStorage to ingest into.
            source_directory: The path to the local directory to monitor.
            polling_interval_sec: How often (in seconds) to check the directory.
            preprocessing_rules: A dictionary mapping lowercase file extensions
                                 (e.g., '.csv', '.png') to preprocessing functions.
                                 Each function takes a file path and returns a
                                 Tuple containing the processed Tensor (or None on failure)
                                 and a metadata dictionary. If None, default rules are used.
        """
        if not isinstance(tensor_storage, TensorStorage):
            raise TypeError("tensor_storage must be an instance of TensorStorage")
        if not os.path.isdir(source_directory):
            raise ValueError(f"Source directory '{source_directory}' does not exist or is not a directory.")

        self.tensor_storage = tensor_storage
        self.dataset_name = dataset_name
        self.source_directory = source_directory
        self.polling_interval = polling_interval_sec

        # Ensure dataset exists
        try:
            self.tensor_storage.get_dataset(self.dataset_name)
            logger.info(f"Agent targeting existing dataset '{self.dataset_name}'.")
        except ValueError:
            logger.info(f"Dataset '{self.dataset_name}' not found. Creating it.")
            self.tensor_storage.create_dataset(self.dataset_name)

        # Default preprocessing rules if none provided
        if preprocessing_rules is None:
            self.preprocessing_rules = {
                '.csv': preprocess_csv,
                '.png': preprocess_image,
                '.jpg': preprocess_image,
                '.jpeg': preprocess_image,
                '.tif': preprocess_image,
                '.tiff': preprocess_image,
            }
            logger.info("Using default preprocessing rules for CSV and common image formats.")
        else:
            self.preprocessing_rules = preprocessing_rules
            logger.info(f"Using custom preprocessing rules for extensions: {list(preprocessing_rules.keys())}")

        self.processed_files = set() # Keep track of files already processed in this session
        self._stop_event = threading.Event()
        self._monitor_thread = None
        logger.info(f"DataIngestionAgent initialized for dataset '{self.dataset_name}' monitoring '{self.source_directory}'.")


    def _validate_data(self, tensor: Optional[torch.Tensor], metadata: Dict[str, Any]) -> bool:
        """
        Performs basic validation on the preprocessed tensor.
        Returns True if valid, False otherwise.
        """
        if tensor is None:
            logger.warning(f"Validation failed: Tensor is None for {metadata.get('source_file', 'N/A')}")
            return False
        if not isinstance(tensor, torch.Tensor):
             logger.warning(f"Validation failed: Output is not a tensor for {metadata.get('source_file', 'N/A')}")
             return False
        # Add more specific checks if needed (e.g., tensor.numel() > 0)
        return True

    def _process_file(self, file_path: str) -> None:
        """Processes a single detected file."""
        logger.info(f"Detected new file: {file_path}")
        _, file_extension = os.path.splitext(file_path)
        file_extension = file_extension.lower()

        preprocessor = self.preprocessing_rules.get(file_extension)

        if preprocessor:
            logger.debug(f"Applying preprocessor for '{file_extension}' to {file_path}")
            try:
                tensor, metadata = preprocessor(file_path)

                if self._validate_data(tensor, metadata):
                    # Ensure tensor is not None before insertion
                    if tensor is not None:
                        record_id = self.tensor_storage.insert(self.dataset_name, tensor, metadata)
                        logger.info(f"Successfully ingested '{file_path}' into dataset '{self.dataset_name}' with record ID: {record_id}")
                        self.processed_files.add(file_path) # Mark as processed only on success
                    else:
                         # Should have been caught by validation, but as safeguard:
                         logger.error(f"Validation passed but tensor is None for {file_path}. Skipping insertion.")

                else:
                    logger.warning(f"Data validation failed for {file_path}. Skipping insertion.")

            except Exception as e:
                logger.error(f"Unhandled error during preprocessing or insertion for {file_path}: {e}", exc_info=True)
        else:
            logger.debug(f"No preprocessor configured for file extension '{file_extension}'. Skipping file: {file_path}")


    def _scan_source_directory(self) -> None:
        """Scans the source directory for new files matching the rules."""
        logger.debug(f"Scanning directory: {self.source_directory}")
        supported_extensions = self.preprocessing_rules.keys()

        try:
             # Use glob to find all files, then filter
             # This might be inefficient for huge directories, consider os.scandir
             all_files = glob.glob(os.path.join(self.source_directory, '*'), recursive=False) # Non-recursive

             for file_path in all_files:
                 if not os.path.isfile(file_path):
                      continue # Skip directories

                 _, file_extension = os.path.splitext(file_path)
                 file_extension = file_extension.lower()

                 if file_extension in supported_extensions and file_path not in self.processed_files:
                     self._process_file(file_path)

        except Exception as e:
             logger.error(f"Error scanning source directory '{self.source_directory}': {e}", exc_info=True)


    def _monitor_loop(self) -> None:
        """The main loop executed by the background thread."""
        logger.info(f"Starting monitoring loop for '{self.source_directory}'. Polling interval: {self.polling_interval} seconds.")
        while not self._stop_event.is_set():
            self._scan_source_directory()
            # Wait for the specified interval or until stop event is set
            self._stop_event.wait(self.polling_interval)
        logger.info("Monitoring loop stopped.")


    def start(self) -> None:
        """Starts the monitoring process in a background thread."""
        if self._monitor_thread is not None and self._monitor_thread.is_alive():
            logger.warning("Agent monitoring is already running.")
            return

        self._stop_event.clear()
        self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self._monitor_thread.start()
        logger.info("Data Ingestion Agent started monitoring.")

    def stop(self) -> None:
        """Signals the monitoring thread to stop."""
        if self._monitor_thread is None or not self._monitor_thread.is_alive():
            logger.info("Agent monitoring is not running.")
            return

        logger.info("Stopping Data Ingestion Agent monitoring...")
        self._stop_event.set()
        self._monitor_thread.join(timeout=self.polling_interval + 5) # Wait for thread to finish

        if self._monitor_thread.is_alive():
             logger.warning("Monitoring thread did not stop gracefully after timeout.")
        else:
             logger.info("Data Ingestion Agent monitoring stopped successfully.")
        self._monitor_thread = None


# --- Example Usage ---
if __name__ == "__main__":
    logger.info("--- Starting Ingestion Agent Example ---")

    # 1. Setup TensorStorage
    storage = TensorStorage()

    # 2. Setup a temporary directory for the agent to monitor
    source_dir = "temp_ingestion_source"
    if not os.path.exists(source_dir):
        os.makedirs(source_dir)
        logger.info(f"Created temporary source directory: {source_dir}")

    # 3. Create the Ingestion Agent
    # We'll use a short polling interval for demonstration
    agent = DataIngestionAgent(
        tensor_storage=storage,
        dataset_name="raw_data",
        source_directory=source_dir,
        polling_interval_sec=5
    )

    # 4. Start the agent (runs in the background)
    agent.start()

    # 5. Simulate adding files to the source directory
    print("\nSimulating file creation...")
    time.sleep(2) # Give agent time to start initial scan

    # Create a dummy CSV file
    csv_path = os.path.join(source_dir, "data_1.csv")
    with open(csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Timestamp", "Value1", "Value2"])
        writer.writerow(["1678886400", "10.5", "20.1"])
        writer.writerow(["1678886460", "11.2", "20.5"])
        writer.writerow(["1678886520", "invalid", "20.9"]) # Test non-numeric row
        writer.writerow(["1678886580", "10.9", "21.0"])
    print(f"Created CSV: {csv_path}")

    # Create a dummy image file (requires Pillow)
    try:
        img_path = os.path.join(source_dir, "image_1.png")
        dummy_img = Image.new('RGB', (60, 30), color = 'red')
        dummy_img.save(img_path)
        print(f"Created Image: {img_path}")
    except ImportError:
        print("Pillow not installed, skipping image creation. Install with: pip install Pillow")
    except Exception as e:
        print(f"Could not create dummy image: {e}")


    # Create an unsupported file type
    txt_path = os.path.join(source_dir, "notes.txt")
    with open(txt_path, 'w') as f:
        f.write("This is a test file.")
    print(f"Created TXT: {txt_path} (should be skipped)")


    # 6. Let the agent run for a couple of polling cycles
    print(f"\nWaiting for agent to process files (polling interval {agent.polling_interval}s)...")
    time.sleep(agent.polling_interval * 2 + 1) # Wait for 2 cycles + buffer


    # 7. Check the contents of TensorStorage
    print("\n--- Checking TensorStorage contents ---")
    try:
        ingested_data = storage.get_dataset_with_metadata("raw_data")
        print(f"Found {len(ingested_data)} items in dataset 'raw_data':")
        for item in ingested_data:
            print(f"  Record ID: {item['metadata'].get('record_id')}, Source: {item['metadata'].get('source_file')}, Shape: {item['tensor'].shape}, Dtype: {item['tensor'].dtype}")
            # print(f" Tensor: {item['tensor']}") # Can be verbose
    except ValueError as e:
        print(f"Could not retrieve dataset 'raw_data': {e}")


    # 8. Stop the agent
    print("\n--- Stopping Agent ---")
    agent.stop()

    # 9. Clean up the temporary directory (optional)
    # print(f"\nCleaning up temporary directory: {source_dir}")
    # import shutil
    # shutil.rmtree(source_dir)

    logger.info("--- Ingestion Agent Example Finished ---")