В Kafka термины «лидер» и «последователь» используются для описания ролей брокеров в кластере Kafka. Кластер Kafka состоит из нескольких брокеров, каждый из которых отвечает за обработку части данных и разделов в системе.
-
Лидер:
Лидер — это брокер, отвечающий за обработку запросов на чтение и запись для определенного раздела темы. Он служит основным узлом раздела и отвечает за поддержание согласованности данных. Лидер получает все запросы на запись и обеспечивает их репликацию на последующие брокеры. В каждый момент времени у каждого раздела есть только один лидер. -
Последователь:
Последовательники — это брокеры, которые реплицируют данные лидера и служат резервными копиями. Они не обрабатывают запросы на чтение или запись непосредственно от клиентов. Вместо этого они копируют сообщения лидера и синхронизируются с его состоянием. Если лидер терпит неудачу, один из последователей избирается новым лидером, чтобы обеспечить непрерывную работу.
Методы взаимодействия с лидерами и последователями в Kafka:
- Создание сообщений.
Чтобы создавать сообщения в теме Kafka, вы можете использовать API KafkaProducer. Вот пример на Python:
from kafka import KafkaProducer
# Create a KafkaProducer instance
producer = KafkaProducer(bootstrap_servers='your_kafka_server:9092')
# Produce a message to a topic
topic = 'your_topic'
message = b'Hello, Kafka!'
producer.send(topic, value=message)
# Close the producer
producer.close()
- Потребление сообщений.
Чтобы использовать сообщения из темы Kafka, вы можете использовать API KafkaConsumer. Вот пример на Java:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
// Create properties for the consumer
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_server:9092");
props.put("group.id", "your_consumer_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create a KafkaConsumer instance
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to a topic
String topic = "your_topic";
consumer.subscribe(Collections.singletonList(topic));
// Start consuming messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
// Close the consumer
consumer.close();