Spaces:
Paused
Paused
File size: 9,190 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 | # -*- coding: utf-8 -*-
"""
Tests for logging_utils/core/rendering.py
Covers:
- GridFormatter shutdown safety
- PlainGridFormatter
- SemanticHighlighter
- BurstBuffer suppression
"""
import logging
import sys
from unittest.mock import patch
from logging_utils.core.rendering import (
BurstBuffer,
GridFormatter,
PlainGridFormatter,
SemanticHighlighter,
normalize_source,
)
class TestNormalizeSource:
"""Tests for normalize_source function."""
def test_known_source_mapping(self):
"""Test that known sources are mapped correctly."""
assert normalize_source("api") == "API "
assert normalize_source("API") == "API "
assert normalize_source("launcher") == "LNCHR"
assert normalize_source("LAUNCHER") == "LNCHR"
def test_unknown_source_truncated(self):
"""Test that unknown sources are truncated to 5 chars."""
result = normalize_source("verylongsourcename")
assert len(result) == 5
assert result == "VERYL"
def test_short_source_padded(self):
"""Test that short sources are padded to 5 chars."""
result = normalize_source("ab")
assert len(result) == 5
assert result == "AB "
class TestSemanticHighlighter:
"""Tests for SemanticHighlighter."""
def test_highlight_disabled(self):
"""Test that highlighting can be disabled."""
text = "Test message with 'quotes' and numbers 123"
result = SemanticHighlighter.highlight(text, colorize=False)
assert result == text
def test_highlight_tags(self):
"""Test that tags like [UI] are highlighted."""
text = "[UI] Some message"
result = SemanticHighlighter.highlight(text, colorize=True)
# Should contain ANSI codes
assert "\x1b[" in result
assert "UI" in result
def test_highlight_booleans(self):
"""Test that True/False/None are highlighted."""
text = "Value is True and other is False"
result = SemanticHighlighter.highlight(text, colorize=True)
assert "\x1b[" in result
assert "True" in result
assert "False" in result
def test_highlight_status_phrases(self):
"""Test that status phrases like error/success are highlighted."""
text = "Operation successful"
result = SemanticHighlighter.highlight(text, colorize=True)
assert "\x1b[" in result
assert "successful" in result
class TestBurstBuffer:
"""Tests for BurstBuffer suppression."""
def test_first_message_returned(self):
"""Test that first message is returned immediately."""
buffer = BurstBuffer()
prev, current = buffer.process("key1", "formatted line 1")
assert prev == "formatted line 1"
assert current is None
def test_duplicate_suppressed(self):
"""Test that duplicate messages are suppressed."""
buffer = BurstBuffer()
buffer.process("key1", "line 1")
# Second identical message
prev, current = buffer.process("key1", "line 1")
assert prev is None
assert current is None
def test_different_message_returns_new_message(self):
"""Test that different message returns the new message."""
buffer = BurstBuffer()
buffer.process("key1", "line 1")
# Different message with no duplicates
# When prev_count == 1, returns (formatted_line, None)
# meaning the NEW line is returned in prev, current is None
prev, current = buffer.process("key2", "line 2")
assert prev == "line 2"
assert current is None
def test_flush_with_count(self):
"""Test flush returns count when duplicates exist."""
buffer = BurstBuffer()
buffer.process("key1", "line 1")
buffer.process("key1", "line 1") # Duplicate
buffer.process("key1", "line 1") # Another duplicate
result = buffer.flush()
assert result is not None
assert "Repeated" in result
assert "3" in result
def test_flush_no_duplicates(self):
"""Test flush returns None when no duplicates."""
buffer = BurstBuffer()
buffer.process("key1", "line 1")
result = buffer.flush()
# No repeat count if only one message
assert result is None
class TestGridFormatterShutdownSafety:
"""Tests for GridFormatter shutdown safety - prevent ImportError during shutdown."""
def test_format_returns_raw_message_when_meta_path_is_none(self):
"""Test that format returns raw message when sys.meta_path is None."""
formatter = GridFormatter(colorize=False, burst_suppression=False)
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Test message during shutdown",
args=(),
exc_info=None,
)
with patch.object(sys, "meta_path", None):
result = formatter.format(record)
# Should return the raw message, not crash
assert result == "Test message during shutdown"
def test_format_works_normally_when_not_shutting_down(self):
"""Test that format works normally when Python is not shutting down."""
formatter = GridFormatter(colorize=False, burst_suppression=False)
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Normal message",
args=(),
exc_info=None,
)
result = formatter.format(record)
# Should contain timestamp, level, etc.
assert "INF" in result
assert "Normal message" in result
def test_format_skips_separator_lines(self):
"""Test that separator lines (---) are skipped."""
formatter = GridFormatter(colorize=False, burst_suppression=False)
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="--- Separator ---",
args=(),
exc_info=None,
)
result = formatter.format(record)
assert result == ""
class TestPlainGridFormatter:
"""Tests for PlainGridFormatter."""
def test_format_no_ansi_codes(self):
"""Test that plain formatter produces no ANSI codes."""
formatter = PlainGridFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Plain message",
args=(),
exc_info=None,
)
result = formatter.format(record)
# Should not contain ANSI escape codes
assert "\x1b[" not in result
assert "INF" in result
assert "Plain message" in result
def test_format_skips_separator_lines(self):
"""Test that separator lines are skipped."""
formatter = PlainGridFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="=== Separator ===",
args=(),
exc_info=None,
)
result = formatter.format(record)
assert result == ""
class TestGridFormatterIntegration:
"""Integration tests for GridFormatter."""
def test_format_with_context_vars(self):
"""Test formatter uses context variables."""
from logging_utils.core.context import request_id_var, source_var
formatter = GridFormatter(colorize=False, burst_suppression=False)
req_token = request_id_var.set("abc1234")
src_token = source_var.set("API")
try:
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Context test",
args=(),
exc_info=None,
)
result = formatter.format(record)
assert "abc1234" in result
assert "API" in result
assert "Context test" in result
finally:
request_id_var.reset(req_token)
source_var.reset(src_token)
def test_format_all_log_levels(self):
"""Test formatter handles all log levels."""
formatter = GridFormatter(colorize=False, burst_suppression=False)
levels = [
(logging.DEBUG, "DBG"),
(logging.INFO, "INF"),
(logging.WARNING, "WRN"),
(logging.ERROR, "ERR"),
(logging.CRITICAL, "CRT"),
]
for level, abbrev in levels:
record = logging.LogRecord(
name="test",
level=level,
pathname="test.py",
lineno=1,
msg=f"Level {abbrev} message",
args=(),
exc_info=None,
)
result = formatter.format(record)
assert abbrev in result, f"Expected {abbrev} in result for level {level}"
|