File size: 20,564 Bytes
1ea26af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
#!/usr/bin/env python3
# NOTICE: This file is adapted from Tencent's CognitiveKernel-Pro (https://github.com/Tencent/CognitiveKernel-Pro).
# Modifications in this fork (2025) are for academic research and educational use only; no commercial use.
# Original rights belong to the original authors and Tencent; see upstream license for details.

"""
CognitiveKernel-Pro TOML Configuration System

Centralized, typed configuration management replacing JSON/dict passing.
Follows Linus Torvalds philosophy: simple, direct, no defensive backups.
"""

import os
import logging as std_logging
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
from pathlib import Path


@dataclass
class LLMConfig:
    """Language Model configuration - HTTP-only, fail-fast"""
    call_target: str  # Must be HTTP URL
    api_key: str      # Required
    model: str        # Required
    api_base_url: Optional[str] = None  # Backward compatibility
    request_timeout: int = 600
    max_retry_times: int = 5
    max_token_num: int = 20000
    extract_body: Dict[str, Any] = field(default_factory=dict)
    # Backward compatibility attributes (ignored)
    thinking: bool = False
    seed: int = 1377


@dataclass
class WebEnvConfig:
    """Web Environment configuration (HTTP API)"""
    web_ip: str = "localhost:3000"
    web_command: str = ""
    web_timeout: int = 600
    screenshot_boxed: bool = True
    target_url: str = "https://www.bing.com/"


@dataclass
class WebEnvBuiltinConfig:
    """Playwright builtin Web Environment configuration"""
    max_browsers: int = 16
    headless: bool = True
    web_timeout: int = 600
    screenshot_boxed: bool = True
    target_url: str = "https://www.bing.com/"


@dataclass
class WebAgentConfig:
    """Web Agent configuration"""
    max_steps: int = 20
    use_multimodal: str = "auto"  # off|yes|auto
    model: LLMConfig = field(default_factory=lambda: LLMConfig(
        call_target=os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions"),
        api_key=os.environ.get("OPENAI_API_KEY", "your-api-key-here"),
        model=os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini"),
        extract_body={"temperature": 0.0, "max_tokens": 8192}
    ))
    model_multimodal: LLMConfig = field(default_factory=lambda: LLMConfig(
        call_target=os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions"),
        api_key=os.environ.get("OPENAI_API_KEY", "your-api-key-here"),
        model=os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini"),
        extract_body={"temperature": 0.0, "max_tokens": 8192}
    ))
    env: WebEnvConfig = field(default_factory=WebEnvConfig)
    env_builtin: WebEnvBuiltinConfig = field(default_factory=WebEnvBuiltinConfig)


@dataclass
class FileAgentConfig:
    """File Agent configuration"""
    max_steps: int = 16
    max_file_read_tokens: int = 3000
    max_file_screenshots: int = 2
    model: LLMConfig = field(default_factory=lambda: LLMConfig(
        call_target=os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions"),
        api_key=os.environ.get("OPENAI_API_KEY", "your-api-key-here"),
        model=os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini"),
        extract_body={"temperature": 0.3, "max_tokens": 8192}
    ))
    model_multimodal: LLMConfig = field(default_factory=lambda: LLMConfig(
        call_target=os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions"),
        api_key=os.environ.get("OPENAI_API_KEY", "your-api-key-here"),
        model=os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini"),
        extract_body={"temperature": 0.0, "max_tokens": 8192}
    ))


@dataclass
class CKAgentConfig:
    """Core CKAgent configuration"""
    name: str = "ck_agent"
    description: str = "Cognitive Kernel, an initial autopilot system."
    max_steps: int = 16
    max_time_limit: int = 4200
    recent_steps: int = 5
    obs_max_token: int = 8192
    exec_timeout_with_call: int = 1000
    exec_timeout_wo_call: int = 200
    end_template: str = "more"  # less|medium|more controls ck_end verbosity (default: more)
    model: LLMConfig = field(default_factory=lambda: LLMConfig(
        call_target=os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions"),
        api_key=os.environ.get("OPENAI_API_KEY", "your-api-key-here"),
        model=os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini"),
        extract_body={"temperature": 0.6, "max_tokens": 4000}
    ))


@dataclass
class LoggingConfig:
    """Centralized logging configuration"""
    console_level: str = "INFO"
    log_dir: str = "logs"
    session_logs: bool = True


@dataclass
class SearchConfig:
    """Search backend configuration"""
    backend: str = "google"  # google|duckduckgo




@dataclass
class EnvironmentConfig:
    """System environment configuration"""


