Source code for thoth.adviser.predictors.td

#!/usr/bin/env python3
# thoth-adviser
# Copyright(C) 2020 - 2021 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Implementation of Temporal Difference (TD) based predictor with adaptive simulated annealing schedule."""

from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
from typing import Optional
import logging
import math
import random
import signal

import attr

from .annealing import AdaptiveSimulatedAnnealing
from ..state import State


_LOGGER = logging.getLogger(__name__)
_INSTANCE = None


def _sigusr1_handler(sig_num: int, _: Any) -> None:
    """Handle SIGUSR1 that switches the predictor to exploitation (soon resolution timeout)."""
    global _INSTANCE

    _LOGGER.debug("Switching to exploitation phase based on a signal")
    _INSTANCE._temperature = 0.0  # type: ignore


[docs]@attr.s(slots=True) class TemporalDifference(AdaptiveSimulatedAnnealing): """Implementation of Temporal Difference (TD) based predictor with adaptive simulated annealing schedule.""" step = attr.ib(type=int, default=1, kw_only=True) trace = attr.ib(type=bool, default=True, kw_only=True) _policy = attr.ib(type=Dict[Tuple[str, str, str], List[Union[float, int]]], factory=dict, init=False) _steps_reward = attr.ib(type=float, default=0.0, init=False) _steps_taken = attr.ib(type=int, default=0, init=False) _next_state = attr.ib(type=Optional[State], default=None, init=False) _old_handler = attr.ib(type=Any, init=False, default=None) @step.validator def _step_validator(self, _: str, value: int) -> None: """Validate step parameter for n-step TD-learning.""" if not isinstance(value, int): raise ValueError(f"Unknown type for TD n-step: {type(value)}") if value < 1: raise ValueError(f"Step set to {value} is not valid for n-step TD-learning")
[docs] def pre_run(self) -> None: """Initialize pre-running of this predictor.""" global _INSTANCE super().pre_run() self._policy.clear() self._temperature = float(self.context.limit) self._steps_taken = 0 self._steps_reward = 0.0 self._next_state = None # Setup handler that responds to SIGUSR1 and switches to exploitation phase. self._old_handler = signal.getsignal(signal.SIGUSR1) signal.signal(signal.SIGUSR1, _sigusr1_handler) _INSTANCE = self
[docs] def post_run(self) -> None: """De-initialize resources used by this predictor.""" global _INSTANCE signal.signal(signal.SIGUSR1, self._old_handler) self._old_handler = None _INSTANCE = None
[docs] def set_reward_signal(self, state: State, package_tuple: Tuple[str, str, str], reward: float) -> None: """Note down reward signal of the last action performed.""" trajectory_end = math.isnan(reward) or math.isinf(reward) if trajectory_end: self._next_state = None reward = 0.0 self._steps_reward += reward if self._steps_taken < self.step and not trajectory_end: return if self.trace: for package_tuple in state.iter_resolved_dependencies(): record = self._policy.setdefault(package_tuple, [0.0, 0]) record[0] += self._steps_reward record[1] += 1 else: record = self._policy.setdefault(package_tuple, [0.0, 0]) record[0] += self._steps_reward record[1] += 1 self._steps_taken = 0 # Set back to zero as we update policy. self._steps_reward = 0.0
[docs] def run(self) -> Tuple[State, Tuple[str, str, str]]: """Run Temporal Difference (TD) with adaptive simulated annealing schedule.""" if self._next_state is not None: unresolved_dependency_tuple = self._next_state.get_random_unresolved_dependency(prefer_recent=True) if self.keep_history: self._temperature_history.append((None, None, None, self.context.accepted_final_states_count)) return self._next_state, unresolved_dependency_tuple if self._temperature > 0.0: self._temperature = self._temperature_function(self._temperature, self.context) self._next_state = self.context.beam.max() # Pick a random state to be expanded if accepted. probable_state_idx = random.randrange(1, self.context.beam.size) if self.context.beam.size > 1 else 0 probable_state = self.context.beam.get(probable_state_idx) acceptance_probability = self._compute_acceptance_probability( self._next_state.score, probable_state.score, self._temperature ) if acceptance_probability >= random.random(): # Perform exploration. unresolved_dependency_tuple = probable_state.get_random_unresolved_dependency(prefer_recent=True) self._next_state = probable_state else: unresolved_dependency_tuple = self._do_exploitation(self._next_state) if self.keep_history: self._temperature_history.append( ( self._temperature, self._next_state is self.context.beam.max(), acceptance_probability, self.context.accepted_final_states_count, ) ) self._steps_taken += 1 return self._next_state, unresolved_dependency_tuple self._steps_taken += 1 self._next_state = self.context.beam.max() if self.keep_history: self._temperature_history.append( ( self._temperature, True, 0.0, self.context.accepted_final_states_count, ) ) return self._next_state, self._do_exploitation(self._next_state)
def _do_exploitation(self, state: State) -> Tuple[str, str, str]: """Perform expansion of a highest rated stack with action that should yield highest reward.""" to_resolve_average = None to_resolve_package_tuple = None for package_tuple in state.iter_unresolved_dependencies(): reward_records = self._policy.get(package_tuple) if reward_records is None: continue # Compute average - we want to be skewed based on the reward signal # we aggregate (so for example median of medians is not that suitable). average = reward_records[0] / reward_records[1] if to_resolve_average is None or to_resolve_average < average: to_resolve_average = average to_resolve_package_tuple = package_tuple # Make sure we found a candidate based on rewards marked. If not, pick a random one. return to_resolve_package_tuple or state.get_random_unresolved_dependency(prefer_recent=True)