Source code for pyhopper.search

# Copyright 2022 Mathias Lechner and the PyHopper team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os.path

import pyhopper
from .cache import EvaluationCache
from .callbacks import History
from .callbacks.callbacks import CheckpointCallback
from .parameters import (
    FloatParameter,
    IntParameter,
    ChoiceParameter,
    CustomParameter,
    Parameter,
    PowerOfIntParameter,
    LogSpaceFloatParameter,
)
from .parallel import execute, TaskManager, SignalListener
import numpy as np
from typing import Union, Optional, Any, Tuple, Sequence
from types import FunctionType
from enum import Enum
import time

from .run_context import ScheduledRun, RunContext
from .utils import (
    parse_runtime,
    sanitize_bounds,
    infer_shape,
    time_to_pretty_str,
    steps_to_pretty_str,
    ParamInfo,
    CandidateType,
    merge_dicts,
    convert_to_list,
    convert_to_checkpoint_path,
    load_dict,
    store_dict,
    unwrap_sample,
    WrappedSample,
    Candidate,
)


# def register_conditional(*args, **kwargs) -> ConditionalParameter:
#     """Creates a new conditional parameter similar to ```pyhopper.choice``` but allows nested configuration spaces.
#     Different from ```pyhopper.choice``` each case is a key-value pair instead of just a value,
#     with the key being the name of the case
#
#     .. Warning::
#         Conditional parameters are an experimental feature of PyHopper and may be unstable and subject to changes in the future
#
#     :param *args: A single ```dict``` object or empty if keyword arguments are used
#     :param **kwargs: Keyword arguments that correspond to the different cases
#
#     Examples::
#
#         >>> search = pyhopper.Search(
#         >>>   cond_param = pyhopper.cases(
#         >>>     case1="abc",
#         >>>     other_case=pyhopper.int(0,10),
#         >>>     third_case=["xyz",pyhopper.int(-10,0)])
#         >>> )
#         >>> # Generates samples
#         >>> # {'cond_param': ('case1', 'abc')}
#         >>> # {'cond_param': ('other_case', 0)}
#         >>> # {'cond_param': ('other_case', 3)}
#         >>> # {'cond_param': ('case1', 'abc')}
#         >>> # {'cond_param': ('third_case', ['xyz', -2])}
#         >>> # {'cond_param': ('third_case', ['xyz', -9])}
#
#     The conditional parameter above may be then used in an objective function as a pair:
#
#     Examples::
#
#     >>> def of(params):
#     >>>   # params["cond_param"] is a pair
#     >>>   if params["cond_param"][0] == "case1":
#     >>>      # params["cond_param"][1] == "abc"
#     >>>      # do x
#     >>>   elif params["cond_param"][0] == "other_case":
#     >>>     # params["cond_param"][1] is a random integer between 0 and 1
#     >>>     # do y
#     >>>   else:
#     >>>     # params["cond_param"][0] is "third_case":
#     >>>     # params["cond_param"][1] is a list
#     >>>     # do z
#
#     Conditional search spaces might be useful for hyperparameter choices that depend on other choice outcomes, for instance:
#
#     Examples::
#
#         >>> search = pyhopper.Search(
#         >>>   optimizer = pyhopper.cases(
#         >>>     sgd = {"lr": pyhopper.float(1e-5,1e-2)},
#         >>>     nesterov = {"lr": pyhopper.float(1e-5,1e-2), "momentum": pyhopper.float(0.01,0.99)},
#         >>>     adam = {"lr": pyhopper.float(1e-5,1e-2), "beta": pyhopper.float(0.1,0.2)},
#         >>> )
#
#     """
#
#     if len(args) > 0 and len(kwargs) > 0:
#         raise ValueError("Cannot specify unnamed and named arguments at the same time.")
#     if len(args) > 1:
#         raise ValueError(
#             "Argument must be a single dictionary object containing the cases"
#         )
#     if len(args) == 1:
#         kwargs = args[0]
#     param = ConditionalParameter(kwargs)
#     return param


