Yaswanth-Bolla's picture
Initial commit
1175c0b
"""
Log stream generator.
Produces realistic structured log entries β€” both signal and noise.
Red herring logs are mixed in so the agent must filter real evidence
from routine chatter.
"""
from __future__ import annotations
import random
from typing import Any, Dict, List, Optional
# ------------------------------------------------------------------
# Noise logs β€” routine operational chatter
# ------------------------------------------------------------------
_NOISE_TEMPLATES = [
("INFO", "Processed {n} requests in last 60 seconds"),
("INFO", "Health check passed β€” all dependencies reachable"),
("INFO", "Connection pool stats: active={a}, idle={i}, max={m}"),
("DEBUG", "Cache hit ratio: {r:.1%} β€” {h} hits, {m} misses"),
("INFO", "Scheduled job 'metrics_export' completed in {d}ms"),
("DEBUG", "TLS handshake completed with upstream in {d}ms"),
("INFO", "Config reload: no changes detected"),
("WARN", "Slow query detected: SELECT * FROM sessions took {d}ms"),
("INFO", "Garbage collection: freed {n}MB in {d}ms"),
("DEBUG", "Rate limiter: {n} requests allowed, 0 throttled"),
]
def generate_noise_logs(
service_name: str,
current_minute: int,
count: int = 3,
) -> List[Dict[str, Any]]:
"""Generate routine noise logs for a service."""
logs = []
for _ in range(count):
template_level, template_msg = random.choice(_NOISE_TEMPLATES)
msg = template_msg.format(
n=random.randint(100, 5000),
a=random.randint(5, 20),
i=random.randint(0, 10),
m=random.randint(20, 50),
r=random.uniform(0.85, 0.99),
h=random.randint(1000, 9000),
d=random.randint(1, 500),
)
logs.append({
"timestamp": f"2025-01-15T14:{current_minute:02d}:{random.randint(0,59):02d}Z",
"level": template_level,
"service": service_name,
"message": msg,
"trace_id": None,
})
return logs
# ------------------------------------------------------------------
# Scenario-specific log generators (signal)
# ------------------------------------------------------------------
def generate_memory_leak_logs(
service_name: str,
current_minute: int,
memory_percent: float,
) -> List[Dict[str, Any]]:
"""Logs that indicate a memory leak is in progress."""
logs = []
trace = f"trace-{random.randint(100000, 999999)}"
if memory_percent > 90:
logs.append(_log(current_minute, "FATAL", service_name,
f"OutOfMemoryError: Java heap space β€” requested 256MB, "
f"available 12MB", trace))
logs.append(_log(current_minute, "ERROR", service_name,
f"Container {service_name}-{random.randint(0,2)} killed by OOM killer "
f"(exit code 137)", trace))
elif memory_percent > 80:
logs.append(_log(current_minute, "ERROR", service_name,
f"Memory allocation failed: unable to allocate {random.randint(64, 256)}MB "
f"for request processing", trace))
logs.append(_log(current_minute, "WARN", service_name,
f"GC overhead limit exceeded: spent {random.randint(80, 97)}% of time in GC"))
elif memory_percent > 70:
logs.append(_log(current_minute, "WARN", service_name,
f"Heap usage warning: {memory_percent:.0f}% β€” approaching limit. "
f"Consider increasing -Xmx or investigating leaks"))
return logs
def generate_auth_failure_logs(
service_name: str,
current_minute: int,
is_auth_service: bool = False,
) -> List[Dict[str, Any]]:
"""Logs for auth-related failures (used in cascading failure scenario)."""
logs = []
trace = f"trace-{random.randint(100000, 999999)}"
if is_auth_service:
logs.append(_log(current_minute, "ERROR", service_name,
"NullPointerException: configuration key 'auth.jwt.secret' is null "
"β€” cannot validate tokens", trace))
logs.append(_log(current_minute, "ERROR", service_name,
f"Authentication failed for {random.randint(50, 200)} requests in "
f"last 60s β€” returning HTTP 500"))
else:
logs.append(_log(current_minute, "ERROR", service_name,
f"Call to auth-service failed: HTTP 500 Internal Server Error "
f"β€” retrying ({random.randint(1,3)}/3)", trace))
logs.append(_log(current_minute, "WARN", service_name,
f"Circuit breaker for auth-service: state=HALF_OPEN, "
f"failures={random.randint(5, 20)}, threshold=10"))
return logs
def generate_deadlock_logs(
service_name: str,
current_minute: int,
waiting_on: str,
) -> List[Dict[str, Any]]:
"""Logs for distributed deadlock / circular wait."""
logs = []
trace = f"trace-{random.randint(100000, 999999)}"
logs.append(_log(current_minute, "WARN", service_name,
f"Request {trace} waiting on {waiting_on}: blocked for "
f"{random.randint(5000, 25000)}ms β€” no response", trace))
if random.random() < 0.4:
logs.append(_log(current_minute, "ERROR", service_name,
f"Timeout calling {waiting_on}: deadline exceeded after 30000ms. "
f"Retry attempt {random.randint(3, 8)} of 10", trace))
if random.random() < 0.2:
logs.append(_log(current_minute, "ERROR", service_name,
f"Thread pool exhausted: all {random.randint(50, 200)} threads blocked "
f"waiting on downstream calls"))
return logs
# ------------------------------------------------------------------
# Red herring logs β€” plausible but misleading
# ------------------------------------------------------------------
_RED_HERRING_TEMPLATES = [
("WARN", "DNS resolution for {svc}.internal took {d}ms (threshold: 100ms)"),
("WARN", "TLS certificate for {svc}.internal expires in {n} days"),
("WARN", "Disk usage on /var/log: {n}% β€” consider log rotation"),
("ERROR", "Failed to export metrics to Prometheus: connection timeout after {d}ms"),
("WARN", "Background job 'cleanup_sessions' took {d}ms (expected <500ms)"),
("ERROR", "Redis SLOWLOG: KEYS pattern='session:*' took {d}ms"),
]
def generate_red_herring_logs(
service_name: str,
current_minute: int,
count: int = 1,
) -> List[Dict[str, Any]]:
"""Generate plausible but misleading log entries."""
logs = []
services = ["api_gateway", "auth", "orders", "payment", "cache", "database", "queue"]
for _ in range(count):
level, tmpl = random.choice(_RED_HERRING_TEMPLATES)
msg = tmpl.format(
svc=random.choice(services),
d=random.randint(100, 3000),
n=random.randint(3, 85),
)
logs.append(_log(current_minute, level, service_name, msg))
return logs
# ------------------------------------------------------------------
# Helper
# ------------------------------------------------------------------
def _log(
minute: int,
level: str,
service: str,
message: str,
trace_id: Optional[str] = None,
) -> Dict[str, Any]:
return {
"timestamp": f"2025-01-15T14:{minute:02d}:{random.randint(0,59):02d}Z",
"level": level,
"service": service,
"message": message,
"trace_id": trace_id,
}