thoth.common package

Subpackages

Submodules

thoth.common.enums module

Enum types used for Thoth adviser integrations.

class thoth.common.enums.InternalTriggerEnum(value)[source]

Bases: enum.Enum

Class containing different types of triggers, this is used to generate justifications for PRs and issues.

CVE = 1
HASH_MISMATCH = 2
MISSING_PACKAGE = 3
MISSING_VERSION = 4
NEW_RELEASE = 5
class thoth.common.enums.ThothAdviserIntegrationEnum(value)[source]

Bases: enum.Enum

Class for Thoth Adviser integrations.

CLI = 1
JUPYTER_NOTEBOOK = 5
KEBECHET = 2
S2I = 3

thoth.common.exceptions module

Exceptions used within thoth-common package.

exception thoth.common.exceptions.ConfigurationError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

Raised on miss-configuration issues.

exception thoth.common.exceptions.KebechetInputsMissingError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

An exception raised if there are inputs missing for Kebechet.

exception thoth.common.exceptions.NotFoundExceptionError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

Raised if the given resource cannot be found.

exception thoth.common.exceptions.NotKnownThothIntegrationError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

An exception raised if the given integration is not known to Thoth.

exception thoth.common.exceptions.SolverNameParseError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

Raised if unable to determine solver information out of solver name run.

exception thoth.common.exceptions.ThothCommonExceptionError[source]

Bases: Exception

A base class for Thoth-common exception hierarchy.

exception thoth.common.exceptions.WorkflowError[source]

Bases: thoth.common.exceptions.ThothCommonExceptionError

Raised on workflow related issues.

thoth.common.helpers module

Various utilities to make your life easier.

class thoth.common.helpers.Lazy(calculate_function: Callable[[...], Any])[source]

Bases: object

Calculates function exactly once then sets it to be and attribute of object.

Intended to optimize cases in which a class function is called and does not change after repeated calls. Attribute lookup is ~2x as fast as even the simples function calls.

thoth.common.helpers.cwd(target: str) Generator[str, None, None][source]

Manage cwd in a pushd/popd fashion.

thoth.common.helpers.datetime2datetime_str(dt: Optional[datetime.datetime] = None) str[source]

Create a string representation of a datetime.

thoth.common.helpers.datetime_str2timestamp(datetime_string: str) int[source]

Parse datetime string represented in ISO format and return timestamp.

thoth.common.helpers.datetime_str_from_timestamp(timestamp: int) str[source]

Convert a timestamp to datetime string representation.

thoth.common.helpers.format_datetime(dt: datetime.datetime) str[source]

Return datetime string in default format.

thoth.common.helpers.get_default_datetime_format() str[source]

Return default datetime format string.

Construct a link to a detailed justification document.

thoth.common.helpers.get_service_account_token() str[source]

Get token from service account token file.

thoth.common.helpers.map_os_name(os_name: Optional[str]) Optional[str][source]

Map operating system name.

thoth.common.helpers.normalize_os_version(os_name: Optional[str], os_version: Optional[str]) Optional[str][source]

Normalize operating system version based on operating system used.

thoth.common.helpers.parse_datetime(datetime_string: str) datetime.datetime[source]

Parse datetime string represented in ISO format.

thoth.common.helpers.timestamp2datetime(timestamp: int) datetime.datetime[source]

Convert a timestamp to datetime respecting UTC.

thoth.common.helpers.to_camel_case(obj: thoth.common.helpers.T) thoth.common.helpers.T[source]

Convert dictionary keys to camelCase.

thoth.common.helpers.to_snake_case(obj: thoth.common.helpers.T) thoth.common.helpers.T[source]

Convert dictionary keys to snake_case.

thoth.common.json module

Manipulation with JSON format.

class thoth.common.json.SafeJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

Convert objects to JSON, safely.

default(o: Any) Any[source]

Convert an object to JSON, safely.

thoth.common.logging module

Logging configuration for whole Thoth.

thoth.common.logging.before_send_handler(event: Dict[str, Any], hint: Dict[str, Any]) Optional[Dict[str, Any]][source]

Filter the errors caught before sending to Sentry.

This function ignores the exceptions passed in as a environment variable in a comma separated manner.

thoth.common.logging.init_logging(logging_configuration: Optional[Dict[str, str]] = None, logging_env_var_start: Optional[str] = None) None[source]

Initialize Thoth’s logging - respects all namespaces.

This function allows you to control logging facilities in Thoth. Logging can be configured via env variables so that deployment can respect your configuration. The structure of environment variables is THOTH_LOG_(MODULE) and the value of env variable states verbosity level as in the logging module (DEBUG, INFO, WARNING, ERROR).

>>> import os
>>> os.environ['THOTH_LOG_SOLVER']
> WARNING

You can also specify more closely which sub-module logging you are configuring - submodules are separated with double dash:

>>> os.environ['THOTH_LOG_SOLVER__PYTHON']
> DEBUG

You can also use arguments explicitly that override configuration in env variables (a shorthand for standard logging functionality):

>>> init_logging({'thoth.solver': 'DEBUG'})

Optionally you can specify prefix of the logging environment variable determining logging configuration via env vars (defaults to THOTH_LOG_).

