File size: 3,933 Bytes
9dd3461 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | #!/usr/bin/env/python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Module contains events processing mechanisms that are integrated with the standard python logging.
Example of usage:
::
from torch.distributed.elastic import events
event = events.Event(name="test_event", source=events.EventSource.WORKER, metadata={...})
events.get_logging_handler(destination="console").info(event)
"""
import inspect
import logging
import os
import socket
import traceback
from enum import Enum
from typing import Dict, Optional
from torch.distributed.elastic.events.handlers import get_logging_handler
from .api import ( # noqa: F401
Event,
EventMetadataValue,
EventSource,
NodeState,
RdzvEvent,
)
_events_loggers: Dict[str, logging.Logger] = {}
def _get_or_create_logger(destination: str = "null") -> logging.Logger:
"""
Constructs python logger based on the destination type or extends if provided.
Available destination could be found in ``handlers.py`` file.
The constructed logger does not propagate messages to the upper level loggers,
e.g. root logger. This makes sure that a single event can be processed once.
Args:
destination: The string representation of the event handler.
Available handlers found in ``handlers`` module
"""
global _events_loggers
if destination not in _events_loggers:
_events_logger = logging.getLogger(f"torchelastic-events-{destination}")
_events_logger.setLevel(os.environ.get("LOGLEVEL", "INFO"))
# Do not propagate message to the root logger
_events_logger.propagate = False
logging_handler = get_logging_handler(destination)
_events_logger.addHandler(logging_handler)
# Add the logger to the global dictionary
_events_loggers[destination] = _events_logger
return _events_loggers[destination]
def record(event: Event, destination: str = "null") -> None:
_get_or_create_logger(destination).info(event.serialize())
def record_rdzv_event(event: RdzvEvent) -> None:
_get_or_create_logger("dynamic_rendezvous").info(event.serialize())
def construct_and_record_rdzv_event(
run_id: str,
message: str,
node_state: NodeState,
name: str = "",
hostname: str = "",
pid: Optional[int] = None,
master_endpoint: str = "",
local_id: Optional[int] = None,
rank: Optional[int] = None,
) -> None:
# We don't want to perform an extra computation if not needed.
if isinstance(get_logging_handler("dynamic_rendezvous"), logging.NullHandler):
return
# Set up parameters.
if not hostname:
hostname = socket.getfqdn()
if not pid:
pid = os.getpid()
# Determines which file called this function.
callstack = inspect.stack()
filename = "no_file"
if len(callstack) > 1:
stack_depth_1 = callstack[1]
filename = os.path.basename(stack_depth_1.filename)
if not name:
name = stack_depth_1.function
# Delete the callstack variable. If kept, this can mess with python's
# garbage collector as we are holding on to stack frame information in
# the inspect module.
del callstack
# Set up error trace if this is an exception
if node_state == NodeState.FAILED:
error_trace = traceback.format_exc()
else:
error_trace = ""
# Initialize event object
event = RdzvEvent(
name=f"{filename}:{name}",
run_id=run_id,
message=message,
hostname=hostname,
pid=pid,
node_state=node_state,
master_endpoint=master_endpoint,
rank=rank,
local_id=local_id,
error_trace=error_trace,
)
# Finally, record the event.
record_rdzv_event(event)
|