Apache Kafka — широко используемая распределенная потоковая платформа, обеспечивающая надежную, масштабируемую и высокопроизводительную обработку данных в реальном времени. Одним из ключевых аспектов Kafka является его поведение при фиксации, которое определяет, как фиксируются смещения для обработанных сообщений. В этой статье блога мы рассмотрим различные методы программного управления поведением фиксации Kafka, а также приведем примеры кода.
Метод 1: ручная фиксация
По умолчанию потребитель Kafka использует автоматическую фиксацию, при которой он автоматически фиксирует смещения через регулярные промежутки времени. Однако вы можете вручную контролировать поведение фиксации, отключив автоматическую фиксацию и явно фиксируя смещения с помощью метода commitSync(). Вот пример:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process the record
}
consumer.commitSync(); // Manually commit offsets
}
} finally {
consumer.close();
}
Метод 2: асинхронная фиксация
Помимо ручной фиксации Kafka также предоставляет вариант асинхронной фиксации с использованием метода commitAsync(). Этот метод позволяет фиксировать смещения неблокирующим способом, что может повысить общую пропускную способность потребителя. Вот пример:
// Consumer initialization code
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process the record
}
consumer.commitAsync(); // Asynchronously commit offsets
}
} finally {
consumer.close();
}
Метод 3: фиксация определенных смещений
Иногда вам может потребоваться фиксировать смещения выборочно, а не фиксировать все обработанные смещения. Kafka предоставляет методы commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)и commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets)для фиксации определенных смещений. Вот пример:
// Consumer initialization code
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Process the record
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
consumer.commitSync(offsets); // Commit specific offset
}
}
} finally {
consumer.close();
}
Программное управление поведением фиксации Kafka позволяет вам точно настроить процесс управления смещениями в соответствии с вашими конкретными требованиями. В этой статье мы рассмотрели три метода: ручная фиксация, асинхронная фиксация и фиксация определенных смещений. Используя эти методы, вы можете оптимизировать производительность, надежность и возможности обработки ошибок ваших потребительских приложений Kafka.