heatmap / backend /data_sources /coordinator.py
Ndg07's picture
Feat: 24-hour cleanup for local SQLite
c293f7c
#!/usr/bin/env python3
"""
Ingestion coordinator for managing data source fetching and processing.
Orchestrates multiple data sources and handles event processing pipeline.
"""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Any, Callable
import json
from dataclasses import asdict
from .base.base_connector import BaseDataConnector, RawEvent
from .base.data_validator import DataValidator
from .registry import DataSourceRegistry
logger = logging.getLogger(__name__)
class IngestionCoordinator:
"""Coordinates data ingestion from multiple sources."""
def __init__(self,
registry: DataSourceRegistry,
validator: Optional[DataValidator] = None,
event_processor: Optional[Callable] = None):
"""Initialize ingestion coordinator.
Args:
registry: Data source registry
validator: Data validator instance
event_processor: Function to process validated events
"""
self.registry = registry
self.validator = validator or DataValidator()
self.event_processor = event_processor
# Coordination settings
self.batch_size = 100
self.max_concurrent_sources = 10
self.fetch_timeout = 300 # 5 minutes per source
# State tracking
self.last_fetch_times: Dict[str, datetime] = {}
self.fetch_intervals: Dict[str, int] = {}
self.running = False
self.fetch_tasks: Dict[str, asyncio.Task] = {}
# Statistics
self.stats = {
'total_fetches': 0,
'total_events': 0,
'total_validated': 0,
'total_processed': 0,
'errors': 0,
'last_run': None,
'source_stats': {}
}
async def start_continuous_ingestion(self,
default_interval: int = 300,
stagger_delay: int = 30):
"""Start continuous ingestion from all enabled sources.
Args:
default_interval: Default fetch interval in seconds
stagger_delay: Delay between starting each source (seconds)
"""
if self.running:
logger.warning("Ingestion coordinator is already running")
return
self.running = True
logger.info("Starting continuous data ingestion")
# Get enabled connectors
connectors = self.registry.get_enabled_connectors()
if not connectors:
logger.warning("No enabled connectors found")
return
# Start fetch tasks for each connector
for i, connector in enumerate(connectors):
# Stagger the start times to avoid overwhelming sources
start_delay = i * stagger_delay
# Get fetch interval from connector config or use default
interval = getattr(connector, 'fetch_interval', default_interval)
self.fetch_intervals[connector.source_id] = interval
# Create and start fetch task
task = asyncio.create_task(
self._continuous_fetch_loop(connector, interval, start_delay)
)
self.fetch_tasks[connector.source_id] = task
logger.info(f"Scheduled {connector.source_id} with {interval}s interval, {start_delay}s delay")
logger.info(f"Started continuous ingestion for {len(connectors)} sources")
async def stop_continuous_ingestion(self):
"""Stop continuous ingestion."""
if not self.running:
return
logger.info("Stopping continuous data ingestion")
self.running = False
# Cancel all fetch tasks
for source_id, task in self.fetch_tasks.items():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.debug(f"Cancelled fetch task for {source_id}")
self.fetch_tasks.clear()
logger.info("Stopped continuous data ingestion")
async def _continuous_fetch_loop(self,
connector: BaseDataConnector,
interval: int,
start_delay: int = 0):
"""Continuous fetch loop for a single connector."""
# Initial delay to stagger starts
if start_delay > 0:
await asyncio.sleep(start_delay)
source_id = connector.source_id
logger.info(f"Starting continuous fetch for {source_id} (interval: {interval}s)")
while self.running:
try:
# Fetch and process events
await self.fetch_from_source(source_id)
# Wait for next fetch
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.info(f"Fetch loop cancelled for {source_id}")
break
except Exception as e:
logger.error(f"Error in fetch loop for {source_id}: {e}")
# Wait before retrying on error
await asyncio.sleep(min(interval, 60))
async def fetch_from_source(self, source_id: str) -> List[RawEvent]:
"""Fetch events from a specific source.
Args:
source_id: Source identifier
Returns:
List of processed events
"""
connector = self.registry.get_connector(source_id)
if not connector:
raise ValueError(f"Source not found: {source_id}")
if not connector.enabled:
logger.debug(f"Skipping disabled source: {source_id}")
return []
try:
# Determine since timestamp
since = self.last_fetch_times.get(source_id)
# Fetch events with timeout
logger.debug(f"Fetching from {source_id} (since: {since})")
events = await asyncio.wait_for(
connector.fetch_events(since=since),
timeout=self.fetch_timeout
)
# Update statistics
self.stats['total_fetches'] += 1
self.stats['total_events'] += len(events)
self.last_fetch_times[source_id] = datetime.now(timezone.utc)
# Initialize source stats if needed
if source_id not in self.stats['source_stats']:
self.stats['source_stats'][source_id] = {
'fetches': 0,
'events': 0,
'validated': 0,
'processed': 0,
'errors': 0,
'last_fetch': None
}
source_stats = self.stats['source_stats'][source_id]
source_stats['fetches'] += 1
source_stats['events'] += len(events)
source_stats['last_fetch'] = datetime.now(timezone.utc).isoformat()
if events:
logger.info(f"Fetched {len(events)} events from {source_id}")
# Process events in batches
processed_events = await self._process_events_batch(events, source_id)
return processed_events
else:
logger.debug(f"No new events from {source_id}")
return []
except asyncio.TimeoutError:
error_msg = f"Fetch timeout for {source_id}"
logger.error(error_msg)
self._record_source_error(source_id, error_msg)
return []
except Exception as e:
error_msg = f"Fetch error for {source_id}: {e}"
logger.error(error_msg)
self._record_source_error(source_id, error_msg)
return []
async def _process_events_batch(self, events: List[RawEvent], source_id: str) -> List[RawEvent]:
"""Process a batch of events through validation and processing pipeline."""
processed_events = []
source_stats = self.stats['source_stats'][source_id]
for event in events:
try:
# Validate event
is_valid, failure_reason = self.validator.validate_event(event)
if is_valid:
self.stats['total_validated'] += 1
source_stats['validated'] += 1
# Process event if processor is configured
if self.event_processor:
try:
await self.event_processor(event)
self.stats['total_processed'] += 1
source_stats['processed'] += 1
processed_events.append(event)
except Exception as e:
logger.error(f"Event processing failed for {event.event_id}: {e}")
self._record_source_error(source_id, f"processing_error: {e}")
else:
processed_events.append(event)
else:
logger.debug(f"Event validation failed for {event.event_id}: {failure_reason}")
except Exception as e:
logger.error(f"Error processing event from {source_id}: {e}")
self._record_source_error(source_id, f"event_error: {e}")
if processed_events:
logger.info(f"Processed {len(processed_events)}/{len(events)} events from {source_id}")
return processed_events
async def fetch_from_all_sources(self,
max_concurrent: Optional[int] = None) -> Dict[str, List[RawEvent]]:
"""Fetch events from all enabled sources concurrently.
Args:
max_concurrent: Maximum concurrent fetches (defaults to configured limit)
Returns:
Dictionary mapping source_id to list of events
"""
connectors = self.registry.get_enabled_connectors()
if not connectors:
logger.warning("No enabled connectors found")
return {}
max_concurrent = max_concurrent or self.max_concurrent_sources
# Create semaphore to limit concurrent fetches
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(source_id: str) -> tuple[str, List[RawEvent]]:
async with semaphore:
events = await self.fetch_from_source(source_id)
return source_id, events
# Execute fetches concurrently
tasks = [
fetch_with_semaphore(connector.source_id)
for connector in connectors
]
results = {}
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
connector = connectors[i]
if isinstance(result, Exception):
logger.error(f"Fetch failed for {connector.source_id}: {result}")
results[connector.source_id] = []
self._record_source_error(connector.source_id, str(result))
else:
source_id, events = result
results[source_id] = events
self.stats['last_run'] = datetime.now(timezone.utc).isoformat()
total_events = sum(len(events) for events in results.values())
logger.info(f"Fetched {total_events} total events from {len(results)} sources")
return results
def _record_source_error(self, source_id: str, error: str):
"""Record an error for a specific source."""
self.stats['errors'] += 1
if source_id not in self.stats['source_stats']:
self.stats['source_stats'][source_id] = {
'fetches': 0, 'events': 0, 'validated': 0, 'processed': 0, 'errors': 0, 'last_fetch': None
}
self.stats['source_stats'][source_id]['errors'] += 1
async def health_check_sources(self) -> Dict[str, Dict[str, Any]]:
"""Perform health check on all registered sources."""
return await self.registry.health_check_all_sources()
def get_ingestion_stats(self) -> Dict[str, Any]:
"""Get ingestion statistics."""
stats = self.stats.copy()
# Add validator stats
stats['validator_stats'] = self.validator.get_stats()
# Add registry stats
stats['registry_stats'] = self.registry.get_registry_stats()
# Calculate rates
if stats['total_fetches'] > 0:
stats['avg_events_per_fetch'] = stats['total_events'] / stats['total_fetches']
stats['validation_rate'] = stats['total_validated'] / stats['total_events'] if stats['total_events'] > 0 else 0
stats['processing_rate'] = stats['total_processed'] / stats['total_validated'] if stats['total_validated'] > 0 else 0
return stats
def reset_stats(self):
"""Reset ingestion statistics."""
self.stats = {
'total_fetches': 0,
'total_events': 0,
'total_validated': 0,
'total_processed': 0,
'errors': 0,
'last_run': None,
'source_stats': {}
}
self.validator.reset_stats()
logger.info("Reset ingestion statistics")
def set_event_processor(self, processor: Callable):
"""Set the event processor function.
Args:
processor: Async function that takes a RawEvent and processes it
"""
self.event_processor = processor
logger.info("Set event processor function")
def configure_source_intervals(self, intervals: Dict[str, int]):
"""Configure fetch intervals for specific sources.
Args:
intervals: Dictionary mapping source_id to interval in seconds
"""
for source_id, interval in intervals.items():
if source_id in self.fetch_intervals:
self.fetch_intervals[source_id] = interval
logger.info(f"Updated fetch interval for {source_id}: {interval}s")
async def manual_fetch_all(self) -> Dict[str, List[RawEvent]]:
"""Manually trigger fetch from all sources (one-time operation)."""
logger.info("Manual fetch triggered for all sources")
return await self.fetch_from_all_sources()
def get_source_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status of all sources including last fetch times and intervals."""
status = {}
for connector in self.registry.get_all_connectors().values():
source_id = connector.source_id
status[source_id] = {
'enabled': connector.enabled,
'source_type': connector.source_type,
'last_fetch': self.last_fetch_times.get(source_id),
'fetch_interval': self.fetch_intervals.get(source_id),
'running': source_id in self.fetch_tasks and not self.fetch_tasks[source_id].done(),
'stats': self.stats['source_stats'].get(source_id, {})
}
return status
async def export_events_batch(self,
events: List[RawEvent],
format: str = 'json') -> str:
"""Export a batch of events to specified format.
Args:
events: List of events to export
format: Export format ('json', 'csv', 'jsonl')
Returns:
Exported data as string
"""
if format == 'json':
return json.dumps([asdict(event) for event in events], indent=2, default=str)
elif format == 'jsonl':
return '\n'.join(json.dumps(asdict(event), default=str) for event in events)
elif format == 'csv':
# Basic CSV export (would need pandas for full implementation)
import csv
import io
output = io.StringIO()
if events:
fieldnames = asdict(events[0]).keys()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
for event in events:
writer.writerow(asdict(event))
return output.getvalue()
else:
raise ValueError(f"Unsupported export format: {format}")
# Utility functions
async def create_coordinator_with_sources(sources_config: Dict[str, Dict[str, Any]]) -> IngestionCoordinator:
"""Create ingestion coordinator with sources from configuration.
Args:
sources_config: Dictionary mapping source_id to configuration
Returns:
Configured IngestionCoordinator instance
"""
registry = DataSourceRegistry()
registry.register_sources_from_config(sources_config)
validator = DataValidator()
coordinator = IngestionCoordinator(registry, validator)
return coordinator