Потребление сообщений Kafka и извлечение смещения: подробное руководство

В архитектурах, управляемых событиями, Apache Kafka стал популярным выбором для создания масштабируемых и надежных систем обмена сообщениями. Kafka позволяет приложениям публиковать потоки записей и подписываться на них, обеспечивая отказоустойчивую обработку данных с высокой пропускной способностью. Одним из важнейших аспектов работы с Kafka является получение сообщений и отслеживание смещения, которое представляет положение потребителя внутри раздела. В этой статье мы рассмотрим несколько методов использования сообщений Kafka и получения смещения, а также примеры кода, иллюстрирующие каждый подход.

  1. API Kafka Consumer:
    API Kafka Consumer предоставляет высокоуровневый интерфейс для получения сообщений из тем Kafka. Подписавшись на тему, вы можете получать сообщения и получать доступ к связанному с ними смещению. Вот пример использования сообщений с использованием Kafka Consumer API в Java:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String message = record.value();
                long offset = record.offset();
                System.out.println("Received message: " + message + ", Offset: " + offset);
            }
        }
    }
}
  1. API Kafka Streams.
    Если вам требуются более расширенные возможности обработки потоков, API Kafka Streams — отличный выбор. Он позволяет создавать масштабируемые и отказоустойчивые приложения потоковой обработки поверх Kafka. Вот пример использования сообщений и доступа к смещению с помощью API Kafka Streams в Java:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("application.id", "my-streams-app");
        props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
        props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("my-topic");

        stream.foreach((key, value) -> {
            long offset = context.offset();
            System.out.println("Received message: " + value + ", Offset: " + offset);
        });
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
  1. Confluent Kafka Consumer API (Java):
    Confluent предоставляет клиент Java, который расширяет Kafka Consumer API дополнительными функциями и улучшениями. Он предлагает улучшенную масштабируемость, производительность и простоту использования. Вот пример использования сообщений и получения смещения с помощью Confluent Kafka Consumer API:
import io.confluent.kafka.serializers.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
public class ConfluentKafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        props.put("schema.registry.url", "http://localhost:8081");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String message = record.value();
                long offset = record.offset();
                System.out.println("Received message: " + message + ", Offset: " + offset);
            }
        }
    }
}

В этой статье мы рассмотрели различные методы получения сообщений и получения смещения в Apache Kafka. Kafka Consumer API, Kafka Streams API и Confluent Kafka Consumer API — это мощные инструменты, подходящие для различных вариантов использования. В зависимости от ваших конкретных требований вы можете выбрать наиболее подходящий подход для создания надежных и эффективных потребителей Kafka, которые точно отслеживают смещения сообщений.

Используя эти методы, разработчики могут использовать возможности Kafka для беспрепятственного использования сообщений и извлечения смещений, что позволяет разрабатывать масштабируемые и надежные приложения, управляемые событиями.