Source code for thamos.lib

#!/usr/bin/env python3
# thamos
# Copyright(C) 2018 - 2021 Fridolin Pokorny
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Core parts of library for interacting with Thoth."""

import logging
import os
import platform
import sys
import typing
from itertools import chain
from time import sleep
from time import monotonic
from contextlib import contextmanager
from functools import partial
from functools import wraps
import pprint
import json
import urllib3
import requests

import yaml
import micropipenv
from termcolor import colored
from yaspin import yaspin
from yaspin.spinners import Spinners
from invectio import gather_library_usage
from thoth.analyzer import run_command
from thoth.python import Project
from thoth.python import Constraints
from thoth.common import ThothAdviserIntegrationEnum
from thoth.common import get_justification_link as jl
from thoth.common import cwd

import thamos.discover
from . import __version__ as thamos_version
from .swagger_client.rest import ApiException
from .swagger_client import ApiClient
from .swagger_client import BuildAnalysisApi
from .swagger_client import Configuration
from .swagger_client import PythonStack
from .swagger_client import AdviseInput
from .swagger_client import ProvenanceInput
from .swagger_client import AdviseApi
from .swagger_client import ImageAnalysisApi
from .swagger_client import ProvenanceApi
from .swagger_client import HardwareApi
from .swagger_client import S2iApi
from .swagger_client import PythonPackagesApi
from .swagger_client.models import AnalysisResultResponse
from .config import config as thoth_config
from .exceptions import UnknownAnalysisType
from .exceptions import TimeoutError
from .exceptions import ApiError
from .exceptions import NoDevRequirements
from .exceptions import NoRequirementsFile

from typing import Callable, Any, Union, Dict

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

LAST_ANALYSIS_ID_FILE = ".thoth_last_analysis_id"
_RECOMMENDATION_TYPES_URL = "https://thoth-station.ninja/recommendation-types/"

_LOGGER = logging.getLogger(__name__)
_RETRY_ON_ERROR_COUNT = int(os.getenv("THAMOS_RETRY_ON_ERROR_COUNT", 3))
_RETRY_ON_ERROR_SLEEP = float(os.getenv("THAMOS_RETRY_ON_ERROR_SLEEP", 3.0))
_THAMOS_TIMEOUT = int(os.getenv("THAMOS_TIMEOUT", 2000))
_THAMOS_TOKEN = os.getenv("THAMOS_TOKEN")
_SOURCE = {
    "adviser": "advise/python",
    "provenance-checker": "provenance/python",
    "package-extract": "analyze",
}


[docs]def with_api_client(func: typing.Callable): """Load configuration entries from Thoth configuration file.""" # noqa @wraps(func) def wrapper(*args, **kwargs): config = Configuration() host = thoth_config.explicit_host if not host: thoth_config.load_config() host = thoth_config.content.get("host") or config.host thoth_config.api_discovery(host) _LOGGER.debug("Using API: %s", thoth_config.api_url) config.host = thoth_config.api_url config.verify_ssl = thoth_config.tls_verify start = monotonic() api_client = ApiClient(configuration=config) # Override default user-agent. api_client.user_agent = ( f"Thamos/{thamos_version} (Python {platform.python_version()}; " f"{platform.system()} {platform.release()})" ) result = func(api_client, *args, **kwargs) _LOGGER.debug("Elapsed seconds processing request: %f", monotonic() - start) return result return wrapper
[docs]def is_analysis_ready(analysis_id: str, *, verify_tls: bool = True) -> bool: """Handle the the multiple response types available while asking for result of a anaylsis result.""" config = Configuration() host = thoth_config.explicit_host if not host: thoth_config.load_config() host = thoth_config.content.get("host") or config.host source = analysis_id.rsplit("-", maxsplit=2)[0] source_url = _SOURCE.get(source) response = requests.get( f"https://{host}/api/v1/{source_url}/{analysis_id}", verify=verify_tls ) if response.status_code == 202: return False elif response.status_code in (200, 400): # Return true if result is ready. return True raise ApiError( f"Thoth Backend didn't respond with correct status code. Returned code - " f"{response.status_code}: {response.text}" )
def _wait_for_analysis( status_func: Callable[..., Any], analysis_id: str, *, timeout: typing.Optional[int] = None, verify_tls: bool = True, ) -> None: """Wait for ongoing analysis to finish.""" # noqa @contextmanager def _no_spinner(): yield spinner = partial( yaspin, Spinners.clock, text=f"Waiting for response from Thoth (analysis: {analysis_id})...", ) # type: Union[Callable[..., Any], partial[Any]] if _LOGGER.getEffectiveLevel() == logging.DEBUG or bool( int(os.getenv("THAMOS_NO_PROGRESSBAR", 1)) ): # CLI automatically injects THAMOS_NO_PROGRESSBAR=0 if user did not turned it off explictily. spinner = _no_spinner sleep_time = 0.5 retries = 0 timeout = timeout if timeout is not None else _THAMOS_TIMEOUT with spinner(): start_time = monotonic() while True: if timeout and monotonic() - start_time > timeout: raise TimeoutError( f"Thoth backend did not respond in time, timeout set " f"to {timeout} - see {jl('thamos_timeout')}" ) try: response = status_func(analysis_id, verify_tls=verify_tls) except Exception as exc: if retries >= _RETRY_ON_ERROR_COUNT: raise retries += 1 _LOGGER.error("Failed to obtain status from Thoth: %s", str(exc)) _LOGGER.warning( "Retrying in a few moments... (attempt %d/%d)", retries, _RETRY_ON_ERROR_COUNT, ) sleep(_RETRY_ON_ERROR_SLEEP) continue retries = 0 # Reset counter as we obtained a valid response. if response: break if _LOGGER.getEffectiveLevel() <= logging.DEBUG: _LOGGER.debug( "Waiting for %r to finish for %g seconds (state: %s)", analysis_id, sleep_time, (get_status(analysis_id) or {}).get("state", "N/A"), ) sleep(sleep_time) sleep_time = min(sleep_time * 2, 8) def _note_last_analysis_id(analysis_id: str) -> None: """Store analysis id in a temporary file to keep analysis id for thamos log optional.""" if int(os.getenv("THAMOS_DISABLE_LAST_ANALYSIS_ID_FILE", 0)): _LOGGER.debug("Last analysis id will not be noted") return _LOGGER.debug("Noting last analysis id %r", analysis_id) try: with open(LAST_ANALYSIS_ID_FILE, "w") as analysis_id_file: analysis_id_file.write(analysis_id) except Exception as exc: _LOGGER.warning("Failed to write analysis id to a temporary file: %s", str(exc)) def _get_last_analysis_id() -> str: """Retrieve last analysis id from a temporary file.""" try: with open(LAST_ANALYSIS_ID_FILE, "r") as analysis_id_file: analysis_id = analysis_id_file.readline().strip() except Exception as exc: raise FileNotFoundError( "Cannot retrieve last analysis id, you need to provide analysis id explicitly" ) from exc return analysis_id def _retrieve_analysis_result( retrieve_func: Callable[..., Any], analysis_id: str ) -> Any: """Retrieve analysis result, raise error if analysis failed.""" retries = 0 while True: try: return retrieve_func(analysis_id) except ApiException as exc: if exc.status == 400: # Re-raise if the exception is bad request (e.g. analysis did not finish successfully). raise _LOGGER.debug( "Retrieved error response %s from server: %s", exc.status, exc.reason ) response = json.loads(exc.body) if "error" in response: # Error produced based on API endpoints semantics... _LOGGER.debug("Error from Thoth: %s", response) _LOGGER.error("%s (analysis: %s)", response["error"], analysis_id) else: # Other errors (e.g. internal server error). _LOGGER.error("Error from Thoth: %s", response) if retries >= _RETRY_ON_ERROR_COUNT: return None retries += 1 _LOGGER.warning( "Retrying in a few moments... (attempt %d/%d)", retries, _RETRY_ON_ERROR_COUNT, ) sleep(_RETRY_ON_ERROR_SLEEP) def _get_static_analysis(src_path: str = ".") -> typing.Optional[dict]: """Get static analysis of files used in project.""" # We are running in the root directory of project, use the root part for gathering static analysis. _LOGGER.info("Performing static analysis of sources to gather library usage") try: library_usage = gather_library_usage( src_path, ignore_errors=True, without_standard_imports=True ) except FileNotFoundError: _LOGGER.warning("No library usage was aggregated - no Python sources found") return None report = {} # type: Dict[Any, Any] for file_record in library_usage["report"].values(): for library, usage in file_record.items(): # We could filter out some of the libraries which were used. if library not in report: report[library] = set() report[library].update(usage) return { "report": {k: list(v) for k, v in report.items()}, "version": library_usage["version"], } def _is_s2i() -> bool: """Check if we run in an OpenShift s2i build.""" # This environment variable is used by OpenShift's s2i build process. return "STI_SCRIPTS_PATH" in os.environ def _get_origin() -> typing.Optional[str]: """Check git origin configured.""" result = run_command("git config --get remote.origin.url", raise_on_error=False) if result.return_code != 0: _LOGGER.debug( "Failed to obtain information about git origin: %s", result.stderr ) return None origin = result.stdout.strip() if origin: _LOGGER.debug("Found git origin %r", origin) return origin return None
[docs]def advise_using_config( pipfile: str, pipfile_lock: str, config: str, *, runtime_environment_name: typing.Optional[str] = None, constraints: typing.Optional[str] = None, src_path: typing.Optional[str] = None, recommendation_type: str = None, dev: bool = False, no_static_analysis: bool = False, no_user_stack: bool = False, nowait: bool = False, force: bool = False, limit: typing.Optional[int] = None, count: int = 1, debug: bool = False, origin: typing.Optional[str] = None, timeout: typing.Optional[int] = None, github_event_type: typing.Optional[str] = None, github_check_run_id: typing.Optional[int] = None, github_installation_id: typing.Optional[int] = None, github_base_repo_url: typing.Optional[str] = None, source_type: typing.Optional[ThothAdviserIntegrationEnum] = None, labels: typing.Optional[Dict[str, str]] = None, ) -> typing.Optional[typing.Tuple[typing.Dict[str, typing.Any], bool]]: """Trigger advise, respecting the configuration file supplied directly as a string or as a file path.""" try: thoth_config.load_config_from_file(config) except (FileNotFoundError, IOError): thoth_config.load_config_from_string(config) return advise( pipfile=pipfile, pipfile_lock=pipfile_lock, constraints=constraints, recommendation_type=recommendation_type, src_path=src_path if src_path is not None else thoth_config.config_path, runtime_environment=thoth_config.get_runtime_environment( runtime_environment_name ), runtime_environment_name=runtime_environment_name, dev=dev, no_static_analysis=no_static_analysis, no_user_stack=no_user_stack, nowait=nowait, force=force, limit=limit, count=count, debug=debug, origin=origin, timeout=timeout, github_event_type=github_event_type, github_check_run_id=github_check_run_id, github_installation_id=github_installation_id, github_base_repo_url=github_base_repo_url, source_type=source_type, verify_tls=thoth_config.tls_verify, labels=labels, )
[docs]@with_api_client def advise( api_client: ApiClient, pipfile: str, pipfile_lock: str, *, constraints: typing.Optional[str] = None, recommendation_type: typing.Optional[str] = None, runtime_environment: typing.Optional[dict] = None, src_path: str = ".", runtime_environment_name: typing.Optional[str] = None, dev: bool = False, no_static_analysis: bool = False, no_user_stack: bool = False, nowait: bool = False, force: bool = False, limit: typing.Optional[int] = None, count: int = 1, debug: bool = False, origin: typing.Optional[str] = None, timeout: typing.Optional[int] = None, github_event_type: typing.Optional[str] = None, github_check_run_id: typing.Optional[int] = None, github_installation_id: typing.Optional[int] = None, github_base_repo_url: typing.Optional[str] = None, source_type: typing.Optional[ThothAdviserIntegrationEnum] = None, justification: typing.Optional[Dict] = None, stack_info: typing.Optional[Dict] = None, kebechet_metadata: typing.Optional[Dict] = None, verify_tls: bool = True, labels: typing.Optional[Dict[str, str]] = None, ) -> typing.Optional[tuple]: """Submit a stack for adviser checks and wait for results.""" if not pipfile: raise ValueError("No Pipfile content provided for advises") if runtime_environment is None: runtime_environment = thoth_config.get_runtime_environment( runtime_environment_name ) if runtime_environment_name is None and runtime_environment: runtime_environment_name = runtime_environment.get("name") if (runtime_environment and runtime_environment_name) and runtime_environment.get( "name" ) != runtime_environment_name: raise ValueError( "Runtime environment name supplied does not match with the one stated in the config file" ) if recommendation_type is None: priority = ( runtime_environment.pop("recommendation_type", None), thoth_config.content.get("recommendation_type", None), "stable", ) recommendation_type = next(filter(bool, priority)) _LOGGER.info( "Using %r recommendation type - see %s", recommendation_type, _RECOMMENDATION_TYPES_URL, ) if no_user_stack and pipfile_lock: _LOGGER.warning( "The user stack found in the lock file will not be supplied as requested" ) pipfile_lock = "" library_usage = None if not no_static_analysis: library_usage = _get_static_analysis(src_path) _LOGGER.debug( "Library usage:%s", "\n" + json.dumps(library_usage, indent=2) if library_usage else None, ) stack = PythonStack(requirements=pipfile, requirements_lock=pipfile_lock or "") if runtime_environment: # Override recommendation type specified explicitly in the runtime environment entry. runtime_environment.pop("recommendation_type", None) input_args: Dict[str, Any] = { "application_stack": stack, "runtime_environment": runtime_environment, "library_usage": library_usage, } if kebechet_metadata: input_args["kebechet_metadata"] = kebechet_metadata if justification: input_args["justification"] = justification if stack_info: input_args["stack_info"] = stack_info if constraints: input_args["constraints"] = constraints if labels: input_args["labels"] = labels advise_input = AdviseInput(**input_args) api_instance = AdviseApi(api_client) if recommendation_type: recommendation_type = recommendation_type.lower() if source_type is None and _is_s2i(): source_type = ThothAdviserIntegrationEnum.S2I parameters = { "recommendation_type": recommendation_type, "debug": debug, "force": force, "source_type": source_type.name.lower() if source_type is not None else None, "origin": _get_origin(), "dev": dev, } # type: Dict[str, Any] if limit is not None: parameters["limit"] = limit if count is not None: parameters["count"] = count if origin is not None: parameters["origin"] = origin if github_event_type is not None: parameters["github_event_type"] = github_event_type if github_check_run_id is not None: parameters["github_check_run_id"] = github_check_run_id if github_installation_id is not None: parameters["github_installation_id"] = github_installation_id if github_base_repo_url is not None: parameters["github_base_repo_url"] = github_base_repo_url if _THAMOS_TOKEN: parameters["token"] = _THAMOS_TOKEN response = api_instance.post_advise_python(advise_input, **parameters) _LOGGER.info( "Successfully submitted advise analysis %r to %r", response.analysis_id, thoth_config.api_url, ) _note_last_analysis_id(response.analysis_id) _LOGGER.debug("Analysis parameters:\n%r", pprint.pformat(parameters)) _LOGGER.debug("Adviser input:\n%s", advise_input.to_str()) if nowait: return response.analysis_id # We call custom status function for advise until swagger client supports mulitple response codes. _wait_for_analysis( is_analysis_ready, response.analysis_id, timeout=timeout, verify_tls=verify_tls ) _LOGGER.debug("Retrieving adviser result for %r", response.analysis_id) response = _retrieve_analysis_result( api_instance.get_advise_python, response.analysis_id ) if not response: return None _LOGGER.debug("Adviser check metadata: %r", response.metadata) return response.result, response.result["error"]
[docs]def advise_here( recommendation_type: typing.Optional[str] = None, *, runtime_environment: typing.Optional[dict] = None, src_path: str = ".", runtime_environment_name: typing.Optional[str] = None, dev: bool = False, no_static_analysis: bool = False, no_user_stack: bool = False, nowait: bool = False, force: bool = False, limit: typing.Optional[int] = None, count: int = 1, debug: bool = False, timeout: typing.Optional[int] = None, origin: typing.Optional[str] = None, github_event_type: typing.Optional[str] = None, github_check_run_id: typing.Optional[int] = None, github_installation_id: typing.Optional[int] = None, github_base_repo_url: typing.Optional[str] = None, source_type: typing.Optional[ThothAdviserIntegrationEnum] = None, justification: typing.Optional[Dict] = None, stack_info: typing.Optional[Dict] = None, kebechet_metadata: typing.Optional[Dict] = None, verify_tls: bool = True, labels: typing.Optional[Dict[str, str]] = None, ) -> typing.Optional[tuple]: """Run advise in current directory, requires no arguments.""" if runtime_environment_name is not None and runtime_environment is not None: raise ValueError( "Both runtime_environment and runtime_environment_name were set." ) if runtime_environment is None and runtime_environment_name is None: runtime_environment = thoth_config.get_runtime_environment() if runtime_environment: runtime_environment_name = runtime_environment["name"] # get_overlays_directory() returns the current working directory if overlays_dir is not configured so this change # should not result in a behavioral change overlay_dir = thoth_config.get_overlays_directory( runtime_environment_name=runtime_environment_name ) with cwd(overlay_dir): requirements_format = thoth_config.requirements_format if requirements_format == "pipenv": _LOGGER.info( "Using Pipenv files to manage dependencies located in %r", os.getcwd() ) pipfile_lock_exists = os.path.exists("Pipfile.lock") if pipfile_lock_exists: _LOGGER.info( "Submitting Pipfile.lock as a base for user's stack scoring - see %s", jl("user_stack"), ) project = Project.from_files( pipfile_path="Pipfile", pipfile_lock_path="Pipfile.lock", without_pipfile_lock=not os.path.exists("Pipfile.lock"), ) if ( pipfile_lock_exists and project.pipfile_lock.meta.hash["sha256"] != project.pipfile.hash()["sha256"] ): _LOGGER.error( "Pipfile hash stated in Pipfile.lock %r does not correspond to Pipfile hash %r - was Pipfile " "adjusted? This error is not critical.", project.pipfile_lock.meta.hash["sha256"][:6], project.pipfile.hash()["sha256"][:6], ) elif requirements_format in ("pip", "pip-tools", "pip-compile"): _LOGGER.info( "Using requirements.txt file to manage dependencies located in %r", os.getcwd(), ) project = Project.from_pip_compile_files(allow_without_lock=True) else: raise ValueError( f"Unknown configuration option for requirements format: {requirements_format!r}" ) constraints_str = None if os.path.exists("constraints.txt"): _LOGGER.info("Using constraints.txt file located in %r", os.getcwd()) with open("constraints.txt") as constraints_file: constraints_str = constraints_file.read() # Try to load constraints before the request to verify their correctness without sending request to the # backend. Constraints.from_string(constraints_str) pipfile = project.pipfile.to_string() pipfile_lock_str = ( project.pipfile_lock.to_string() if project.pipfile_lock else "" ) return advise( pipfile=pipfile, pipfile_lock=pipfile_lock_str, constraints=constraints_str, recommendation_type=recommendation_type, src_path=src_path, runtime_environment=runtime_environment, runtime_environment_name=runtime_environment_name, dev=dev, no_static_analysis=no_static_analysis, no_user_stack=no_user_stack, nowait=nowait, force=force, limit=limit, count=count, debug=debug, origin=origin, timeout=timeout, source_type=source_type, github_event_type=github_event_type, github_check_run_id=github_check_run_id, github_installation_id=github_installation_id, github_base_repo_url=github_base_repo_url, justification=justification, stack_info=stack_info, kebechet_metadata=kebechet_metadata, verify_tls=verify_tls, labels=labels, )
[docs]@with_api_client def provenance_check( api_client: ApiClient, pipfile: str, pipfile_lock: str, *, nowait: bool = False, force: bool = False, debug: bool = False, origin: typing.Optional[str] = None, timeout: typing.Optional[int] = None, justification: typing.Optional[Dict[str, Any]] = None, stack_info: typing.Optional[Dict[str, Any]] = None, kebechet_metadata: typing.Optional[Dict[str, Any]] = None, verify_tls: bool = False, ) -> typing.Optional[tuple]: """Submit a stack for provenance checks and wait for results.""" if not pipfile: raise ValueError("No Pipfile content provided for provenance checks") input_args: Dict[str, Any] = { "application_stack": PythonStack( requirements=pipfile, requirements_lock=pipfile_lock ), } if justification: input_args["justification"] = justification if stack_info: input_args["stack_info"] = stack_info if kebechet_metadata: input_args["kebechet_metadata"] = kebechet_metadata provenance_input = ProvenanceInput(**input_args) provenance_kwargs: Dict[str, Any] = { "debug": debug, "force": force, } if origin: provenance_kwargs["origin"] = origin if _THAMOS_TOKEN: provenance_kwargs["token"] = _THAMOS_TOKEN api_instance = ProvenanceApi(api_client) response = api_instance.post_provenance_python( provenance_input, **provenance_kwargs ) _LOGGER.info( "Successfully submitted provenance check analysis %r to %r", response.analysis_id, thoth_config.api_url, ) _note_last_analysis_id(response.analysis_id) if nowait: return response.analysis_id _wait_for_analysis( is_analysis_ready, response.analysis_id, timeout=timeout, verify_tls=verify_tls ) _LOGGER.debug("Retrieving provenance check result for %r", response.analysis_id) response = _retrieve_analysis_result( api_instance.get_provenance_python, response.analysis_id ) if not response: return None _LOGGER.debug("Provenance check metadata: %r", response.metadata) return response.result["report"], response.result["error"]
[docs]def provenance_check_here( *, nowait: bool = False, force: bool = False, debug: bool = False, origin: typing.Optional[str] = None, timeout: typing.Optional[int] = None, justification: typing.Optional[Dict[str, Any]] = None, stack_info: typing.Optional[Dict[str, Any]] = None, kebechet_metadata: typing.Optional[Dict[str, Any]] = None, verify_tls: bool = True, ) -> typing.Optional[tuple]: """Submit a provenance check in current directory.""" if not os.path.isfile("Pipfile"): raise FileNotFoundError("No Pipfile found in current directory") if not os.path.isfile("Pipfile.lock"): raise FileNotFoundError("No Pipfile.lock found in current directory") with open("Pipfile", "r") as pipfile, open("Pipfile.lock", "r") as piplock: return provenance_check( pipfile.read(), piplock.read(), nowait=nowait, force=force, debug=debug, origin=origin, timeout=timeout, justification=justification, stack_info=stack_info, kebechet_metadata=kebechet_metadata, verify_tls=verify_tls, )
[docs]@with_api_client def image_analysis( api_client: ApiClient, image: str, *, environment_type: str, registry_user: str = None, registry_password: str = None, verify_tls: bool = True, nowait: bool = False, force: bool = False, timeout: typing.Optional[int] = None, debug: bool = False, ) -> Union[Dict, str, None]: """Submit an image for analysis to Thoth.""" if not image: raise ValueError("No image provided") api_instance = ImageAnalysisApi(api_client) if registry_user or registry_password: # Swagger client handles None in a different way - we need to explicitly avoid passing # registry user and registry password if they are not set. response = api_instance.post_analyze( image=image, debug=debug, registry_user=registry_user, registry_password=registry_password, verify_tls=verify_tls, force=force, environment_type=environment_type, ) else: response = api_instance.post_analyze( image=image, debug=debug, verify_tls=verify_tls, force=force ) _LOGGER.info( "Successfully submitted provenance check analysis %r to %r", response.analysis_id, thoth_config.api_url, ) if nowait: return response.analysis_id _wait_for_analysis( is_analysis_ready, response.analysis_id, timeout=timeout, verify_tls=verify_tls ) _LOGGER.debug( "Retrieving image analysis result result for %r", response.analysis_id ) response = _retrieve_analysis_result(api_instance.get_analyze, response.analysis_id) if not response: return None _LOGGER.debug("Image analysis metadata: %r", response.metadata) return response.result
[docs]@with_api_client def build_analysis( api_client: ApiClient, build_log: dict, base_image: str, output_image: str, *, environment_type: str, base_registry_user: typing.Optional[str] = None, base_registry_password: typing.Optional[str] = None, base_registry_verify_tls: bool = True, output_registry_user: typing.Optional[str] = None, output_registry_password: typing.Optional[str] = None, output_registry_verify_tls: bool = True, origin: typing.Optional[str] = None, nowait: bool = False, force: bool = False, debug: bool = False, ) -> typing.Union[typing.Dict, str]: """Submit a build image and logs for analysis to Thoth.""" if build_log or base_image or output_image: build_detail = { "build_log": build_log, "base_image": base_image, "output_image": output_image, } else: raise ValueError("No build info provided") params = dict( body=build_detail, base_registry_user=base_registry_user, base_registry_password=base_registry_password, base_registry_verify_tls=base_registry_verify_tls, output_registry_user=output_registry_user, output_registry_password=output_registry_password, output_registry_verify_tls=output_registry_verify_tls, environment_type=environment_type, origin=origin, debug=debug, force=force, ) # Swagger client handles None in a different way - we need to explicitly avoid passing # values that are not present. params = {k: v for k, v in params.items() if v is not None} api_instance = BuildAnalysisApi(api_client) response = api_instance.post_build(**params) _LOGGER.info("Successfully submitted build analysis to %r", thoth_config.api_url) _LOGGER.debug("Build analysis parameters: %r", params) if nowait: return response return response
[docs]@with_api_client def get_log(api_client: ApiClient, analysis_id: str = None): """Get log of an analysis - the analysis type and endpoint are automatically derived from analysis id. If analysis_id is not provided, its get from the last thamos call which stores it in a temporary file. """ if not analysis_id: analysis_id = _get_last_analysis_id() if analysis_id.startswith("package-extract-"): api_instance = ImageAnalysisApi( api_client ) # type: Union[ImageAnalysisApi, ProvenanceApi, AdviseApi] method = api_instance.get_analyze_log # type: ignore elif analysis_id.startswith("provenance-checker-"): api_instance = ProvenanceApi(api_client) method = api_instance.get_provenance_python_log # type: ignore elif analysis_id.startswith("adviser-"): api_instance = AdviseApi(api_client) method = api_instance.get_advise_python_log # type: ignore else: raise UnknownAnalysisType( "Cannot determine analysis type from identifier: %r", analysis_id ) json_log = method(analysis_id).log if not json_log: return json_log result = "" for line in json_log.splitlines(): try: content = json.loads(line) if not int(os.getenv("THAMOS_NO_EMOJI", 0)): if content["levelname"] == "DEBUG": message = colored(content["message"], "green") elif content["levelname"] == "INFO": message = colored(content["message"], "cyan") elif content["levelname"] == "WARNING": message = colored(content["message"], "yellow", attrs=["bold"]) elif content["levelname"] in ("ERROR", "CRITICAL"): message = colored(content["message"], "red", attrs=["bold"]) else: message = content["message"] result += "{} {:<27} {}: {}\n".format( content["asctime"], content["name"], content["levelname"], message ) else: result += "{} {:<27} {}: {}\n".format( content["asctime"], content["name"], content["levelname"], content["message"], ) except Exception: # If the content parsed does not carry logger information or has not relevant # entries, log the original message. result += line + "\n" continue return result
[docs]@with_api_client def get_status(api_client: ApiClient, analysis_id: typing.Optional[str] = None): """Get status of an analysis - the analysis type and endpoint are automatically derived from analysis id. If analysis_id is not provided, its get from the last thamos call which stores it in a temporary file. """ if not analysis_id: analysis_id = _get_last_analysis_id() if analysis_id.startswith("package-extract-"): api_instance = ImageAnalysisApi( api_client ) # type: Union[ImageAnalysisApi, ProvenanceApi, AdviseApi] method = api_instance.get_analyze_status # type: ignore elif analysis_id.startswith("provenance-checker-"): api_instance = ProvenanceApi(api_client) method = api_instance.get_provenance_python_status # type: ignore elif analysis_id.startswith("adviser-"): api_instance = AdviseApi(api_client) method = api_instance.get_advise_python_status # type: ignore else: raise UnknownAnalysisType( "Cannot determine analysis type from identifier: %r", analysis_id ) return method(analysis_id).status.to_dict()
[docs]@with_api_client def get_analysis_results( api_client: ApiClient, analysis_id: typing.Optional[str] = None ): """Get the analysis result from a given id.""" if not analysis_id: analysis_id = _get_last_analysis_id() if analysis_id.startswith("package-extract-"): api_instance = ImageAnalysisApi( api_client ) # type: Union[ImageAnalysisApi, ProvenanceApi, AdviseApi] method = api_instance.get_analyze # type: ignore response = _retrieve_analysis_result( method, analysis_id ) # type: AnalysisResultResponse return response.result elif analysis_id.startswith("provenance-checker-"): api_instance = ProvenanceApi(api_client) method = api_instance.get_provenance_python # type: ignore response = _retrieve_analysis_result(method, analysis_id) return response.result["report"], response.result["error"] elif analysis_id.startswith("adviser-"): api_instance = AdviseApi(api_client) method = api_instance.get_advise_python # type: ignore response = _retrieve_analysis_result(method, analysis_id) return response.result, response.result["error"] else: raise UnknownAnalysisType( "Cannot determine analysis type from identifier: %r", analysis_id )
[docs]def install_using_config( config: str, runtime_environment_name: typing.Optional[str] = None, dev: bool = False, ) -> None: """Perform installation given the configuration supplied.""" try: thoth_config.load_config_from_file(config) except (FileNotFoundError, IOError): thoth_config.load_config_from_string(config) install(runtime_environment_name=runtime_environment_name, dev=dev)
[docs]def install( runtime_environment_name: typing.Optional[str] = None, dev: bool = False, pip_args: typing.Optional[typing.Tuple[str]] = None, ) -> None: """Perform installation of packages for the given runtime environment. If the runtime environment is not specified, the first environment stated in the configuration is used. """ method = ( "pipenv" if thoth_config.requirements_format == "pipenv" else "requirements" ) if not dev and method == "pipenv": _LOGGER.warning( "Development dependencies will not be installed - see %s", jl("no_dev") ) with cwd(thoth_config.get_overlays_directory(runtime_environment_name)): if method == "pipenv": if not os.path.isfile("Pipfile.lock"): raise NoRequirementsFile( f"No Pipfile.lock found in {os.getcwd()!r} needed to install dependencies, " "issue `thamos advise` resolve dependencies" ) if not os.path.isfile("Pipfile"): # Required for computing digests. raise NoRequirementsFile( f"No Pipfile found in {os.getcwd()!r} needed for the installation process" ) if dev: with open("Pipfile.lock") as pipfile_lock_file: content = json.load(pipfile_lock_file) if not content.get("develop"): raise NoDevRequirements( "No development requirements found in the lock file, make sure development " "requirements are stated and the resolved stack preserves them by " "using `thamos advise --dev`" ) del content else: if not os.path.isfile("requirements.txt"): raise NoRequirementsFile( f"No requirements.txt file found in {os.getcwd()!r} needed to install dependencies" ) if runtime_environment_name is None: config_entry = thoth_config.get_runtime_environment( runtime_environment_name ) runtime_environment_name = config_entry["name"] _LOGGER.info( "Installing requirements for runtime environment %r", runtime_environment_name, ) _LOGGER.info( "Using %r installation method to install dependencies stated in %r", method, os.getcwd(), ) micropipenv_kwargs = { "method": method, "deploy": True, "dev": dev, "pip_args": pip_args, } virtualenv_path = thoth_config.get_virtualenv_path(runtime_environment_name) if virtualenv_path: if not os.path.isdir(virtualenv_path): thoth_config.create_virtualenv() micropipenv_kwargs["pip_bin"] = os.path.join(virtualenv_path, "bin", "pip3") # micropipenv writes and prints the lockfile which is not very user friendly when thamos is used by a user. # Suppress this behavior unless these environment variables options are supplied explicitly. old_write = os.getenv("MICROPIPENV_NO_LOCKFILE_WRITE") if old_write is None: os.environ["MICROPIPENV_NO_LOCKFILE_WRITE"] = "1" old_print = os.getenv("MICROPIPENV_NO_LOCKFILE_PRINT") if old_print is None: os.environ["MICROPIPENV_NO_LOCKFILE_PRINT"] = "1" try: micropipenv.install(**micropipenv_kwargs) finally: if old_write is None: os.environ.pop("MICROPIPENV_NO_LOCKFILE_WRITE", None) if old_print is None: os.environ.pop("MICROPIPENV_NO_LOCKFILE_PRINT", None)
[docs]@with_api_client def list_hardware_environments(api_client: ApiClient) -> typing.Dict[str, Any]: """Get information about hardware for which Thoth can give recommendations.""" return ( HardwareApi(api_client) .list_hardware_environments() .to_dict()["hardware_environments"] )
[docs]@with_api_client def list_thoth_s2i(api_client: ApiClient) -> typing.Dict[str, Any]: """Get information about hardware for which Thoth can give recommendations.""" return S2iApi(api_client).list_s2i_python().to_dict()["s2i"]
[docs]@with_api_client def list_python_package_indexes(api_client: ApiClient) -> typing.Dict[str, Any]: """Get information about hardware for which Thoth can give recommendations.""" return ( PythonPackagesApi(api_client).list_python_package_indexes().to_dict()["indexes"] )
[docs]def write_files( requirements: typing.Dict[str, Any], requirements_lock: typing.Dict[str, Any], requirements_format: str, ) -> None: """Write content of Pipfile/Pipfile.lock or requirements.in/txt to the current directory.""" project = Project.from_dict(requirements, requirements_lock) if requirements_format == "pipenv": _LOGGER.debug("Writing to Pipfile/Pipfile.lock in %r", os.getcwd()) project.to_files(keep_thoth_section=True) elif requirements_format in ("pip", "pip-tools", "pip-compile"): _LOGGER.debug("Writing to requirements.in/requirements.txt in %r", os.getcwd()) project.to_pip_compile_files() _LOGGER.debug("No changes to Pipfile to write") else: raise ValueError( f"Unknown requirements format, supported are 'pipenv' and 'pip': {requirements_format!r}" )
[docs]def load_files(requirements_format: str) -> typing.Tuple[str, typing.Optional[str]]: """Load Pipfile/Pipfile.lock or requirements.in/txt from the current directory.""" if requirements_format == "pipenv": _LOGGER.info("Using Pipenv files located in %r directory", os.getcwd()) pipfile_lock_exists = os.path.exists("Pipfile.lock") if pipfile_lock_exists: _LOGGER.info( "Submitting Pipfile.lock as a base for user's stack scoring - see %s", jl("user_stack"), ) project = Project.from_files( pipfile_path="Pipfile", without_pipfile_lock=not os.path.exists("Pipfile.lock"), ) if ( pipfile_lock_exists and project.pipfile_lock.meta.hash["sha256"] != project.pipfile.hash()["sha256"] ): _LOGGER.error( "Pipfile hash stated in Pipfile.lock %r does not correspond to Pipfile hash %r - was Pipfile " "adjusted? This error is not critical.", project.pipfile_lock.meta.hash["sha256"][:6], project.pipfile.hash()["sha256"][:6], ) elif requirements_format in ("pip", "pip-tools", "pip-compile"): _LOGGER.info("Using requirements.txt file located in %r directory", os.getcwd()) project = Project.from_pip_compile_files(allow_without_lock=True) else: raise ValueError( f"Unknown configuration option for requirements format: {requirements_format!r}" ) return ( project.pipfile.to_string(), project.pipfile_lock.to_string() if project.pipfile_lock else None, )
[docs]def write_configuration( advised_configuration: dict, recommendation_type: str = None, dev: bool = False, ) -> None: """Create thoth configuration file.""" if not advised_configuration: _LOGGER.debug("No advises on configuration, nothing to adjust") return if "name" not in advised_configuration: _LOGGER.error( "Cannot adjust Thoth's configuration based on advises: No name found in Thoth's configuration entry" ) return _LOGGER.debug("Reading Thoth's configuration file") with open(".thoth.yaml", "r") as thoth_yaml_file: content = yaml.safe_load(thoth_yaml_file.read()) for idx, runtime_environment_entry in enumerate( content.get("runtime_environments", []) ): if runtime_environment_entry.get("name") == advised_configuration["name"]: _LOGGER.debug( "Adjusting configuration entry for %r based on recommendations", advised_configuration["name"], ) runtime_environment_entry = advised_configuration if recommendation_type: runtime_environment_entry["recommendation_type"] = recommendation_type runtime_environment_entry["dev"] = dev content["runtime_environments"][idx] = runtime_environment_entry break else: _LOGGER.error( "Cannot adjust Thoth's configuration based on advises: No runtime environment entry with name %r found", advised_configuration["name"], ) return _LOGGER.debug("Writing adjusted Thoth's configuration file") with open(".thoth.yaml", "w") as thoth_yaml_file: yaml.safe_dump(content, thoth_yaml_file)
[docs]def collect_support_information_dict() -> Dict[str, Any]: """Collect environment information suitable to report bugs or issues in a dictionary form.""" discovery = {} for name, obj in thamos.discover.__dict__.items(): if callable(obj) and name.startswith("discover_"): discovery[name[len("discover_") :]] = obj() thoth_config_content = None try: thoth_config_content = thoth_config.content except Exception: _LOGGER.exception("Failed to obtain content of Thoth's configuration file") thoth_version = None try: thoth_version = thoth_config.get_thoth_version() except Exception: _LOGGER.exception("Failed to obtain Thoth version information") pip_freeze = None try: pip_freeze = run_command("pip freeze").stdout.splitlines() except Exception: _LOGGER.exception("Failed to obtain packages present in the environment") result = { "environment": { k: v for k, v in os.environ.items() if k.startswith(("THAMOS_", "THOTH_")) }, "is_s2i": _is_s2i(), "last_analysis_id": _get_last_analysis_id(), "thoth_config": thoth_config_content, "thoth_version": thoth_version, "thamos_version": thamos_version, "os_name": os.name, "platform_machine": platform.machine(), "platform_python_implementation": platform.python_implementation(), "platform_release": platform.release(), "platform_system": platform.system(), "platform_version": platform.version(), "python_full_version": platform.python_version(), "python_version": platform.python_version()[:3], "sys_platform": sys.platform, "pip_freeze": pip_freeze, "discovery": discovery, } return result
def _get_package_entry_str( pipfile_lock: Dict[str, Dict[str, typing.Any]], package_name: str ) -> str: """Get information about the given package so that they are printed to users.""" package_entry = pipfile_lock["default"].get( package_name, pipfile_lock["develop"].get(package_name) ) if not package_entry: # Any error spotted? return "UNKNOWN==UNKNOWN from UNKNOWN" index_url = "UNKNOWN" for source in pipfile_lock["_meta"]["sources"]: if package_entry.get("index") == source["name"]: index_url = source["url"] break return f"{package_name}{package_entry.get('version')} from {index_url}" def _traverse_edges( pipfile_lock: Dict[str, Dict[str, str]], nodes: typing.List[str], edges: typing.List[typing.List[int]], seen_nodes: typing.Set[int], *, starting_node: int, indent: int = 2, fold: bool = True, ) -> None: """Traverse edges of the dependency graph and show its structure to terminal.""" for edge in edges: if edge[0] == starting_node: print( " " * indent + f"↳ {_get_package_entry_str(pipfile_lock, nodes[edge[1]])}" ) if edge[1] not in seen_nodes: if fold: seen_nodes.add(edge[1]) _traverse_edges( pipfile_lock, nodes, edges, seen_nodes=seen_nodes | {edge[1]}, starting_node=edge[1], indent=indent + 2, fold=fold, ) else: # Note cyclic dependency. print(" " * (indent + 2) + "↳ …")