MultiAgentLogsAnalyze / test_large_logs.py
PatrickRedStar's picture
add
d6f4b44
"""
Генератор больших тестовых лог-файлов и скрипт для тестирования системы.
"""
import random
import os
from datetime import datetime, timedelta
from agents import LogParserAgent, AnomalyDetectionAgent, RootCauseAgent
import time
def generate_log_entry(timestamp, level, message_template, **kwargs):
"""Генерирует одну запись лога."""
message = message_template.format(**kwargs)
return f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n"
def generate_log_file_1():
"""Лог-файл 1: Обычные логи с редкими ошибками (3000 строк)"""
lines = []
base_time = datetime(2024, 1, 15, 10, 0, 0)
messages = [
"User {user_id} logged in from IP {ip}",
"Request GET /api/users/{user_id} processed successfully",
"Database query executed in {time}ms",
"Cache hit for key: {key}",
"Request POST /api/data processed in {time}ms",
"Session {session_id} created",
"File {filename} uploaded successfully",
"Processing job {job_id} started",
"Background task {task_id} completed",
]
error_messages = [
"Connection timeout to external API: {api_url}",
"Database connection lost, retrying...",
"Invalid token received from user {user_id}",
]
for i in range(3000):
timestamp = base_time + timedelta(seconds=i * 2)
if i % 100 == 0: # Каждая 100-я строка - ошибка
level = random.choice(["ERROR", "WARNING"])
template = random.choice(error_messages)
message = template.format(
api_url=f"api-{random.randint(1,5)}.example.com",
user_id=random.randint(1000, 9999),
)
else:
level = "INFO"
template = random.choice(messages)
message = template.format(
user_id=random.randint(1000, 9999),
ip=f"192.168.{random.randint(1,255)}.{random.randint(1,255)}",
time=random.randint(10, 500),
key=f"cache_key_{random.randint(1,100)}",
session_id=f"session_{random.randint(10000,99999)}",
filename=f"file_{random.randint(1,1000)}.txt",
job_id=random.randint(1000, 9999),
task_id=random.randint(10000, 99999),
)
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_2():
"""Лог-файл 2: Burst errors (5000 строк с всплеском ошибок)"""
lines = []
base_time = datetime(2024, 1, 15, 14, 0, 0)
for i in range(5000):
timestamp = base_time + timedelta(seconds=i)
# Всплеск ошибок между 2000-2050 строками
if 2000 <= i < 2050:
level = random.choice(["ERROR", "ERROR", "ERROR", "CRITICAL"])
message = f"Database connection failed: unable to connect to host db-{random.randint(1,3)}.internal"
elif 2050 <= i < 2060:
level = "WARNING"
message = f"High latency detected: {random.randint(5000, 15000)}ms response time"
else:
level = "INFO"
message = f"Request processed: {random.choice(['GET', 'POST', 'PUT'])} /api/v1/{random.choice(['users', 'data', 'files'])}"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_3():
"""Лог-файл 3: Повторяющиеся ошибки (4000 строк)"""
lines = []
base_time = datetime(2024, 1, 15, 16, 0, 0)
repeated_error = "Authentication failed for user admin@example.com: invalid credentials"
for i in range(4000):
timestamp = base_time + timedelta(seconds=i * 3)
# Одна и та же ошибка повторяется каждые 50 строк
if i % 50 == 0:
level = "ERROR"
message = repeated_error
elif i % 75 == 0:
level = "WARNING"
message = f"Rate limit approaching: {random.randint(80, 95)}% of limit used"
else:
level = "INFO"
message = f"HTTP {random.choice([200, 200, 200, 201, 304])} {random.choice(['GET', 'POST'])} /api/{random.choice(['users', 'orders', 'products'])}"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_4():
"""Лог-файл 4: Stack traces (3500 строк)"""
lines = []
base_time = datetime(2024, 1, 15, 18, 0, 0)
stack_trace = """Traceback (most recent call last):
File "/app/services/api.py", line {line}, in process_request
result = external_api.call(data)
File "/app/lib/external_api.py", line {line2}, in call
raise ConnectionError("Service unavailable")
ConnectionError: Service unavailable"""
for i in range(3500):
timestamp = base_time + timedelta(seconds=i * 2)
if i % 200 == 0:
level = "ERROR"
message = stack_trace.format(
line=random.randint(100, 500),
line2=random.randint(50, 300)
)
else:
level = random.choice(["INFO", "DEBUG"])
message = f"Processing request {random.randint(10000, 99999)}"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_5():
"""Лог-файл 5: Временные всплески (4500 строк)"""
lines = []
base_time = datetime(2024, 1, 16, 8, 0, 0)
for i in range(4500):
# Группируем по минутам для создания всплесков
timestamp = base_time + timedelta(minutes=i // 60, seconds=i % 60)
# Всплески в определённые минуты
minute = (i // 60) % 60
if minute in [5, 15, 25, 35, 45]:
# Много событий в эти минуты
level = random.choice(["INFO", "INFO", "INFO", "WARNING", "ERROR"])
message = f"High traffic: {random.randint(100, 1000)} requests/min"
else:
level = "INFO"
message = f"Normal traffic: {random.randint(10, 50)} requests/min"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_6():
"""Лог-файл 6: Ошибка перед крашем (3000 строк)"""
lines = []
base_time = datetime(2024, 1, 16, 12, 0, 0)
for i in range(3000):
timestamp = base_time + timedelta(seconds=i)
# Последние 50 строк - критические ошибки
if i >= 2950:
level = random.choice(["CRITICAL", "ERROR"])
messages = [
"Out of memory: cannot allocate additional resources",
"Fatal error: database connection pool exhausted",
"Critical: unable to process requests, system overloaded",
"ERROR: Service unavailable, shutting down",
]
message = random.choice(messages)
elif i >= 2900:
level = "ERROR"
message = f"System resource exhaustion detected: memory usage {random.randint(95, 99)}%"
else:
level = random.choice(["INFO", "DEBUG"])
message = f"System operation: {random.choice(['cache_update', 'db_query', 'api_call'])}"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_7():
"""Лог-файл 7: Разнообразные форматы логов (4000 строк)"""
lines = []
base_time = datetime(2024, 1, 16, 14, 30, 0)
formats = [
"{timestamp} [{level}] {message}",
"[{timestamp}] {level}: {message}",
"{timestamp} {level} - {message}",
]
for i in range(4000):
timestamp = base_time + timedelta(seconds=i * 2)
level = random.choice(["INFO", "WARNING", "ERROR", "DEBUG"])
if level == "ERROR" and i % 100 == 0:
message = f"Error processing transaction {random.randint(100000, 999999)}"
else:
message = f"Event {i}: {random.choice(['user_action', 'system_check', 'data_sync'])}"
fmt = random.choice(formats)
if fmt.startswith("["):
lines.append(fmt.format(
timestamp=timestamp.strftime('%Y-%m-%d %H:%M:%S'),
level=level,
message=message
) + "\n")
else:
lines.append(fmt.format(
timestamp=timestamp.strftime('%Y-%m-%d %H:%M:%S'),
level=level,
message=message
) + "\n")
return ''.join(lines)
def generate_log_file_8():
"""Лог-файл 8: Смешанные паттерны (5000 строк)"""
lines = []
base_time = datetime(2024, 1, 17, 9, 0, 0)
for i in range(5000):
timestamp = base_time + timedelta(seconds=i)
# Разные паттерны в разных секциях
if 1000 <= i < 1100:
# Burst errors
level = "ERROR"
message = f"API endpoint /api/data failed: {random.choice(['timeout', '500', 'connection refused'])}"
elif 2000 <= i < 2100 and i % 10 == 0:
# Repeated errors
level = "ERROR"
message = "Validation error: email format is invalid"
elif 3000 <= i < 3050:
# Stack traces
level = "ERROR"
message = f"Exception in handler: ValueError at line {random.randint(1, 500)}"
elif i >= 4900:
# Error before crash
level = random.choice(["CRITICAL", "ERROR"])
message = "System failure: critical service unavailable"
else:
level = "INFO"
message = f"Normal operation: {random.choice(['request', 'response', 'cache', 'db'])} processed"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_9():
"""Лог-файл 9: Web server logs format (4500 строк)"""
lines = []
base_time = datetime(2024, 1, 17, 15, 0, 0)
ips = [f"192.168.{x}.{y}" for x in range(1, 10) for y in range(1, 50)]
for i in range(4500):
timestamp = base_time + timedelta(seconds=i)
ip = random.choice(ips)
method = random.choice(["GET", "POST", "PUT", "DELETE"])
endpoint = random.choice(["/api/users", "/api/orders", "/api/products", "/static/css", "/static/js"])
status = random.choice([200, 200, 200, 201, 404, 500, 503])
if status >= 500:
level = "ERROR"
elif status >= 400:
level = "WARNING"
else:
level = "INFO"
message = f'{ip} - - [{timestamp.strftime("%d/%b/%Y:%H:%M:%S")}] "{method} {endpoint} HTTP/1.1" {status} {random.randint(100, 5000)}'
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def generate_log_file_10():
"""Лог-файл 10: Application logs с метаданными (4000 строк)"""
lines = []
base_time = datetime(2024, 1, 18, 10, 0, 0)
for i in range(4000):
timestamp = base_time + timedelta(seconds=i * 2)
# Периодические проблемы
if i % 300 == 0:
level = "ERROR"
message = f"Service health check failed: service-{random.randint(1, 5)}.internal is down"
elif i % 150 == 0:
level = "WARNING"
message = f"Performance degradation: p95 latency increased to {random.randint(1000, 5000)}ms"
elif 3500 <= i < 3600:
# Проблемы перед концом
level = random.choice(["ERROR", "WARNING"])
message = f"Resource constraint: {random.choice(['CPU', 'Memory', 'Disk'])} usage critical"
else:
level = "INFO"
message = f"[thread-{random.randint(1, 20)}] Processing job {random.randint(10000, 99999)}: status={random.choice(['completed', 'in_progress'])}"
lines.append(f"{timestamp.strftime('%Y-%m-%d %H:%M:%S')} {level} {message}\n")
return ''.join(lines)
def test_log_file(content, file_num):
"""Тестирует обработку одного лог-файла."""
print(f"\n{'='*60}")
print(f"Testing log file {file_num}")
print(f"{'='*60}")
# Подсчёт строк
line_count = len(content.split('\n'))
print(f"Lines in file: {line_count}")
# Замер времени
start_time = time.time()
# Agent 1: Парсинг
parser = LogParserAgent()
parsed_start = time.time()
structured_data = parser.parse(content)
parsed_time = time.time() - parsed_start
events_count = len(structured_data.get('events', []))
errors_count = len(structured_data.get('errors', []))
warnings_count = len(structured_data.get('warnings', []))
print(f"\n[OK] Agent 1 (Parser): {parsed_time:.2f} sec")
print(f" - Events: {events_count}")
print(f" - Errors: {errors_count}")
print(f" - Warnings: {warnings_count}")
# Agent 2: Обнаружение аномалий
anomaly_start = time.time()
anomaly_agent = AnomalyDetectionAgent()
anomaly_report = anomaly_agent.detect(structured_data)
anomaly_time = time.time() - anomaly_start
anomalies_count = len(anomaly_report.get('anomalies', []))
print(f"\n[OK] Agent 2 (Anomaly Detection): {anomaly_time:.2f} sec")
print(f" - Anomalies detected: {anomalies_count}")
if anomalies_count > 0:
by_type = anomaly_report.get('statistics', {}).get('by_type', {})
for anomaly_type, count in by_type.items():
print(f" - {anomaly_type}: {count}")
# Agent 3: Анализ первопричин
rca_start = time.time()
rca_agent = RootCauseAgent()
recommendations = rca_agent.analyze(anomaly_report)
rca_time = time.time() - rca_start
print(f"\n[OK] Agent 3 (Root Cause Analysis): {rca_time:.2f} sec")
print(f" - Report size: {len(recommendations)} characters")
total_time = time.time() - start_time
print(f"\n[TIME] Total processing time: {total_time:.2f} sec")
print(f" Speed: {line_count / total_time:.0f} lines/sec")
return {
'file_num': file_num,
'lines': line_count,
'events': events_count,
'errors': errors_count,
'warnings': warnings_count,
'anomalies': anomalies_count,
'parsed_time': parsed_time,
'anomaly_time': anomaly_time,
'rca_time': rca_time,
'total_time': total_time
}
def main():
"""Главная функция для генерации и тестирования."""
print("=" * 60)
print("ГЕНЕРАЦИЯ И ТЕСТИРОВАНИЕ БОЛЬШИХ ЛОГ-ФАЙЛОВ")
print("=" * 60)
# Создаём папку для тестовых файлов
test_dir = "test_logs"
os.makedirs(test_dir, exist_ok=True)
# Генераторы лог-файлов
generators = [
("normal_logs.log", generate_log_file_1),
("burst_errors.log", generate_log_file_2),
("repeated_errors.log", generate_log_file_3),
("stack_traces.log", generate_log_file_4),
("temporal_spikes.log", generate_log_file_5),
("error_before_crash.log", generate_log_file_6),
("mixed_formats.log", generate_log_file_7),
("mixed_patterns.log", generate_log_file_8),
("web_server.log", generate_log_file_9),
("application_metadata.log", generate_log_file_10),
]
# Генерируем файлы
print(f"\n[GENERATING] Generating {len(generators)} test files...")
files_data = []
for filename, generator in generators:
filepath = os.path.join(test_dir, filename)
print(f" Generating: {filename}...", end=" ")
content = generator()
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
line_count = len(content.split('\n'))
file_size = len(content.encode('utf-8')) / 1024 # KB
print(f"OK ({line_count} lines, {file_size:.1f} KB)")
files_data.append((filepath, content))
print(f"\n[SUCCESS] All files created in '{test_dir}' folder")
# Тестируем каждый файл
print(f"\n[TESTING] Starting tests...")
results = []
for i, (filepath, content) in enumerate(files_data, 1):
result = test_log_file(content, i)
results.append(result)
# Итоговая статистика
print(f"\n\n{'='*60}")
print("SUMMARY STATISTICS")
print(f"{'='*60}")
print(f"\n{'#':<3} {'Lines':<8} {'Time (sec)':<12} {'Lines/sec':<12} {'Anomalies':<10}")
print("-" * 60)
total_lines = 0
total_time = 0
for result in results:
speed = result['lines'] / result['total_time'] if result['total_time'] > 0 else 0
print(f"{result['file_num']:<3} {result['lines']:<8} {result['total_time']:<12.2f} {speed:<12.0f} {result['anomalies']:<10}")
total_lines += result['lines']
total_time += result['total_time']
print("-" * 60)
avg_speed = total_lines / total_time if total_time > 0 else 0
print(f"{'TOTAL':<3} {total_lines:<8} {total_time:<12.2f} {avg_speed:<12.0f}")
print(f"\n[SUCCESS] Testing completed!")
print(f" Total processed: {total_lines} lines in {total_time:.2f} seconds")
print(f" Average speed: {avg_speed:.0f} lines/sec")
# Проверка производительности
if total_time > 100: # Если больше 100 секунд для всех файлов
print(f"\n[WARNING] Total processing time exceeds 100 seconds")
else:
print(f"\n[OK] Performance is within normal range (<100 sec for all files)")
if __name__ == "__main__":
main()