File size: 12,465 Bytes
3fde5f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import os
import shutil
from upgrade_codes.upgrade_core.constants import (
    USER_CONF,
    BACKUP_CONF,
    TEXTS,
    ZH_DEFAULT_CONF,
    EN_DEFAULT_CONF,
    TEXTS_COMPARE,
    TEXTS_MERGE,
)
import logging
from ruamel.yaml import YAML
from src.open_llm_vtuber.config_manager.utils import load_text_file_with_guess_encoding
from upgrade_codes.upgrade_core.comment_sync import CommentSynchronizer
from upgrade_codes.version_manager import VersionUpgradeManager
from upgrade_codes.upgrade_core.upgrade_utils import UpgradeUtility
from upgrade_codes.upgrade_core.comment_diff_fn import comment_diff_fn
from packaging import version


class ConfigSynchronizer:
    def __init__(self, lang="en", logger=logging.getLogger(__name__)):
        self.lang = lang
        self.texts = TEXTS[lang]
        self.default_path = ZH_DEFAULT_CONF if lang == "zh" else EN_DEFAULT_CONF
        self.yaml = YAML()
        self.yaml.preserve_quotes = True
        self.user_path = USER_CONF
        self.backup_path = BACKUP_CONF
        self.texts_merge = TEXTS_MERGE.get(lang, TEXTS_MERGE["en"])
        self.texts_compare = TEXTS_COMPARE.get(lang, TEXTS_COMPARE["en"])
        self.logger = logger
        self.upgrade_utils = UpgradeUtility(self.logger, self.lang)

    def sync_user_config(self) -> None:
        """

        Ensure the user configuration file exists and create a backup if necessary.

        If the user config file does not exist, copy the default config.

        """
        # Check if the user config file exists
        if not os.path.exists(self.user_path):
            self.logger.warning(self.texts["no_config"])
            self.logger.warning(self.texts["copy_default_config"])
            # Copy default config to user path
            shutil.copy2(self.default_path, self.user_path)
            return

        # Create a backup of the user config file
        self.backup_user_config()

    def update_user_config(self) -> None:
        """

        Perform the actual update operations on the user configuration file:

        1. Compare and update configuration fields

        2. Synchronize comments

        3. Upgrade version if needed

        """

        # Step 1: Update config fields
        if not self.compare_field_keys():
            self.merge_and_update_user_config()
        else:
            self.logger.info(self.texts["configs_up_to_date"])

        # Step 2: Sync comments
        if not self.compare_comments():
            comment_sync = CommentSynchronizer(
                self.default_path,
                self.user_path,
                self.logger,
                self.yaml,
                self.texts_compare,
            )
            comment_sync.sync()
        else:
            self.logger.info(self.texts_compare["comments_up_to_date"])

        # Step 3: Determine whether upgrade is needed
        new_version = self.get_latest_version()
        old_version = self.get_old_version()
        need_upgrade = old_version != new_version

        # Step 4: Run upgrade if needed
        if need_upgrade:
            version_upgrade_manager = VersionUpgradeManager(self.lang, self.logger)
            final_version = version_upgrade_manager.upgrade(old_version)
            self.logger.info(
                self.texts["version_upgrade_success"].format(
                    old=old_version, new=final_version
                )
            )
        else:
            self.logger.info(
                self.texts["version_upgrade_none"].format(version=old_version)
            )

    def backup_user_config(self):
        backup_path = os.path.abspath(self.backup_path)
        self.logger.info(
            self.texts["backup_user_config"].format(
                user_conf=self.user_path, backup_conf=self.backup_path
            )
        )
        self.logger.debug(self.texts["config_backup_path"].format(path=backup_path))
        shutil.copy2(self.user_path, self.backup_path)

    def merge_and_update_user_config(self):
        try:
            new_keys = self.merge_configs()
            if new_keys:
                self.logger.info(self.texts["merged_config_success"])
                for key in new_keys:
                    self.logger.info(f"  - {key}")
            else:
                self.logger.info(self.texts["merged_config_none"])
        except Exception as e:
            self.logger.error(self.texts["merge_failed"].format(error=e))

    def merge_configs(self):
        user_config = self.yaml.load(load_text_file_with_guess_encoding(self.user_path))
        default_config = self.yaml.load(
            load_text_file_with_guess_encoding(self.default_path)
        )

        new_keys = []

        def merge(d_user, d_default, path=""):
            for k, v in d_default.items():
                current_path = f"{path}.{k}" if path else k
                if k not in d_user:
                    d_user[k] = v
                    new_keys.append(current_path)
                elif isinstance(v, dict) and isinstance(d_user.get(k), dict):
                    merge(d_user[k], v, current_path)
            return d_user

        merged = merge(user_config, default_config)

        with open(self.user_path, "w", encoding="utf-8") as f:
            self.yaml.dump(merged, f)

        for key in new_keys:
            self.logger.info(self.texts_merge["new_config_item"].format(key=key))
        return new_keys

    def collect_all_subkeys(self, d, base_path):
        """Collect all keys in the dictionary d, recursively, with base_path as the prefix."""
        keys = []
        # Only process if d is a dictionary
        if isinstance(d, dict):
            for key, value in d.items():
                current_path = f"{base_path}.{key}" if base_path else key
                keys.append(current_path)
                if isinstance(value, dict):
                    keys.extend(self.collect_all_subkeys(value, current_path))
        return keys

    def get_missing_keys(self, user, default, path=""):
        """Recursively find keys in default that are missing in user."""
        missing = []
        for key, default_val in default.items():
            current_path = f"{path}.{key}" if path else key
            if key not in user:
                missing.append(current_path)
            else:
                user_val = user[key]
                if isinstance(default_val, dict):
                    if isinstance(user_val, dict):
                        missing.extend(
                            self.get_missing_keys(user_val, default_val, current_path)
                        )
                    else:
                        subtree_missing = self.collect_all_subkeys(
                            default_val, current_path
                        )
                        missing.extend(subtree_missing)
        return missing

    def get_extra_keys(self, user, default, path=""):
        """Recursively find keys in user that are not present in default."""
        extra = []
        for key, user_val in user.items():
            current_path = f"{path}.{key}" if path else key
            if key not in default:
                # Only collect subkeys if the value is a dictionary
                if isinstance(user_val, dict):
                    subtree_extra = self.collect_all_subkeys(user_val, current_path)
                    extra.extend(subtree_extra)
                extra.append(current_path)
            else:
                default_val = default[key]
                if isinstance(user_val, dict) and isinstance(default_val, dict):
                    extra.extend(
                        self.get_extra_keys(user_val, default_val, current_path)
                    )
                elif isinstance(user_val, dict):
                    subtree_extra = self.collect_all_subkeys(user_val, current_path)
                    extra.extend(subtree_extra)
        return extra

    def delete_extra_keys(self):
        """Delete extra keys in user config that are not present in default config."""

        user_config = self.yaml.load(load_text_file_with_guess_encoding(self.user_path))
        default_config = self.yaml.load(
            load_text_file_with_guess_encoding(self.default_path)
        )
        extra_keys = self.get_extra_keys(user_config, default_config)

        def delete_key_by_path(config_dict, key_path):
            keys = key_path.split(".")
            sub_dict = config_dict
            for k in keys[:-1]:
                if k in sub_dict and isinstance(sub_dict[k], dict):
                    sub_dict = sub_dict[k]
                else:
                    return False
            return sub_dict.pop(keys[-1], None) is not None

        deleted_keys = []
        for key_path in extra_keys:
            if delete_key_by_path(user_config, key_path):
                deleted_keys.append(key_path)

        with open(self.user_path, "w", encoding="utf-8") as f:
            self.yaml.dump(user_config, f)

        self.logger.info(
            self.texts_compare["extra_keys_deleted_count"].format(
                count=len(deleted_keys)
            )
        )
        for key in deleted_keys:
            self.logger.info(
                self.texts_compare["extra_keys_deleted_item"].format(key=key)
            )

    def compare_field_keys(self) -> bool:
        """Compare field structure differences (missing/extra keys)"""

        def field_compare_fn(user, default):
            missing = self.get_missing_keys(user, default)
            extra = self.get_extra_keys(user, default)

            if missing:
                self.logger.warning(
                    self.texts_compare["missing_keys"].format(keys=", ".join(missing))
                )
            if extra:
                self.logger.warning(
                    self.texts_compare["extra_keys"].format(keys=", ".join(extra))
                )
                self.delete_extra_keys()
            return (not missing, missing + extra)

        return self.upgrade_utils.compare_dicts(
            name="keys",
            get_a=lambda: self.yaml.load(
                load_text_file_with_guess_encoding(self.user_path)
            ),
            get_b=lambda: self.yaml.load(
                load_text_file_with_guess_encoding(self.default_path)
            ),
            compare_fn=field_compare_fn,
        )

    def compare_comments(self) -> bool:
        return self.upgrade_utils.compare_dicts(
            name="comments",
            get_a=lambda: load_text_file_with_guess_encoding(self.user_path),
            get_b=lambda: load_text_file_with_guess_encoding(self.default_path),
            compare_fn=comment_diff_fn,
        )

    def get_latest_version(self):
        with open(self.default_path, "r", encoding="utf-8") as f:
            default_config = self.yaml.load(f)
        return default_config.get("system_config", {}).get("conf_version", "")

    def get_old_version(self) -> str:
        """

        Extract the old version from backup config.

        If missing or too old (< v1.1.1), fallback to v1.1.1.

        """
        fallback_version = "v1.1.1"
        try:
            yaml = YAML()
            with open(BACKUP_CONF, "r", encoding="utf-8") as f:
                backup_conf = yaml.load(f)
                raw_version = backup_conf.get("system_config", {}).get(
                    "conf_version", fallback_version
                )

                if version.parse(raw_version) < version.parse(fallback_version):
                    self.logger.warning(
                        self.texts["version_too_old"].format(
                            found=raw_version, adjusted=fallback_version
                        )
                    )
                    return fallback_version

                self.logger.info(
                    self.texts["backup_used_version"].format(backup_version=raw_version)
                )
                return raw_version
        except Exception as e:
            self.logger.warning(
                self.texts["backup_read_error"].format(
                    version=fallback_version, error=e
                )
            )
            return fallback_version