Source code for thoth.adviser.prescription.v1.unit

#!/usr/bin/env python3
# thoth-adviser
# Copyright(C) 2021 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""A base class for prescription based pipeline units."""

import abc
import logging
import re
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import Union
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from voluptuous import Schema
from voluptuous import Required

import attr

from thoth.adviser.cpu_db import CPUDatabase
from thoth.adviser.exceptions import EagerStopPipeline
from thoth.adviser.exceptions import NotAcceptable
from thoth.adviser.state import State
from thoth.common import get_justification_link as jl
from thoth.python import PackageVersion

from .unit_cache import should_include_cache
from ...unit import Unit

if TYPE_CHECKING:
    from ...pipeline_builder import PipelineBuilderContext


_LOGGER = logging.getLogger(__name__)


class _ValueList:
    """A class that overrides `in` to transparently handle included and excluded values."""

    __slots__ = ["_list"]

    def __init__(self, obj: Union[List[object], Dict[str, List[object]]]) -> None:
        """Initialize self."""
        self._list = obj

    def __contains__(self, item: str) -> bool:
        """Override default in behavior based on the YAML definition."""
        if isinstance(self._list, list):
            return self._list.__contains__(item)

        return not self._list["not"].__contains__(item)


class _ValueListBaseImage:
    """A class that overrides `in` to transparently handle included and excluded base images."""

    # Image name mapped to image tag; the "_not" flag specifies if images were declared as part of "not".
    __slots__ = ["_images", "_not"]

    def __init__(self, obj: Union[List[Optional[str]], Dict[str, List[Optional[str]]]]) -> None:
        """Initialize self."""
        self._images: Dict[Optional[str], Set[str]] = {}

        if isinstance(obj, dict):
            image_listing = obj["not"]
            self._not = True
        else:
            image_listing = obj
            self._not = False

        images = []
        for item in image_listing:
            if item is None:
                self._images[None] = {"*"}
                continue

            images.append(item.rsplit(":", maxsplit=1))

        for image in images:
            tag_exp_set = self._images.get(image[0])
            if tag_exp_set is None:
                tag_exp_set = set()
                self._images[image[0]] = tag_exp_set

            if len(image) == 1:
                tag_exp_set.add("*")  # Means any tag.
                continue

            tag = image[1]
            tag_exp_set.add(tag)

    def __contains__(self, item: Optional[str]) -> bool:
        """Check if the given item (base image) is in the provided listing."""
        if item is None:
            # Match `None` image in the image listing.
            if item in self._images:
                return not self._not

            return self._not

        parts = item.rsplit(":", maxsplit=1)
        if len(parts) == 1:  # No tag specified.
            return parts[0] not in self._images if self._not else parts[0] in self._images

        image, tag = parts
        tag_expressions = self._images.get(image)
        if not tag_expressions:
            return self._not

        for tag_exp in tag_expressions:
            if tag_exp.endswith("*"):
                if tag.startswith(tag_exp[:-1]):
                    return not self._not
            elif tag_exp == tag:
                return not self._not

        return self._not


