Spaces:
Sleeping
Sleeping
| """Polling adapters for logs, metrics, and error traces.""" | |
| import json | |
| import hashlib | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional | |
| from pathlib import Path | |
| from src.adapters.web_scraper import RawDocument | |
| from src.config import settings | |
| logger = logging.getLogger(__name__) | |
| class LogAggregatorAdapter: | |
| """Adapter for aggregating and indexing server logs.""" | |
| async def connect(self, credentials: dict) -> None: | |
| """Initialize log aggregator (no auth typically needed for local logs).""" | |
| pass | |
| async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]: | |
| """ | |
| Fetch ERROR/WARN logs since last sync. | |
| Args: | |
| space_id: Service name (e.g., "api-backend") | |
| last_sync_at: Only fetch logs after this timestamp | |
| Returns: | |
| List of RawDocuments representing log entries | |
| """ | |
| docs = [] | |
| # Get log file path for this service | |
| log_file = settings.integrations.log_file_paths.get(space_id) | |
| if not log_file: | |
| logger.warning(f"No log file configured for service: {space_id}") | |
| return [] | |
| try: | |
| log_path = Path(log_file) | |
| if not log_path.exists(): | |
| logger.warning(f"Log file not found: {log_file}") | |
| return [] | |
| # Read log lines | |
| with open(log_path, "r") as f: | |
| for line in f: | |
| try: | |
| log_entry = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| # Only index ERROR and WARN | |
| if log_entry.get("level") not in ["ERROR", "WARN"]: | |
| continue | |
| # Check timestamp | |
| entry_time = datetime.fromisoformat(log_entry.get("timestamp", "")) | |
| if entry_time < last_sync_at: | |
| continue | |
| # Build content | |
| content = f""" | |
| Level: {log_entry.get('level')} | |
| Service: {space_id} | |
| Message: {log_entry.get('message')} | |
| Trace ID: {log_entry.get('trace_id', 'N/A')} | |
| Stack trace: | |
| {log_entry.get('stacktrace', 'N/A')} | |
| Context: | |
| {json.dumps(log_entry.get('context', {}), indent=2)} | |
| """ | |
| # Create RawDocument | |
| doc = RawDocument( | |
| uri=f"logs://{space_id}/{log_entry.get('trace_id', log_entry.get('timestamp'))}", | |
| source_type="log", | |
| source_subtype="error_log", | |
| title=f"[{log_entry.get('level')}] {space_id}: {log_entry.get('message', '')[:80]}", | |
| content=content, | |
| content_hash=hashlib.sha256(content.encode()).hexdigest(), | |
| created_at=entry_time, | |
| updated_at=datetime.utcnow(), | |
| author_ids=["system"], | |
| space_id=space_id, | |
| tags=[log_entry.get("level", ""), space_id], | |
| priority=5 if log_entry.get("level") == "ERROR" else 3, | |
| ttl_seconds=86400 * 7, # 1 week | |
| raw_metadata=log_entry, | |
| ) | |
| docs.append(doc) | |
| except Exception as e: | |
| logger.error(f"Error reading log file {log_file}: {e}") | |
| return docs | |
| class MetricsAdapter: | |
| """Adapter for polling metrics and anomaly detection.""" | |
| async def connect(self, credentials: dict) -> None: | |
| """Initialize metrics connection.""" | |
| pass | |
| async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]: | |
| """ | |
| Fetch metrics with anomalies detected since last sync. | |
| Args: | |
| space_id: Metric source (e.g., "prometheus", "datadog") | |
| last_sync_at: Only fetch metrics with anomalies since this time | |
| Returns: | |
| List of RawDocuments representing metric anomalies | |
| """ | |
| # Placeholder: would query Prometheus/Datadog APIs | |
| # For now, return empty list | |
| logger.info(f"Metrics polling for {space_id} - placeholder implementation") | |
| return [] | |
| class ErrorTraceAdapter: | |
| """Adapter for polling error traces from APM services.""" | |
| async def connect(self, credentials: dict) -> None: | |
| """Initialize APM connection.""" | |
| pass | |
| async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]: | |
| """ | |
| Fetch error groups with new occurrences since last sync. | |
| Args: | |
| space_id: APM source (e.g., "sentry", "datadog") | |
| last_sync_at: Only fetch errors after this time | |
| Returns: | |
| List of RawDocuments representing error traces | |
| """ | |
| # Placeholder: would query Sentry/Datadog APIs | |
| # For now, return empty list | |
| logger.info(f"Error trace polling for {space_id} - placeholder implementation") | |
| return [] | |
| class BusinessDataAdapter: | |
| """Adapter for polling business data from ERP/CRM systems.""" | |
| async def connect(self, credentials: dict) -> None: | |
| """Initialize business system connection.""" | |
| pass | |
| async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]: | |
| """ | |
| Fetch updated business records since last sync. | |
| Args: | |
| space_id: Business domain (e.g., "sales", "inventory", "finance") | |
| last_sync_at: Only fetch records updated after this time | |
| Returns: | |
| List of RawDocuments representing business entities | |
| """ | |
| # Placeholder: would query ERP/CRM APIs or databases | |
| # For now, return empty list | |
| logger.info(f"Business data polling for {space_id} - placeholder implementation") | |
| return [] | |