Source code for thoth.messaging.admin_client

#!/usr/bin/env python3
# thoth-messaging
# Copyright(C) 2020 Kevin Postlethwait
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""Helper functions for using confluent kafka admin client with thoth.messaging."""

from typing import Optional, Dict
import logging

from .config import kafka_config_from_env, topic_config_from_env
from . import ALL_MESSAGES
from . import MessageBase

from confluent_kafka.admin import AdminClient, NewTopic


_LOGGER = logging.Logger(__name__)


[docs]def create_admin_client(config: Optional[Dict[str, str]] = None) -> AdminClient: """Create admin client.""" if config: return AdminClient(config) return AdminClient(kafka_config_from_env())
[docs]def create_all_topics(admin: AdminClient, partitions: int = 1, replication_factor: int = 1): """Create admin client for all topics in thoth messaging with equal replication and partitions.""" # NOTE: topics are only created if they don't exist topics = admin.list_topics().topics for i in ALL_MESSAGES: t_name = i.topic_name if t_name in topics: continue admin.create_topics( [ NewTopic( t_name, partitions, replication_factor=replication_factor, config=topic_config_from_env(), ) ] )
[docs]def create_topic(admin: AdminClient, message: MessageBase, partitions: int = 1, replication_factor: int = 1): """Create single topic.""" # NOTE: we assume `message` is initialized topics = admin.list_topics().topics t_name = message.topic_name if t_name in topics: _LOGGER.warn("Topic %s already exists on Kafka cluster.", t_name) return admin.create_topics( [ NewTopic( message.topic_name, partitions, replication_factor=replication_factor, config=topic_config_from_env(), ) ] )
[docs]def check_connection(timeout: int = 10, config: Optional[Dict[str, str]] = None): """Check connection to Kafka with either provided config or config gathered from env.""" if config: a = AdminClient(config) else: a = AdminClient(kafka_config_from_env()) try: a.list_topics(timeout=timeout) return True except Exception as e: _LOGGER.exception(e) return False