Apache Kafka – популярная распределенная платформа потоковой передачи, которая широко используется для создания конвейеров данных в реальном времени и приложений потоковой передачи. В Kafka производители отвечают за публикацию данных в темах, которые могут быть использованы одним или несколькими потребителями. Метод send()в API Kafka Producer используется для асинхронной отправки сообщений брокерам Kafka. В этой статье мы подробно рассмотрим метод send()и предоставим примеры кода для различных сценариев.
Понимание метода send() производителя.
Метод send()в API Kafka Producer используется для отправки сообщения в тему Kafka. Он имеет разные сигнатуры с различными параметрами для разных случаев использования. Давайте рассмотрим некоторые из наиболее часто используемых методов и соответствующие им примеры кода:
- Базовый метод send():
Базовая форма методаsend()позволяет отправлять сообщение только с указанным значением. Вот пример:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "Hello, Kafka!");
producer.send(record);
- Метод send() с ключом и значением:
Вы также можете отправить сообщение с определенным ключом вместе со значением. Это может быть полезно, если вы хотите, чтобы все сообщения с одним и тем же ключом записывались в один и тот же раздел. Вот пример:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "my-key", "Hello, Kafka!");
producer.send(record);
-
Метод
- send() с обратным вызовом:
Методsend()позволяет вам предоставить функцию обратного вызова, которая будет вызываться после подтверждения сообщения брокером. Это полезно для обработки подтверждений или ошибок. Вот пример:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "Hello, Kafka!");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Message sent successfully! Offset: " + metadata.offset());
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
});
- Метод send() с разделом:
Вы также можете отправить сообщение в определенный раздел, используя методsend(). Это может быть полезно, если вы хотите управлять логикой разделения вручную. Вот пример:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", 0, "my-key", "Hello, Kafka!");
producer.send(record);
- Метод send() с меткой времени:
Если вам нужно указать метку времени для сообщения, вы можете использовать методsend(), который принимает параметр метки времени. Вот пример:
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", null, System.currentTimeMillis(), "my-key", "Hello, Kafka!");
producer.send(record);