Source code for thoth.messaging.config

#!/usr/bin/env python3
# thoth-messaging
# Copyright(C) 2020 Kevin Postlethwait
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""Helper functions for generating confluent kafka configuration."""

import os

# For more configuration options go to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# and add them to these dictionaries

confluent_config = {
    "bootstrap.servers": ("KAFKA_BOOTSTRAP_SERVERS", str),
    "client.id": ("KAFKA_CLIENT_ID", str),
    "message.max.bytes": ("KAFKA_MESSAGE_MAX_BYTES", int),
    "receive.message.max.bytes": ("KAFKA_RECEIVE_MESSAGE_MAX_BYTES", int),
    "security.protocol": ("KAFKA_SECURITY_PROTOCOL", str),
    "ssl.certificate.location": ("KAFKA_SSL_CERTIFICATE_LOCATION", str),
    "ssl.ca.location": ("KAFKA_SSL_CA_LOCATION", str),
    "ssl.endpoint.identification.algorithm": ("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", str),
    "ssl.key.location": ("KAFKA_SSL_KEY_LOCATION", str),
    "ssl.key.password": ("KAFKA_SSL_KEY_PASSWORD", str),
    "sasl.mechanism": ("KAFKA_SASL_MECHANISM", str),
    "sasl.username": ("KAFKA_SASL_USERNAME", str),
    "sasl.password": ("KAFKA_SASL_PASSWORD", str),
    "group.id": ("KAFKA_CONSUMER_GROUP_ID", str),
    "group.instance.id": ("KAFKA_CONSUMER_GROUP_INSTANCE_ID", str),
    "max.poll.interval.ms": ("KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS", float),
    "enable.auto.commit": ("KAFKA_CONSUMER_ENABLE_AUTO_COMMIT", bool),
    "auto.offset.reset": ("KAFKA_CONSUMER_AUTO_OFFSET_RESET", str),
}

topic_config = {
    "compression.type": ("KAFKA_TOPIC_PRODUCER_COMPRESSION_TYPE", str),
    "compression.level": ("KAFKA_TOPIC_PRODUCER_COMPRESSION_LEVEL", int),
    "enable.auto.commit": ("KAFKA_TOPIC_CONSUMER_ENABLE_AUTO_COMMIT", bool),
}


[docs]def kafka_config_from_env(): """Generate Kafka configuration from environment variables.""" # NOTE: if a different config is desired please open an issue config = dict() for k in confluent_config: value = os.getenv(confluent_config[k][0], None) if value: if confluent_config[k][1] is bool: config[k] = value.title() != "False" else: config[k] = confluent_config[k][1](value) return config
[docs]def topic_config_from_env(): """Generate topic config from environment variables.""" config = dict() for k in topic_config: value = os.getenv(topic_config[k][0], None) if value: if topic_config[k][1] is bool: config[k] = value.title() != "False" else: config[k] = topic_config[k][1](value) return config