#!/usr/bin/env python3
# thoth-adviser
# Copyright(C) 2019 - 2021 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Implementation of pipeline builder - create pipeline configuration based on the project and its library usage."""
import os
import logging
import json
from typing import Any
from typing import Dict
from typing import Generator
from typing import Type
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from itertools import chain
import attr
from thoth.python import Project
from thoth.storages import GraphDatabase
import yaml
from .enums import DecisionType
from .enums import RecommendationType
from .exceptions import InternalError
from .exceptions import UnknownPipelineUnitError
from .exceptions import PipelineConfigurationError
from .pipeline_config import PipelineConfig
from .prescription import UnitPrescription
if TYPE_CHECKING:
from .prescription import Prescription # noqa: F401
from .unit_types import BootType
from .unit_types import PseudonymType
from .unit_types import SieveType
from .unit_types import StepType
from .unit_types import StrideType
from .unit_types import UnitType
from .unit_types import WrapType
_LOGGER = logging.getLogger(__name__)
[docs]@attr.s(slots=True)
class PipelineBuilderContext:
"""A context passed to units to determine if they want to participate in a pipeline."""
graph = attr.ib(type=GraphDatabase, kw_only=True, default=None)
project = attr.ib(type=Project, kw_only=True, default=None)
library_usage = attr.ib(type=Optional[Dict[str, Any]], kw_only=True, default=None)
labels = attr.ib(type=Dict[str, str], kw_only=True, default=attr.Factory(dict))
decision_type = attr.ib(type=Optional[DecisionType], kw_only=True, default=None)
recommendation_type = attr.ib(type=Optional[RecommendationType], kw_only=True, default=None)
cli_parameters = attr.ib(type=Dict[str, Any], kw_only=True, default=attr.Factory(dict))
prescription = attr.ib(type=Optional["Prescription"], kw_only=True, default=None)
iteration = attr.ib(type=int, kw_only=True, default=0)
authenticated = attr.ib(type=bool, kw_only=True)
_boots = attr.ib(type=Dict[Optional[str], List["BootType"]], factory=dict, kw_only=True)
_pseudonyms = attr.ib(type=Dict[str, List["PseudonymType"]], factory=dict, kw_only=True)
_sieves = attr.ib(type=Dict[Optional[str], List["SieveType"]], factory=dict, kw_only=True)
_steps = attr.ib(type=Dict[Optional[str], List["StepType"]], factory=dict, kw_only=True)
_strides = attr.ib(type=Dict[Optional[str], List["StrideType"]], factory=dict, kw_only=True)
_wraps = attr.ib(type=Dict[Optional[str], List["WrapType"]], factory=dict, kw_only=True)
_boots_included = attr.ib(type=Dict[str, List["BootType"]], factory=dict, kw_only=True)
_pseudonyms_included = attr.ib(type=Dict[str, List["PseudonymType"]], factory=dict, kw_only=True)
_sieves_included = attr.ib(type=Dict[str, List["SieveType"]], factory=dict, kw_only=True)
_steps_included = attr.ib(type=Dict[str, List["StepType"]], factory=dict, kw_only=True)
_strides_included = attr.ib(type=Dict[str, List["StrideType"]], factory=dict, kw_only=True)
_wraps_included = attr.ib(type=Dict[str, List["WrapType"]], factory=dict, kw_only=True)
@authenticated.default
def _authenticated_default(self) -> bool:
"""Check if adviser is running in an authenticated mode."""
env_authenticated = os.getenv("THOTH_AUTHENTICATED_ADVISE")
return bool(int(env_authenticated)) if env_authenticated is not None else False
@property
def boots(self) -> List["BootType"]:
"""Get all boots registered to this pipeline builder context."""
return list(chain(*self._boots.values()))
@property
def boots_dict(self) -> Dict[Optional[str], List["BootType"]]:
"""Get boots as a dictionary mapping."""
return self._boots
@property
def pseudonyms(self) -> List["PseudonymType"]:
"""Get all pseudonyms registered to this pipeline builder context."""
return list(chain(*self._pseudonyms.values()))
@property
def pseudonyms_dict(self) -> Dict[str, List["PseudonymType"]]:
"""Get pseudonyms as a dictionary mapping."""
return self._pseudonyms
@property
def sieves(self) -> List["SieveType"]:
"""Get all sieves registered to this pipeline builder context."""
return list(chain(*self._sieves.values()))
@property
def sieves_dict(self) -> Dict[Optional[str], List["SieveType"]]:
"""Get sieves as a dictionary mapping."""
return self._sieves
@property
def steps(self) -> List["StepType"]:
"""Get all steps registered to this pipeline builder context."""
return list(chain(*self._steps.values()))
@property
def steps_dict(self) -> Dict[Optional[str], List["StepType"]]:
"""Get steps as a dictionary mapping."""
return self._steps
@property
def strides(self) -> List["StrideType"]:
"""Get all strides registered to this pipeline builder context."""
return list(chain(*self._strides.values()))
@property
def strides_dict(self) -> Dict[Optional[str], List["StrideType"]]:
"""Get strides as a dictionary mapping."""
return self._strides
@property
def wraps(self) -> List["WrapType"]:
"""Get all wraps registered to this pipeline builder context."""
return list(chain(*self._wraps.values()))
@property
def wraps_dict(self) -> Dict[Optional[str], List["WrapType"]]:
"""Get wraps as a dictionary mapping."""
return self._wraps
def __attrs_post_init__(self) -> None:
"""Verify we have only adviser or dependency monkey specific builder."""
if self.decision_type is not None and self.recommendation_type is not None:
raise ValueError("Cannot instantiate builder for adviser and dependency monkey at the same time")
if self.decision_type is None and self.recommendation_type is None:
raise ValueError("Cannot instantiate builder context not specific to adviser nor dependency monkey")
[docs] def is_included(self, unit_class: Type["UnitType"]) -> bool:
"""Check if the given pipeline unit is already included in the pipeline configuration."""
if unit_class.is_boot_unit_type():
return unit_class.get_unit_name() in self._boots_included
elif unit_class.is_pseudonym_unit_type():
return unit_class.get_unit_name() in self._pseudonyms_included
elif unit_class.is_sieve_unit_type():
return unit_class.get_unit_name() in self._sieves_included
elif unit_class.is_step_unit_type():
return unit_class.get_unit_name() in self._steps_included
elif unit_class.is_stride_unit_type():
return unit_class.get_unit_name() in self._strides_included
elif unit_class.is_wrap_unit_type():
return unit_class.get_unit_name() in self._wraps_included
raise InternalError(f"Unknown unit {unit_class.get_unit_name()!r} of type {unit_class}")
[docs] def get_included_boots(self, boot_class: Type["UnitType"]) -> Generator["BootType", None, None]:
"""Get included boots of the provided boot class."""
assert boot_class.is_boot_unit_type()
yield from self._boots_included.get(boot_class.get_unit_name(), [])
[docs] def get_included_boot_names(self) -> Generator[str, None, None]:
"""Get names of included boots."""
yield from self._boots_included.keys()
[docs] def get_included_pseudonyms(self, pseudonym_class: Type["PseudonymType"]) -> Generator["PseudonymType", None, None]:
"""Get included sieves of the provided sieve class."""
assert pseudonym_class.is_pseudonym_unit_type()
yield from self._pseudonyms_included.get(pseudonym_class.get_unit_name(), [])
[docs] def get_included_pseudonym_names(self) -> Generator[str, None, None]:
"""Get names of included pseudonyms."""
yield from self._pseudonyms_included.keys()
[docs] def get_included_sieves(self, sieve_class: Type["SieveType"]) -> Generator["SieveType", None, None]:
"""Get included sieves of the provided sieve class."""
assert sieve_class.is_sieve_unit_type()
yield from self._sieves_included.get(sieve_class.get_unit_name(), [])
[docs] def get_included_sieve_names(self) -> Generator[str, None, None]:
"""Get names of included sieves."""
yield from self._sieves_included.keys()
[docs] def get_included_steps(self, step_class: Type["StepType"]) -> Generator["StepType", None, None]:
"""Get included steps of the provided step class."""
assert step_class.is_step_unit_type()
yield from self._steps_included.get(step_class.get_unit_name(), [])
[docs] def get_included_step_names(self) -> Generator[str, None, None]:
"""Get names of included steps."""
yield from self._steps_included.keys()
[docs] def get_included_strides(self, stride_class: Type["StrideType"]) -> Generator["StrideType", None, None]:
"""Get included strides of the provided stride class."""
assert stride_class.is_stride_unit_type()
yield from self._strides_included.get(stride_class.get_unit_name(), [])
[docs] def get_included_stride_names(self) -> Generator[str, None, None]:
"""Get names of included strides."""
yield from self._strides_included.keys()
[docs] def get_included_wraps(self, wrap_class: Type["WrapType"]) -> Generator["WrapType", None, None]:
"""Get included wraps of the provided wrap class."""
assert wrap_class.is_wrap_unit_type()
yield from self._wraps_included.get(wrap_class.get_unit_name(), [])
[docs] def get_included_wrap_names(self) -> Generator[str, None, None]:
"""Get names of included wraps."""
yield from self._wraps_included.keys()
[docs] def is_adviser_pipeline(self) -> bool:
"""Check if the pipeline built is meant for adviser."""
return self.decision_type is None and self.recommendation_type is not None
[docs] def is_dependency_monkey_pipeline(self) -> bool:
"""Check if the pipeline built is meant for Dependency Monkey."""
return self.decision_type is not None and self.recommendation_type is None
[docs] def add_unit(self, unit: "UnitType") -> None:
"""Add the given unit to pipeline configuration."""
package_name: Optional[str] = unit.configuration.get("package_name")
if unit.is_boot_unit_type():
self._boots_included.setdefault(unit.name, []).append(unit)
self._boots.setdefault(package_name, []).append(unit)
return
elif unit.is_pseudonym_unit_type():
if not package_name:
raise PipelineConfigurationError(
f"Pipeline cannot be constructed as unit {unit.name!r} of type Pseudonym "
f"did not provide any package name configuration: {unit.configuration!r}"
)
self._pseudonyms_included.setdefault(unit.name, []).append(unit)
self._pseudonyms.setdefault(package_name, []).append(unit)
return
elif unit.is_sieve_unit_type():
self._sieves_included.setdefault(unit.name, []).append(unit)
self._sieves.setdefault(package_name, []).append(unit)
return
elif unit.is_step_unit_type():
self._steps_included.setdefault(unit.name, []).append(unit)
self._steps.setdefault(package_name, []).append(unit)
return
elif unit.is_stride_unit_type():
self._strides_included.setdefault(unit.name, []).append(unit)
self._strides.setdefault(package_name, []).append(unit)
return
elif unit.is_wrap_unit_type():
self._wraps_included.setdefault(unit.name, []).append(unit)
self._wraps.setdefault(package_name, []).append(unit)
return
raise InternalError(f"Unknown unit {unit!r} of type {unit.name!r}")
[docs]class PipelineBuilder:
"""Builder responsible for creating pipeline configuration from the project and its library usage."""
__slots__: List[str] = []
def __init__(self) -> None:
"""Instantiate of the pipeline builder - do NOT instantiate this class."""
raise NotImplementedError("Cannot instantiate pipeline builder")
@staticmethod
def _iter_units(ctx: PipelineBuilderContext) -> Generator["UnitType", None, None]:
"""Iterate over pipeline units available in this implementation."""
# Imports placed here to simplify tests.
import thoth.adviser.boots
import thoth.adviser.pseudonyms
import thoth.adviser.sieves
import thoth.adviser.steps
import thoth.adviser.strides
import thoth.adviser.wraps
for boot_name in thoth.adviser.boots.__all__:
yield getattr(thoth.adviser.boots, boot_name)
if ctx.prescription:
yield from ctx.prescription.iter_boot_units()
for pseudonym_name in thoth.adviser.pseudonyms.__all__:
yield getattr(thoth.adviser.pseudonyms, pseudonym_name)
if ctx.prescription:
yield from ctx.prescription.iter_pseudonym_units()
for sieve_name in thoth.adviser.sieves.__all__:
yield getattr(thoth.adviser.sieves, sieve_name)
if ctx.prescription:
yield from ctx.prescription.iter_sieve_units()
for step_name in thoth.adviser.steps.__all__:
yield getattr(thoth.adviser.steps, step_name)
if ctx.prescription:
yield from ctx.prescription.iter_step_units()
for stride_name in thoth.adviser.strides.__all__:
yield getattr(thoth.adviser.strides, stride_name)
if ctx.prescription:
yield from ctx.prescription.iter_stride_units()
for wrap_name in thoth.adviser.wraps.__all__:
yield getattr(thoth.adviser.wraps, wrap_name)
if ctx.prescription:
yield from ctx.prescription.iter_wrap_units()
@classmethod
def _build_configuration(cls, ctx: PipelineBuilderContext) -> PipelineConfig:
"""Instantiate units and return the actual pipeline configuration."""
# As pipeline steps can have dependencies on each other, iterate over them until we have any change done
# to the pipeline configuration.
_LOGGER.info("Creating pipeline configuration")
blocked_units = (
set(os.environ["THOTH_ADVISER_BLOCKED_UNITS"].split(","))
if "THOTH_ADVISER_BLOCKED_UNITS" in os.environ
else set()
)
change = True
ctx.iteration = -1
try:
while change:
change = False
ctx.iteration += 1
for unit_class in cls._iter_units(ctx):
unit_name = unit_class.get_unit_name()
if unit_name in blocked_units:
_LOGGER.debug(
"Avoiding adding pipeline unit %r based on blocked units configuration",
unit_name,
)
continue
for unit_configuration in unit_class.should_include(ctx):
if unit_configuration is None:
_LOGGER.debug(
"Pipeline unit %r will not be included in the pipeline configuration in this round",
unit_name,
)
continue
change = True
_LOGGER.debug(
"Including pipeline unit %r in pipeline configuration with unit configuration %r",
unit_name,
unit_configuration,
)
unit_instance = unit_class() # type: ignore
# Always perform update, even with an empty dict. Update triggers a schema check.
try:
unit_instance.update_configuration(unit_configuration)
except Exception as exc:
raise PipelineConfigurationError(
f"Filed to initialize pipeline unit configuration for {unit_name!r} "
f"with configuration {unit_configuration!r}: {str(exc)}"
) from exc
ctx.add_unit(unit_instance)
finally:
# Once the build pipeline is constructed or fails to construct, we can clear cached results.
UnitPrescription.SHOULD_INCLUDE_CACHE.clear()
pipeline = PipelineConfig(
boots=ctx.boots_dict,
pseudonyms=ctx.pseudonyms_dict,
sieves=ctx.sieves_dict,
steps=ctx.steps_dict,
strides=ctx.strides_dict,
wraps=ctx.wraps_dict,
)
if _LOGGER.getEffectiveLevel() <= logging.DEBUG:
_LOGGER.debug(
"Pipeline configuration creation ended, configuration:\n%s",
json.dumps(pipeline.to_dict(), indent=2),
)
return pipeline
@staticmethod
def _do_instantiate_from_dict(module: object, configuration_entry: Dict[str, Any]) -> "UnitType":
"""Instantiate a pipeline unit from a dict representation."""
if "name" not in configuration_entry:
raise ValueError(f"No pipeline unit name provided in the configuration entry: {configuration_entry!r}")
try:
unit_class = getattr(module, configuration_entry["name"])
except AttributeError as exc:
raise UnknownPipelineUnitError(f"Cannot import unit {configuration_entry['name']}: {str(exc)}") from exc
unit: "UnitType" = unit_class()
if configuration_entry.get("configuration"):
try:
unit.update_configuration(configuration_entry["configuration"])
except Exception as exc:
raise PipelineConfigurationError(
f"Filed to initialize pipeline unit configuration for {unit.name!r} "
f"with configuration {configuration_entry['configuration']!r}: {str(exc)}"
) from exc
return unit
[docs] @classmethod
def from_dict(cls, dict_: Dict[str, Any]) -> "PipelineConfig":
"""Instantiate pipeline configuration based on dictionary supplied."""
# Imports placed here to simplify tests.
import thoth.adviser.boots
import thoth.adviser.pseudonyms
import thoth.adviser.sieves
import thoth.adviser.steps
import thoth.adviser.strides
import thoth.adviser.wraps
boots: Dict[Optional[str], List["BootType"]] = {}
for boot_entry in dict_.pop("boots", []) or []:
boot_unit: "BootType" = cls._do_instantiate_from_dict(thoth.adviser.boots, boot_entry)
package_name = boot_unit.configuration.get("package_name")
boots.setdefault(package_name, []).append(boot_unit)
pseudonyms: Dict[str, List["PseudonymType"]] = {}
for pseudonym_entry in dict_.pop("pseudonyms", []) or []:
unit: "PseudonymType" = cls._do_instantiate_from_dict(thoth.adviser.pseudonyms, pseudonym_entry)
package_name = unit.configuration.get("package_name")
if not package_name:
raise PipelineConfigurationError(
f"Pipeline cannot be constructed as unit {unit.name!r} of type Pseudonym "
f"did not provide any package name configuration: {unit.configuration!r}"
)
pseudonyms.setdefault(package_name, []).append(unit)
sieves: Dict[Optional[str], List["SieveType"]] = {}
for sieve_entry in dict_.pop("sieves", []) or []:
sieve_unit: "SieveType" = cls._do_instantiate_from_dict(thoth.adviser.sieves, sieve_entry)
package_name = sieve_unit.configuration.get("package_name")
sieves.setdefault(package_name, []).append(sieve_unit)
steps: Dict[Optional[str], List["StepType"]] = {}
for step_entry in dict_.pop("steps", []) or []:
step_unit: "StepType" = cls._do_instantiate_from_dict(thoth.adviser.steps, step_entry)
package_name = step_unit.configuration.get("package_name")
steps.setdefault(package_name, []).append(step_unit)
strides: Dict[Optional[str], List["StrideType"]] = {}
for stride_entry in dict_.pop("strides", []) or []:
stride_unit: "StrideType" = cls._do_instantiate_from_dict(thoth.adviser.strides, stride_entry)
package_name = stride_unit.configuration.get("package_name")
strides.setdefault(package_name, []).append(stride_unit)
wraps: Dict[Optional[str], List["WrapType"]] = {}
for wrap_entry in dict_.pop("wraps", []) or []:
wrap_unit: "WrapType" = cls._do_instantiate_from_dict(thoth.adviser.wraps, wrap_entry)
package_name = wrap_unit.configuration.get("package_name")
wraps.setdefault(package_name, []).append(wrap_unit)
if dict_:
_LOGGER.warning("Unknown entry in pipeline configuration: %r", dict_)
pipeline = PipelineConfig(
boots=boots,
pseudonyms=pseudonyms,
sieves=sieves,
steps=steps,
strides=strides,
wraps=wraps,
)
_LOGGER.debug(
"Pipeline configuration creation ended, configuration:\n%s",
json.dumps(pipeline.to_dict(), indent=2),
)
return pipeline
[docs] @classmethod
def load(cls, config: str) -> "PipelineConfig":
"""Load pipeline configuration from a file or a string."""
if os.path.isfile(config):
_LOGGER.debug("Loading pipeline configuration from file %r", config)
with open(config, "r") as config_file:
config = config_file.read()
return cls.from_dict(yaml.safe_load(config))
[docs] @classmethod
def get_adviser_pipeline_config(
cls,
*,
recommendation_type: RecommendationType,
graph: GraphDatabase,
project: Project,
labels: Dict[str, str],
library_usage: Optional[Dict[str, Any]],
prescription: Optional["Prescription"],
cli_parameters: Dict[str, Any],
) -> PipelineConfig:
"""Get adviser's pipeline configuration."""
return cls._build_configuration(
PipelineBuilderContext(
graph=graph,
project=project,
labels=labels,
library_usage=library_usage,
recommendation_type=recommendation_type,
prescription=prescription,
cli_parameters=cli_parameters,
)
)
[docs] @classmethod
def get_dependency_monkey_pipeline_config(
cls,
*,
decision_type: DecisionType,
graph: GraphDatabase,
project: Project,
labels: Dict[str, str],
library_usage: Optional[Dict[str, Any]],
prescription: Optional["Prescription"],
cli_parameters: Dict[str, Any],
) -> PipelineConfig:
"""Get dependency-monkey's pipeline configuration."""
return cls._build_configuration(
PipelineBuilderContext(
graph=graph,
project=project,
labels=labels,
library_usage=library_usage,
decision_type=decision_type,
prescription=prescription,
cli_parameters=cli_parameters,
)
)