# Copyright 2021 Mathias Lechner and the PyHopper team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import signal
import time
import os
from traceback import format_exception
from types import GeneratorType
import numpy as np
import sys
import subprocess
from xml.etree.ElementTree import fromstring
from .pruners.pruners import (
set_global_pruner,
should_prune,
get_intermediate_results_list,
)
from .utils import unwrap_sample
_signals_received = 0
class SignalListener:
def __init__(self):
self._sigterm_received = 0
self._force_quit_callback = None
self._gradual_quit_callback = None
self._original_sigint_handler = None
def signal_handler(self, sig, frame):
global _signals_received
_signals_received += 1
if self._sigterm_received == 0:
print(
"CTRL+C received. Will terminate once the currently running candidates finished"
)
elif self._sigterm_received == 1:
print(f"Will force termination on 2/3 signals")
else:
print(f"Terminate")
self._sigterm_received += 1
if self._sigterm_received >= 3:
if self._force_quit_callback is not None:
self._force_quit_callback()
else:
if self._gradual_quit_callback is not None:
self._gradual_quit_callback()
def register_signal(self, gradual_quit_callback, force_quit_callback):
self._gradual_quit_callback = gradual_quit_callback
self._force_quit_callback = force_quit_callback
self._original_sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self.signal_handler)
def unregister_signal(self):
signal.signal(signal.SIGINT, self._original_sigint_handler)
self._force_quit_callback = None
self._gradual_quit_callback = None
self._original_sigint_handler = None
self._sigterm_received = 0
[docs]class PruneEvaluation(Exception):
pass
def parse_nvidia_smi():
"""
Parse the IDs, names and compute mode of GPUs on the current machine
:return: A triple of a list of all device IDs, a dict mapping device ID to GPU name,
and a dict mapping device ID to a bool specifying if the device is configured in the Default compute mode.
returns (None,None,None) if the output of nvidia-smi cannot be parsed
"""
res = subprocess.run("nvidia-smi -L", shell=True, stdout=subprocess.PIPE)
res = res.stdout.decode("utf-8")
if not res.startswith("GPU"):
return None, None, None
lines = res.split("\n")
gpu_ids = []
gpu_names = {}
gpu_default_modes = {}
for line in lines:
if len(line.strip()) == 0:
continue
idx = line.find(":")
if idx <= 4:
raise ValueError("Could not parse output of 'nvidia-smi -L'")
gpu_id = line[4:idx]
gpu_ids.append(gpu_id)
idx2 = line.find("(UUID")
if idx2 <= idx + 3:
raise ValueError("Could not parse output of 'nvidia-smi -L'")
gpu_name = line[idx + 2 : idx2 - 1]
gpu_names[gpu_id] = gpu_name
gpu_default_modes[gpu_id] = True
# print(f"GPU [{gpu_id}]: {gpu_name} in mode {compute_mode}")
return gpu_ids, gpu_names, gpu_default_modes
# def parse_nvidia_smi():
# """
# Parse the IDs, names and compute mode of GPUs on the current machine
# :return: A triple of a list of all device IDs, a dict mapping device ID to GPU name,
# and a dict mapping device ID to a bool specifying if the device is configured in the Default compute mode.
# returns (None,None,None) if the output of nvidia-smi cannot be parsed
# """
# res = subprocess.run("nvidia-smi -q -x", shell=True, stdout=subprocess.PIPE)
# res = res.stdout.decode("utf-8")
# if not res.startswith("<?xml"):
# return None, None, None
# root = fromstring(res)
# gpu_ids = []
# gpu_names = {}
# gpu_default_modes = {}
# for gpu_node in root.findall("gpu"):
# gpu_id = gpu_node.find("minor_number").text
# gpu_ids.append(gpu_id)
# compute_mode = gpu_node.find("compute_mode").text
# gpu_name = gpu_node.find("product_name").text
# gpu_names[gpu_id] = gpu_name
# gpu_default_modes[gpu_id] = compute_mode == "Default"
# # print(f"GPU [{gpu_id}]: {gpu_name} in mode {compute_mode}")
# return gpu_ids, gpu_names, gpu_default_modes
def get_gpu_list():
gpu_ids, gpu_names, gpu_default_modes = parse_nvidia_smi()
if "CUDA_VISIBLE_DEVICES" in os.environ.keys():
# User specified which GPUs to use (could be a subset of the GPUs on the machine)
if gpu_ids is None:
print(
"Warning: Could not parse output of 'nvidia-smi'. This probably means you need to upgrade the nvidia "
"device drivers or reboot the machine. "
)
gpus_used = [s for s in os.environ["CUDA_VISIBLE_DEVICES"].split(",")]
else:
if gpu_ids is None:
raise ValueError(
"Error: Could not parse output of 'nvidia-smi'. Either specify the GPU ids via the "
"'CUDA_VISIBLE_DEVICES' environment variable, or upgrade the nvidia device drivers and reboot the "
"machine. "
)
# No explicit GPU defined -> let's use them all
gpus_used = gpu_ids
supports_multi_instance = True
if gpu_ids is not None:
for gpu in gpus_used:
if not gpu_default_modes[gpu]:
supports_multi_instance = False
return gpus_used, supports_multi_instance
class GPUAllocator:
def __init__(self, factor):
physical_gpus, supports_multi_instance = get_gpu_list()
if factor > 1 and not supports_multi_instance:
raise ValueError(
"Error: GPUs are configured in Process Exclusive mode. Cannot run multiple training processes per GPU."
)
virtual_gpus = []
for i in range(factor):
virtual_gpus.extend(physical_gpus)
self.num_gpus = len(virtual_gpus)
self._free_gpus = virtual_gpus
self._allocated = {} # dict future -> str
def free(self, future):
gpu = self._allocated[future]
self._free_gpus.append(gpu)
del self._allocated[future]
def pop_free(self):
gpu = self._free_gpus.pop()
return gpu
def alloc(self, future, gpu):
self._allocated[future] = gpu
class EvaluationResult:
def __init__(self):
self.value = None
self.was_pruned = None
self.is_nan = False
self.intermediate_results = None
self.error = None
def dummy_signal_handler(sig, frame):
global _signals_received
_signals_received += 1
# if _signals_received >= 3:
# time.sleep(1.0)
# os.kill()
def execute(objective_function, candidate, pruner, kwargs, remote=False, gpu=None):
"""
Wrapper function for the objective function that takes care of GPU allocation and the SIGINT signal handle
"""
if gpu is not None:
# GPU Mode -> Get environment variable to be consumed by PyTorch and TensorFlow
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
if remote:
signal.signal(signal.SIGINT, dummy_signal_handler)
# Remove index auxiliary variables before calling the objective function
candidate = candidate.unwrapped_value
set_global_pruner(pruner)
eval_result = EvaluationResult()
try:
iter_or_result = objective_function(candidate, **kwargs)
if iter_or_result is None:
raise ValueError(
"Objective function returned 'None'. This probably means you forgot to add a 'return' (or 'yield') "
"statement at the end of the function. If you intended to prune the evaluation, you can do that by "
"\n```\nraise pyhopper.PruneEvaluation()\n```\n"
)
if isinstance(iter_or_result, GeneratorType):
# Generator mode
repeat = True
while repeat:
try:
ir = next(iter_or_result)
eval_result.value = ir
if np.isnan(ir):
eval_result.is_nan = True
repeat = False
if should_prune(ir):
# Let's not continue from here on
eval_result.was_pruned = True
repeat = False
except StopIteration:
repeat = False
else:
# No iterator but a simple function
eval_result.value = iter_or_result
if np.isnan(iter_or_result):
eval_result.is_nan = True
except PruneEvaluation:
# If objective function raises this error, the evaluation will be treated as being pruned
eval_result.was_pruned = True
except:
etype, value, tb = sys.exc_info()
eval_result.error = "".join(format_exception(etype, value, tb, 4096))
eval_result.intermediate_results = get_intermediate_results_list()
return eval_result
def parse_factor(n_jobs):
# Remove all non-digits
only_digits = "".join(c for c in n_jobs if c.isdigit())
if len(only_digits) == 0:
return 1
return int(only_digits)
class TaskManager:
def __init__(self, n_jobs, mp_backend):
admissible_mp_backends = [
"auto",
"multiprocessing",
"dask",
"dask-cuda",
]
if mp_backend not in admissible_mp_backends:
raise ValueError(
f"Unknown multiprocessing backend '{mp_backend}'. Valid options are {str(admissible_mp_backends)}"
)
self._gpu_allocator = None
self._pending_candidates = []
self._pending_futures = []
if isinstance(n_jobs, str) and (
n_jobs.endswith("per_gpu") or n_jobs.endswith("per-gpu")
):
if mp_backend == "dask-cuda":
mp_backend = "dask-cuda"
elif mp_backend in ["auto", "multiprocessing"]:
mp_backend = "multiprocessing"
# workaround if dask-cuda does not work
factor = parse_factor(n_jobs)
self._gpu_allocator = GPUAllocator(factor)
n_jobs = self._gpu_allocator.num_gpus
self._queue_max = n_jobs
else:
raise ValueError(
f"Cannot use mp_backend ```{mp_backend}``` when using per_gpu mode. "
"Valid options are 'dask-cuda' and 'multiprocessing' ('auto' defaults to 'multiprocessing')"
)
else:
n_jobs = int(n_jobs)
if mp_backend == "auto":
mp_backend = "multiprocessing"
if n_jobs <= 0:
n_jobs = len(os.sched_getaffinity(0))
self._queue_max = n_jobs
if mp_backend == "dask-cuda":
# Experimental
try:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
except ImportError:
raise ValueError(
"Could not import cuda-dask. Make sure dask is installed ```pip3 install -U cuda-dask```. "
+ str(sys.exc_info()[0])
)
self._backend_FIRST_COMPLETED = "FIRST_COMPLETED"
self._backend_ALL_COMPLETED = "ALL_COMPLETED"
cluster = LocalCUDACluster()
# print("CUDA_VISIBLE_DEVICES:", cluster.cuda_visible_devices)
self._queue_max = len(cluster.cuda_visible_devices)
self._backend_task_executor = Client(cluster)
self._backend_wait_func = wait
n_jobs = -1
elif mp_backend == "multiprocessing":
import concurrent.futures
self._backend_wait_func = concurrent.futures.wait
self._backend_FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
self._backend_ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
self._backend_task_executor = concurrent.futures.ProcessPoolExecutor(
max_workers=n_jobs
)
elif mp_backend == "dask":
# Experimental
try:
from dask.distributed import Client, LocalCluster, wait
except ImportError:
raise ValueError(
"Could not import dask. Make sure dask is installed ```pip3 install -U dask[distributed]```."
+ str(sys.exc_info()[0])
)
self._backend_FIRST_COMPLETED = "FIRST_COMPLETED"
self._backend_ALL_COMPLETED = "ALL_COMPLETED"
cluster = LocalCluster(n_workers=n_jobs)
self._backend_task_executor = Client(cluster)
self._backend_wait_func = wait
else:
assert False, "This should never happen"
self._mp_backend = mp_backend
self.n_jobs = n_jobs
def shutdown(self):
self._backend_task_executor.shutdown(wait=False)
def submit(self, objective_function, candidate, param_info, pruner, kwargs):
gpu_arg = None
if self._gpu_allocator is not None:
# GPU Mode -> Get a free GPU
gpu_arg = self._gpu_allocator.pop_free()
res = self._backend_task_executor.submit(
execute,
objective_function,
candidate,
pruner,
kwargs,
True, # remote = True
gpu_arg,
)
if self._gpu_allocator is not None:
# GPU Mode -> Mark GPU as allocated by the future object
self._gpu_allocator.alloc(res, gpu_arg)
self._pending_futures.append(res)
self._pending_candidates.append((candidate, param_info))
def wait_for_first_to_complete(self):
if len(self._pending_futures) <= 0:
# Nothing to do
return
self._backend_wait_func(
self._pending_futures, return_when=self._backend_FIRST_COMPLETED
)
def wait_for_all_to_complete(self):
if len(self._pending_futures) <= 0:
# Nothing to do
return
self._backend_wait_func(
self._pending_futures, return_when=self._backend_ALL_COMPLETED
)
def iterate_done_tasks(self):
"""
Generator that gathers all finished tasks.
yields tuples of the form (type,param,runtime, result_f)
"""
i = 0
while i < len(self._pending_futures):
if self._pending_futures[i].done():
if self._gpu_allocator is not None:
# GPU Mode -> Mark GPU as freed
self._gpu_allocator.free(self._pending_futures[i])
candidate, param_info = self._pending_candidates[i]
param_info.finished_at = time.time()
future = self._pending_futures[i]
self._pending_futures.pop(i)
self._pending_candidates.pop(i)
yield candidate, param_info, future.result()
else:
i += 1
@property
def is_full(self):
return len(self._pending_futures) >= self._queue_max