Spaces:
Sleeping
Sleeping
File size: 5,936 Bytes
0493349 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """Polling adapters for logs, metrics, and error traces."""
import json
import hashlib
import logging
from datetime import datetime
from typing import Optional
from pathlib import Path
from src.adapters.web_scraper import RawDocument
from src.config import settings
logger = logging.getLogger(__name__)
class LogAggregatorAdapter:
"""Adapter for aggregating and indexing server logs."""
async def connect(self, credentials: dict) -> None:
"""Initialize log aggregator (no auth typically needed for local logs)."""
pass
async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]:
"""
Fetch ERROR/WARN logs since last sync.
Args:
space_id: Service name (e.g., "api-backend")
last_sync_at: Only fetch logs after this timestamp
Returns:
List of RawDocuments representing log entries
"""
docs = []
# Get log file path for this service
log_file = settings.integrations.log_file_paths.get(space_id)
if not log_file:
logger.warning(f"No log file configured for service: {space_id}")
return []
try:
log_path = Path(log_file)
if not log_path.exists():
logger.warning(f"Log file not found: {log_file}")
return []
# Read log lines
with open(log_path, "r") as f:
for line in f:
try:
log_entry = json.loads(line)
except json.JSONDecodeError:
continue
# Only index ERROR and WARN
if log_entry.get("level") not in ["ERROR", "WARN"]:
continue
# Check timestamp
entry_time = datetime.fromisoformat(log_entry.get("timestamp", ""))
if entry_time < last_sync_at:
continue
# Build content
content = f"""
Level: {log_entry.get('level')}
Service: {space_id}
Message: {log_entry.get('message')}
Trace ID: {log_entry.get('trace_id', 'N/A')}
Stack trace:
{log_entry.get('stacktrace', 'N/A')}
Context:
{json.dumps(log_entry.get('context', {}), indent=2)}
"""
# Create RawDocument
doc = RawDocument(
uri=f"logs://{space_id}/{log_entry.get('trace_id', log_entry.get('timestamp'))}",
source_type="log",
source_subtype="error_log",
title=f"[{log_entry.get('level')}] {space_id}: {log_entry.get('message', '')[:80]}",
content=content,
content_hash=hashlib.sha256(content.encode()).hexdigest(),
created_at=entry_time,
updated_at=datetime.utcnow(),
author_ids=["system"],
space_id=space_id,
tags=[log_entry.get("level", ""), space_id],
priority=5 if log_entry.get("level") == "ERROR" else 3,
ttl_seconds=86400 * 7, # 1 week
raw_metadata=log_entry,
)
docs.append(doc)
except Exception as e:
logger.error(f"Error reading log file {log_file}: {e}")
return docs
class MetricsAdapter:
"""Adapter for polling metrics and anomaly detection."""
async def connect(self, credentials: dict) -> None:
"""Initialize metrics connection."""
pass
async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]:
"""
Fetch metrics with anomalies detected since last sync.
Args:
space_id: Metric source (e.g., "prometheus", "datadog")
last_sync_at: Only fetch metrics with anomalies since this time
Returns:
List of RawDocuments representing metric anomalies
"""
# Placeholder: would query Prometheus/Datadog APIs
# For now, return empty list
logger.info(f"Metrics polling for {space_id} - placeholder implementation")
return []
class ErrorTraceAdapter:
"""Adapter for polling error traces from APM services."""
async def connect(self, credentials: dict) -> None:
"""Initialize APM connection."""
pass
async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]:
"""
Fetch error groups with new occurrences since last sync.
Args:
space_id: APM source (e.g., "sentry", "datadog")
last_sync_at: Only fetch errors after this time
Returns:
List of RawDocuments representing error traces
"""
# Placeholder: would query Sentry/Datadog APIs
# For now, return empty list
logger.info(f"Error trace polling for {space_id} - placeholder implementation")
return []
class BusinessDataAdapter:
"""Adapter for polling business data from ERP/CRM systems."""
async def connect(self, credentials: dict) -> None:
"""Initialize business system connection."""
pass
async def fetch_incremental(self, space_id: str, last_sync_at: datetime) -> list[RawDocument]:
"""
Fetch updated business records since last sync.
Args:
space_id: Business domain (e.g., "sales", "inventory", "finance")
last_sync_at: Only fetch records updated after this time
Returns:
List of RawDocuments representing business entities
"""
# Placeholder: would query ERP/CRM APIs or databases
# For now, return empty list
logger.info(f"Business data polling for {space_id} - placeholder implementation")
return []
|