#!/usr/bin/env python3
# thoth-investigator
# Copyright(C) 2020 Francesco Murdaca
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This is Thoth investigator common methods."""
import logging
from math import inf
from asyncio import sleep
import json
from typing import List, Tuple, Optional, Callable
from thoth.common import OpenShift
from thoth.storages import GraphDatabase
from thoth.messaging import ALL_MESSAGES
from .configuration import Configuration
from .metrics import message_version_metric
_LOGGER = logging.getLogger(__name__)
def _create_base_handler_table():
table = dict()
for i in ALL_MESSAGES:
table[i.topic_name] = dict()
return table
def _get_class_from_topic_name(topic_name):
for i in ALL_MESSAGES:
if i.topic_name == topic_name:
return i
def _get_class_from_base_name(base_name):
for i in ALL_MESSAGES:
if i.base_name == base_name:
return i
def _message_type_from_message_class(message_class):
message_type = message_class.base_name.rsplit(".", maxsplit=1)[-1] # type: str
message_type = message_type.replace("-", "_") # this is to match the metrics associate with processing
return message_type
investigator_handler_table = _create_base_handler_table()
metrics_handler_table = _create_base_handler_table()
[docs]def register_handler(topic_name: str, version_strings: List[str], handler_table: dict = investigator_handler_table):
"""Register function to specific message versions."""
def wrapper_func(func: Callable):
for v in version_strings:
handler_table[topic_name][v] = func
_LOGGER.debug("Registering handler for %s==%s", topic_name, v)
async def innner_func(*args, **kwargs):
return await func(*args, **kwargs)
return innner_func
return wrapper_func
[docs]async def default_metric_handler(contents, msg, **kwargs):
"""Increments counter specific to message type and version that was encountered."""
metric = message_version_metric.labels(
message_type=_message_type_from_message_class(_get_class_from_topic_name(msg.topic())),
message_version=json.loads(msg.value().decode("utf-8")).get("version", "None"),
)
metric.inc()
[docs]async def wait_for_limit(openshift: OpenShift, workflow_namespace: str):
"""Wait for pending workflow limit."""
total_pending = inf
if Configuration.PENDING_WORKFLOW_LIMIT is None:
return
limit = int(Configuration.PENDING_WORKFLOW_LIMIT)
total_pending = openshift.workflow_manager.get_pending_workflows(workflow_namespace=workflow_namespace)
_LOGGER.debug("Current number pending = %d", total_pending)
while total_pending > limit:
await sleep(Configuration.SLEEP_TIME)
total_pending = openshift.workflow_manager.get_pending_workflows(workflow_namespace=workflow_namespace)
_LOGGER.debug("Current number pending = %d", total_pending)
[docs]async def schedule_kebechet_administrator(openshift: OpenShift, message_info: dict, message_name: str) -> Optional[str]:
"""Schedule Kebechet Administrator from a particular message."""
try:
await wait_for_limit(openshift, workflow_namespace=Configuration.THOTH_BACKEND_NAMESPACE)
workflow_id = openshift.schedule_kebechet_administrator(message_info=message_info, message_type=message_name)
_LOGGER.info(
"Scheduled Kebechet Administrator workflow"
f" for message type {message_name} with workflow id - {workflow_id}"
)
except Exception as e:
_LOGGER.exception(f"Failed to schedule Kebechet Administrator workflow for message type {message_name}: {e}")
raise e
return workflow_id
[docs]async def learn_about_security(
openshift: OpenShift,
graph: GraphDatabase,
is_present: bool,
package_name: str,
index_url: str,
package_version: str,
) -> int:
"""Learn about security of Package Version Index."""
if is_present:
# Check if package version index has been already analyzed for security
is_analyzed = graph.si_aggregated_python_package_version_exists(
package_name=package_name, package_version=package_version, index_url=index_url
)
if is_analyzed:
return 0
# Package never seen (schedule si workflow to collect knowledge for Thoth)
is_si_analyzer_scheduled = await _schedule_security_indicator(
openshift=openshift, package_name=package_name, package_version=package_version, index_url=index_url
)
return is_si_analyzer_scheduled
async def _schedule_security_indicator(
openshift: OpenShift, package_name: str, package_version: str, index_url: str
) -> int:
"""Schedule Security Indicator."""
try:
await wait_for_limit(openshift, workflow_namespace=Configuration.THOTH_MIDDLETIER_NAMESPACE)
analysis_id = openshift.schedule_security_indicator(
python_package_name=package_name,
python_package_version=package_version,
python_package_index=index_url,
aggregation_function="process_data",
)
_LOGGER.info(
"Scheduled SI workflow for package %r in version %r from index %r, analysis is %r",
package_name,
package_version,
index_url,
analysis_id,
)
is_scheduled = 1
except Exception as e:
_LOGGER.exception(
f"Failed to schedule SI for package {package_name} in version {package_version} from index {index_url}: {e}"
)
raise e
return is_scheduled
[docs]async def learn_using_revsolver(
openshift: OpenShift,
is_present: bool,
package_name: str,
package_version: str,
revsolver_packages_seen: Optional[List[Tuple[str, str]]] = None,
) -> Tuple[int, List[Tuple[str, str]]]:
"""Learn using revsolver about Package Version dependencies."""
revsolver_packages_seen = revsolver_packages_seen or []
if not is_present and (package_name, package_version) not in revsolver_packages_seen:
# Package never seen (schedule revsolver workflow to collect knowledge for Thoth)
is_revsolver_scheduled = await _schedule_revsolver(
openshift=openshift, package_name=package_name, package_version=package_version
)
revsolver_packages_seen.append((package_name, package_version))
return is_revsolver_scheduled, revsolver_packages_seen
return 0, revsolver_packages_seen
async def _schedule_revsolver(openshift: OpenShift, package_name: str, package_version: str) -> int:
"""Schedule revsolver."""
try:
await wait_for_limit(openshift, workflow_namespace=Configuration.THOTH_MIDDLETIER_NAMESPACE)
analysis_id = openshift.schedule_revsolver(
package_name=package_name, package_version=package_version, debug=Configuration.LOG_REVSOLVER
)
_LOGGER.info(
"Scheduled reverse solver for package %r in version %r, analysis is %r",
package_name,
package_version,
analysis_id,
)
is_scheduled = 1
except Exception as e:
_LOGGER.exception(
"Failed to schedule reverse solver for %r in version %r: %r", package_name, package_version, e
)
raise e
return is_scheduled
[docs]async def learn_using_solver(
openshift: OpenShift,
graph: GraphDatabase,
is_present: bool,
package_name: str,
index_url: str,
package_version: str,
solver: Optional[str] = None,
) -> int:
"""Learn using solver about Package Version Index dependencies."""
if not is_present:
# Package never seen (schedule all solver workflows to collect all knowledge for Thoth)
are_solvers_scheduled = await _schedule_all_solvers(
openshift=openshift, package_name=package_name, package_version=package_version, indexes=[index_url]
)
return are_solvers_scheduled
# Select solvers
if not solver:
solvers: List[str] = openshift.get_solver_names()
else:
solvers = [solver]
dependency_indexes = graph.get_python_package_index_urls_all(enabled=True)
# Check for which solver has not been solved and schedule solver workflow
are_solvers_scheduled = 0
for solver_name in solvers:
is_solved = graph.python_package_version_exists(
package_name=package_name, package_version=package_version, index_url=index_url, solver_name=solver_name
)
if not is_solved:
is_solver_scheduled = _schedule_solver(
openshift=openshift,
package_name=package_name,
package_version=package_version,
indexes=[index_url],
dependency_indexes=dependency_indexes,
solver_name=solver_name,
)
are_solvers_scheduled += is_solver_scheduled
return are_solvers_scheduled
def _schedule_solver(
openshift: OpenShift,
package_name: str,
package_version: str,
dependency_indexes: List[str],
indexes: List[str],
solver_name: str,
) -> int:
"""Schedule solver."""
packages = f"{package_name}==={package_version}"
try:
# mypy is throwing errors in CI without this ignore
analysis_id = openshift.schedule_solver( # type: ignore
solver=solver_name,
packages=packages,
indexes=indexes,
dependency_indexes=dependency_indexes,
transitive=False,
debug=Configuration.LOG_SOLVER,
)
_LOGGER.info(
"Scheduled solver %r for packages %r from indexes %r, analysis is %r",
solver_name,
packages,
indexes,
analysis_id,
)
is_scheduled = 1
except Exception as e:
_LOGGER.exception(f"Failed to schedule solver {solver_name} for package {packages} from {indexes}: {e}")
raise e
return is_scheduled
async def _schedule_all_solvers(
openshift: OpenShift, package_name: str, package_version: str, indexes: List[str]
) -> int:
"""Schedule all solvers."""
packages = f"{package_name}==={package_version}"
try:
await wait_for_limit(openshift, workflow_namespace=Configuration.THOTH_MIDDLETIER_NAMESPACE)
analysis_ids = openshift.schedule_all_solvers(packages=packages, indexes=indexes)
_LOGGER.info(
"Scheduled solvers for packages %r from indexes %r, analysis ids are %r", packages, indexes, analysis_ids
)
are_scheduled = len(analysis_ids)
except Exception as e:
_LOGGER.exception(f"Failed to schedule solvers for package {packages} from {indexes}: {e}")
raise e
return are_scheduled