Kafka Topics: Руководство по пониманию и реализации потоков сообщений

Kafka Topics — это фундаментальный строительный блок Apache Kafka, распределенной потоковой платформы с открытым исходным кодом, которая позволяет создавать конвейеры данных в реальном времени и потоковые приложения. В этой статье блога мы погрузимся в мир Kafka Topics, объясним, что это такое, как они работают, и предоставим вам несколько методов эффективной работы с ними. Итак, начнём!

Что такое темы Kafka?

В Kafka тема представляет собой определенный поток данных. Это именованный канал или категория, в которой сообщения публикуются производителями и из которой сообщения потребляются потребителями. Темы разделены на разделы, каждый из которых представляет собой упорядоченную неизменяемую последовательность сообщений.

Создание темы Kafka:

Чтобы создать тему Kafka, вы можете использовать инструмент командной строки Kafka или программно с помощью API Kafka. Давайте рассмотрим пример создания темы под названием «my-topic» с тремя разделами и коэффициентом репликации два с помощью инструмента командной строки:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic my-topic --partitions 3 --replication-factor 2

Публикация сообщений в теме:

Теперь, когда у нас есть тема, мы можем начать публиковать в ней сообщения. Продюсеры несут ответственность за отправку сообщений в темы Kafka. Вот простой фрагмент кода Java, который демонстрирует, как опубликовать сообщение в теме Kafka с помощью API Kafka Producer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "Hello, Kafka!");
producer.send(record);
producer.close();

Получение сообщений из темы:

Потребители читают сообщения из тем Kafka. Они могут подписаться на одну или несколько тем и получать сообщения из назначенных разделов. Вот пример кода, использующий Kafka Consumer API для получения сообщений из темы:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}

Управление темами:

Kafka предоставляет различные административные операции для управления темами, такие как создание, удаление, изменение или перечисление тем. Вы можете использовать инструмент командной строки Kafka или Kafka Admin API для выполнения этих операций программным способом.

Темы Kafka составляют основу распределенного обмена сообщениями и потоковой передачи в Apache Kafka. В этой статье мы рассмотрели основы тем Kafka, включая создание тем, публикацию сообщений в темах, получение сообщений из тем и управление темами. Используя возможности тем Kafka, вы можете создавать масштабируемые, отказоустойчивые системы обработки данных в реальном времени.