|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
Module contains events processing mechanisms that are integrated with the standard python logging. |
|
|
|
|
|
Example of usage: |
|
|
|
|
|
:: |
|
|
|
|
|
from torch.distributed.elastic import events |
|
|
event = events.Event(name="test_event", source=events.EventSource.WORKER, metadata={...}) |
|
|
events.get_logging_handler(destination="console").info(event) |
|
|
|
|
|
""" |
|
|
|
|
|
import inspect |
|
|
import logging |
|
|
import os |
|
|
import socket |
|
|
import traceback |
|
|
from enum import Enum |
|
|
from typing import Dict, Optional |
|
|
|
|
|
from torch.distributed.elastic.events.handlers import get_logging_handler |
|
|
|
|
|
from .api import ( |
|
|
Event, |
|
|
EventMetadataValue, |
|
|
EventSource, |
|
|
NodeState, |
|
|
RdzvEvent, |
|
|
) |
|
|
|
|
|
_events_loggers: Dict[str, logging.Logger] = {} |
|
|
|
|
|
def _get_or_create_logger(destination: str = "null") -> logging.Logger: |
|
|
""" |
|
|
Constructs python logger based on the destination type or extends if provided. |
|
|
Available destination could be found in ``handlers.py`` file. |
|
|
The constructed logger does not propagate messages to the upper level loggers, |
|
|
e.g. root logger. This makes sure that a single event can be processed once. |
|
|
|
|
|
Args: |
|
|
destination: The string representation of the event handler. |
|
|
Available handlers found in ``handlers`` module |
|
|
""" |
|
|
global _events_loggers |
|
|
|
|
|
if destination not in _events_loggers: |
|
|
_events_logger = logging.getLogger(f"torchelastic-events-{destination}") |
|
|
_events_logger.setLevel(os.environ.get("LOGLEVEL", "INFO")) |
|
|
|
|
|
_events_logger.propagate = False |
|
|
|
|
|
logging_handler = get_logging_handler(destination) |
|
|
_events_logger.addHandler(logging_handler) |
|
|
|
|
|
|
|
|
_events_loggers[destination] = _events_logger |
|
|
|
|
|
return _events_loggers[destination] |
|
|
|
|
|
|
|
|
def record(event: Event, destination: str = "null") -> None: |
|
|
_get_or_create_logger(destination).info(event.serialize()) |
|
|
|
|
|
def record_rdzv_event(event: RdzvEvent) -> None: |
|
|
_get_or_create_logger("dynamic_rendezvous").info(event.serialize()) |
|
|
|
|
|
|
|
|
def construct_and_record_rdzv_event( |
|
|
run_id: str, |
|
|
message: str, |
|
|
node_state: NodeState, |
|
|
name: str = "", |
|
|
hostname: str = "", |
|
|
pid: Optional[int] = None, |
|
|
master_endpoint: str = "", |
|
|
local_id: Optional[int] = None, |
|
|
rank: Optional[int] = None, |
|
|
) -> None: |
|
|
|
|
|
if isinstance(get_logging_handler("dynamic_rendezvous"), logging.NullHandler): |
|
|
return |
|
|
|
|
|
|
|
|
if not hostname: |
|
|
hostname = socket.getfqdn() |
|
|
if not pid: |
|
|
pid = os.getpid() |
|
|
|
|
|
|
|
|
callstack = inspect.stack() |
|
|
filename = "no_file" |
|
|
if len(callstack) > 1: |
|
|
stack_depth_1 = callstack[1] |
|
|
filename = os.path.basename(stack_depth_1.filename) |
|
|
if not name: |
|
|
name = stack_depth_1.function |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
del callstack |
|
|
|
|
|
|
|
|
if node_state == NodeState.FAILED: |
|
|
error_trace = traceback.format_exc() |
|
|
else: |
|
|
error_trace = "" |
|
|
|
|
|
|
|
|
event = RdzvEvent( |
|
|
name=f"{filename}:{name}", |
|
|
run_id=run_id, |
|
|
message=message, |
|
|
hostname=hostname, |
|
|
pid=pid, |
|
|
node_state=node_state, |
|
|
master_endpoint=master_endpoint, |
|
|
rank=rank, |
|
|
local_id=local_id, |
|
|
error_trace=error_trace, |
|
|
) |
|
|
|
|
|
|
|
|
record_rdzv_event(event) |
|
|
|