|
|
""" |
|
|
OmniCoreX Utilities Module |
|
|
|
|
|
Helper functions, logging setup, configuration parsing, |
|
|
and common utilities used throughout the OmniCoreX system. |
|
|
|
|
|
Features: |
|
|
- Robust logging setup with configurable formats and levels. |
|
|
- Configuration loader supporting YAML and JSON with overrides. |
|
|
- Seed setting for reproducibility. |
|
|
- Timing and benchmarking decorators. |
|
|
- Various small utilities for system use. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import yaml |
|
|
import json |
|
|
import logging |
|
|
import random |
|
|
import time |
|
|
import numpy as np |
|
|
import torch |
|
|
|
|
|
|
|
|
|
|
|
def setup_logging(log_level=logging.INFO, log_file: str = None) -> logging.Logger: |
|
|
""" |
|
|
Sets up a logger with console and optional file handlers. |
|
|
|
|
|
Args: |
|
|
log_level: Logging level (e.g., logging.INFO). |
|
|
log_file: Optional path to log file. |
|
|
|
|
|
Returns: |
|
|
Configured logger instance. |
|
|
""" |
|
|
logger = logging.getLogger("OmniCoreX") |
|
|
logger.setLevel(log_level) |
|
|
formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(name)s | %(message)s") |
|
|
|
|
|
|
|
|
for handler in logger.handlers[:]: |
|
|
logger.removeHandler(handler) |
|
|
|
|
|
|
|
|
ch = logging.StreamHandler(sys.stdout) |
|
|
ch.setLevel(log_level) |
|
|
ch.setFormatter(formatter) |
|
|
logger.addHandler(ch) |
|
|
|
|
|
|
|
|
if log_file: |
|
|
fh = logging.FileHandler(log_file) |
|
|
fh.setLevel(log_level) |
|
|
fh.setFormatter(formatter) |
|
|
logger.addHandler(fh) |
|
|
|
|
|
return logger |
|
|
|
|
|
|
|
|
logger = setup_logging() |
|
|
|
|
|
|
|
|
|
|
|
def load_config_file(config_path: str) -> dict: |
|
|
""" |
|
|
Loads a YAML or JSON configuration file. |
|
|
|
|
|
Args: |
|
|
config_path: Path to the config file. |
|
|
|
|
|
Returns: |
|
|
Dictionary of configuration parameters. |
|
|
""" |
|
|
if not os.path.isfile(config_path): |
|
|
raise FileNotFoundError(f"Config file not found: {config_path}") |
|
|
|
|
|
ext = os.path.splitext(config_path)[1].lower() |
|
|
with open(config_path, "r", encoding="utf-8") as f: |
|
|
if ext in [".yaml", ".yml"]: |
|
|
cfg = yaml.safe_load(f) |
|
|
elif ext == ".json": |
|
|
cfg = json.load(f) |
|
|
else: |
|
|
raise ValueError(f"Unsupported config format: {ext}") |
|
|
|
|
|
return cfg |
|
|
|
|
|
def merge_dicts(base: dict, override: dict) -> dict: |
|
|
""" |
|
|
Deep merges two dictionaries, with the override taking precedence. |
|
|
|
|
|
Args: |
|
|
base: Base dictionary. |
|
|
override: Dictionary with override values. |
|
|
|
|
|
Returns: |
|
|
Merged dictionary. |
|
|
""" |
|
|
result = base.copy() |
|
|
for k, v in override.items(): |
|
|
if k in result and isinstance(result[k], dict) and isinstance(v, dict): |
|
|
result[k] = merge_dicts(result[k], v) |
|
|
else: |
|
|
result[k] = v |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def set_seed(seed: int = 42): |
|
|
""" |
|
|
Set seed for reproducibility across random, numpy and torch. |
|
|
|
|
|
Args: |
|
|
seed: Integer seed value. |
|
|
""" |
|
|
random.seed(seed) |
|
|
np.random.seed(seed) |
|
|
torch.manual_seed(seed) |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.manual_seed_all(seed) |
|
|
logger.info(f"Random seed set to {seed}") |
|
|
|
|
|
|
|
|
|
|
|
def timeit(func): |
|
|
""" |
|
|
Decorator to measure and log function execution time. |
|
|
|
|
|
Usage: |
|
|
@timeit |
|
|
def my_function(...): |
|
|
... |
|
|
""" |
|
|
def wrapper(*args, **kwargs): |
|
|
start = time.time() |
|
|
result = func(*args, **kwargs) |
|
|
end = time.time() |
|
|
logger.info(f"Function {func.__name__!r} executed in {(end - start):.4f}s") |
|
|
return result |
|
|
return wrapper |
|
|
|
|
|
|
|
|
|
|
|
def ensure_dir(dirname: str): |
|
|
""" |
|
|
Creates directory if it does not exist. |
|
|
|
|
|
Args: |
|
|
dirname: Directory path to create. |
|
|
""" |
|
|
if not os.path.exists(dirname): |
|
|
os.makedirs(dirname) |
|
|
logger.debug(f"Directory created: {dirname}") |
|
|
|
|
|
def to_device(batch: dict, device: torch.device) -> dict: |
|
|
""" |
|
|
Moves all tensor elements in batch dict to specified device. |
|
|
|
|
|
Args: |
|
|
batch: Dictionary with tensors. |
|
|
device: Target torch device. |
|
|
|
|
|
Returns: |
|
|
Dictionary with tensors on device. |
|
|
""" |
|
|
return {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
set_seed(1234) |
|
|
logger.info("This is a test log message.") |
|
|
|
|
|
|
|
|
base_cfg = {"model": {"layers": 12, "embed_dim": 256}, "training": {"batch_size": 32}} |
|
|
override_cfg = {"model": {"layers": 24}, "training": {"learning_rate": 0.001}} |
|
|
|
|
|
merged_cfg = merge_dicts(base_cfg, override_cfg) |
|
|
logger.info(f"Merged config: {merged_cfg}") |
|
|
|
|
|
|
|
|
test_dir = "./tmp_test_dir" |
|
|
ensure_dir(test_dir) |
|
|
|
|
|
|
|
|
@timeit |
|
|
def dummy_work(): |
|
|
import time; time.sleep(0.5) |
|
|
|
|
|
dummy_work() |
|
|
|
|
|
|