Source code for olympus.hpo

from olympus.utils.factory import fetch_factories
from olympus.utils import MissingArgument, warning
from olympus.hpo.fidelity import Fidelity
from olympus.hpo.parallel import ParallelHPO

registered_optimizer = fetch_factories('olympus.hpo', __file__)


[docs]def known_hpo(): return registered_optimizer.keys()
[docs]def register_hpo(name, factory, override=False): global registered_optimizer if name in registered_optimizer: warning(f'{name} was already registered, use override=True to ignore') if not override: return registered_optimizer[name] = factory
[docs]class RegisteredHPONotFound(Exception): pass
[docs]class HPOptimizer: """Olympus standardized HPO interface Examples -------- .. code-block:: python def add(a, b): return a + b hpo = HPOptimizer('hyperband', space={ 'a': 'uniform(0, 1)', 'b': 'uniform(0, 1)' }) while not hpo.is_done(): for args in hpo: # try a new configuration result = add(**args) # forward the result to the optimizer hpo.observe(args, result) hpo.result() (OrderedDict([('a', 0.02021839744032572), ('b', 0.05433798833925363), ('uid', 'b6b3c96296beaad9'), ('epoch', 8)]), 0.07455638577957935 """ def __init__(self, name=None, *, hpo=None, **kwargs): self.hpo = None if name: hpo_fun = registered_optimizer.get(name) if not hpo_fun: raise RegisteredHPONotFound(name) self.hpo = hpo_fun(**kwargs) elif hpo and isinstance(hpo, type): self.hpo = hpo(**kwargs) else: raise MissingArgument('hpo or name need to be set') kwargs['name'] = name self.kwargs = kwargs
[docs] def insert_manual_sample(self, sample=None, fidelity_override=None, **kwargs): return self.hpo.insert_manual_sample(sample, fidelity_override, **kwargs)
[docs] def suggest(self, **kwargs): return self.hpo.suggest(**kwargs)
[docs] def observe(self, args, result): return self.hpo.observe(args, result)
[docs] def is_done(self): return self.hpo.is_done()
[docs] def remaining(self): return self.hpo.remaining()
[docs] def load_state_dict(self, state): return self.hpo.load_state_dict(state)
[docs] def state_dict(self, compressed=True): return self.hpo.state_dict(compressed=compressed)
[docs] def info(self): return self.hpo.info()
[docs] def result(self): result = sorted(self.hpo.trials.values(), key=lambda x: x.objective) return result[0]
def __iter__(self): return iter(self.hpo) @property def trials(self): return self.hpo.trials
[docs] def ctor_call(self): return self.kwargs
[docs]def check(): def add(a, b, **kwargs): return a + b hpo = HPOptimizer('hyperband', fidelity=Fidelity(1, 30, 2), space={ 'a': 'uniform(0, 1)', 'b': 'uniform(0, 1)' }) while not hpo.is_done(): for args in hpo: # try a new configuration result = add(**args) # forward the result to the optimizer hpo.observe(args, result) print(hpo.result())
if __name__ == '__main__': check()