File size: 5,129 Bytes
db06ffa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Tests for the logging configuration and structured log emission."""

from __future__ import annotations

import io
import json
import logging
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch

from zsgdp.logging_config import configure_logging, get_logger
from zsgdp.pipeline import parse_document


class ConfigureLoggingTests(unittest.TestCase):
    def setUp(self) -> None:
        # Reset between tests so each one configures cleanly.
        root = logging.getLogger("zsgdp")
        for handler in list(root.handlers):
            root.removeHandler(handler)

    def test_idempotent_configuration(self):
        stream = io.StringIO()
        configure_logging(level="INFO", json_format=False, stream=stream)
        configure_logging(level="INFO", json_format=False, stream=stream)
        root = logging.getLogger("zsgdp")
        # Idempotent: still exactly one handler attached.
        self.assertEqual(len(root.handlers), 1)

    def test_text_format_emits_human_readable(self):
        stream = io.StringIO()
        configure_logging(level="INFO", json_format=False, stream=stream)
        get_logger("zsgdp.test").info("hello", extra={"doc_id": "d1"})
        output = stream.getvalue()
        self.assertIn("INFO", output)
        self.assertIn("zsgdp.test", output)
        self.assertIn("hello", output)

    def test_json_format_emits_one_line_records(self):
        stream = io.StringIO()
        configure_logging(level="INFO", json_format=True, stream=stream)
        get_logger("zsgdp.test").info("event", extra={"doc_id": "abc", "count": 3})
        output = stream.getvalue().strip()
        record = json.loads(output)
        self.assertEqual(record["level"], "INFO")
        self.assertEqual(record["logger"], "zsgdp.test")
        self.assertEqual(record["message"], "event")
        self.assertEqual(record["doc_id"], "abc")
        self.assertEqual(record["count"], 3)

    def test_default_level_is_warning(self):
        stream = io.StringIO()
        with patch.dict("os.environ", {"ZSGDP_LOG_LEVEL": "", "ZSGDP_LOG_JSON": ""}, clear=False):
            configure_logging(stream=stream)
            get_logger("zsgdp.test").info("hidden_at_default_level")
            self.assertNotIn("hidden_at_default_level", stream.getvalue())
            get_logger("zsgdp.test").warning("visible_at_default_level")
            self.assertIn("visible_at_default_level", stream.getvalue())

    def test_get_logger_namespacing(self):
        self.assertEqual(get_logger().name, "zsgdp")
        self.assertEqual(get_logger("zsgdp.foo").name, "zsgdp.foo")
        # Bare names get prefixed.
        self.assertEqual(get_logger("foo").name, "zsgdp.foo")


class PipelineLogEmissionTests(unittest.TestCase):
    def test_parse_emits_start_and_end_records(self):
        # Reset handlers so assertLogs works against the named logger.
        root = logging.getLogger("zsgdp")
        for handler in list(root.handlers):
            root.removeHandler(handler)
        root.setLevel(logging.DEBUG)
        root.propagate = True

        with tempfile.TemporaryDirectory() as tmp:
            input_path = Path(tmp) / "doc.md"
            input_path.write_text("# Doc\n\nHello.\n", encoding="utf-8")

            with self.assertLogs("zsgdp.pipeline", level="INFO") as captured:
                parse_document(input_path, Path(tmp) / "out")

        messages = [record.message for record in captured.records]
        self.assertIn("parse_start", messages)
        self.assertIn("parse_end", messages)

        # Find a parse_end record and assert structured fields are present.
        parse_end = next(record for record in captured.records if record.message == "parse_end")
        self.assertTrue(hasattr(parse_end, "doc_id"))
        self.assertTrue(hasattr(parse_end, "elapsed_seconds"))
        self.assertTrue(hasattr(parse_end, "quality_score"))
        self.assertTrue(hasattr(parse_end, "element_count"))


class RepairLogEmissionTests(unittest.TestCase):
    def test_repair_emits_iteration_record(self):
        root = logging.getLogger("zsgdp")
        for handler in list(root.handlers):
            root.removeHandler(handler)
        root.setLevel(logging.DEBUG)
        root.propagate = True

        with tempfile.TemporaryDirectory() as tmp:
            input_path = Path(tmp) / "report.md"
            # Malformed table forces a repair iteration.
            input_path.write_text(
                "# Report\n\n| A | B |\n| --- | --- |\n| 1 | 2 | 3 |\n",
                encoding="utf-8",
            )

            with self.assertLogs("zsgdp.repair.controller", level="INFO") as captured:
                parse_document(input_path, Path(tmp) / "out")

        repair_records = [r for r in captured.records if r.message == "repair_iteration"]
        self.assertGreaterEqual(len(repair_records), 1)
        # Each record carries the iteration index.
        for record in repair_records:
            self.assertTrue(hasattr(record, "iteration"))
            self.assertTrue(hasattr(record, "status"))


if __name__ == "__main__":
    unittest.main()