В среде Kafka ISR означает «синхронная реплика». Это относится к набору реплик тематического раздела Kafka, которые считаются синхронизированными с ведущей репликой. Реплика-лидер отвечает за обработку всех запросов на чтение и запись для раздела, а реплики с синхронизацией реплицируют данные от лидера, чтобы обеспечить отказоустойчивость.
Когда сообщение создается в Kafka, оно записывается в журнал ведущей реплики. Затем ведущая реплика реплицирует сообщение на синхронизированные реплики. Сообщение считается «зафиксированным», если оно записано лидеру и реплицировано в настраиваемое количество синхронизируемых реплик. Это гарантирует, что сообщение будет надежно сохранено и не будет потеряно даже в случае сбоя ведущей реплики.
Поддержание набора синхронизированных реплик обеспечивает отказоустойчивость и высокую доступность в Kafka. В случае сбоя ведущей реплики одна из синхронизированных реплик может быть выбрана в качестве нового лидера, гарантируя, что раздел останется доступным для чтения и записи. Однако если доступных синхронизированных реплик нет, раздел станет недоступным до тех пор, пока новая реплика не догонит и не синхронизируется.
Вот несколько методов, которые вы можете использовать для работы с ISR в среде Kafka, а также примеры кода:
-
Получение ISR для тематического раздела:
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartitionInfo; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; public class KafkaISRMethods { public static List<Integer> getISRForPartition(String topic, int partition) throws ExecutionException, InterruptedException { AdminClient adminClient = AdminClient.create(properties); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic)); Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get(); TopicDescription topicDescription = topicDescriptions.get(topic); List<TopicPartitionInfo> partitions = topicDescription.partitions(); TopicPartitionInfo partitionInfo = partitions.get(partition); return partitionInfo.isr(); } } -
Проверка синхронизации реплики:
import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaISRMethods { public static boolean isReplicaInSync(String topic, int partition, int replicaId) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("group.id", "my-group"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); List<PartitionInfo> partitions = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitions) { if (partitionInfo.partition() == partition) { return partitionInfo.inSyncReplicas().contains(new Node(replicaId, "localhost", 9092)); } } return false; } } -
Добавление реплики в ISR:
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions; import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; public class KafkaISRMethods { public static void addReplicaToISR(String topic, int partition, int replicaId) throws ExecutionException, InterruptedException { AdminClient adminClient = AdminClient.create(properties); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic)); Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get(); TopicDescription topicDescription = topicDescriptions.get(topic); List<TopicPartitionInfo> partitions = topicDescription.partitions(); for (TopicPartitionInfo partitionInfo : partitions) { if (partitionInfo.partition() == partition) { NewPartitionReassignment newPartitionReassignment = new NewPartitionReassignment( new TopicPartition(topic, partition), partitionInfo.replicas(), partitionInfo.replicas().stream().map(Node::id).toList(), replicaId ); AlterPartitionReassignmentsResult result = adminClient.alterPartitionReassignments( Collections.singletonList(newPartitionReassignment), new AlterPartitionReassignmentsOptions() ); result.all().get(); break; } } } }