| """ |
| Log stream generator. |
| |
| Produces realistic structured log entries β both signal and noise. |
| Red herring logs are mixed in so the agent must filter real evidence |
| from routine chatter. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import random |
| from typing import Any, Dict, List, Optional |
|
|
|
|
| |
| |
| |
|
|
| _NOISE_TEMPLATES = [ |
| ("INFO", "Processed {n} requests in last 60 seconds"), |
| ("INFO", "Health check passed β all dependencies reachable"), |
| ("INFO", "Connection pool stats: active={a}, idle={i}, max={m}"), |
| ("DEBUG", "Cache hit ratio: {r:.1%} β {h} hits, {m} misses"), |
| ("INFO", "Scheduled job 'metrics_export' completed in {d}ms"), |
| ("DEBUG", "TLS handshake completed with upstream in {d}ms"), |
| ("INFO", "Config reload: no changes detected"), |
| ("WARN", "Slow query detected: SELECT * FROM sessions took {d}ms"), |
| ("INFO", "Garbage collection: freed {n}MB in {d}ms"), |
| ("DEBUG", "Rate limiter: {n} requests allowed, 0 throttled"), |
| ] |
|
|
|
|
| def generate_noise_logs( |
| service_name: str, |
| current_minute: int, |
| count: int = 3, |
| ) -> List[Dict[str, Any]]: |
| """Generate routine noise logs for a service.""" |
| logs = [] |
| for _ in range(count): |
| template_level, template_msg = random.choice(_NOISE_TEMPLATES) |
| msg = template_msg.format( |
| n=random.randint(100, 5000), |
| a=random.randint(5, 20), |
| i=random.randint(0, 10), |
| m=random.randint(20, 50), |
| r=random.uniform(0.85, 0.99), |
| h=random.randint(1000, 9000), |
| d=random.randint(1, 500), |
| ) |
| logs.append({ |
| "timestamp": f"2025-01-15T14:{current_minute:02d}:{random.randint(0,59):02d}Z", |
| "level": template_level, |
| "service": service_name, |
| "message": msg, |
| "trace_id": None, |
| }) |
| return logs |
|
|
|
|
| |
| |
| |
|
|
| def generate_memory_leak_logs( |
| service_name: str, |
| current_minute: int, |
| memory_percent: float, |
| ) -> List[Dict[str, Any]]: |
| """Logs that indicate a memory leak is in progress.""" |
| logs = [] |
| trace = f"trace-{random.randint(100000, 999999)}" |
|
|
| if memory_percent > 90: |
| logs.append(_log(current_minute, "FATAL", service_name, |
| f"OutOfMemoryError: Java heap space β requested 256MB, " |
| f"available 12MB", trace)) |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Container {service_name}-{random.randint(0,2)} killed by OOM killer " |
| f"(exit code 137)", trace)) |
| elif memory_percent > 80: |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Memory allocation failed: unable to allocate {random.randint(64, 256)}MB " |
| f"for request processing", trace)) |
| logs.append(_log(current_minute, "WARN", service_name, |
| f"GC overhead limit exceeded: spent {random.randint(80, 97)}% of time in GC")) |
| elif memory_percent > 70: |
| logs.append(_log(current_minute, "WARN", service_name, |
| f"Heap usage warning: {memory_percent:.0f}% β approaching limit. " |
| f"Consider increasing -Xmx or investigating leaks")) |
|
|
| return logs |
|
|
|
|
| def generate_auth_failure_logs( |
| service_name: str, |
| current_minute: int, |
| is_auth_service: bool = False, |
| ) -> List[Dict[str, Any]]: |
| """Logs for auth-related failures (used in cascading failure scenario).""" |
| logs = [] |
| trace = f"trace-{random.randint(100000, 999999)}" |
|
|
| if is_auth_service: |
| logs.append(_log(current_minute, "ERROR", service_name, |
| "NullPointerException: configuration key 'auth.jwt.secret' is null " |
| "β cannot validate tokens", trace)) |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Authentication failed for {random.randint(50, 200)} requests in " |
| f"last 60s β returning HTTP 500")) |
| else: |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Call to auth-service failed: HTTP 500 Internal Server Error " |
| f"β retrying ({random.randint(1,3)}/3)", trace)) |
| logs.append(_log(current_minute, "WARN", service_name, |
| f"Circuit breaker for auth-service: state=HALF_OPEN, " |
| f"failures={random.randint(5, 20)}, threshold=10")) |
|
|
| return logs |
|
|
|
|
| def generate_deadlock_logs( |
| service_name: str, |
| current_minute: int, |
| waiting_on: str, |
| ) -> List[Dict[str, Any]]: |
| """Logs for distributed deadlock / circular wait.""" |
| logs = [] |
| trace = f"trace-{random.randint(100000, 999999)}" |
|
|
| logs.append(_log(current_minute, "WARN", service_name, |
| f"Request {trace} waiting on {waiting_on}: blocked for " |
| f"{random.randint(5000, 25000)}ms β no response", trace)) |
|
|
| if random.random() < 0.4: |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Timeout calling {waiting_on}: deadline exceeded after 30000ms. " |
| f"Retry attempt {random.randint(3, 8)} of 10", trace)) |
|
|
| if random.random() < 0.2: |
| logs.append(_log(current_minute, "ERROR", service_name, |
| f"Thread pool exhausted: all {random.randint(50, 200)} threads blocked " |
| f"waiting on downstream calls")) |
|
|
| return logs |
|
|
|
|
| |
| |
| |
|
|
| _RED_HERRING_TEMPLATES = [ |
| ("WARN", "DNS resolution for {svc}.internal took {d}ms (threshold: 100ms)"), |
| ("WARN", "TLS certificate for {svc}.internal expires in {n} days"), |
| ("WARN", "Disk usage on /var/log: {n}% β consider log rotation"), |
| ("ERROR", "Failed to export metrics to Prometheus: connection timeout after {d}ms"), |
| ("WARN", "Background job 'cleanup_sessions' took {d}ms (expected <500ms)"), |
| ("ERROR", "Redis SLOWLOG: KEYS pattern='session:*' took {d}ms"), |
| ] |
|
|
|
|
| def generate_red_herring_logs( |
| service_name: str, |
| current_minute: int, |
| count: int = 1, |
| ) -> List[Dict[str, Any]]: |
| """Generate plausible but misleading log entries.""" |
| logs = [] |
| services = ["api_gateway", "auth", "orders", "payment", "cache", "database", "queue"] |
| for _ in range(count): |
| level, tmpl = random.choice(_RED_HERRING_TEMPLATES) |
| msg = tmpl.format( |
| svc=random.choice(services), |
| d=random.randint(100, 3000), |
| n=random.randint(3, 85), |
| ) |
| logs.append(_log(current_minute, level, service_name, msg)) |
| return logs |
|
|
|
|
| |
| |
| |
|
|
| def _log( |
| minute: int, |
| level: str, |
| service: str, |
| message: str, |
| trace_id: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| return { |
| "timestamp": f"2025-01-15T14:{minute:02d}:{random.randint(0,59):02d}Z", |
| "level": level, |
| "service": service, |
| "message": message, |
| "trace_id": trace_id, |
| } |
|
|