В Apache Kafka группы потребителей позволяют параллельно использовать данные из нескольких тем. Одним из важнейших аспектов работы с группами потребителей является управление смещениями, которые определяют положение группы потребителей в разделе темы. В этой статье мы рассмотрим различные методы установки смещения на самую раннюю позицию на примерах кода.
Метод 1. Использование Kafka Consumer API (Java):
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
public class KafkaOffsetExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
consumer.seekToBeginning(consumer.assignment());
// Start consuming messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process the records
}
}
}
Метод 2: использование kafka-consumer-groups.sh (командная строка):
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-consumer-group --topic my-topic --reset-offsets --to-earliest --execute
Метод 3. Использование librdkafka (C/C++):
#include <librdkafka/rdkafka.h>
int main(int argc, char argv) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
char errstr[512];
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "group.id", "my-consumer-group", NULL, 0);
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rd_kafka_subscribe(rk, "my-topic");
rd_kafka_seek(rk, RD_KAFKA_PARTITION_UA, RD_KAFKA_OFFSET_BEGINNING);
// Start consuming messages
while (1) {
rd_kafka_message_t *msg;
msg = rd_kafka_consumer_poll(rk, 100);
// Process the message
rd_kafka_message_destroy(msg);
}
rd_kafka_destroy(rk);
return 0;
}
В этой статье мы рассмотрели три различных метода установки смещения на самую раннюю позицию для групп потребителей Kafka. Первый метод продемонстрировал использование Kafka Consumer API на Java, позволяющего детально контролировать управление смещениями. Второй метод продемонстрировал использование инструмента командной строки kafka-consumer-groups.sh, предоставляющего быстрый и удобный способ сброса смещений. Наконец, третий метод представляет собой пример использования библиотеки librdkafka на C/C++.
Используя эти методы, вы можете эффективно управлять смещениями групп потребителей и гарантировать, что ваши потребители Kafka начнут потреблять с самой ранней доступной позиции в теме.