Понимание ISR в среде Kafka и как с ним работать

В среде Kafka ISR означает «синхронная реплика». Это относится к набору реплик тематического раздела Kafka, которые считаются синхронизированными с ведущей репликой. Реплика-лидер отвечает за обработку всех запросов на чтение и запись для раздела, а реплики с синхронизацией реплицируют данные от лидера, чтобы обеспечить отказоустойчивость.

Когда сообщение создается в Kafka, оно записывается в журнал ведущей реплики. Затем ведущая реплика реплицирует сообщение на синхронизированные реплики. Сообщение считается «зафиксированным», если оно записано лидеру и реплицировано в настраиваемое количество синхронизируемых реплик. Это гарантирует, что сообщение будет надежно сохранено и не будет потеряно даже в случае сбоя ведущей реплики.

Поддержание набора синхронизированных реплик обеспечивает отказоустойчивость и высокую доступность в Kafka. В случае сбоя ведущей реплики одна из синхронизированных реплик может быть выбрана в качестве нового лидера, гарантируя, что раздел останется доступным для чтения и записи. Однако если доступных синхронизированных реплик нет, раздел станет недоступным до тех пор, пока новая реплика не догонит и не синхронизируется.

Вот несколько методов, которые вы можете использовать для работы с ISR в среде Kafka, а также примеры кода:

  1. Получение ISR для тематического раздела:

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeTopicsResult;
    import org.apache.kafka.clients.admin.TopicDescription;
    import org.apache.kafka.common.TopicPartitionInfo;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;
    public class KafkaISRMethods {
    public static List<Integer> getISRForPartition(String topic, int partition) throws ExecutionException, InterruptedException {
        AdminClient adminClient = AdminClient.create(properties);
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
        Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();
        TopicDescription topicDescription = topicDescriptions.get(topic);
        List<TopicPartitionInfo> partitions = topicDescription.partitions();
        TopicPartitionInfo partitionInfo = partitions.get(partition);
        return partitionInfo.isr();
    }
    }
  2. Проверка синхронизации реплики:

    import org.apache.kafka.clients.consumer.KafkaConsumer;
    public class KafkaISRMethods {
    public static boolean isReplicaInSync(String topic, int partition, int replicaId) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "my-group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);
    
        for (PartitionInfo partitionInfo : partitions) {
            if (partitionInfo.partition() == partition) {
                return partitionInfo.inSyncReplicas().contains(new Node(replicaId, "localhost", 9092));
            }
        }
    
        return false;
    }
    }
  3. Добавление реплики в ISR:

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
    import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
    import org.apache.kafka.clients.admin.NewPartitionReassignment;
    import org.apache.kafka.clients.admin.TopicDescription;
    import org.apache.kafka.common.TopicPartition;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    public class KafkaISRMethods {
    public static void addReplicaToISR(String topic, int partition, int replicaId) throws ExecutionException, InterruptedException {
        AdminClient adminClient = AdminClient.create(properties);
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
        Map<String, TopicDescription> topicDescriptions = describeTopicsResult.all().get();
        TopicDescription topicDescription = topicDescriptions.get(topic);
        List<TopicPartitionInfo> partitions = topicDescription.partitions();
    
        for (TopicPartitionInfo partitionInfo : partitions) {
            if (partitionInfo.partition() == partition) {
                NewPartitionReassignment newPartitionReassignment = new NewPartitionReassignment(
                        new TopicPartition(topic, partition),
                        partitionInfo.replicas(),
                        partitionInfo.replicas().stream().map(Node::id).toList(),
                        replicaId
                );
                AlterPartitionReassignmentsResult result = adminClient.alterPartitionReassignments(
                        Collections.singletonList(newPartitionReassignment), new AlterPartitionReassignmentsOptions()
                );
                result.all().get();
                break;
            }
        }
    }
    }