thoth.adviser.prescription.v1 package

Submodules

thoth.adviser.prescription.v1.add_package_step module

A prescription step implementing adding a package to a dependency graph.

class thoth.adviser.prescription.v1.add_package_step.AddPackageStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

Add package prescription step unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': Any(<Schema({'multi_package_resolution': <class 'bool'>, 'package_version': <Schema({'name': <function _python_package_name>, 'locked_version': <function _locked_version>, 'index_url': All(<class 'str'>, Length(min=1, max=None), msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → None[source]

Run main entry-point for steps to skip packages.

thoth.adviser.prescription.v1.boot module

A prescription for boot units.

class thoth.adviser.prescription.v1.boot.BootPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Boot prescription implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': Any(<Schema({'package_name': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'run': <Schema({'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_boot_unit_type() → bool[source]

Check if this unit is of type boot.

run() → None[source]

Run main entry-point for boot units.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.gh_release_notes module

GitHub release notes pipeline unit.

class thoth.adviser.prescription.v1.gh_release_notes.GHReleaseNotesWrapPrescription(*, unit_run: bool = False, stack_info_run: bool = False, prescription: Dict[str, Any] = NOTHING, configuration: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

GitHub release notes pipeline unit.

CONFIGURATION_DEFAULT = {'package_name': None, 'package_version': None, 'release_notes': None}
CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'release_notes': <Schema({'organization': All(<class 'str'>, Length(min=1, max=None), msg=None), 'tag_version_prefix': All(<class 'str'>, Length(min=1, max=None), msg=None), 'repository': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, 'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_wrap_unit_type() → bool[source]

Check if this unit is of type wrap.

run(state: thoth.adviser.state.State) → None[source]

Add release information to justification for selected packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Include this pipeline unit.

thoth.adviser.prescription.v1.group module

A base class for implementing group steps.

class thoth.adviser.prescription.v1.group.GroupStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

A base class for implementing group steps.

The declarative interface triggers registration of multiple pipeline units for all the package versions stated. All the pipeline units have shared context to exchange shared information (ex. if stack_info was already added and so). This is an optimization to make sure the pipeline unit is run only if needed (based on package_name).

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': {'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, 'multi_package_resolution': True, 'run': <Schema({'score': <class 'float'>, 'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>

thoth.adviser.prescription.v1.prescription module

Implementation of a prescription abstraction.

class thoth.adviser.prescription.v1.prescription.Prescription(*, prescriptions: List[Tuple[str, str]] = NOTHING, boots_dict: Dict[str, Dict[str, Any]] = NOTHING, pseudonyms_dict: Dict[str, Dict[str, Any]] = NOTHING, sieves_dict: Dict[str, Dict[str, Any]] = NOTHING, steps_dict: Dict[str, Dict[str, Any]] = NOTHING, strides_dict: Dict[str, Dict[str, Any]] = NOTHING, wraps_dict: Dict[str, Dict[str, Any]] = NOTHING)[source]

Bases: object

Dynamically create pipeline units based on inscription.

boots_dict
classmethod from_dict(prescription: Dict[str, Any], *, prescription_instance: Optional[Prescription] = None, prescription_name: str, prescription_release: str) → thoth.adviser.prescription.v1.prescription.Prescription[source]

Instantiate prescription from a dictionary representation.

If an instance is provided, a safe merge will be performed.

is_empty() → bool[source]

Check if no prescription units are loaded.

iter_boot_units() → Generator[Type[BootType], None, None][source]

Iterate over prescription boot units registered in the prescription supplied.

iter_pseudonym_units() → Generator[Type[PseudonymType], None, None][source]

Iterate over prescription pseudonym units registered in the prescription supplied.

iter_sieve_units() → Generator[Type[SieveType], None, None][source]

Iterate over prescription sieve units registered in the prescription supplied.

iter_step_units() → Generator[Type[StepType], None, None][source]

Iterate over prescription step units registered in the prescription supplied.

iter_stride_units() → Generator[Type[StrideType], None, None][source]

Iterate over prescription stride units registered in the prescription supplied.

iter_wrap_units() → Generator[Type[WrapType], None, None][source]

Iterate over prescription stride units registered in the prescription supplied.

classmethod load(*prescriptions: str) → thoth.adviser.prescription.v1.prescription.Prescription[source]

Load prescription from files or from their YAML representation.

prescriptions
pseudonyms_dict
sieves_dict
steps_dict
strides_dict
property units

Iterate over units.

classmethod validate(prescriptions: str) → thoth.adviser.prescription.v1.prescription.Prescription[source]

Validate the given prescription.

wraps_dict

thoth.adviser.prescription.v1.pseudonym module

A base class for implementing package pseudonyms.

class thoth.adviser.prescription.v1.pseudonym.PseudonymPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Pseudonym base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'yield': <Schema({'yield_matched_version': <class 'bool'>, 'package_version': <Schema({'name': <function _python_package_name>, 'locked_version': <function _locked_version>, 'index_url': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_pseudonym_unit_type() → bool[source]

Check if this unit is of type pseudonym.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(package_version: thoth.python.package_version.PackageVersion) → Generator[Tuple[str, str, str], None, None][source]

Run main entry-point for pseudonyms to map packages to their counterparts.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.schema module

JSON schema for pipeline unit prescription in version v1.

thoth.adviser.prescription.v1.sieve module

A base class for implementing sieves.

class thoth.adviser.prescription.v1.sieve.SievePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Sieve base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': Any(<Schema({'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_sieve_unit_type() → bool[source]

Check if this unit is of type sieve.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(package_versions: Generator[thoth.python.package_version.PackageVersion, None, None]) → Generator[thoth.python.package_version.PackageVersion, None, None][source]

Run main entry-point for sieves to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.skip_package_sieve module

A prescription sieve implementing skipping a package in a dependency graph.

class thoth.adviser.prescription.v1.skip_package_sieve.SkipPackageSievePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Skip package sieve prescription unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'run': Any(<Schema({'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'match': <Schema({'package_name': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_sieve_unit_type() → bool[source]

Check if this unit is of type sieve.

pre_run() → None[source]

Initialize this unit before each run.

run(_: Generator[thoth.python.package_version.PackageVersion, None, None]) → Generator[thoth.python.package_version.PackageVersion, None, None][source]

Run main entry-point for sieves to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.skip_package_step module

A prescription step implementing skipping a package in a dependency graph.

class thoth.adviser.prescription.v1.skip_package_step.SkipPackageStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

Skip package prescription step unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': Any(<Schema({'multi_package_resolution': <class 'bool'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → None[source]

Run main entry-point for steps to skip packages.

thoth.adviser.prescription.v1.step module

A base class for implementing steps.

class thoth.adviser.prescription.v1.step.StepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Step base class implementation.

Configuration option multi_package_resolution states whether a step should be run if package is resolved multiple times for the same stack.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': <Schema({'score': <class 'float'>, 'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'multi_package_resolution': <class 'bool'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
SCORE_MAX = 1.0
SCORE_MIN = -1.0
static is_step_unit_type() → bool[source]

Check if this unit is of type step.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → Optional[Tuple[Optional[float], Optional[List[Dict[str, str]]]]][source]

Run main entry-point for steps to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.stride module

A base class for implementing strides.

class thoth.adviser.prescription.v1.stride.StridePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Stride base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'state': <Schema({'resolved_dependencies': [<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>]}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_stride_unit_type() → bool[source]

Check if this unit is of type stride.

run(state: thoth.adviser.state.State) → None[source]

Run main entry-point for strides.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

thoth.adviser.prescription.v1.unit module

A base class for prescription based pipeline units.

class thoth.adviser.prescription.v1.unit.UnitPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.unit.Unit

A base class for implementing pipeline units based on prescription supplied.

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': <class 'object'>, 'run': <class 'object'>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
SHOULD_INCLUDE_CACHE = {}
classmethod get_unit_name() → str[source]

Get the name of the current prescription unit.

This method is a class method and MUST NOT be used when obtaining unit name on an instance. As part of the memory optimization we use class to get the current name of a prescription unit with assigned prescription. This means that the prescription unit instance would have different names reported with this method based on the current class context.

property match_prescription

Get match part of the prescription assigned.

property name

Get name of the prescription instance.

pre_run() → None[source]

Prepare this pipeline unit before running it.

prescription
property run_prescription

Get run part of the prescription assigned.

classmethod set_prescription(prescription: Dict[str, Any]) → None[source]

Set prescription to the unit.

thoth.adviser.prescription.v1.unit_cache module

Handle caching of pipeline units.

thoth.adviser.prescription.v1.unit_cache.should_include_cache(func: SHOULD_INCLUDE_FUNC_TYPE) → SHOULD_INCLUDE_FUNC_TYPE[source]

Handle caching for parts of prescription units that do not change during pipeline construction.

thoth.adviser.prescription.v1.wrap module

A base class for implementing wrap units.

class thoth.adviser.prescription.v1.wrap.WrapPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Wrap base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'state': <Schema({'resolved_dependencies': [<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>]}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'advised_manifest_changes': <class 'object'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_wrap_unit_type() → bool[source]

Check if this unit is of type wrap.

pre_run() → None[source]

Prepare this pipeline unit before run.

run(state: thoth.adviser.state.State) → None[source]

Run main entry-point for wrap units to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

Module contents

Schema v1 based prescription units.

class thoth.adviser.prescription.v1.AddPackageStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

Add package prescription step unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': Any(<Schema({'multi_package_resolution': <class 'bool'>, 'package_version': <Schema({'name': <function _python_package_name>, 'locked_version': <function _locked_version>, 'index_url': All(<class 'str'>, Length(min=1, max=None), msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → None[source]

Run main entry-point for steps to skip packages.

class thoth.adviser.prescription.v1.BootPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Boot prescription implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': Any(<Schema({'package_name': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'run': <Schema({'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_boot_unit_type() → bool[source]

Check if this unit is of type boot.

run() → None[source]

Run main entry-point for boot units.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.GHReleaseNotesWrapPrescription(*, unit_run: bool = False, stack_info_run: bool = False, prescription: Dict[str, Any] = NOTHING, configuration: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

GitHub release notes pipeline unit.

CONFIGURATION_DEFAULT = {'package_name': None, 'package_version': None, 'release_notes': None}
CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'release_notes': <Schema({'organization': All(<class 'str'>, Length(min=1, max=None), msg=None), 'tag_version_prefix': All(<class 'str'>, Length(min=1, max=None), msg=None), 'repository': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, 'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_wrap_unit_type() → bool[source]

Check if this unit is of type wrap.

run(state: thoth.adviser.state.State) → None[source]

Add release information to justification for selected packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Include this pipeline unit.

class thoth.adviser.prescription.v1.GroupStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

A base class for implementing group steps.

The declarative interface triggers registration of multiple pipeline units for all the package versions stated. All the pipeline units have shared context to exchange shared information (ex. if stack_info was already added and so). This is an optimization to make sure the pipeline unit is run only if needed (based on package_name).

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': {'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, 'multi_package_resolution': True, 'run': <Schema({'score': <class 'float'>, 'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
class thoth.adviser.prescription.v1.PseudonymPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Pseudonym base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'yield': <Schema({'yield_matched_version': <class 'bool'>, 'package_version': <Schema({'name': <function _python_package_name>, 'locked_version': <function _locked_version>, 'index_url': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_pseudonym_unit_type() → bool[source]

Check if this unit is of type pseudonym.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(package_version: thoth.python.package_version.PackageVersion) → Generator[Tuple[str, str, str], None, None][source]

Run main entry-point for pseudonyms to map packages to their counterparts.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.SievePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Sieve base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': Any(<Schema({'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_sieve_unit_type() → bool[source]

Check if this unit is of type sieve.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(package_versions: Generator[thoth.python.package_version.PackageVersion, None, None]) → Generator[thoth.python.package_version.PackageVersion, None, None][source]

Run main entry-point for sieves to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.SkipPackageSievePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Skip package sieve prescription unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'run': Any(<Schema({'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'match': <Schema({'package_name': All(<class 'str'>, Length(min=1, max=None), msg=None)}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_sieve_unit_type() → bool[source]

Check if this unit is of type sieve.

pre_run() → None[source]

Initialize this unit before each run.

run(_: Generator[thoth.python.package_version.PackageVersion, None, None]) → Generator[thoth.python.package_version.PackageVersion, None, None][source]

Run main entry-point for sieves to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.SkipPackageStepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.step.StepPrescription

Skip package prescription step unit implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': Any(<Schema({'multi_package_resolution': <class 'bool'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, None, msg=None), 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → None[source]

Run main entry-point for steps to skip packages.

class thoth.adviser.prescription.v1.StepPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Step base class implementation.

Configuration option multi_package_resolution states whether a step should be run if package is resolved multiple times for the same stack.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'package_version': <Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>, 'state': <Schema({'resolved_dependencies': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from': All([<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'package_version_from_allow_other': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'multi_package_resolution': <class 'bool'>, 'run': <Schema({'score': <class 'float'>, 'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'multi_package_resolution': <class 'bool'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
SCORE_MAX = 1.0
SCORE_MIN = -1.0
static is_step_unit_type() → bool[source]

Check if this unit is of type step.

pre_run() → None[source]

Prepare before running this pipeline unit.

run(state: thoth.adviser.state.State, package_version: thoth.python.package_version.PackageVersion) → Optional[Tuple[Optional[float], Optional[List[Dict[str, str]]]]][source]

Run main entry-point for steps to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.StridePrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Stride base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'state': <Schema({'resolved_dependencies': [<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>]}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'not_acceptable': All(<class 'str'>, Length(min=1, max=None), msg=None), 'eager_stop_pipeline': All(<class 'str'>, Length(min=1, max=None), msg=None), 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_stride_unit_type() → bool[source]

Check if this unit is of type stride.

run(state: thoth.adviser.state.State) → None[source]

Run main entry-point for strides.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.

class thoth.adviser.prescription.v1.UnitPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.unit.Unit

A base class for implementing pipeline units based on prescription supplied.

CONFIGURATION_SCHEMA = <Schema({'package_name': <class 'str'>, 'match': <class 'object'>, 'run': <class 'object'>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
SHOULD_INCLUDE_CACHE = {}
classmethod get_unit_name() → str[source]

Get the name of the current prescription unit.

This method is a class method and MUST NOT be used when obtaining unit name on an instance. As part of the memory optimization we use class to get the current name of a prescription unit with assigned prescription. This means that the prescription unit instance would have different names reported with this method based on the current class context.

property match_prescription

Get match part of the prescription assigned.

property name

Get name of the prescription instance.

pre_run() → None[source]

Prepare this pipeline unit before running it.

prescription
property run_prescription

Get run part of the prescription assigned.

classmethod set_prescription(prescription: Dict[str, Any]) → None[source]

Set prescription to the unit.

class thoth.adviser.prescription.v1.WrapPrescription(*, unit_run: bool = False, stack_info_run: bool = False, configuration: Dict[str, Any] = NOTHING, prescription: Dict[str, Any] = NOTHING)[source]

Bases: thoth.adviser.prescription.v1.unit.UnitPrescription

Wrap base class implementation.

CONFIGURATION_SCHEMA = <Schema({'package_name': Any(<class 'str'>, None, msg=None), 'match': <Schema({'state': <Schema({'resolved_dependencies': [<Schema({'name': <function _python_package_name>, 'version': <function _specifier_set>, 'index_url': Any(All(<class 'str'>, Length(min=1, max=None), msg=None), {'not': All(<class 'str'>, Length(min=1, max=None), msg=None)}, msg=None), 'develop': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>]}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'run': <Schema({'justification': All([<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>, 'package_name': <function _python_package_name>, 'advisory': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_id': All(<class 'str'>, Length(min=1, max=None), msg=None), 'cve_name': All(<class 'str'>, Length(min=1, max=None), msg=None), 'version_range': <function _specifier_set>}, extra=PREVENT_EXTRA, required=False) object>], Length(min=1, max=None), msg=None), 'advised_manifest_changes': <class 'object'>, 'stack_info': [<Schema({'type': Any('WARNING', 'INFO', 'ERROR', msg=None), 'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'link': <function _justification_link>}, extra=PREVENT_EXTRA, required=False) object>], 'log': <Schema({'message': All(<class 'str'>, Length(min=1, max=None), msg=None), 'type': Any('WARNING', 'INFO', 'ERROR', msg=None)}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>, 'prescription': <Schema({'run': <class 'bool'>}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
static is_wrap_unit_type() → bool[source]

Check if this unit is of type wrap.

pre_run() → None[source]

Prepare this pipeline unit before run.

run(state: thoth.adviser.state.State) → None[source]

Run main entry-point for wrap units to filter and score packages.

classmethod should_include(builder_context: PipelineBuilderContext) → Generator[Dict[str, Any], None, None][source]

Check if the given pipeline unit should be included in the given pipeline configuration.