[docs]@attr.s(slots=True) class UnitPrescription(Unit, metaclass=abc.ABCMeta): """A base class for implementing pipeline units based on prescription supplied.""" # Each prescription unit defines these specifically. SHOULD_INCLUDE_CACHE: Dict[str, bool] = {} CONFIGURATION_SCHEMA: Schema = Schema( { Required("package_name"): str, Required("match"): object, Required("run"): object, Required("prescription"): Schema({"run": bool}), } ) _PRESCRIPTION: Optional[Dict[str, Any]] = None _stack_info_run = attr.ib(type=bool, kw_only=True, default=False) _configuration = attr.ib(type=Dict[str, Any], kw_only=True) prescription = attr.ib(type=Dict[str, Any], kw_only=True) @prescription.default def _prescription_default(self) -> Dict[str, Any]: """Initialize prescription property.""" if self._PRESCRIPTION is None: raise ValueError("No assigned prescription on the class level to be set") return self._PRESCRIPTION @property def run_prescription(self) -> Dict[str, Any]: """Get run part of the prescription assigned.""" return self._configuration.get("run", {}) @property def match_prescription(self) -> Dict[str, Any]: """Get match part of the prescription assigned.""" return self._configuration.get("match", {}) @_configuration.default def _initialize_default_configuration(self) -> Dict[str, Any]: """Initialize default unit configuration based on declared class' default configuration.""" if self._PRESCRIPTION is None: raise ValueError("No assigned prescription on the class level to be set") return { "package_name": None, "match": self._PRESCRIPTION.get("match", {}), "run": self._PRESCRIPTION.get("run", {}), "prescription": {"run": False}, }
[docs] @classmethod def get_unit_name(cls) -> str: """Get the name of the current prescription unit. This method is a class method and *MUST NOT* be used when obtaining unit name on an instance. As part of the memory optimization we use class to get the current name of a prescription unit with assigned prescription. This means that the prescription unit instance would have different names reported with this method based on the current class context. """ if cls._PRESCRIPTION is None: raise ValueError("No prescription defined") name: str = cls._PRESCRIPTION["name"] return name
[docs] @classmethod def set_prescription(cls, prescription: Dict[str, Any]) -> None: """Set prescription to the unit.""" cls._PRESCRIPTION = prescription
@classmethod def _check_symbols( cls, unit_name: str, library_name: str, symbols_expected: List[str], symbols_used: List[str] ) -> bool: """Check if symbols expected are available given the symbols used.""" for symbol_expected in symbols_expected: for symbol_used in symbols_used: if symbol_expected.endswith(".*"): if symbol_used.startswith(symbol_expected[:-2]): # Discard ending ".*" _LOGGER.debug( "%s: Library symbol %r matching unit requirement on symbol %r for %r", unit_name, symbol_used, symbol_expected, library_name, ) break elif symbol_used == symbol_expected: _LOGGER.debug( "%s: Library symbol %r matching unit requirement on symbol %r for %r", unit_name, symbol_used, symbol_expected, library_name, ) break _LOGGER.debug( "%s: Not registering as library symbol requested %r for %r is not used", unit_name, symbol_expected, library_name, ) return False _LOGGER.debug("%s: All library symbols required for %r unit are used", unit_name, library_name) return True @staticmethod def _check_version(version_present: Optional[str], version_spec_declared: Optional[str]) -> bool: """Check that version present matches version specification.""" if version_present is None: if version_spec_declared is not None: return False else: return True else: if version_spec_declared is None: return True return Version(version_present) in SpecifierSet(version_spec_declared) @classmethod def _should_include_base(cls, builder_context: "PipelineBuilderContext") -> bool: """Determine if this unit should be included.""" if cls._PRESCRIPTION is None: raise ValueError("No prescription defined") should_include_dict = cls._PRESCRIPTION["should_include"] unit_name = cls.get_unit_name() times = should_include_dict.get("times", 1) # XXX: We allow values 0 or 1 in the schema described. if times == 0 or builder_context.is_included(cls): return False if not cls._should_include_base_cached(unit_name, builder_context, should_include_dict): # Using pre-cached results based on parts that do not change or first time run. return False # Dependencies. dependencies = should_include_dict.get("dependencies", {}) for boot_name in dependencies.get("boots", []): if boot_name not in builder_context.get_included_boot_names(): _LOGGER.debug("%s: Not registering as dependency on boot %r is not satisfied", unit_name, boot_name) return False for pseudonym_name in dependencies.get("pseudonyms", []): if pseudonym_name not in builder_context.get_included_pseudonym_names(): _LOGGER.debug( "%s: Not registering as dependency on pseudonym %r is not satisfied", unit_name, pseudonym_name ) return False for sieve_name in dependencies.get("sieves", []): if sieve_name not in builder_context.get_included_sieve_names(): _LOGGER.debug("%s: Not registering as dependency on sieve %r is not satisfied", unit_name, sieve_name) return False for step_name in dependencies.get("steps", []): if step_name not in builder_context.get_included_step_names(): _LOGGER.debug("%s: Not registering as dependency on step %r is not satisfied", unit_name, step_name) return False for stride_name in dependencies.get("strides", []): if stride_name not in builder_context.get_included_stride_names(): _LOGGER.debug("%s: Not registering as dependency on stride %r is not satisfied", unit_name, stride_name) return False for wrap_name in dependencies.get("wraps", []): if wrap_name not in builder_context.get_included_wrap_names(): _LOGGER.debug("%s: Not registering as dependency on stride %r is not satisfied", unit_name, wrap_name) return False return True if TYPE_CHECKING: SHOULD_INCLUDE_FUNC_TYPE = Callable[ [Type["UnitPrescription"], str, "PipelineBuilderContext", Dict[str, Any]], bool ] @classmethod @should_include_cache def _should_include_base_cached( cls, unit_name: str, builder_context: "PipelineBuilderContext", should_include_dict: Dict[str, Any] ) -> bool: """Determine if this unit should be included.""" adviser_pipeline = should_include_dict.get("adviser_pipeline", False) if not adviser_pipeline and builder_context.is_adviser_pipeline(): _LOGGER.debug("%s: Not registering for adviser pipeline", unit_name) return False elif adviser_pipeline and builder_context.is_adviser_pipeline(): allowed_recommendation_types = should_include_dict.get("recommendation_types") if ( allowed_recommendation_types is not None and builder_context.recommendation_type is not None and builder_context.recommendation_type.name.lower() not in _ValueList(allowed_recommendation_types) ): _LOGGER.debug( "%s: Not registering for adviser pipeline with recommendation type %s", unit_name, builder_context.recommendation_type.name, ) return False if ( not should_include_dict.get("dependency_monkey_pipeline", False) and builder_context.is_dependency_monkey_pipeline() ): _LOGGER.debug("%s: Not registering for dependency monkey pipeline", unit_name) return False elif ( should_include_dict.get("dependency_monkey_pipeline", False) and builder_context.is_dependency_monkey_pipeline() ): allowed_decision_types = should_include_dict.get("decision_types") if ( allowed_decision_types is not None and builder_context.decision_type is not None and builder_context.decision_type.name.lower() not in _ValueList(allowed_decision_types) ): _LOGGER.debug( "%s: Not registering for dependency monkey pipeline with decision type %s", unit_name, builder_context.decision_type.name, ) return False authenticated = should_include_dict.get("authenticated") if authenticated is not None and authenticated is not builder_context.authenticated: _LOGGER.debug( "%s: Not registering as authentication requirements are not met", unit_name, ) return False labels_expected = should_include_dict.get("labels", {}) if labels_expected: for label_key, value in labels_expected.items(): value_context = builder_context.labels.get(label_key) if value == value_context: break else: _LOGGER.debug( "%s: Not registering as labels requested %s do not match with labels supplied %s", unit_name, labels_expected, builder_context.labels, ) return False # Library usage. library_usage_expected = should_include_dict.get("library_usage", {}) if library_usage_expected: if not builder_context.library_usage: _LOGGER.debug("%s: Not registering as no library usage supplied", unit_name) return False for library_name, symbols_expected in library_usage_expected.items(): symbols_used = builder_context.library_usage.get(library_name) if not symbols_used: _LOGGER.debug("%s: Not registering as library %s is not used", unit_name, library_name) return False if not cls._check_symbols(unit_name, library_name, symbols_expected, symbols_used): return False else: _LOGGER.debug("%s: All library symbols required present in the library usage supplied", unit_name) runtime_environment_dict = should_include_dict.get("runtime_environments", {}) # Operating system. operating_systems = runtime_environment_dict.get("operating_systems") os_used = builder_context.project.runtime_environment.operating_system os_used_name = os_used.name if os_used is not None else None os_used_version = os_used.version if os_used is not None else None if operating_systems: for item in operating_systems: os_name = item.get("name") os_version = item.get("version") if (os_name is None or os_name == os_used_name) and ( os_version is None or os_version == os_used_version ): _LOGGER.debug("%s: Matching operating system %r in version %r", unit_name, os_name, os_version) break else: _LOGGER.debug( "%s: Not matching operating system (using %r in version %r)", unit_name, os_used_name, os_used_version, ) return False # Hardware. hw_used = builder_context.project.runtime_environment.hardware for hardware_dict in runtime_environment_dict.get("hardware", []): # CPU/GPU cpu_families = hardware_dict.get("cpu_families") cpu_models = hardware_dict.get("cpu_models") cpu_flags = hardware_dict.get("cpu_flags") or [] gpu_models = hardware_dict.get("gpu_models") if cpu_families is not None and hw_used.cpu_family not in _ValueList(cpu_families): _LOGGER.debug("%s: Not matching CPU family used (using %r)", unit_name, hw_used.cpu_family) return False if cpu_models is not None and hw_used.cpu_model not in _ValueList(cpu_models): _LOGGER.debug("%s: Not matching CPU model used (using %r)", unit_name, hw_used.cpu_model) return False if gpu_models is not None and hw_used.gpu_model not in _ValueList(gpu_models): _LOGGER.debug("%s: Not matching GPU model used (using %r)", unit_name, hw_used.gpu_model) return False if cpu_flags: if hw_used.cpu_family is None or hw_used.cpu_model is None: _LOGGER.debug( "%s: CPU family (%s) or CPU model (%s) not provided, cannot check CPU flags %r", unit_name, hw_used.cpu_family, hw_used.cpu_model, cpu_flags, ) return False if isinstance(cpu_flags, dict): for cpu_flag in cpu_flags["not"]: if CPUDatabase.provides_flag(hw_used.cpu_family, hw_used.cpu_model, cpu_flag): _LOGGER.debug( "%s: CPU flag %r is provided by CPU family %s and CPU model %s, not registering unit", unit_name, cpu_flag, hw_used.cpu_family, hw_used.cpu_model, ) return False else: for cpu_flag in cpu_flags: if not CPUDatabase.provides_flag(hw_used.cpu_family, hw_used.cpu_model, cpu_flag): _LOGGER.debug( "%s: Not matching CPU flag %r for CPU family %s and CPU model %s, not registering unit", unit_name, cpu_flag, hw_used.cpu_family, hw_used.cpu_model, ) return False _LOGGER.debug( "%s: Used CPU family %s and CPU model %s provides all CPU flags required %r", unit_name, hw_used.cpu_family, hw_used.cpu_model, cpu_flags, ) # Software present. runtime_used = builder_context.project.runtime_environment python_version_spec = runtime_environment_dict.get("python_version") if not cls._check_version(runtime_used.python_version, python_version_spec): _LOGGER.debug( "%s: Not matching Python version used (using %r; expected %r)", unit_name, runtime_used.python_version, python_version_spec, ) return False cuda_version_spec = runtime_environment_dict.get("cuda_version") if not cls._check_version(runtime_used.cuda_version, cuda_version_spec): _LOGGER.debug( "%s: Not matching CUDA version used (using %r; expected %r)", unit_name, runtime_used.cuda_version, cuda_version_spec, ) return False platforms = runtime_environment_dict.get("platforms") if platforms is not None and runtime_used.platform not in _ValueList(platforms): _LOGGER.debug("%s: Not matching platform used (using %r)", unit_name, runtime_used.platform) return False openblas_version_spec = runtime_environment_dict.get("openblas_version") if not cls._check_version(runtime_used.openblas_version, openblas_version_spec): _LOGGER.debug( "%s: Not matching openblas version used (using %r; expected %r)", unit_name, runtime_used.openblas_version, openblas_version_spec, ) return False openmpi_version_spec = runtime_environment_dict.get("openmpi_version") if not cls._check_version(runtime_used.openmpi_version, openmpi_version_spec): _LOGGER.debug( "%s: Not matching openmpi version used (using %r; expected %r)", unit_name, runtime_used.openmpi_version, openmpi_version_spec, ) return False cudnn_version_spec = runtime_environment_dict.get("cudnn_version") if not cls._check_version(runtime_used.cudnn_version, cudnn_version_spec): _LOGGER.debug( "%s: Not matching cudnn version used (using %r; expected %r)", unit_name, runtime_used.cudnn_version, cudnn_version_spec, ) return False mkl_version_spec = runtime_environment_dict.get("mkl_version") if not cls._check_version(runtime_used.mkl_version, mkl_version_spec): _LOGGER.debug( "%s: Not matching mkl version used (using %r; expected %r)", unit_name, runtime_used.mkl_version, mkl_version_spec, ) return False base_images = runtime_environment_dict.get("base_images") if base_images is not None and runtime_used.base_image not in _ValueListBaseImage(base_images): _LOGGER.debug("%s: Not matching base image used (using %r)", unit_name, runtime_used.base_image) return False # All the following require base image information. base_image = None if runtime_used.base_image: base_image = cls.get_base_image(runtime_used.base_image, raise_on_error=True) abi = runtime_environment_dict.get("abi") if abi: if not base_image: _LOGGER.debug( "%s: Check on ABI present but no base image provided", unit_name, ) return False symbols_present = set( builder_context.graph.get_thoth_s2i_analyzed_image_symbols_all( base_image[0], base_image[1], is_external=False, ) ) if not symbols_present: if builder_context.iteration == 0: _LOGGER.warning( "%s: No symbols found for runtime environment %r", unit_name, runtime_used.base_image ) return False if isinstance(abi, dict) and "not" in abi: # Negate operation. if symbols_present.intersection(set(abi["not"])): _LOGGER.debug("%s: Not matching ABI present in the base image", unit_name) return False else: return True elif isinstance(abi, list): if set(abi).issubset(symbols_present): return True else: _LOGGER.debug("%s: Not matching ABI present in the base image", unit_name) return False else: _LOGGER.error("%s: Unknown ABI definition - please report this error to administrator", unit_name) return False rpm_packages = runtime_environment_dict.get("rpm_packages") if rpm_packages: if not base_image: _LOGGER.debug( "%s: Check on RPM packages present but no base image provided", unit_name, ) return False analysis_document_id = builder_context.graph.get_last_analysis_document_id( base_image[0], base_image[1], is_external=False, ) if not analysis_document_id: if builder_context.iteration == 0: _LOGGER.warning( "%s: No analysis for base container image %r found", unit_name, runtime_used.base_image, ) return False rpm_packages_present = builder_context.graph.get_rpm_package_version_all(analysis_document_id) if not rpm_packages_present: _LOGGER.debug("%s: No RPM packages found for %r", unit_name, runtime_used.base_image) return False if not cls._check_rpm_packages(unit_name, rpm_packages_present, rpm_packages): _LOGGER.debug( "%s: Not matching RPM packages present in the base image %r", unit_name, runtime_used.base_image ) return False python_packages = runtime_environment_dict.get("python_packages") if python_packages: if not base_image: _LOGGER.debug( "%s: Check on Python packages present but no base image provided", unit_name, ) return False analysis_document_id = builder_context.graph.get_last_analysis_document_id( base_image[0], base_image[1], is_external=False, ) if not analysis_document_id: if builder_context.iteration == 0: _LOGGER.warning( "%s: No analysis for base container image %r found", unit_name, runtime_used.base_image, ) return False python_packages_present = builder_context.graph.get_python_package_version_all(analysis_document_id) if not python_packages_present: _LOGGER.debug("%s: No Python packages found for %r", unit_name, runtime_used.base_image) return False if not cls._check_python_packages(unit_name, python_packages_present, python_packages): _LOGGER.debug( "%s: Not matching Python packages present in the base image %r", unit_name, runtime_used.base_image ) return False return True @classmethod def _index_url_check(cls, index_url_conf: Optional[Union[str, Dict[str, str]]], index_url: str) -> bool: """Convert index_url to a comparable object considering "not".""" if index_url_conf is None: return True if isinstance(index_url_conf, dict): if list(index_url_conf.keys()) != ["not"]: raise ValueError("index_url configuration should state directly string or a 'not' value") return index_url_conf["not"] != index_url else: return index_url_conf == index_url @classmethod def _check_python_packages( cls, unit_name: str, python_packages_present: List[Dict[str, str]], python_packages_required: List[Dict[str, str]], ) -> bool: """Check if required Python packages are present in the environment.""" # Convert to dict to have O(1) access time. py_packages_present_dict: Dict[str, List[Dict[str, str]]] = {} for python_package_present in python_packages_present: package = py_packages_present_dict.get(python_package_present["package_name"]) if package is None: py_packages_present_dict[python_package_present["package_name"]] = [python_package_present] else: package.append(python_package_present) if isinstance(python_packages_required, dict): if "not" not in python_packages_required: _LOGGER.error("%s: Unable to parse description of Python packages required", unit_name) return False for not_required_python_package in python_packages_required["not"]: for py_package_present in py_packages_present_dict.get(not_required_python_package["name"]) or []: location = not_required_python_package.get("location") if location is not None and not re.fullmatch(location, py_package_present["location"]): _LOGGER.debug( "%s: Python package %r in %r is located in different location %r as expected", unit_name, not_required_python_package["name"], py_package_present["location"], location, ) continue version = not_required_python_package.get("version") if version and py_package_present["package_version"] not in SpecifierSet(version): _LOGGER.debug( "%s: Python package '%s==%s' (in %r) matches version %r but should not", unit_name, not_required_python_package["name"], py_package_present["package_version"], py_package_present["location"], version, ) continue _LOGGER.debug( "%s: presence of Python package %r causes not including the pipeline unit", unit_name, py_package_present, ) return False else: for required_python_package in python_packages_required: for py_package_present in py_packages_present_dict.get(required_python_package["name"]) or []: version = required_python_package.get("version") if version and py_package_present["package_version"] not in SpecifierSet(version): _LOGGER.debug( "%s: Python package '%s==%s' (in %r) does not match required version %r", unit_name, required_python_package["name"], py_package_present["package_version"], py_package_present.get("location", "any"), version, ) continue location = required_python_package.get("location") if location is not None and not re.fullmatch(location, py_package_present["location"]): _LOGGER.debug( "%s: Python package %r is located at %r but expected to be in %r", unit_name, required_python_package["name"], py_package_present["location"], location, ) continue _LOGGER.debug( "%s: Python package %r in version %r (located in %r) is found in the runtime environment", unit_name, required_python_package["name"], required_python_package.get("version", "any"), py_package_present.get("location", "any"), ) break else: _LOGGER.debug( "%s: Not including as Python package %r (in version %r) is not present in the environment", unit_name, required_python_package["name"], required_python_package.get("version", "any"), ) return False _LOGGER.debug("%s: all Python package presence checks passed", unit_name) return True @staticmethod def _check_rpm_packages( unit_name: str, rpm_packages_present: List[Dict[str, str]], rpm_packages_required: Union[List[Dict[str, str]], Dict[str, List[Dict[str, str]]]], ) -> bool: """Check if required RPM packages are present.""" # Convert RPM packages present to mapping to save some cycles and have O(1) look up. rpm_packages_pres = {i["package_name"]: i for i in rpm_packages_present} rpm_packages_req: List[Dict[str, str]] if isinstance(rpm_packages_required, dict): if "not" not in rpm_packages_required: _LOGGER.error("%s: Unable to parse description of RPM packages required", unit_name) return False should_be_present = False rpm_packages_req = [i for i in rpm_packages_required["not"]] else: should_be_present = True rpm_packages_req = [i for i in rpm_packages_required] for rpm_package_req in rpm_packages_req: rpm_name = rpm_package_req["package_name"] rpm_present = rpm_packages_pres.get(rpm_name) if should_be_present: if not rpm_present: _LOGGER.debug( "%s: Not including unit as RPM %r is not present in the runtime environment", unit_name, rpm_name, ) return False for key, value in rpm_package_req.items(): value_present = rpm_present.get(key) if value_present != value: _LOGGER.debug( "%s: Not including unit as RPM %r has not matching %r - expected %r got %r", unit_name, rpm_name, key, value, value_present, ) return False else: if not rpm_present: # If just one is not present, we know the unit is included. return True for key, value in rpm_package_req.items(): value_present = rpm_present.get(key) if value_present != value: _LOGGER.debug( "%s: Not including unit as RPM %s has matching %r - expected %r got %r", unit_name, rpm_name, key, value, value_present, ) return True if not should_be_present: _LOGGER.debug("%s: Not including unit as all RPM are present in the runtime environment", unit_name) return False # Path to should be present. _LOGGER.debug("%s: all RPM package presence checks passed", unit_name) return True @classmethod def _prepare_justification_link(cls, entries: List[Dict[str, Any]]) -> None: """Prepare justification links before using them.""" for entry in entries: link = entry.get("link") if link and not link.startswith(("https://", "http://")): entry["link"] = jl(link) @property def name(self) -> str: """Get name of the prescription instance.""" name: str = self.prescription["name"] return name
[docs] def pre_run(self) -> None: """Prepare this pipeline unit before running it.""" self._prepare_justification_link(self.run_prescription.get("stack_info", [])) self._configuration["prescription"]["run"] = False super().pre_run()
@staticmethod def _yield_should_include(unit_prescription: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: """Yield for every entry stated in the match field.""" match = unit_prescription.get("match", {}) run = unit_prescription.get("run", {}) prescription_conf = {"run": False} if isinstance(match, list): for item in match: yield { "package_name": item["package_version"].get("name") if item else None, "match": item, "run": run, "prescription": prescription_conf, } else: yield { "package_name": match["package_version"].get("name") if match else None, "match": match, "run": run, "prescription": prescription_conf, } @staticmethod def _yield_should_include_with_state(unit_prescription: Dict[str, Any]) -> Generator[Dict[str, Any], None, None]: """Yield for every entry stated in the match field.""" match = unit_prescription.get("match", {}) prescription_conf = {"run": False} if isinstance(match, list): for item in match: match_resolved = item.get("state", {}).get("resolved_dependencies") yield { # Return the first package name that should be matched to keep optimization for wrap calls. "package_name": match_resolved[0].get("name") if match_resolved else None, "match": item, "run": unit_prescription["run"], "prescription": prescription_conf, } else: match_resolved = match.get("state", {}).get("resolved_dependencies") if match else None yield { "package_name": match_resolved[0].get("name") if match_resolved else None, "match": match, "run": unit_prescription["run"], "prescription": prescription_conf, } def _run_log(self) -> None: """Log message specified in the run prescription.""" if self._configuration["prescription"]["run"]: # Noop. The prescription was already run. return log = self.run_prescription.get("log") if log: _LOGGER.log(level=getattr(logging, log["type"]), msg=f"{self.name}: {log['message']}") def _run_stack_info(self) -> None: """Add stack info if any prescribed.""" if self._configuration["prescription"]["run"]: # Noop. The prescription was already run. return stack_info = self.run_prescription.get("stack_info") if stack_info: self.context.stack_info.extend(stack_info) def _check_package_tuple_from_prescription( self, dependency_tuple: Tuple[str, str, str], dependency: Dict[str, str] ) -> bool: """Check if the given package version tuple matches with what was written in prescription.""" develop = dependency.get("develop") if develop is not None: package_version = self.context.get_package_version(dependency_tuple, graceful=True) if not package_version: return False if package_version.develop != develop: return False if not self._index_url_check(dependency.get("index_url"), dependency_tuple[2]): return False version = dependency.get("version") if version is not None: specifier = SpecifierSet(version) if dependency_tuple[1] not in specifier: return False return True def _run_state(self, state: State) -> bool: """Check state match.""" state_prescription = self.match_prescription.get("state") if not state_prescription: # Nothing to check. return True for resolved_dependency in state_prescription.get("resolved_dependencies", []): resolved = state.resolved_dependencies.get(resolved_dependency["name"]) if not resolved: return False if not self._check_package_tuple_from_prescription(resolved, resolved_dependency): return False return True def _run_state_with_initiator(self, state: State, package_version: PackageVersion) -> bool: """Check state match respecting also initiator of the give package.""" state_prescription = self.match_prescription.get("state") if not state_prescription: # Nothing to check. return True package_version_from = state_prescription.get("package_version_from") or [] # XXX: we explicitly do not consider runtime environment as we expect to have it only one here. dependents = { i[0] for i in self.context.dependents.get(package_version.name, {}).get(package_version.to_tuple(), set()) } for resolved_dependency in package_version_from: resolved = state.resolved_dependencies.get(resolved_dependency["name"]) if not resolved: return False if not self._check_package_tuple_from_prescription(resolved, resolved_dependency): return False if resolved not in dependents: _LOGGER.debug( "Package %r stated in package_version_from not a did not introduce package %r", resolved, package_version.to_tuple(), ) return False dependents.discard(resolved) if dependents and not state_prescription.get("package_version_from_allow_other", False): for dependent in dependents: if dependent == state.resolved_dependencies.get(dependent[0]): return False for resolved_dependency in state_prescription.get("resolved_dependencies", []): resolved = state.resolved_dependencies.get(resolved_dependency["name"]) if not resolved: return False if not self._check_package_tuple_from_prescription(resolved, resolved_dependency): return False return True def _run_base(self) -> None: """Implement base routines for run part of the prescription.""" self._run_log() self._run_stack_info() not_acceptable = self.run_prescription.get("not_acceptable") if not_acceptable: raise NotAcceptable(not_acceptable) eager_stop_pipeline = self.run_prescription.get("eager_stop_pipeline") if eager_stop_pipeline: raise EagerStopPipeline(eager_stop_pipeline)