Понимание концепций лидера и последователя в Kafka: объяснение на примерах кода

В Kafka термины «лидер» и «последователь» используются для описания ролей брокеров в кластере Kafka. Кластер Kafka состоит из нескольких брокеров, каждый из которых отвечает за обработку части данных и разделов в системе.

  1. Лидер:
    Лидер — это брокер, отвечающий за обработку запросов на чтение и запись для определенного раздела темы. Он служит основным узлом раздела и отвечает за поддержание согласованности данных. Лидер получает все запросы на запись и обеспечивает их репликацию на последующие брокеры. В каждый момент времени у каждого раздела есть только один лидер.

  2. Последователь:
    Последовательники — это брокеры, которые реплицируют данные лидера и служат резервными копиями. Они не обрабатывают запросы на чтение или запись непосредственно от клиентов. Вместо этого они копируют сообщения лидера и синхронизируются с его состоянием. Если лидер терпит неудачу, один из последователей избирается новым лидером, чтобы обеспечить непрерывную работу.

Методы взаимодействия с лидерами и последователями в Kafka:

  1. Создание сообщений.
    Чтобы создавать сообщения в теме Kafka, вы можете использовать API KafkaProducer. Вот пример на Python:
from kafka import KafkaProducer
# Create a KafkaProducer instance
producer = KafkaProducer(bootstrap_servers='your_kafka_server:9092')
# Produce a message to a topic
topic = 'your_topic'
message = b'Hello, Kafka!'
producer.send(topic, value=message)
# Close the producer
producer.close()
  1. Потребление сообщений.
    Чтобы использовать сообщения из темы Kafka, вы можете использовать API KafkaConsumer. Вот пример на Java:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
// Create properties for the consumer
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_server:9092");
props.put("group.id", "your_consumer_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create a KafkaConsumer instance
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to a topic
String topic = "your_topic";
consumer.subscribe(Collections.singletonList(topic));
// Start consuming messages
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}
// Close the consumer
consumer.close();