def register_int(
    lb: Optional[Union[int, float, np.ndarray]] = None,
    ub: Optional[Union[int, float, np.ndarray]] = None,
    init: Optional[Union[int, float, np.ndarray]] = None,
    multiple_of: Optional[int] = None,
    power_of: Optional[int] = None,
    shape: Optional[Union[int, Tuple]] = None,
    seeding_fn: Optional[callable] = None,
    mutation_fn: Optional[callable] = None,
) -> IntParameter:
    """Creates a new integer parameter (both lower and upper bounds are **inclusive**)

    Examples::

        >>> pyhopper.int(10) # uniform distribution [0,10] (including 0 and 10 as valid values)
        >>> pyhopper.int(-10, 10) # uniform distribution [-10,10]
        >>> pyhopper.int(100,500, multiple_of=100) # quantized to 100 increments (100,200,300,400,500)
        >>> pyhopper.int(8,64, power_of=2) # quantized to powers of 2 (8,16,32,64)
        >>> pyhopper.int(0,100, shape=5) # multidimensional parameter (5 dimensions)

    :param lb: Inclusive lower bound of the parameter (used as upper bound if no upper bound is provided)
    :param ub: Inclusive upper bound of the parameter. If None, the `lb` argument will be used as upper bound with a lower bound of 0.
    :param init: Initial value of the parameter. If None it will be randomly sampled
    :param multiple_of: Setting this value to a positive integer enforces the sampled values of this parameter to be a mulitple of `multiple_of`.
    :param shape: For NumPy array type parameters, this argument must be set to a tuple containing the shape of the np.ndarray
    :param mutation_fn: Setting this argument to a callable overwrites the default local sampling strategy. The callback gets called with the value
        of the the current best solution as argument and returns a mutated value
    :param seeding_fn: Setting this argument to a callable overwrites the default random seeding strategy
    :return:
    """
    if lb is None and ub is None:
        # Unbounded int is actually a 32-bit integer
        lb = np.iinfo(np.int32).min
        ub = np.iinfo(np.int32).max
    lb, ub = sanitize_bounds(lb, ub)
    param_shape = infer_shape(shape, init, lb, ub) if shape is None else shape
    if power_of is not None:
        if power_of not in [1, 2]:
            raise ValueError(
                f"Power of {power_of} integers are currently not supported (only power 2 integers)."
            )
        return PowerOfIntParameter(
            shape,
            lb,
            ub,
            init,
            power_of,
            multiple_of,
            mutation_fn,
            seeding_fn,
        )
    param = IntParameter(
        param_shape,
        lb,
        ub,
        init,
        multiple_of,
        mutation_fn,
        seeding_fn,
    )
    return param


def register_custom(
    seeding_fn: Optional[callable] = None,
    mutation_fn: Optional[callable] = None,
    init: Any = None,
) -> CustomParameter:
    if seeding_fn is None and init is None:
        raise ValueError(
            f"Could not create custom parameter, must either provide an initial value or a seeding strategy function"
        )
    if init is None:
        init = seeding_fn()
    param = CustomParameter(init, mutation_fn, seeding_fn)
    return param


def recursive_check_for_ph_types_and_fail(options):
    if isinstance(options, pyhopper.Parameter):
        raise ValueError(
            "Cannot use pyhopper.Parameter type inside pyhopper.choice. Consider using pyhopper.cases instead!"
        )
    elif isinstance(options, list):
        for v in options:
            recursive_check_for_ph_types_and_fail(v)
    elif isinstance(options, dict):
        for k, v in options:
            recursive_check_for_ph_types_and_fail(v)


def register_choice(
    *args,
    init_index: Optional[Any] = None,
    is_ordinal: bool = False,
    mutation_fn: Optional[FunctionType] = None,
    seeding_fn: Optional[FunctionType] = None,
) -> ChoiceParameter:
    """Creates a new choice parameter

    Examples::

        >>> pyhopper.choice("adam","rmsprop","sgd") # unnamed arguments as valid options
        >>> pyhopper.choice(["adam","rmsprop","sgd"]) # equivalent syntax
        >>> pyhopper.choice("low","medium","high",is_ordinal=True) # ordinal (ordered) options

    The possible options can contain nested parameter spaces, for instance

    Examples::

        >>> pyhopper.choice("const", pyhopper.int(0, 10), ["nested2", pyhopper.int(-10, 0)])
        >>> # Generates the samples
        >>> # 'const'
        >>> # 8
        >>> # 2
        >>> # ['nested2', 0]
        >>> # ['nested2', -8]
        >>> # 4

    :param init_index: Initial guess of the parameter represented by its index
    :param *args: Possible values of this parameter.
        If only a single list is provided, the items inside the list will be used as admissible values.
    :param init: Initial value of the parameter. If None it will be randomly sampled.
    :param is_ordinal: Flag indicating whether two neighboring list items ordered or not. If True, in the local sampling stage list items neighboring the current best value will be preferred. For sets with a natural ordering it is recommended to set this flag to True.
    :param mutation_fn: Setting this argument to a callable overwrites the default local sampling strategy. The callback gets called with the value
        of the the current best solution as argument and returns a mutated value
    :param seeding_fn: Setting this argument to a callable overwrites the default random seeding strategy
    :return:
    """
    options = list(args)
    if len(options) == 0:
        raise ValueError("List with possible values must not be empty.")
    if len(options) == 1 and isinstance(options[0], list):
        options = options[0]
    # recursive_check_for_ph_types_and_fail(options)
    param = ChoiceParameter(options, init_index, is_ordinal, mutation_fn, seeding_fn)
    return param


