| import logging |
| import os |
| import re |
| import socket |
| import sys |
| import time |
| import warnings |
| from datetime import datetime |
| from enum import Enum |
| from itertools import cycle, islice |
| from pathlib import Path |
| from queue import Queue |
| from threading import Thread |
| from typing import Any, Callable, Dict, Optional, Union |
|
|
| import boto3 |
| import botocore.exceptions as boto_exceptions |
| import rich |
| from botocore.config import Config |
| from rich.console import Console, ConsoleRenderable |
| from rich.highlighter import NullHighlighter |
| from rich.progress import Progress |
| from rich.text import Text |
| from rich.traceback import Traceback |
|
|
| from .aliases import PathOrStr |
| from .exceptions import ( |
| OLMoCliError, |
| OLMoEnvironmentError, |
| OLMoError, |
| OLMoNetworkError, |
| OLMoThreadError, |
| ) |
| from .torch_util import get_global_rank, get_local_rank, get_node_rank, is_distributed |
|
|
| try: |
| from functools import cache |
| except ImportError: |
| from functools import lru_cache as cache |
|
|
|
|
| class StrEnum(str, Enum): |
| """ |
| This is equivalent to Python's :class:`enum.StrEnum` since version 3.11. |
| We include this here for compatibility with older version of Python. |
| """ |
|
|
| def __str__(self) -> str: |
| return self.value |
|
|
| def __repr__(self) -> str: |
| return f"'{str(self)}'" |
|
|
|
|
| _log_extra_fields: Dict[str, Any] = {} |
| log = logging.getLogger(__name__) |
|
|
|
|
| class LogFilterType(StrEnum): |
| rank0_only = "rank0_only" |
| local_rank0_only = "local_rank0_only" |
| all_ranks = "all_ranks" |
|
|
|
|
| def log_extra_field(field_name: str, field_value: Any) -> None: |
| global _log_extra_fields |
| if field_value is None: |
| if field_name in _log_extra_fields: |
| del _log_extra_fields[field_name] |
| else: |
| _log_extra_fields[field_name] = field_value |
|
|
|
|
| def setup_logging(log_filter_type: LogFilterType = LogFilterType.rank0_only) -> None: |
| """ |
| :param rank0_only: INFO and below messages will only be emitted on the rank0 process. |
| """ |
| log_extra_field("hostname", socket.gethostname()) |
| if is_distributed(): |
| log_extra_field("node_rank", get_node_rank()) |
| log_extra_field("local_rank", get_local_rank()) |
| log_extra_field("global_rank", get_global_rank()) |
| else: |
| log_extra_field("node_rank", 0) |
| log_extra_field("local_rank", 0) |
| log_extra_field("global_rank", 0) |
|
|
| old_log_record_factory = logging.getLogRecordFactory() |
|
|
| def log_record_factory(*args, **kwargs) -> logging.LogRecord: |
| record = old_log_record_factory(*args, **kwargs) |
| for field_name, field_value in _log_extra_fields.items(): |
| setattr(record, field_name, field_value) |
| return record |
|
|
| logging.setLogRecordFactory(log_record_factory) |
|
|
| handler: logging.Handler |
| if ( |
| os.environ.get("OLMo_NONINTERACTIVE", False) |
| or os.environ.get("DEBIAN_FRONTEND", None) == "noninteractive" |
| or not sys.stdout.isatty() |
| ): |
| handler = logging.StreamHandler(sys.stdout) |
| formatter = logging.Formatter( |
| "%(asctime)s\t%(hostname)s:%(local_rank)s\t%(name)s:%(lineno)s\t%(levelname)s\t%(message)s" |
| ) |
| formatter.default_time_format = "%Y-%m-%d %H:%M:%S" |
| formatter.default_msec_format = "%s.%03d" |
| handler.setFormatter(formatter) |
| else: |
| handler = RichHandler() |
|
|
| def rank0_filter(record: logging.LogRecord) -> int: |
| if record.levelno > logging.INFO: |
| return 1 |
| if getattr(record, "global_rank", 0) == 0: |
| return 1 |
| else: |
| return 0 |
|
|
| def local_rank0_filter(record: logging.LogRecord) -> int: |
| if record.levelno > logging.INFO: |
| return 1 |
| if getattr(record, "local_rank", 0) == 0: |
| return 1 |
| else: |
| return 0 |
|
|
| if log_filter_type == LogFilterType.rank0_only: |
| filter = rank0_filter |
| elif log_filter_type == LogFilterType.local_rank0_only: |
| filter = local_rank0_filter |
| elif log_filter_type == LogFilterType.all_ranks: |
| filter = None |
| else: |
| raise ValueError(log_filter_type) |
|
|
| if filter is not None: |
| handler.addFilter(filter) |
| logging.basicConfig(handlers=[handler], level=logging.INFO) |
|
|
| logging.captureWarnings(True) |
| logging.getLogger("urllib3").setLevel(logging.ERROR) |
|
|
|
|
| def excepthook(exctype, value, traceback): |
| """ |
| Used to patch `sys.excepthook` in order to log exceptions. |
| """ |
| if issubclass(exctype, KeyboardInterrupt): |
| sys.__excepthook__(exctype, value, traceback) |
| elif issubclass(exctype, OLMoCliError): |
| rich.get_console().print(f"[yellow]{value}[/]", highlight=False) |
| elif issubclass(exctype, OLMoError): |
| rich.get_console().print(Text(f"{exctype.__name__}:", style="red"), value, highlight=False) |
| else: |
| log.critical("Uncaught %s: %s", exctype.__name__, value, exc_info=(exctype, value, traceback)) |
|
|
|
|
| def install_excepthook(): |
| sys.excepthook = excepthook |
|
|
|
|
| def filter_warnings(): |
| |
| warnings.filterwarnings( |
| action="ignore", |
| category=UserWarning, |
| message="torch.distributed.*_base is a private function and will be deprecated.*", |
| ) |
| warnings.filterwarnings( |
| action="ignore", |
| category=UserWarning, |
| message="TypedStorage is deprecated.*", |
| ) |
| warnings.filterwarnings( |
| action="ignore", |
| category=UserWarning, |
| message="Please use DTensor instead.*", |
| ) |
| |
| warnings.filterwarnings( |
| action="ignore", |
| message="failed to load.*", |
| module="torchvision.io.image", |
| ) |
|
|
|
|
| def set_env_variables(): |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
| def prepare_cli_environment(log_filter_type: Optional[LogFilterType] = None): |
| if log_filter_type is None: |
| log_filter_type = LogFilterType(os.environ.get("LOG_FILTER_TYPE", "rank0_only")) |
| rich.reconfigure(width=max(rich.get_console().width, 180), soft_wrap=True) |
| setup_logging(log_filter_type=log_filter_type) |
| install_excepthook() |
| filter_warnings() |
| set_env_variables() |
|
|
|
|
| def clean_opt(arg: str) -> str: |
| if "=" not in arg: |
| arg = f"{arg}=True" |
| name, val = arg.split("=", 1) |
| name = name.strip("-").replace("-", "_") |
| return f"{name}={val}" |
|
|
|
|
| class RichHandler(logging.Handler): |
| """ |
| A simplified version of rich.logging.RichHandler from |
| https://github.com/Textualize/rich/blob/master/rich/logging.py |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| level: Union[int, str] = logging.NOTSET, |
| console: Optional[Console] = None, |
| markup: bool = False, |
| ) -> None: |
| super().__init__(level=level) |
| self.console = console or rich.get_console() |
| self.highlighter = NullHighlighter() |
| self.markup = markup |
|
|
| def emit(self, record: logging.LogRecord) -> None: |
| try: |
| if hasattr(record.msg, "__rich__") or hasattr(record.msg, "__rich_console__"): |
| self.console.print(record.msg) |
| else: |
| msg: Any = record.msg |
| if isinstance(record.msg, str): |
| msg = self.render_message(record=record, message=record.getMessage()) |
| renderables = [ |
| self.get_time_text(record), |
| self.get_level_text(record), |
| self.get_location_text(record), |
| msg, |
| ] |
| if record.exc_info is not None: |
| tb = Traceback.from_exception(*record.exc_info) |
| renderables.append(tb) |
| self.console.print(*renderables) |
| except Exception: |
| self.handleError(record) |
|
|
| def render_message(self, *, record: logging.LogRecord, message: str) -> ConsoleRenderable: |
| use_markup = getattr(record, "markup", self.markup) |
| message_text = Text.from_markup(message) if use_markup else Text(message) |
|
|
| highlighter = getattr(record, "highlighter", self.highlighter) |
| if highlighter: |
| message_text = highlighter(message_text) |
|
|
| return message_text |
|
|
| def get_time_text(self, record: logging.LogRecord) -> Text: |
| log_time = datetime.fromtimestamp(record.created) |
| time_str = log_time.strftime("[%Y-%m-%d %X]") |
| return Text(time_str, style="log.time", end=" ") |
|
|
| def get_level_text(self, record: logging.LogRecord) -> Text: |
| level_name = record.levelname |
| level_text = Text.styled(level_name.ljust(8), f"logging.level.{level_name.lower()}") |
| level_text.style = "log.level" |
| level_text.end = " " |
| return level_text |
|
|
| def get_location_text(self, record: logging.LogRecord) -> Text: |
| name_and_line = f"{record.name}:{record.lineno}" if record.name != "root" else "root" |
| text = f"[{name_and_line}, rank={record.local_rank}]" |
| return Text(text, style="log.path") |
|
|
|
|
| def wait_for(condition: Callable[[], bool], description: str, timeout: float = 10.0): |
| """Wait for the condition function to return True.""" |
| start_time = time.monotonic() |
| while not condition(): |
| time.sleep(0.5) |
| if time.monotonic() - start_time > timeout: |
| raise TimeoutError(f"{description} timed out") |
|
|
|
|
| def is_url(path: PathOrStr) -> bool: |
| return re.match(r"[a-z0-9]+://.*", str(path)) is not None |
|
|
|
|
| def dir_is_empty(dir: PathOrStr) -> bool: |
| dir = Path(dir) |
| if not dir.is_dir(): |
| return True |
| try: |
| next(dir.glob("*")) |
| return False |
| except StopIteration: |
| return True |
|
|
|
|
| def get_progress_bar() -> Progress: |
| from cached_path import get_download_progress |
|
|
| return get_download_progress() |
|
|
|
|
| def resource_path( |
| folder: PathOrStr, fname: str, local_cache: Optional[PathOrStr] = None, progress: Optional[Progress] = None |
| ) -> Path: |
| if local_cache is not None and (local_path := Path(local_cache) / fname).is_file(): |
| log.info(f"Found local cache of {fname} at {local_path}") |
| return local_path |
| else: |
| from cached_path import cached_path |
|
|
| return cached_path(f"{str(folder).rstrip('/')}/{fname}", progress=progress) |
|
|
|
|
| def file_size(path: PathOrStr) -> int: |
| """ |
| Get the size of a local or remote file in bytes. |
| """ |
| if is_url(path): |
| from urllib.parse import urlparse |
|
|
| parsed = urlparse(str(path)) |
| if parsed.scheme == "gs": |
| return _gcs_file_size(parsed.netloc, parsed.path.strip("/")) |
| elif parsed.scheme in ("s3", "r2"): |
| return _s3_file_size(parsed.scheme, parsed.netloc, parsed.path.strip("/")) |
| elif parsed.scheme == "file": |
| return file_size(str(path).replace("file://", "", 1)) |
| else: |
| raise NotImplementedError(f"file size not implemented for '{parsed.scheme}' files") |
| else: |
| return os.stat(path).st_size |
|
|
|
|
| def upload(source: PathOrStr, target: str, save_overwrite: bool = False): |
| """Upload source file to a target location on GCS or S3.""" |
| from urllib.parse import urlparse |
|
|
| source = Path(source) |
| assert source.is_file() |
| parsed = urlparse(target) |
| if parsed.scheme == "gs": |
| _gcs_upload(source, parsed.netloc, parsed.path.strip("/"), save_overwrite=save_overwrite) |
| elif parsed.scheme in ("s3", "r2"): |
| _s3_upload(source, parsed.scheme, parsed.netloc, parsed.path.strip("/"), save_overwrite=save_overwrite) |
| else: |
| raise NotImplementedError(f"Upload not implemented for '{parsed.scheme}' scheme") |
|
|
|
|
| def get_bytes_range(source: PathOrStr, bytes_start: int, num_bytes: int) -> bytes: |
| if is_url(source): |
| from urllib.parse import urlparse |
|
|
| parsed = urlparse(str(source)) |
| if parsed.scheme == "gs": |
| return _gcs_get_bytes_range(parsed.netloc, parsed.path.strip("/"), bytes_start, num_bytes) |
| elif parsed.scheme in ("s3", "r2"): |
| return _s3_get_bytes_range( |
| parsed.scheme, parsed.netloc, parsed.path.strip("/"), bytes_start, num_bytes |
| ) |
| elif parsed.scheme == "file": |
| return get_bytes_range(str(source).replace("file://", "", 1), bytes_start, num_bytes) |
| else: |
| raise NotImplementedError(f"file size not implemented for '{parsed.scheme}' files") |
| else: |
| with open(source, "rb") as f: |
| f.seek(bytes_start) |
| return f.read(num_bytes) |
|
|
|
|
| def find_latest_checkpoint(dir: PathOrStr) -> Optional[PathOrStr]: |
| if is_url(dir): |
| from urllib.parse import urlparse |
|
|
| parsed = urlparse(str(dir)) |
| if parsed.scheme == "gs": |
| raise NotImplementedError |
| elif parsed.scheme in ("s3", "r2"): |
| return _s3_find_latest_checkpoint(parsed.scheme, parsed.netloc, parsed.path.strip("/")) |
| elif parsed.scheme == "file": |
| return find_latest_checkpoint(str(dir).replace("file://", "", 1)) |
| else: |
| raise NotImplementedError(f"find_latest_checkpoint not implemented for '{parsed.scheme}' files") |
| else: |
| latest_step = 0 |
| latest_checkpoint: Optional[Path] = None |
| for path in Path(dir).glob("step*"): |
| if path.is_dir(): |
| try: |
| step = int(path.name.replace("step", "").replace("-unsharded", "")) |
| except ValueError: |
| continue |
| |
| if step > latest_step or (step == latest_step and not path.name.endswith("-unsharded")): |
| latest_step = step |
| latest_checkpoint = path |
| return latest_checkpoint |
|
|
|
|
| def _gcs_upload(source: Path, bucket_name: str, key: str, save_overwrite: bool = False): |
| from google.cloud import storage as gcs |
|
|
| storage_client = gcs.Client() |
| bucket = storage_client.bucket(bucket_name) |
| blob = bucket.blob(key) |
| if not save_overwrite and blob.exists(): |
| raise FileExistsError(f"gs://{bucket_name}/{key} already exists. Use save_overwrite to overwrite it.") |
| blob.upload_from_filename(source) |
|
|
|
|
| def _gcs_file_size(bucket_name: str, key: str) -> int: |
| from google.api_core.exceptions import NotFound |
| from google.cloud import storage as gcs |
|
|
| storage_client = gcs.Client() |
| bucket = storage_client.bucket(bucket_name) |
| blob = bucket.blob(key) |
| try: |
| blob.reload() |
| except NotFound: |
| raise FileNotFoundError(f"gs://{bucket_name}/{key}") |
| assert blob.size is not None |
| return blob.size |
|
|
|
|
| def _gcs_get_bytes_range(bucket_name: str, key: str, bytes_start: int, num_bytes: int) -> bytes: |
| from google.api_core.exceptions import NotFound |
| from google.cloud import storage as gcs |
|
|
| storage_client = gcs.Client() |
| bucket = storage_client.bucket(bucket_name) |
| blob = bucket.blob(key) |
| try: |
| blob.reload() |
| except NotFound: |
| raise FileNotFoundError(f"gs://{bucket_name}/{key}") |
| return blob.download_as_bytes(start=bytes_start, end=bytes_start + num_bytes - 1) |
|
|
|
|
| def _get_s3_profile_name(scheme: str) -> Optional[str]: |
| if scheme == "s3": |
| |
| return os.environ.get("S3_PROFILE") |
| if scheme == "r2": |
| profile_name = os.environ.get("R2_PROFILE") |
| if profile_name is None: |
| raise OLMoEnvironmentError( |
| "R2 profile name is not set. Did you forget to set the 'R2_PROFILE' env var?" |
| ) |
|
|
| return profile_name |
|
|
| raise NotImplementedError(f"Cannot get profile name for scheme {scheme}") |
|
|
|
|
| def _get_s3_endpoint_url(scheme: str) -> Optional[str]: |
| if scheme == "s3": |
| return None |
| if scheme == "r2": |
| r2_endpoint_url = os.environ.get("R2_ENDPOINT_URL") |
| if r2_endpoint_url is None: |
| raise OLMoEnvironmentError( |
| "R2 endpoint url is not set. Did you forget to set the 'R2_ENDPOINT_URL' env var?" |
| ) |
|
|
| return r2_endpoint_url |
|
|
| raise NotImplementedError(f"Cannot get endpoint url for scheme {scheme}") |
|
|
|
|
| @cache |
| def _get_s3_client(scheme: str): |
| session = boto3.Session(profile_name=_get_s3_profile_name(scheme)) |
| return session.client( |
| "s3", |
| endpoint_url=_get_s3_endpoint_url(scheme), |
| config=Config(retries={"max_attempts": 10, "mode": "standard"}), |
| use_ssl=not int(os.environ.get("OLMO_NO_SSL", "0")), |
| ) |
|
|
|
|
| def _wait_before_retry(attempt: int): |
| time.sleep(min(0.5 * 2**attempt, 3.0)) |
|
|
|
|
| def _s3_upload( |
| source: Path, scheme: str, bucket_name: str, key: str, save_overwrite: bool = False, max_attempts: int = 3 |
| ): |
| err: Optional[Exception] = None |
| if not save_overwrite: |
| for attempt in range(1, max_attempts + 1): |
| try: |
| _get_s3_client(scheme).head_object(Bucket=bucket_name, Key=key) |
| raise FileExistsError( |
| f"s3://{bucket_name}/{key} already exists. Use save_overwrite to overwrite it." |
| ) |
| except boto_exceptions.ClientError as e: |
| if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
| err = None |
| break |
| err = e |
|
|
| if attempt < max_attempts: |
| log.warning("%s failed attempt %d with retriable error: %s", _s3_upload.__name__, attempt, err) |
| _wait_before_retry(attempt) |
|
|
| if err is not None: |
| raise OLMoNetworkError(f"Failed to check object existence during {scheme} upload") from err |
|
|
| try: |
| _get_s3_client(scheme).upload_file(source, bucket_name, key) |
| except boto_exceptions.ClientError as e: |
| raise OLMoNetworkError(f"Failed to upload to {scheme}") from e |
|
|
|
|
| def _s3_file_size(scheme: str, bucket_name: str, key: str, max_attempts: int = 3) -> int: |
| err: Optional[Exception] = None |
| for attempt in range(1, max_attempts + 1): |
| try: |
| return _get_s3_client(scheme).head_object(Bucket=bucket_name, Key=key)["ContentLength"] |
| except boto_exceptions.ClientError as e: |
| if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
| raise FileNotFoundError(f"s3://{bucket_name}/{key}") from e |
| err = e |
|
|
| if attempt < max_attempts: |
| log.warning("%s failed attempt %d with retriable error: %s", _s3_file_size.__name__, attempt, err) |
| _wait_before_retry(attempt) |
|
|
| raise OLMoNetworkError(f"Failed to get {scheme} file size") from err |
|
|
|
|
| def _s3_get_bytes_range( |
| scheme: str, bucket_name: str, key: str, bytes_start: int, num_bytes: int, max_attempts: int = 3 |
| ) -> bytes: |
| err: Optional[Exception] = None |
| for attempt in range(1, max_attempts + 1): |
| try: |
| return ( |
| _get_s3_client(scheme) |
| .get_object( |
| Bucket=bucket_name, Key=key, Range=f"bytes={bytes_start}-{bytes_start + num_bytes - 1}" |
| )["Body"] |
| .read() |
| ) |
| except boto_exceptions.ClientError as e: |
| if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404: |
| raise FileNotFoundError(f"{scheme}://{bucket_name}/{key}") from e |
| err = e |
| except (boto_exceptions.HTTPClientError, boto_exceptions.ConnectionError) as e: |
| |
| |
| |
| err = e |
|
|
| if attempt < max_attempts: |
| log.warning( |
| "%s failed attempt %d with retriable error: %s", _s3_get_bytes_range.__name__, attempt, err |
| ) |
| _wait_before_retry(attempt) |
|
|
| |
| |
| |
| |
| |
| |
| |
| raise OLMoNetworkError(f"Failed to get bytes range from {scheme}") from err |
|
|
|
|
| def _s3_find_latest_checkpoint(scheme: str, bucket_name: str, prefix: str) -> Optional[str]: |
| if not prefix.endswith("/"): |
| prefix = f"{prefix}/" |
| response = _get_s3_client(scheme).list_objects(Bucket=bucket_name, Prefix=prefix, Delimiter="/") |
| assert not response["IsTruncated"] |
| latest_step = 0 |
| latest_checkpoint: Optional[str] = None |
| for item in response["CommonPrefixes"]: |
| prefix = item["Prefix"].strip("/") |
| checkpoint_name = os.path.split(prefix)[-1] |
| if not checkpoint_name.startswith("step"): |
| continue |
| try: |
| step = int(checkpoint_name.replace("step", "").replace("-unsharded", "")) |
| except ValueError: |
| continue |
| |
| |
| try: |
| _s3_file_size(scheme, bucket_name, f"{prefix}/config.yaml") |
| except FileNotFoundError: |
| continue |
| |
| if step > latest_step or (step == latest_step and not checkpoint_name.endswith("-unsharded")): |
| latest_step = step |
| latest_checkpoint = f"{scheme}://ai2-llm/{prefix}" |
| return latest_checkpoint |
|
|
|
|
| def default_thread_count() -> int: |
| return int(os.environ.get("OLMO_NUM_THREADS") or min(32, (os.cpu_count() or 1) + 4)) |
|
|
|
|
| def pass_through_fn(fn, *args, **kwargs): |
| return fn(*args, **kwargs) |
|
|
|
|
| def threaded_generator(g, maxsize: int = 16, thread_name: Optional[str] = None): |
| q: Queue = Queue(maxsize=maxsize) |
|
|
| sentinel = object() |
|
|
| def fill_queue(): |
| try: |
| for value in g: |
| q.put(value) |
| except Exception as e: |
| q.put(e) |
| finally: |
| q.put(sentinel) |
|
|
| thread_name = thread_name or repr(g) |
| thread = Thread(name=thread_name, target=fill_queue, daemon=True) |
| thread.start() |
|
|
| for x in iter(q.get, sentinel): |
| if isinstance(x, Exception): |
| raise OLMoThreadError(f"generator thread {thread_name} failed") from x |
| else: |
| yield x |
|
|
|
|
| def roundrobin(*iterables): |
| """ |
| Call the given iterables in a round-robin fashion. For example: |
| ``roundrobin('ABC', 'D', 'EF') --> A D E B F C`` |
| """ |
| |
| num_active = len(iterables) |
| nexts = cycle(iter(it).__next__ for it in iterables) |
| while num_active: |
| try: |
| for next in nexts: |
| yield next() |
| except StopIteration: |
| |
| num_active -= 1 |
| nexts = cycle(islice(nexts, num_active)) |
|
|