Source code for pyhopper.utils

# Copyright 2022 Mathias Lechner and the PyHopper team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gzip
import os
import pickle
from enum import Enum
from typing import Union
import numpy as np


[docs]class ParamInfo: """Holds auxiliary information about a parameter candidate Attributes: ``type`` Enum specifying how the parameter was sampled (valid values are pyhopper.CandidateType.INIT, pyhopper.CandidateType.MANUALLY_ADDED, pyhopper.CandidateType.RANDOM_SEEDING, pyhopper.CandidateType.LOCAL_SAMPLING). ``sampled_at`` UNIX epoch timestamp when the parameter candidate was sampled. ``finished_at`` UNIX epoch timestamp when the evaluation of the candidate was finished. ``is_pruned`` Bool indicating if the candidate was pruned. """ type = None sampled_at = None finished_at = None is_pruned = False is_nan = False def __init__(self, candidate_type, sampled_at): self.type = candidate_type self.sampled_at = sampled_at
class WrappedSample: def __init__(self, value, aux): self.value = value self.aux = aux def unwrap_sample(sample): if isinstance(sample, WrappedSample): return unwrap_sample(sample.value) elif isinstance(sample, dict): return {k: unwrap_sample(v) for k, v in sample.items()} elif isinstance(sample, list): return [unwrap_sample(v) for v in sample] else: return sample class Candidate: def __init__(self, value): self.value = value self.unwrapped_value = unwrap_sample(value)
[docs]def merge_dicts(*args): """Merges multiple dictionaries (``dict``s) into a single dictionary. Raises ValueError if a key is contained in two dicts with different values. :param args: Sequence of ``dict``s that :return: The merged ``dict`` """ new_dict = {} for d in args: for k, v in d.items(): if k in new_dict.keys() and new_dict[k] != v: raise ValueError( f"Could not merge dicts! The key '{k}' is contained in multiple dictionaries with different values" ) new_dict[k] = v return new_dict
def convert_to_list(list_or_obj): if list_or_obj is None: return [] if not isinstance(list_or_obj, list): # Convert single callback object to a list of size 1 list_or_obj = [list_or_obj] return list_or_obj def store_dict(filename, obj): with gzip.open(filename, "wb") as f: pickle.dump(obj, f) def load_dict(filename): with gzip.open(filename, "rb") as f: obj = pickle.load(f) return obj def convert_to_checkpoint_path(checkpoint_path): actual_path = checkpoint_path if os.path.isdir(checkpoint_path): for i in range(100000): actual_path = os.path.join(checkpoint_path, f"pyhopper_run_{i:05d}.ckpt") if not os.path.isfile(actual_path): break return actual_path class CandidateType(Enum): INIT = 0 MANUALLY_ADDED = 1 RANDOM_SEEDING = 2 LOCAL_SAMPLING = 3 class NTimesEvaluator: def __init__(self, func, n, yield_after=0, reduction="mean", pass_index_arg=False): if n <= 0: raise ValueError(f"n must be > 0, but got {n}") if yield_after is not None and yield_after >= n: raise ValueError( f"'yield_after' must be less than 'n', but got {yield_after} and {n}" ) if callable(reduction): self._reduction = reduction elif reduction == "mean": self._reduction = np.mean elif reduction in ["med", "median"]: self._reduction = np.median else: raise ValueError( f"Unknown reduction '{reduction}'. Pass either 'mean','median', or a callable" ) self._pass_index_arg = pass_index_arg self._n = n self._yield_after = yield_after self._func = func def __call__(self, param, **kwargs): results = [] for i in range(self._n): if self._pass_index_arg: r = self._func(param, i, **kwargs) else: r = self._func(param, **kwargs) if r is None: raise ValueError( f"Objective function returned None. The probably means you forgot to add a 'return' statement at " f"the end of the function " ) results.append(float(r)) if self._yield_after is not None and i >= self._yield_after: yield self._reduction(results) if self._yield_after is None: return self._reduction(results) def _contains_number(text): for c in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]: if c in text: return True return False def parse_runtime(runtime: Union[int, float, str]): if isinstance(runtime, float) or isinstance(runtime, int): return runtime orig_runtime = runtime if " " in runtime: # 5d 1h or 5d 1:0:0 pattern parts = runtime.split(" ") merged_parts = [parts[0]] for i in range(1, len(parts)): if not _contains_number(parts[i]): merged_parts[-1] += parts[i] else: merged_parts.append(parts[i]) total_time = 0 for part in merged_parts: total_time += parse_runtime(part) return total_time elif ":" in runtime: # h:m:s or m:s pattern parts = runtime.split(":") total_time = 0 for part in parts: total_time *= 60 total_time += int(part) return total_time elif "w" in runtime: runtime = runtime.replace("weeks", "").replace("week", "").replace("w", "") if runtime.strip() == "": raise ValueError( f"Could not parse substring '{orig_runtime}' while attempting to parse weeks in runtime-string. " ) return int(runtime) * 60 * 60 * 24 * 7 elif "d" in runtime: runtime = runtime.replace("days", "").replace("day", "").replace("d", "") if runtime.strip() == "": raise ValueError( f"Could not parse substring '{orig_runtime}' while attempting to parse days in runtime-string. " ) return int(runtime) * 60 * 60 * 24 elif "h" in runtime: runtime = runtime.replace("hours", "").replace("hour", "").replace("h", "") if runtime.strip() == "": raise ValueError( f"Could not parse substring '{orig_runtime}' while attempting to parse hours in runtime-string. " ) return int(runtime) * 60 * 60 elif "m" in runtime: runtime = ( runtime.replace("minutes", "") .replace("minute", "") .replace("mins", "") .replace("min", "") .replace("m", "") ) if runtime.strip() == "": raise ValueError( f"Could not parse substring '{orig_runtime}' while attempting to parse minutes in runtime-string. " ) return int(runtime) * 60 else: runtime = ( runtime.replace("seconds", "") .replace("second", "") .replace("secs", "") .replace("sec", "") .replace("s", "") ) if runtime.strip() == "": raise ValueError( f"Could not parse substring '{orig_runtime}' while attempting to parse second in runtime-string. " ) return int(runtime) def sanitize_bounds(lb, ub): if lb is not None and ub is None: if np.any(lb <= 0): raise ValueError( "Cannot register parameter. If only a single bound is provided it is treated as upper bound and the " "lower bound defaults to 0, but the provided bound is negative. Providing both bounds. " ) ub = lb lb = 0 if lb is None and ub is not None: if np.any(ub <= 0): raise ValueError( "Cannot register parameter. If only a single bound is provided it is treated as upper bound and the " "lower bound defaults to 0, but the provided bound is negative. Providing both bounds. " ) lb = 0 if lb is not None and ub is not None: temp = np.minimum(lb, ub) ub = np.maximum(lb, ub) lb = temp return lb, ub def infer_shape(*args): # TODO: If there are multiple np.ndarray the shape should be the largest one while making sure the others are # broadcastable shape = None for v in args: if isinstance(v, np.ndarray): shape = v.shape return shape def steps_to_pretty_str(steps): if steps is None: return "-" if steps > 1e6: return f"{steps//1e6:0.0f}M" if steps > 1e3: return f"{steps//1e3:0.0f}k" return str(steps) def time_to_pretty_str(elapsed): if elapsed is None: return "-" seconds = elapsed % 60 elapsed = elapsed // 60 # now minutes minutes = int(elapsed % 60) elapsed = elapsed // 60 # now hours hours = int(elapsed % 24) days = int(elapsed // 24) if days == 1: # 1 day 03:39:01 (h:m:s) return f"{days:d} day {hours:02d}:{minutes:02d}:{seconds:02.0f} (h:m:s)" elif days > 1: # 3 days 03:39:01 (h:m:s) return f"{days:d} days {hours:02d}:{minutes:02d}:{seconds:02.0f} (h:m:s)" elif hours > 0: # 03:39:01 (h:m:s) return f"{hours:02d}:{minutes:02d}:{seconds:02.0f} (h:m:s)" elif minutes > 0: # 39:01 (m:s) return f"{minutes:02d}:{seconds:02.0f} (m:s)" elif seconds > 20: # 27s return f"{seconds:02.0f} s" elif seconds < 1: # 837ms return f"{1000*seconds:0.0f} ms" else: # 9.83s return f"{seconds:0.02f} s" if __name__ == "__main__": def print_t(inp): # print(f"{str(inp)} -> {parse_runtime(inp)}") print(f"assert parse_runtime('{str(inp)}') == {parse_runtime(inp)}") print_t(1723) print_t(934.0438) print_t("1d") print_t("1d 2h") print_t("2h 60min") print_t("1h 30min") print_t("1:30") print_t("1m 30s") print_t("1m 30sec") print_t("1min 30sec") print_t("1:1:30") print_t("1:01:30") print_t("1h 1m 30s") print(time_to_pretty_str(0.0004)) print(time_to_pretty_str(0.004)) print(time_to_pretty_str(0.04)) print(time_to_pretty_str(0.4)) print(time_to_pretty_str(4.4)) print(time_to_pretty_str(47.4)) print(time_to_pretty_str(474.4)) print(time_to_pretty_str(4746.4)) print(time_to_pretty_str(47467.4)) print(time_to_pretty_str(474678.4)) print(time_to_pretty_str(4746788.4)) print(time_to_pretty_str(47467888.4)) print(time_to_pretty_str(474678888.4))