@dataclass
class Settings:
    """Root configuration object"""
    ck: CKAgentConfig = field(default_factory=CKAgentConfig)
    web: WebAgentConfig = field(default_factory=WebAgentConfig)
    file: FileAgentConfig = field(default_factory=FileAgentConfig)
    logging: LoggingConfig = field(default_factory=LoggingConfig)
    search: SearchConfig = field(default_factory=SearchConfig)
    environment: EnvironmentConfig = field(default_factory=EnvironmentConfig)

    @classmethod
    def load(cls, path: str = "config.toml") -> "Settings":
        """Load configuration from TOML file or build from environment.

        If the TOML file does not exist and OPENAI_* environment variables are
        provided, build settings that source credentials from environment vars.
        Falls back to hardcoded defaults otherwise.
        """
        try:
            import tomllib
        except ImportError:
            # Python < 3.11 fallback
            try:
                import tomli as tomllib
            except ImportError:
                raise ImportError(
                    "TOML support requires Python 3.11+ or 'pip install tomli'"
                )

        config_path = Path(path)

        if not config_path.exists():
            # Environment-only path: create minimal sections so env fallback triggers
            env_vars = {
                "OPENAI_API_BASE": os.environ.get("OPENAI_API_BASE"),
                "OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY"),
                "OPENAI_API_MODEL": os.environ.get("OPENAI_API_MODEL")
            }

            env_present = bool(env_vars["OPENAI_API_BASE"] or env_vars["OPENAI_API_KEY"] or env_vars["OPENAI_API_MODEL"])

            if env_present:
                data: Dict[str, Any] = {
                    "ck": {"model": {}},
                    "web": {"model": {}, "model_multimodal": {}},
                    "file": {"model": {}, "model_multimodal": {}},
                }
                return cls._from_dict(data)
            else:
                return cls()

        try:
            with open(config_path, "rb") as f:
                data = tomllib.load(f)
        except Exception as e:
            raise

        return cls._from_dict(data)

    @classmethod
    def _from_dict(cls, data: Dict[str, Any]) -> "Settings":
        """Convert TOML dict to Settings object"""
        # Extract sections with defaults
        ck_data = data.get("ck", {})
        web_data = data.get("web", {})
        file_data = data.get("file", {})
        logging_data = data.get("logging", {})
        search_data = data.get("search", {})
        environment_data = data.get("environment", {})

        # Build nested configs
        ck_config = CKAgentConfig(
            name=ck_data.get("name", "ck_agent"),
            description=ck_data.get("description", "Cognitive Kernel, an initial autopilot system."),
            max_steps=ck_data.get("max_steps", 16),
            max_time_limit=ck_data.get("max_time_limit", 4200),
            recent_steps=ck_data.get("recent_steps", 5),
            obs_max_token=ck_data.get("obs_max_token", 8192),
            exec_timeout_with_call=ck_data.get("exec_timeout_with_call", 1000),
            exec_timeout_wo_call=ck_data.get("exec_timeout_wo_call", 200),
            end_template=ck_data.get("end_template", "more"),
            # Always build model (even if empty dict) so env fallback can apply
            model=cls._build_llm_config(ck_data.get("model", {}), {
                "temperature": 0.6, "max_tokens": 4000
            })
        )

        web_config = WebAgentConfig(
            max_steps=web_data.get("max_steps", 20),
            use_multimodal=web_data.get("use_multimodal", "auto"),
            model=cls._build_llm_config(web_data.get("model", {}), {
                "temperature": 0.0, "max_tokens": 8192
            }),
            model_multimodal=cls._build_llm_config(web_data.get("model_multimodal", {}), {
                "temperature": 0.0, "max_tokens": 8192
            }),
            env=cls._build_web_env_config(web_data.get("env", {})),
            env_builtin=cls._build_web_env_builtin_config(web_data.get("env_builtin", {}))
        )

        file_config = FileAgentConfig(
            max_steps=file_data.get("max_steps", 16),
            max_file_read_tokens=file_data.get("max_file_read_tokens", 3000),
            max_file_screenshots=file_data.get("max_file_screenshots", 2),
            model=cls._build_llm_config(file_data.get("model", {}), {
                "temperature": 0.3, "max_tokens": 8192
            }),
            model_multimodal=cls._build_llm_config(file_data.get("model_multimodal", {}), {
                "temperature": 0.0, "max_tokens": 8192
            })
        )

        logging_config = LoggingConfig(
            console_level=logging_data.get("console_level", "INFO"),
            log_dir=logging_data.get("log_dir", "logs"),
            session_logs=logging_data.get("session_logs", True)
        )

        search_config = SearchConfig(
            backend=search_data.get("backend", "google")
        )

        environment_config = EnvironmentConfig()

        return cls(
            ck=ck_config,
            web=web_config,
            file=file_config,
            logging=logging_config,
            search=search_config,
            environment=environment_config
        )

    @staticmethod
    def _build_llm_config(llm_data: Dict[str, Any], default_extract_body: Dict[str, Any]) -> LLMConfig:
        """Build LLMConfig from TOML data - HTTP-only, fail-fast

        Priority order: TOML config > Inheritance > Environment variables > Hardcoded defaults

        Environment variable support:
        - OPENAI_API_BASE: Default API base URL
        - OPENAI_API_KEY: Default API key
        - OPENAI_API_MODEL: Default model name

        Environment variables are only used when the corresponding config value is not provided.
        """
        # Merge default extract_body with config
        extract_body = default_extract_body.copy()
        extract_body.update(llm_data.get("extract_body", {}))
        # Also support legacy call_kwargs section for backward compatibility
        extract_body.update(llm_data.get("call_kwargs", {}))

        # HTTP-only validation and environment variable fallback
        call_target = llm_data.get("call_target")
        if call_target is None:
            call_target = os.environ.get("OPENAI_API_BASE", "https://api.openai.com/v1/chat/completions")

        # Validate HTTP URL regardless of source (config or env var)
        if not call_target.startswith("http"):
            raise ValueError(f"call_target must be HTTP URL, got: {call_target}")

        api_key = llm_data.get("api_key")
        if not api_key:
            api_key = os.environ.get("OPENAI_API_KEY", "your-api-key-here")

        model = llm_data.get("model")
        if not model:
            model = os.environ.get("OPENAI_API_MODEL", "gpt-4o-mini")

        # Extract api_base_url from call_target only if explicitly requested
        api_base_url = llm_data.get("api_base_url")
        # Do not auto-extract from call_target to preserve inheritance behavior

        config = LLMConfig(
            call_target=call_target,
            api_key=api_key,
            model=model,
            api_base_url=api_base_url,
            request_timeout=llm_data.get("request_timeout", 600),
            max_retry_times=llm_data.get("max_retry_times", 5),
            max_token_num=llm_data.get("max_token_num", 20000),
            extract_body=extract_body,
            thinking=llm_data.get("thinking", False),
            seed=llm_data.get("seed", 1377),
        )

        return config

    @staticmethod
    def _build_web_env_config(env_data: Dict[str, Any]) -> WebEnvConfig:
        """Build WebEnvConfig from TOML data"""
        return WebEnvConfig(
            web_ip=env_data.get("web_ip", "localhost:3000"),
            web_command=env_data.get("web_command", ""),
            web_timeout=env_data.get("web_timeout", 600),
            screenshot_boxed=env_data.get("screenshot_boxed", True),
            target_url=env_data.get("target_url", "https://www.bing.com/")
        )

    @staticmethod
    def _build_web_env_builtin_config(env_data: Dict[str, Any]) -> WebEnvBuiltinConfig:
        """Build WebEnvBuiltinConfig from TOML data"""
        return WebEnvBuiltinConfig(
            max_browsers=env_data.get("max_browsers", 16),
            headless=env_data.get("headless", True),
            web_timeout=env_data.get("web_timeout", 600),
            screenshot_boxed=env_data.get("screenshot_boxed", True),
            target_url=env_data.get("target_url", "https://www.bing.com/")
        )

    def validate(self) -> None:
        """Validate configuration values"""
        # Validate use_multimodal enum
        if self.web.use_multimodal not in {"off", "yes", "auto"}:
            raise ValueError(f"web.use_multimodal must be 'off', 'yes', or 'auto', got: {self.web.use_multimodal}")

        # Validate search backend
        if self.search.backend not in {"google", "duckduckgo"}:
            raise ValueError(f"search.backend must be 'google' or 'duckduckgo', got: {self.search.backend}")

        # Validate std_logging level
        valid_levels = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
        if self.logging.console_level not in valid_levels:
            raise ValueError(f"logging.console_level must be one of {valid_levels}, got: {self.logging.console_level}")

    def to_ckagent_kwargs(self) -> Dict[str, Any]:
        """Convert Settings to CKAgent constructor kwargs"""
        # Parent→child inheritance for API creds
        parent_model = self._llm_config_to_dict(self.ck.model)
        web_model = self._llm_config_to_dict(self.web.model)
        file_model = self._llm_config_to_dict(self.file.model)
        web_mm_model = self._llm_config_to_dict(self.web.model_multimodal)
        file_mm_model = self._llm_config_to_dict(self.file.model_multimodal)

        def inherit(child: Dict[str, Any], parent: Dict[str, Any]) -> Dict[str, Any]:
            # Inherit fields that are missing or empty in child
            if ("api_base_url" not in child or not child.get("api_base_url")) and "api_base_url" in parent:
                child["api_base_url"] = parent["api_base_url"]
            if ("api_key" not in child or not child.get("api_key")) and "api_key" in parent:
                child["api_key"] = parent["api_key"]
            if ("model" not in child or not child.get("model")) and "model" in parent:
                child["model"] = parent["model"]
            return child

        web_model = inherit(web_model, parent_model)
        file_model = inherit(file_model, parent_model)
        web_mm_model = inherit(web_mm_model, parent_model)
        file_mm_model = inherit(file_mm_model, parent_model)

        # Legacy tests expect a reduced model dict with call_kwargs etc.
        def reduce_model(m: Dict[str, Any]) -> Dict[str, Any]:
            out = {
                "call_target": m.get("call_target"),
                "thinking": m.get("thinking", False),
                "request_timeout": m.get("request_timeout", 600),
                "max_retry_times": m.get("max_retry_times", 5),
                "seed": m.get("seed", 1377),
                "max_token_num": m.get("max_token_num", 20000),
                "call_kwargs": m.get("extract_body", {}),
            }
            # Preserve API credentials for integration tests that assert existence
            if m.get("api_key") is not None:
                out["api_key"] = m["api_key"]
            if m.get("api_base_url") is not None:
                out["api_base_url"] = m["api_base_url"]
            if m.get("model") is not None:
                out["model"] = m["model"]
            return out

        return {
            "name": self.ck.name,
            "description": self.ck.description,
            "max_steps": self.ck.max_steps,
            "max_time_limit": self.ck.max_time_limit,
            "recent_steps": self.ck.recent_steps,
            "obs_max_token": self.ck.obs_max_token,
            "exec_timeout_with_call": self.ck.exec_timeout_with_call,
            "exec_timeout_wo_call": self.ck.exec_timeout_wo_call,
            "end_template": self.ck.end_template,
            "model": reduce_model(parent_model),
            "web_agent": {
                "max_steps": self.web.max_steps,
                "use_multimodal": self.web.use_multimodal,
                "model": reduce_model(web_model),
                "model_multimodal": reduce_model(web_mm_model),
                "web_env_kwargs": {
                    "web_ip": self.web.env.web_ip,
                    "web_command": self.web.env.web_command,
                    "web_timeout": self.web.env.web_timeout,
                    "screenshot_boxed": self.web.env.screenshot_boxed,
                    "target_url": self.web.env.target_url,
                    # Builtin env config for fuse fallback
                    "max_browsers": self.web.env_builtin.max_browsers,
                    "headless": self.web.env_builtin.headless,
                }
            },
            "file_agent": {
                "max_steps": self.file.max_steps,
                "max_file_read_tokens": self.file.max_file_read_tokens,
                "max_file_screenshots": self.file.max_file_screenshots,
                "model": reduce_model(file_model),
                "model_multimodal": reduce_model(file_mm_model),
            },
            "search_backend": self.search.backend,  # Add search backend configuration
        }

    def _llm_config_to_dict(self, llm_config: LLMConfig) -> Dict[str, Any]:
        """Convert LLMConfig to dict for agent initialization - HTTP-only"""
        return {
            "call_target": llm_config.call_target,
            "api_key": llm_config.api_key,
            "model": llm_config.model,
            "extract_body": llm_config.extract_body.copy(),
            "request_timeout": llm_config.request_timeout,
            "max_retry_times": llm_config.max_retry_times,
            "max_token_num": llm_config.max_token_num,
            # Backward compatibility (ignored by LLM)
            "thinking": llm_config.thinking,
            "seed": llm_config.seed,
        }

    def build_logger(self) -> std_logging.Logger:
        """Create configured logger instance"""
        # Create logs directory
        log_dir = Path(self.logging.log_dir)
        log_dir.mkdir(exist_ok=True)

        # Create logger
        logger = std_logging.getLogger("CognitiveKernel")
        logger.setLevel(getattr(std_logging, self.logging.console_level))

        # Clear existing handlers
        logger.handlers.clear()

        # Console handler
        console_handler = std_logging.StreamHandler()
        console_handler.setLevel(getattr(std_logging, self.logging.console_level))
        console_formatter = std_logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        console_handler.setFormatter(console_formatter)
        logger.addHandler(console_handler)

        # File handler if session_logs enabled
        if self.logging.session_logs:
            from datetime import datetime
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            log_file = log_dir / f"ck_session_{timestamp}.log"
            file_handler = std_logging.FileHandler(log_file, encoding="utf-8")
            file_handler.setLevel(getattr(std_logging, self.logging.console_level))
            file_handler.setFormatter(console_formatter)
            logger.addHandler(file_handler)

        return logger