thoth.common.openshift module

Handling OpenShift and Kubernetes objects across project.

class thoth.common.openshift.OpenShift(*, frontend_namespace: Optional[str] = None, middletier_namespace: Optional[str] = None, backend_namespace: Optional[str] = None, infra_namespace: Optional[str] = None, graph_namespace: Optional[str] = None, amun_infra_namespace: Optional[str] = None, amun_inspection_namespace: Optional[str] = None, kubernetes_api_url: Optional[str] = None, kubernetes_verify_tls: bool = True, openshift_api_url: Optional[str] = None, token: Optional[str] = None, token_file: Optional[str] = None, cert_file: Optional[str] = None, environ: Optional[Dict[str, str]] = None)[source]

Bases: object

Interaction with OpenShift Master.

create_config_map(configmap_name: str, namespace: str, labels: Dict[str, str], data: Dict[str, str]) str[source]

Create a ConfigMap in the given namespace.

static generate_id(prefix: Optional[str] = None, identifier: Optional[str] = None) str[source]

Generate an identifier.

get_build(build_id: str, namespace: str) Dict[str, Any][source]

Get a build in the given namespace.

get_build_log(build_id: str, namespace: str) str[source]

Get log of a build in the given namespace.

get_buildconfig(buildconfig_id: str, namespace: str) Dict[str, Any][source]

Get a buildconfig in the given namespace.

get_cache_expiration_configuration() Optional[int][source]

Retrieve cache expiration configuration [s].

get_configmap(configmap_id: str, namespace: str) Dict[str, Any][source]

Get the given configmap in a namespace, return object representing config map.

get_configmaps(namespace: str, label_selector: str) Dict[str, Any][source]

Get all configmaps in a namespace and select them by label.

get_image_streams(namespace: str, label_selector: str) Dict[str, Any][source]

Get all image streams in a namespace and select them by label.

get_job_log(job_id: str, namespace: str) Optional[str][source]

Get log of a pod running inside a job.

get_job_logs(job_id: str, namespace: str) Optional[str][source]

Get log of a pod running inside a job.

get_job_status(job_id: str, namespace: str) Dict[str, Any][source]

Get status of a Job and Pods created by the Job.

Raises

NotFoundError if no Job of such name is found in the namespace

get_job_status_count(label_selector: str, namespace: str) Dict[str, int][source]

Count the number of Jobs per status in a specific namespace.

get_job_status_report(job_id: str, namespace: str) Dict[str, Optional[str]][source]

Get status report of a Job and Pods created by the Job.

get_jobs(label_selector: str, namespace: Optional[str] = None) Dict[str, Any][source]

Get all Jobs, select them by the provided label.

get_mi_repositories_and_organizations() Tuple[List[str], List[str]][source]

Get all of the repositories and organizations for mi-analysis.

Return type

Tuple of (repositories, organizations)

get_pod_log(pod_id: str, namespace: Optional[str] = None, container: Optional[str] = None) Optional[str][source]

Get log of a pod based on assigned pod ID.

get_pod_status(pod_id: str, namespace: str) Dict[str, Any][source]

Get status entry for a pod - low level routine.

get_pod_status_report(pod_id: str, namespace: str) Dict[str, Optional[str]][source]

Get pod state and convert it to a user-friendly response.

get_solver_names() List[str][source]

Retrieve name of solvers available in installation.

get_workflow(name: Optional[str] = None, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Any][source]

Get Workflow from a namespace, use one of name or label_selector to identify which one to get.

get_workflow_node_log(node_name: str, workflow_id: str, namespace: str) Optional[str][source]

Get log from a task/node in a workflow.

get_workflow_node_status(node_name: str, workflow_id: str, namespace: str) Dict[str, Any][source]

Get status from a task/node in a workflow.

get_workflow_pod_name(node_name: str, workflow_id: str, namespace: str) str[source]

Get pod name where task is being executed.

get_workflow_status(name: Optional[str] = None, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Any][source]

Get a Workflow status, use one of name or label_selector to identify which one to get.

get_workflow_status_report(workflow_id: str, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Optional[str]][source]

Get workflow status report, derived from workflow status.

classmethod obtain_solver_from_runtime_environment(runtime_environment: Dict[str, Any]) Optional[str][source]

Define solver from runtime_environment.

oc_process(namespace: str, template: Dict[str, Any]) Dict[str, Any][source]

Process the given template in OpenShift.

static parse_cpu_spec(cpu_spec: Optional[str]) Optional[float][source]

Parse the given CPU requirement as used by OpenShift/Kubernetes.

static parse_memory_spec(memory_spec: Optional[str]) Optional[float][source]

Parse the given CPU requirement as used by OpenShift/Kubernetes.

classmethod parse_python_solver_name(solver_name: str) Dict[str, Any][source]

Parse os and Python identifiers encoded into solver name.

