Source code for thoth.messaging.consumer
#!/usr/bin/env python3
# thoth-messaging
# Copyright(C) 2020 Kevin Postlethwait
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Helper functions for using confluent kafka consumer with thoth.messaging."""
from typing import Optional, Dict
from .config import kafka_config_from_env
from .message_base import MessageBase
from . import ALL_MESSAGES
from confluent_kafka import Consumer
[docs]def create_consumer(config: Optional[Dict[str, str]] = None) -> Consumer:
"""Initialize consumer."""
if config:
return Consumer(config)
return Consumer(kafka_config_from_env())
[docs]def subscribe_to_all(consumer: Consumer):
"""Subscribe to all topics defined in messaging."""
to_subscribe = []
for i in ALL_MESSAGES:
to_subscribe.append(i.topic_name)
consumer.subscribe(to_subscribe)
[docs]def subscribe_to_message(consumer: Consumer, message_type: MessageBase):
"""Subscribe to specific message by passing message class."""
# NOTE: be sure to initialize message_type before passing
consumer.subscribe([message_type.topic_name])