Source code for thoth.investigator.configuration
#!/usr/bin/env python3
# thoth-investigator
# Copyright(C) 2020 Francesco Murdaca
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This is Thoth investigator configuration parameters."""
import logging
import os
import json
from enum import Enum, auto
from typing import Union
_LOGGER = logging.getLogger(__name__)
[docs]class ConsumerModeEnum(Enum):
"""Class representing the different modes the consumer can use which correspond to different handler tables."""
investigator = auto()
metrics = auto()
def _get_ack_on_fail() -> Union[bool, list]:
to_ret = json.loads(os.getenv("THOTH_INVESTIGATOR_ACK_ON_FAIL", "0"))
if type(to_ret) == int:
return bool(to_ret)
elif type(to_ret) == list:
return to_ret
else:
raise TypeError("THOTH_INVESTIGATOR_ACK_ON_FAIL envvar must be either a integer or a list")
[docs]class Configuration:
"""Configuration of investigator."""
# Namespaces
THOTH_BACKEND_NAMESPACE = os.environ["THOTH_BACKEND_NAMESPACE"]
THOTH_MIDDLETIER_NAMESPACE = os.environ["THOTH_MIDDLETIER_NAMESPACE"]
DEPLOYMENT_NAME = os.environ["THOTH_DEPLOYMENT_NAME"]
# Workflows
THOTH_INVESTIGATOR_SCHEDULE_SOLVER = int(os.getenv("THOTH_INVESTIGATOR_SCHEDULE_SOLVER", 1))
THOTH_INVESTIGATOR_SCHEDULE_REVSOLVER = int(os.getenv("THOTH_INVESTIGATOR_SCHEDULE_REVSOLVER", 1))
THOTH_INVESTIGATOR_SCHEDULE_SECURITY = int(os.getenv("THOTH_INVESTIGATOR_SCHEDULE_SECURITY", 1))
THOTH_INVESTIGATOR_SCHEDULE_KEBECHET_ADMIN = int(os.getenv("THOTH_INVESTIGATOR_KEBECHET_ADMINISTRATOR", 1))
LOG_SOLVER = os.environ.get("THOTH_LOG_SOLVER") == "DEBUG"
LOG_REVSOLVER = os.environ.get("THOTH_LOG_REVSOLVER") == "DEBUG"
# Quota handling
SLEEP_TIME = int(os.getenv("ARGO_PENDING_SLEEP_TIME", 2))
PENDING_WORKFLOW_LIMIT = os.getenv("ARGO_PENDING_WORKFLOW_LIMIT", None)
# Consumer configuration
MAX_RETRIES = int(os.getenv("THOTH_INVESTIGATOR_MAX_RETRIES", 5))
BACKOFF = float(os.getenv("THOTH_INVESTIGATOR_BACKOFF", 0.5)) # Linear backoff strategy
ACK_ON_FAIL = _get_ack_on_fail()
NUM_WORKERS = int(os.getenv("THOTH_CONSUMER_WORKERS", 5))
CONSUMER_MODE = os.getenv("THOTH_CONSUMER_MODE", "investigator")