Spaces:
Running
Running
File size: 6,280 Bytes
7ab470d 75fe59b 7ab470d 75fe59b 7ab470d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""
Tests for Loguru context variable propagation.
These tests verify:
- Request ID propagates through context vars
- Context preserves across async operations
- Logging works correctly in async task chains
"""
import asyncio
from pathlib import Path
import pytest
from loguru import logger
class TestLoguruContext:
"""Test Loguru context variable propagation."""
def test_context_vars_propagation(self, tmp_path: Path):
"""Verify request ID propagates through context vars."""
from app.core.logging import request_id_var
# Configure logger to write to file
log_file = tmp_path / "context.log"
logger.remove()
logger.add(log_file, format="{extra[request_id]} | {message}", level="INFO")
# Set context and log
request_id_var.set("req-abc-123")
logger.bind(request_id=request_id_var.get()).info("Processing request")
output = log_file.read_text()
assert "req-abc-123" in output
assert "Processing request" in output
@pytest.mark.asyncio
async def test_logging_in_async_tasks(self, tmp_path: Path):
"""Verify logging works correctly in async task chains."""
from app.core.logging import request_id_var
log_file = tmp_path / "async.log"
logger.remove()
logger.add(log_file, format="{message}", level="INFO")
async def async_task(task_id: int):
# Each task has its own context
request_id_var.set(f"task-{task_id}")
logger.bind(task_id=task_id).info(f"Task {task_id} executing")
await asyncio.sleep(0.01)
logger.bind(task_id=task_id).info(f"Task {task_id} complete")
# Run multiple async tasks
await asyncio.gather(*[async_task(i) for i in range(3)])
output = log_file.read_text()
assert "Task 0 executing" in output
assert "Task 1 executing" in output
assert "Task 2 executing" in output
assert "Task 0 complete" in output
assert "Task 1 complete" in output
assert "Task 2 complete" in output
def test_context_isolation_between_requests(self, tmp_path: Path):
"""Verify context variables are isolated between different request contexts."""
from app.core.logging import request_id_var
log_file = tmp_path / "isolation.log"
logger.remove()
logger.add(log_file, format="{extra[request_id]} | {message}", level="INFO")
# Simulate first request
request_id_var.set("request-1")
logger.bind(request_id=request_id_var.get()).info("First request")
# Simulate second request (new context)
request_id_var.set("request-2")
logger.bind(request_id=request_id_var.get()).info("Second request")
output = log_file.read_text()
assert "request-1 | First request" in output
assert "request-2 | Second request" in output
@pytest.mark.asyncio
async def test_context_preservation_across_await(self, tmp_path: Path):
"""Verify context is preserved across await calls."""
from app.core.logging import request_id_var
log_file = tmp_path / "preservation.log"
logger.remove()
logger.add(log_file, format="{extra[request_id]} | {message}", level="INFO")
async def process_with_delay(request_id: str):
request_id_var.set(request_id)
logger.bind(request_id=request_id_var.get()).info("Before await")
await asyncio.sleep(0.01)
logger.bind(request_id=request_id_var.get()).info("After await")
await process_with_delay("preserved-id")
output = log_file.read_text()
assert "preserved-id | Before await" in output
assert "preserved-id | After await" in output
def test_request_logger_uses_context(self, tmp_path: Path):
"""Verify RequestLogger class uses context variables."""
from app.core.logging import RequestLogger, request_id_var
log_file = tmp_path / "request_logger.log"
logger.remove()
logger.add(log_file, format="{message}", level="INFO")
# Set request ID in context
request_id_var.set("context-req-456")
# Create RequestLogger and log
req_logger = RequestLogger()
req_logger.log_request("POST", "/api/test", "context-req-456")
output = log_file.read_text()
assert "context-req-456" in output
assert "POST /api/test" in output
def test_context_with_none_value(self, tmp_path: Path):
"""Verify logger handles None request ID gracefully."""
from app.core.logging import request_id_var
log_file = tmp_path / "none_context.log"
logger.remove()
logger.add(log_file, format="{message}", level="INFO")
# Don't set request ID (should be None)
request_id_var.set(None)
logger.info("No request ID")
output = log_file.read_text()
assert "No request ID" in output # Should not raise exception
@pytest.mark.asyncio
async def test_concurrent_async_tasks_with_different_request_ids(
self, tmp_path: Path
):
"""Verify concurrent tasks maintain separate request ID contexts."""
from app.core.logging import request_id_var
log_file = tmp_path / "concurrent.log"
logger.remove()
logger.add(log_file, format="{message}", level="INFO")
async def task_with_context(request_id: str, delay: float):
request_id_var.set(request_id)
logger.bind(request_id=request_id).info(f"{request_id} started")
await asyncio.sleep(delay)
logger.bind(request_id=request_id).info(f"{request_id} finished")
# Run tasks concurrently with different delays
await asyncio.gather(
task_with_context("req-fast", 0.01),
task_with_context("req-slow", 0.02),
task_with_context("req-medium", 0.015),
)
output = log_file.read_text()
assert "req-fast started" in output
assert "req-slow started" in output
assert "req-medium started" in output
assert "req-fast finished" in output
assert "req-slow finished" in output
assert "req-medium finished" in output
|