В архитектурах, управляемых событиями, Apache Kafka стал популярным выбором для создания масштабируемых и надежных систем обмена сообщениями. Kafka позволяет приложениям публиковать потоки записей и подписываться на них, обеспечивая отказоустойчивую обработку данных с высокой пропускной способностью. Одним из важнейших аспектов работы с Kafka является получение сообщений и отслеживание смещения, которое представляет положение потребителя внутри раздела. В этой статье мы рассмотрим несколько методов использования сообщений Kafka и получения смещения, а также примеры кода, иллюстрирующие каждый подход.
- API Kafka Consumer:
API Kafka Consumer предоставляет высокоуровневый интерфейс для получения сообщений из тем Kafka. Подписавшись на тему, вы можете получать сообщения и получать доступ к связанному с ними смещению. Вот пример использования сообщений с использованием Kafka Consumer API в Java:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
long offset = record.offset();
System.out.println("Received message: " + message + ", Offset: " + offset);
}
}
}
}
- API Kafka Streams.
Если вам требуются более расширенные возможности обработки потоков, API Kafka Streams — отличный выбор. Он позволяет создавать масштабируемые и отказоустойчивые приложения потоковой обработки поверх Kafka. Вот пример использования сообщений и доступа к смещению с помощью API Kafka Streams в Java:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
public class KafkaStreamsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "my-streams-app");
props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("my-topic");
stream.foreach((key, value) -> {
long offset = context.offset();
System.out.println("Received message: " + value + ", Offset: " + offset);
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
- Confluent Kafka Consumer API (Java):
Confluent предоставляет клиент Java, который расширяет Kafka Consumer API дополнительными функциями и улучшениями. Он предлагает улучшенную масштабируемость, производительность и простоту использования. Вот пример использования сообщений и получения смещения с помощью Confluent Kafka Consumer API:
import io.confluent.kafka.serializers.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;
public class ConfluentKafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
props.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
long offset = record.offset();
System.out.println("Received message: " + message + ", Offset: " + offset);
}
}
}
}
В этой статье мы рассмотрели различные методы получения сообщений и получения смещения в Apache Kafka. Kafka Consumer API, Kafka Streams API и Confluent Kafka Consumer API — это мощные инструменты, подходящие для различных вариантов использования. В зависимости от ваших конкретных требований вы можете выбрать наиболее подходящий подход для создания надежных и эффективных потребителей Kafka, которые точно отслеживают смещения сообщений.
Используя эти методы, разработчики могут использовать возможности Kafka для беспрепятственного использования сообщений и извлечения смещений, что позволяет разрабатывать масштабируемые и надежные приложения, управляемые событиями.