def register_bool(
    init: Optional[Any] = None,
    mutation_fn: Optional[FunctionType] = None,
    seeding_fn: Optional[FunctionType] = None,
) -> ChoiceParameter:
    """Creates a new choice parameter

        Examples::

        >>> pyhopper.bool()
        >>> pyhopper.bool(True) # initial guess is assumed to be "True"

    :param init: Initial value of the parameter. If None it will be randomly sampled.
    :param mutation_fn: Setting this argument to a callable overwrites the default local sampling strategy. The callback gets called with the value
        of the the current best solution as argument and returns a mutated value
    :param seeding_fn: Setting this argument to a callable overwrites the default random seeding strategy
    :return:
    """
    param = ChoiceParameter([True, False], init, False, mutation_fn, seeding_fn)
    return param


def register_float(
    lb: Optional[Union[int, float, np.ndarray]] = None,
    ub: Optional[Union[int, float, np.ndarray]] = None,
    fmt: Optional[str] = None,
    init: Optional[Union[int, float, np.ndarray]] = None,
    log: Union[bool] = None,
    precision: Optional[int] = None,
    shape: Optional[Union[int, Tuple]] = None,
    mutation_fn: Optional[FunctionType] = None,
    seeding_fn: Optional[FunctionType] = None,
) -> FloatParameter:
    """Creates a new floating point parameter

    Examples::

        >>> pyhopper.float(1) # uniform distribution in range [0,1]
        >>> pyhopper.float(-1,1) # uniform distribution in range [-1,1]
        >>> pyhopper.float(1e-5,1e-2, log=True) # loguniform distrubution
        >>> pyhopper.float(0,0.5, "0.1f") # uniform distribution, quantized to 0.1 increments (1 decimal digit)
        >>> pyhopper.float(1e-5,1e-2, "0.1g") # loguniform distribution, logquantized to to 1 signficiant digit
        >>> pyhopper.float(-1,1, shape=(3,3)) # multidimensional parameter


    :param lb: Lower bound of the parameter. If both `lb` and `ub` are None, this parameter will be unbounded (usually not recommended).
    :param ub: Upper bound of the parameter. If None, the `lb` argument will be used as upper bound with a lower bound of 0.
    :param init: Initial value of the parameter. If None it will be randomly sampled
    :param fmt: Format string as syntactic sugar for setting both log and precision.
        fmt="0.2f" refers to parameter with linear search space and 2 decimal digts precision.
        fmt="0.1g" refers to a parameter with logarithmic search space and 1 significant digit precision
    :param shape: For NumPy array type parameters, this argument must be set to a tuple containing the shape of the np.ndarray
    :param log: Whether to use logarithmic or linearly scaling of the parameter.
        Defaults to False which searches the space linearly.
        If True, a logarithmic scaling is applied to the search space of this variable
    :param precision: Rounds the values to the specified significant digits.
        Defaults to None meaning that no rounding is applied
    :param mutation_fn: Setting this argument to a callable overwrites the default local sampling strategy. The callback gets called with the value
        of the the current best solution as argument and returns a mutated value
    :param seeding_fn: Setting this argument to a callable overwrites the default random seeding strategy
    """
    lb, ub = sanitize_bounds(lb, ub)
    if log is not None and fmt is not None:
        raise ValueError(f"Cannot specify `log` and `fmt` at the same time.")
    if precision is not None and fmt is not None:
        raise ValueError(f"Cannot specify `log` and `fmt` at the same time.")

    if fmt is not None:
        # simple but non-pedantic parsing of the format string
        if fmt.endswith("g"):
            log = True
        fmt = fmt.replace(":", "").replace(".", "").replace("g", "").replace("f", "")
        try:
            precision = int(fmt)
        except ValueError as e:
            raise ValueError(
                f"Could not parse format string '{fmt}'. Valid examples are ':0.3f', '0.1g' (error details: {str(e)})"
            )

    if log and (lb is None or ub is None):
        raise ValueError(
            "Logarithmically distributed mode without bounds is not supported. Please specify lower and upper bound."
        )
    if log and (lb <= 0 or ub <= 0):
        raise ValueError(
            "Both bounds for logarithmically distributed parameter must be positive."
        )

    param_shape = infer_shape(init, lb, ub) if shape is None else shape
    if log:
        return LogSpaceFloatParameter(
            param_shape, lb, ub, init, precision, mutation_fn, seeding_fn
        )
    param = FloatParameter(
        param_shape,
        lb,
        ub,
        init,
        precision,
        mutation_fn,
        seeding_fn,
    )
    return param