schedule_adviser(recommendation_type: str, *, count: Optional[int] = None, limit: Optional[int] = None, predictor_config: Optional[Dict[str, Any]] = None, origin: Optional[str] = None, dev: bool = False, authenticated: bool = False, debug: bool = False, job_id: Optional[str] = None, re_run_adviser_id: Optional[str] = None, source_type: Optional[str] = None, kebechet_metadata: Optional[Dict[str, Any]] = None, justification: Optional[List[Dict[str, Any]]] = None, stack_info: Optional[List[Dict[str, Any]]] = None) Optional[str][source]

Schedule an adviser run.

schedule_all_solvers(packages: str, *, indexes: Optional[List[str]] = None, dependency_indexes: Optional[List[str]] = None, force_sync: bool = False, debug: bool = False, transitive: bool = False) List[str][source]

Schedule all solvers for the given packages.

schedule_build_analysis(*, base_image: Optional[str] = None, base_image_analysis_id: Optional[str] = None, base_registry_password: Optional[str] = None, base_registry_user: Optional[str] = None, base_registry_verify_tls: bool = True, output_image: Optional[str] = None, output_image_analysis_id: Optional[str] = None, output_registry_password: Optional[str] = None, output_registry_user: Optional[str] = None, output_registry_verify_tls: bool = True, buildlog_document_id: Optional[str] = None, buildlog_parser_id: Optional[str] = None, environment_type: Optional[str] = None, origin: Optional[str] = None, debug: bool = False, is_external: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule a build analysis workflow.

schedule_dependency_monkey(*, predictor: Optional[str] = None, predictor_config: Optional[Dict[str, Any]] = None, stack_output: Optional[str] = None, seed: Optional[int] = None, dry_run: bool = False, decision: Optional[str] = None, count: Optional[int] = None, debug: bool = False, job_id: Optional[str] = None, limit_latest_versions: Optional[int] = None) Optional[str][source]

Schedule a dependency monkey run.

schedule_graph_refresh(namespace: Optional[str] = None) str[source]

Schedule graph refresh job in frontend namespace by default.

schedule_graph_schema_update(job_id: Optional[str] = None, namespace: Optional[str] = None) str[source]

Schedule graph schema update job in infra namespace by default.

schedule_graph_sync(document_id: str, force_sync: bool = False, *, job_id: Optional[str] = None) Optional[str][source]

Schedule graph sync for specific document id.

schedule_inspection(dockerfile: str, specification: Dict[str, Any], target: str, parameters: Dict[str, Any], *, raw_specification: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None) Optional[str][source]

Schedule an inspection run.

schedule_kebechet_administrator(message_info: Dict[str, str], message_type: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Administrator Workflow for an internal trigger message.

schedule_kebechet_run_url_workflow(repo_url: str, service_name: str = 'github', *, kebechet_metadata: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Workflow for a particular valid slug and service name.

schedule_kebechet_workflow(webhook_payload: Dict[str, Any], *, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Workflow for a Webhook from GitHub App..

schedule_mi_workflow(create_knowledge: Optional[bool] = False, repository: Optional[str] = None, entities: Optional[str] = None, knowledge_path: Optional[str] = None, mi_merge_path: Optional[str] = None, mi_used_for_thoth: Optional[bool] = False, mi_merge: Optional[bool] = False) Optional[str][source]

Schedule Meta-information Indicators Workflow.

Parameters
  • repository:str – GitHub repository in full name format: <repo_owner>/<repo_name>

  • entities:Optional[str] – Meta-information Indicator Entities that will be inspected multiple entities are in form of Foo,Bar,…

schedule_package_extract(image: str, *, environment_type: str, is_external: bool = True, origin: Optional[str] = None, registry_user: Optional[str] = None, registry_password: Optional[str] = None, verify_tls: bool = True, debug: bool = False, graph_sync: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule package extract workflow.

schedule_provenance_checker(*, origin: Optional[str] = None, whitelisted_sources: Optional[List[str]] = None, debug: bool = False, authenticated: bool = False, job_id: Optional[str] = None, kebechet_metadata: Optional[Dict[str, Any]] = None, justification: Optional[List[Dict[str, Any]]] = None, stack_info: Optional[List[Dict[str, Any]]] = None) Optional[str][source]

Run provenance checks on the provided user input.

schedule_purge_adviser_job(*, end_datetime: Optional[datetime.datetime] = None, adviser_version: Optional[str] = None, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging adviser data.

schedule_purge_package_extract_job(*, end_datetime: Optional[datetime.datetime] = None, package_extract_version: Optional[str] = None, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging package-extract data.

schedule_purge_solver_job(*, os_name: str, os_version: str, python_version: str, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging solver data.

schedule_revsolver(package_name: str, package_version: str, *, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule reverse solver.

schedule_security_indicator(python_package_name: str, python_package_version: str, python_package_index: str, aggregation_function: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule a security indicator run.

schedule_solver(packages: str, solver: str, *, indexes: Optional[List[str]] = None, dependency_indexes: Optional[List[str]] = None, force_sync: bool = False, debug: bool = False, transitive: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule the given solver.

schedule_sync_job(document_type: Optional[str], force_sync: bool = False, graceful: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule graph sync for specific document id.

schedule_thoth_repo_init(project_url: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule a workflow for running initial thoth-advise on a repo.

static set_template_parameters(template: Dict[str, Any], **parameters: Any) None[source]

Set parameters in the template - replace existing ones or append to parameter list if not exist.

>>> set_template_parameters(template, THOTH_LOG_ADVISER='DEBUG')
property token: str

Access service account token mounted to the pod.

verify_integration_inputs(source_type: Optional[thoth.common.enums.ThothAdviserIntegrationEnum], origin: Optional[str] = None) None[source]

Verify if inputs for registered Thoth integrations are correct.

static verify_kebechet_inputs(origin: Optional[str]) None[source]

Verify if Thoth Kebechet integration inputs are correct.

property workflow_manager: WorkflowManager

Return WorkflowManager instance.

This property lazily initializes the WorkflowManager.

thoth.common.workflows module

Workflow management for Thoth.

class thoth.common.workflows.Workflow(api_version: str, kind: str, metadata: argo.workflows.client.models.v1alpha1_metadata.V1alpha1Metadata, spec: argo.workflows.client.models.v1alpha1_workflow_spec.V1alpha1WorkflowSpec, status: Optional[argo.workflows.client.models.v1alpha1_workflow_status.V1alpha1WorkflowStatus] = None)[source]

Bases: argo.workflows.client.models.v1alpha1_workflow.V1alpha1Workflow

Argo Workflow instance.

This is a subclass of argo.workflows V1alpha1Workflow model which provides a convenient set of methods to make workflow management easier.

classmethod from_dict(wf: Dict[str, Any], validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a dict.

classmethod from_file(fp: Union[str, pathlib.Path], validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a file.

classmethod from_string(wf: str, validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a YAML string.

classmethod from_url(url: str, validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a remote file.

property id: str

Get Workflow ID.

property name: Optional[str]

Get Workflow name.

property validated: bool

Return whether this workflow has been validated.

class thoth.common.workflows.WorkflowManager(openshift: Optional[thoth.common.openshift.OpenShift] = None, openshift_config: Optional[Dict[str, str]] = None)[source]

Bases: object

Argo Workflow manager.

get_pending_workflows(workflow_namespace: str) int[source]

Get the total number of pending workflows in a given namespace.

get_workflow(namespace: str, name: str) Dict[str, Any][source]

Get Workflow in namespace by name.

get_workflow_and_tasks_status(namespace: str, name: str) Dict[str, Any][source]

Get workflow and tasks status from a specific namespace.

get_workflow_info(namespace: str, name: str) Dict[str, Any][source]

Get Workflow in namespace by name.

get_workflow_template(namespace: str, label_selector: str, *, parameters: Optional[Dict[str, str]]) Dict[str, Any][source]

Get Workflow template.

get_workflows(namespace: str, *, label_selector: Optional[str] = None) Dict[str, Any][source]

Get Workflows in namespace.

get_workflows_and_tasks_status(namespace: str, label_selector: Optional[str] = None) Dict[str, Any][source]

Get workflows and tasks status from a specific namespace.

get_workflows_info(namespace: str, *, label_selector: Optional[str] = None) Dict[str, Any][source]

Get workflows info from a specific namespace.

submit_adviser(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Adviser Workflow.

submit_build_analysis(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit build analysis workflow.

submit_dependency_monkey(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Dependency Monkey workflow.

submit_graph_sync(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit graph-sync workflow.

submit_inspection(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None, use_hw_template: bool = False) Optional[str][source]

Submit the Inspection Workflow.

submit_kebechet(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Workflow.

submit_kebechet_administrator(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Administrator Workflow for an internal trigger message.

submit_kebechet_run_url(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Run-URL Workflow for a single slug.

submit_mi(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Meta-information Indicators workflow.

submit_package_extract(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, str]] = None) Optional[str][source]

Submit package-extract workflow.

submit_provenance_checker(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit provenance checker workflow.

submit_purge(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit purge workflow.

submit_revsolver(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit reverse solver workflow.

submit_security_indicator(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Security Indicator Workflow.

submit_solver(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Solver Workflow.

submit_sync_job(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit graph-sync workflow.

submit_thoth_repo_init(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit repo initialization workflow.

submit_workflow(namespace: str, wf: Union[argo.workflows.client.models.v1alpha1_workflow.V1alpha1Workflow, Dict[str, Any]], *, parameters: Optional[Dict[str, str]] = None, validate: bool = True) Optional[str][source]

Submit an Argo Workflow to a given namespace.

Returns

Workflow ID

submit_workflow_from_template(namespace: str, label_selector: str, *, template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None, workflow_namespace: Optional[str] = None, workflow_limit: Optional[int] = None) Optional[str][source]

Retrieve and Submit Workflow from an OpenShift template.

Parameters
  • namespace

    namespace to lookup the template in

    If workflow_namespace is not provided, this namespace is also implicitly the namespace where the workflow is submitted

  • label_selector – selector for the template, i.e. ‘template=workflow-template’

  • template_parameters – parameters for the template

  • workflow_parameters – parameters for the workflow

  • workflow_namespace – namespace to submit the workflow to

  • workflow_limit – limit number of workflows currently in memory for workflowController

Module contents

Shared code across Thoth analyzers.

class thoth.common.HardwareInformation(cpu_family: Optional[int] = None, cpu_model: Optional[int] = None, gpu_model: Optional[int] = None)[source]

Bases: thoth.common.config.base.ConfigEntryBase

Representation for hardware related information.

cpu_family
cpu_model
gpu_model
class thoth.common.Lazy(calculate_function: Callable[[...], Any])[source]

Bases: object

Calculates function exactly once then sets it to be and attribute of object.

Intended to optimize cases in which a class function is called and does not change after repeated calls. Attribute lookup is ~2x as fast as even the simples function calls.

class thoth.common.OpenShift(*, frontend_namespace: Optional[str] = None, middletier_namespace: Optional[str] = None, backend_namespace: Optional[str] = None, infra_namespace: Optional[str] = None, graph_namespace: Optional[str] = None, amun_infra_namespace: Optional[str] = None, amun_inspection_namespace: Optional[str] = None, kubernetes_api_url: Optional[str] = None, kubernetes_verify_tls: bool = True, openshift_api_url: Optional[str] = None, token: Optional[str] = None, token_file: Optional[str] = None, cert_file: Optional[str] = None, environ: Optional[Dict[str, str]] = None)[source]

Bases: object

Interaction with OpenShift Master.

create_config_map(configmap_name: str, namespace: str, labels: Dict[str, str], data: Dict[str, str]) str[source]

Create a ConfigMap in the given namespace.

static generate_id(prefix: Optional[str] = None, identifier: Optional[str] = None) str[source]

Generate an identifier.

get_build(build_id: str, namespace: str) Dict[str, Any][source]

Get a build in the given namespace.

get_build_log(build_id: str, namespace: str) str[source]

Get log of a build in the given namespace.

get_buildconfig(buildconfig_id: str, namespace: str) Dict[str, Any][source]

Get a buildconfig in the given namespace.

get_cache_expiration_configuration() Optional[int][source]

Retrieve cache expiration configuration [s].

get_configmap(configmap_id: str, namespace: str) Dict[str, Any][source]

Get the given configmap in a namespace, return object representing config map.

get_configmaps(namespace: str, label_selector: str) Dict[str, Any][source]

Get all configmaps in a namespace and select them by label.

get_image_streams(namespace: str, label_selector: str) Dict[str, Any][source]

Get all image streams in a namespace and select them by label.

get_job_log(job_id: str, namespace: str) Optional[str][source]

Get log of a pod running inside a job.

get_job_logs(job_id: str, namespace: str) Optional[str][source]

Get log of a pod running inside a job.

get_job_status(job_id: str, namespace: str) Dict[str, Any][source]

Get status of a Job and Pods created by the Job.

Raises

NotFoundError if no Job of such name is found in the namespace

get_job_status_count(label_selector: str, namespace: str) Dict[str, int][source]

Count the number of Jobs per status in a specific namespace.

get_job_status_report(job_id: str, namespace: str) Dict[str, Optional[str]][source]

Get status report of a Job and Pods created by the Job.

get_jobs(label_selector: str, namespace: Optional[str] = None) Dict[str, Any][source]

Get all Jobs, select them by the provided label.

get_mi_repositories_and_organizations() Tuple[List[str], List[str]][source]

Get all of the repositories and organizations for mi-analysis.

Return type

Tuple of (repositories, organizations)

get_pod_log(pod_id: str, namespace: Optional[str] = None, container: Optional[str] = None) Optional[str][source]

Get log of a pod based on assigned pod ID.

get_pod_status(pod_id: str, namespace: str) Dict[str, Any][source]

Get status entry for a pod - low level routine.

get_pod_status_report(pod_id: str, namespace: str) Dict[str, Optional[str]][source]

Get pod state and convert it to a user-friendly response.

get_solver_names() List[str][source]

Retrieve name of solvers available in installation.

get_workflow(name: Optional[str] = None, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Any][source]

Get Workflow from a namespace, use one of name or label_selector to identify which one to get.

get_workflow_node_log(node_name: str, workflow_id: str, namespace: str) Optional[str][source]

Get log from a task/node in a workflow.

get_workflow_node_status(node_name: str, workflow_id: str, namespace: str) Dict[str, Any][source]

Get status from a task/node in a workflow.

get_workflow_pod_name(node_name: str, workflow_id: str, namespace: str) str[source]

Get pod name where task is being executed.

get_workflow_status(name: Optional[str] = None, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Any][source]

Get a Workflow status, use one of name or label_selector to identify which one to get.

get_workflow_status_report(workflow_id: str, label_selector: Optional[str] = None, namespace: Optional[str] = None) Dict[str, Optional[str]][source]

Get workflow status report, derived from workflow status.

classmethod obtain_solver_from_runtime_environment(runtime_environment: Dict[str, Any]) Optional[str][source]

Define solver from runtime_environment.

oc_process(namespace: str, template: Dict[str, Any]) Dict[str, Any][source]

Process the given template in OpenShift.

static parse_cpu_spec(cpu_spec: Optional[str]) Optional[float][source]

Parse the given CPU requirement as used by OpenShift/Kubernetes.

static parse_memory_spec(memory_spec: Optional[str]) Optional[float][source]

Parse the given CPU requirement as used by OpenShift/Kubernetes.

classmethod parse_python_solver_name(solver_name: str) Dict[str, Any][source]

Parse os and Python identifiers encoded into solver name.

schedule_adviser(recommendation_type: str, *, count: Optional[int] = None, limit: Optional[int] = None, predictor_config: Optional[Dict[str, Any]] = None, origin: Optional[str] = None, dev: bool = False, authenticated: bool = False, debug: bool = False, job_id: Optional[str] = None, re_run_adviser_id: Optional[str] = None, source_type: Optional[str] = None, kebechet_metadata: Optional[Dict[str, Any]] = None, justification: Optional[List[Dict[str, Any]]] = None, stack_info: Optional[List[Dict[str, Any]]] = None) Optional[str][source]

Schedule an adviser run.

schedule_all_solvers(packages: str, *, indexes: Optional[List[str]] = None, dependency_indexes: Optional[List[str]] = None, force_sync: bool = False, debug: bool = False, transitive: bool = False) List[str][source]

Schedule all solvers for the given packages.

schedule_build_analysis(*, base_image: Optional[str] = None, base_image_analysis_id: Optional[str] = None, base_registry_password: Optional[str] = None, base_registry_user: Optional[str] = None, base_registry_verify_tls: bool = True, output_image: Optional[str] = None, output_image_analysis_id: Optional[str] = None, output_registry_password: Optional[str] = None, output_registry_user: Optional[str] = None, output_registry_verify_tls: bool = True, buildlog_document_id: Optional[str] = None, buildlog_parser_id: Optional[str] = None, environment_type: Optional[str] = None, origin: Optional[str] = None, debug: bool = False, is_external: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule a build analysis workflow.

schedule_dependency_monkey(*, predictor: Optional[str] = None, predictor_config: Optional[Dict[str, Any]] = None, stack_output: Optional[str] = None, seed: Optional[int] = None, dry_run: bool = False, decision: Optional[str] = None, count: Optional[int] = None, debug: bool = False, job_id: Optional[str] = None, limit_latest_versions: Optional[int] = None) Optional[str][source]

Schedule a dependency monkey run.

schedule_graph_refresh(namespace: Optional[str] = None) str[source]

Schedule graph refresh job in frontend namespace by default.

schedule_graph_schema_update(job_id: Optional[str] = None, namespace: Optional[str] = None) str[source]

Schedule graph schema update job in infra namespace by default.

schedule_graph_sync(document_id: str, force_sync: bool = False, *, job_id: Optional[str] = None) Optional[str][source]

Schedule graph sync for specific document id.

schedule_inspection(dockerfile: str, specification: Dict[str, Any], target: str, parameters: Dict[str, Any], *, raw_specification: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None) Optional[str][source]

Schedule an inspection run.

schedule_kebechet_administrator(message_info: Dict[str, str], message_type: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Administrator Workflow for an internal trigger message.

schedule_kebechet_run_url_workflow(repo_url: str, service_name: str = 'github', *, kebechet_metadata: Optional[Dict[str, Any]] = None, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Workflow for a particular valid slug and service name.

schedule_kebechet_workflow(webhook_payload: Dict[str, Any], *, job_id: Optional[str] = None) Optional[str][source]

Schedule Kebechet Workflow for a Webhook from GitHub App..

schedule_mi_workflow(create_knowledge: Optional[bool] = False, repository: Optional[str] = None, entities: Optional[str] = None, knowledge_path: Optional[str] = None, mi_merge_path: Optional[str] = None, mi_used_for_thoth: Optional[bool] = False, mi_merge: Optional[bool] = False) Optional[str][source]

Schedule Meta-information Indicators Workflow.

Parameters
  • repository:str – GitHub repository in full name format: <repo_owner>/<repo_name>

  • entities:Optional[str] – Meta-information Indicator Entities that will be inspected multiple entities are in form of Foo,Bar,…

schedule_package_extract(image: str, *, environment_type: str, is_external: bool = True, origin: Optional[str] = None, registry_user: Optional[str] = None, registry_password: Optional[str] = None, verify_tls: bool = True, debug: bool = False, graph_sync: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule package extract workflow.

schedule_provenance_checker(*, origin: Optional[str] = None, whitelisted_sources: Optional[List[str]] = None, debug: bool = False, authenticated: bool = False, job_id: Optional[str] = None, kebechet_metadata: Optional[Dict[str, Any]] = None, justification: Optional[List[Dict[str, Any]]] = None, stack_info: Optional[List[Dict[str, Any]]] = None) Optional[str][source]

Run provenance checks on the provided user input.

schedule_purge_adviser_job(*, end_datetime: Optional[datetime.datetime] = None, adviser_version: Optional[str] = None, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging adviser data.

schedule_purge_package_extract_job(*, end_datetime: Optional[datetime.datetime] = None, package_extract_version: Optional[str] = None, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging package-extract data.

schedule_purge_solver_job(*, os_name: str, os_version: str, python_version: str, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule a job for purging solver data.

schedule_revsolver(package_name: str, package_version: str, *, debug: bool = False, job_id: Optional[str] = None) Optional[str][source]

Schedule reverse solver.

schedule_security_indicator(python_package_name: str, python_package_version: str, python_package_index: str, aggregation_function: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule a security indicator run.

schedule_solver(packages: str, solver: str, *, indexes: Optional[List[str]] = None, dependency_indexes: Optional[List[str]] = None, force_sync: bool = False, debug: bool = False, transitive: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule the given solver.

schedule_sync_job(document_type: Optional[str], force_sync: bool = False, graceful: bool = True, job_id: Optional[str] = None) Optional[str][source]

Schedule graph sync for specific document id.

schedule_thoth_repo_init(project_url: str, *, job_id: Optional[str] = None) Optional[str][source]

Schedule a workflow for running initial thoth-advise on a repo.

static set_template_parameters(template: Dict[str, Any], **parameters: Any) None[source]

Set parameters in the template - replace existing ones or append to parameter list if not exist.

>>> set_template_parameters(template, THOTH_LOG_ADVISER='DEBUG')
property token: str

Access service account token mounted to the pod.

verify_integration_inputs(source_type: Optional[thoth.common.enums.ThothAdviserIntegrationEnum], origin: Optional[str] = None) None[source]

Verify if inputs for registered Thoth integrations are correct.

static verify_kebechet_inputs(origin: Optional[str]) None[source]

Verify if Thoth Kebechet integration inputs are correct.

property workflow_manager: WorkflowManager

Return WorkflowManager instance.

This property lazily initializes the WorkflowManager.

class thoth.common.OperatingSystem(name: Optional[str] = None, version: Optional[str] = None)[source]

Bases: thoth.common.config.base.ConfigEntryBase

Representation for hardware related information.

name
version
class thoth.common.RuntimeEnvironment(hardware: thoth.common.config.hardware_information.HardwareInformation, operating_system: thoth.common.config.operating_system.OperatingSystem, python_version: Optional[str] = None, cuda_version: Optional[str] = None, openblas_version: Optional[str] = None, openmpi_version: Optional[str] = None, cudnn_version: Optional[str] = None, mkl_version: Optional[str] = None, labels: Optional[Dict[str, str]] = None, base_image: Optional[str] = None, name: Optional[str] = None, platform: Optional[str] = None, recommendation_type: Optional[str] = None)[source]

Bases: object

An entry collapsing configuration options in the user configuration file.

base_image
cuda_version
cudnn_version
classmethod from_dict(dict_: Optional[Dict[Any, Any]] = None) thoth.common.config.runtime_environment.RuntimeEnvironment[source]

Parse one configuration entry from a dictionary.

get_python_version_tuple() Tuple[int, int][source]

Get tuple with Python version (major, minor) information.

hardware
is_fully_specified() bool[source]

Check if the given runtime environment is fully specified.

labels
classmethod load(content: Optional[str] = None) thoth.common.config.runtime_environment.RuntimeEnvironment[source]

Load runtime environment information from file or from a JSON representation, transparently.

mkl_version
name
openblas_version
openmpi_version
operating_system
platform
python_version
recommendation_type
to_dict(without_none: bool = False) Any[source]

Convert runtime environment configuration to a dict representation.

to_string() str[source]

Convert runtime environment configuration to a string representation.

class thoth.common.SafeJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.encoder.JSONEncoder

Convert objects to JSON, safely.

default(o: Any) Any[source]

Convert an object to JSON, safely.

class thoth.common.ThothAdviserIntegrationEnum(value)[source]

Bases: enum.Enum

Class for Thoth Adviser integrations.

CLI = 1
JUPYTER_NOTEBOOK = 5
KEBECHET = 2
S2I = 3
class thoth.common.Workflow(api_version: str, kind: str, metadata: argo.workflows.client.models.v1alpha1_metadata.V1alpha1Metadata, spec: argo.workflows.client.models.v1alpha1_workflow_spec.V1alpha1WorkflowSpec, status: Optional[argo.workflows.client.models.v1alpha1_workflow_status.V1alpha1WorkflowStatus] = None)[source]

Bases: argo.workflows.client.models.v1alpha1_workflow.V1alpha1Workflow

Argo Workflow instance.

This is a subclass of argo.workflows V1alpha1Workflow model which provides a convenient set of methods to make workflow management easier.

classmethod from_dict(wf: Dict[str, Any], validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a dict.

classmethod from_file(fp: Union[str, pathlib.Path], validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a file.

classmethod from_string(wf: str, validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a YAML string.

classmethod from_url(url: str, validate: bool = True) thoth.common.workflows.Workflow[source]

Create a Workflow from a remote file.

property id: str

Get Workflow ID.

property name: Optional[str]

Get Workflow name.

property validated: bool

Return whether this workflow has been validated.

class thoth.common.WorkflowManager(openshift: Optional[thoth.common.openshift.OpenShift] = None, openshift_config: Optional[Dict[str, str]] = None)[source]

Bases: object

Argo Workflow manager.

get_pending_workflows(workflow_namespace: str) int[source]

Get the total number of pending workflows in a given namespace.

get_workflow(namespace: str, name: str) Dict[str, Any][source]

Get Workflow in namespace by name.

get_workflow_and_tasks_status(namespace: str, name: str) Dict[str, Any][source]

Get workflow and tasks status from a specific namespace.

get_workflow_info(namespace: str, name: str) Dict[str, Any][source]

Get Workflow in namespace by name.

get_workflow_template(namespace: str, label_selector: str, *, parameters: Optional[Dict[str, str]]) Dict[str, Any][source]

Get Workflow template.

get_workflows(namespace: str, *, label_selector: Optional[str] = None) Dict[str, Any][source]

Get Workflows in namespace.

get_workflows_and_tasks_status(namespace: str, label_selector: Optional[str] = None) Dict[str, Any][source]

Get workflows and tasks status from a specific namespace.

get_workflows_info(namespace: str, *, label_selector: Optional[str] = None) Dict[str, Any][source]

Get workflows info from a specific namespace.

submit_adviser(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Adviser Workflow.

submit_build_analysis(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit build analysis workflow.

submit_dependency_monkey(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Dependency Monkey workflow.

submit_graph_sync(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit graph-sync workflow.

submit_inspection(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None, use_hw_template: bool = False) Optional[str][source]

Submit the Inspection Workflow.

submit_kebechet(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Workflow.

submit_kebechet_administrator(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Administrator Workflow for an internal trigger message.

submit_kebechet_run_url(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Kebechet Run-URL Workflow for a single slug.

submit_mi(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Meta-information Indicators workflow.

submit_package_extract(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, str]] = None) Optional[str][source]

Submit package-extract workflow.

submit_provenance_checker(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit provenance checker workflow.

submit_purge(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit purge workflow.

submit_revsolver(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit reverse solver workflow.

submit_security_indicator(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Security Indicator Workflow.

submit_solver(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit Solver Workflow.

submit_sync_job(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit graph-sync workflow.

submit_thoth_repo_init(template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None) Optional[str][source]

Submit repo initialization workflow.

submit_workflow(namespace: str, wf: Union[argo.workflows.client.models.v1alpha1_workflow.V1alpha1Workflow, Dict[str, Any]], *, parameters: Optional[Dict[str, str]] = None, validate: bool = True) Optional[str][source]

Submit an Argo Workflow to a given namespace.

Returns

Workflow ID

submit_workflow_from_template(namespace: str, label_selector: str, *, template_parameters: Optional[Dict[str, str]] = None, workflow_parameters: Optional[Dict[str, Any]] = None, workflow_namespace: Optional[str] = None, workflow_limit: Optional[int] = None) Optional[str][source]

Retrieve and Submit Workflow from an OpenShift template.

Parameters
  • namespace

    namespace to lookup the template in

    If workflow_namespace is not provided, this namespace is also implicitly the namespace where the workflow is submitted

  • label_selector – selector for the template, i.e. ‘template=workflow-template’

  • template_parameters – parameters for the template

  • workflow_parameters – parameters for the workflow

  • workflow_namespace – namespace to submit the workflow to

  • workflow_limit – limit number of workflows currently in memory for workflowController

thoth.common.cwd(target: str) Generator[str, None, None][source]

Manage cwd in a pushd/popd fashion.

thoth.common.datetime2datetime_str(dt: Optional[datetime.datetime] = None) str[source]

Create a string representation of a datetime.

thoth.common.datetime_str2timestamp(datetime_string: str) int[source]

Parse datetime string represented in ISO format and return timestamp.

thoth.common.datetime_str_from_timestamp(timestamp: int) str[source]

Convert a timestamp to datetime string representation.

Construct a link to a detailed justification document.

thoth.common.get_service_account_token() str[source]

Get token from service account token file.

thoth.common.init_logging(logging_configuration: Optional[Dict[str, str]] = None, logging_env_var_start: Optional[str] = None) None[source]

Initialize Thoth’s logging - respects all namespaces.

This function allows you to control logging facilities in Thoth. Logging can be configured via env variables so that deployment can respect your configuration. The structure of environment variables is THOTH_LOG_(MODULE) and the value of env variable states verbosity level as in the logging module (DEBUG, INFO, WARNING, ERROR).

>>> import os
>>> os.environ['THOTH_LOG_SOLVER']
> WARNING

You can also specify more closely which sub-module logging you are configuring - submodules are separated with double dash:

>>> os.environ['THOTH_LOG_SOLVER__PYTHON']
> DEBUG

You can also use arguments explicitly that override configuration in env variables (a shorthand for standard logging functionality):

>>> init_logging({'thoth.solver': 'DEBUG'})

Optionally you can specify prefix of the logging environment variable determining logging configuration via env vars (defaults to THOTH_LOG_).

thoth.common.map_os_name(os_name: Optional[str]) Optional[str][source]

Map operating system name.

thoth.common.normalize_os_version(os_name: Optional[str], os_version: Optional[str]) Optional[str][source]

Normalize operating system version based on operating system used.

thoth.common.parse_datetime(datetime_string: str) datetime.datetime[source]

Parse datetime string represented in ISO format.

thoth.common.timestamp2datetime(timestamp: int) datetime.datetime[source]

Convert a timestamp to datetime respecting UTC.