Kafka — это распределенная платформа потоковой передачи, позволяющая создавать конвейеры данных в реальном времени и приложения потоковой передачи. С выпуском Kafka 4 было представлено несколько новых API и методов, расширяющих функциональные возможности и возможности Kafka. В этой статье мы рассмотрим некоторые ключевые методы API Kafka 4 и приведем примеры кода, демонстрирующие их использование.
- API Producer:
API Producer в Kafka 4 позволяет приложениям публиковать записи в темах Kafka. Вот пример использования класса KafkaProducer для отправки записи:
import org.apache.kafka.clients.producer.*;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "my_key", "my_value");
producer.send(record);
producer.close();
}
}
- Потребительский API:
Потребительский API позволяет приложениям подписываться на темы Kafka и использовать записи, созданные производителями. Вот пример использования класса KafkaConsumer для использования записей:
import org.apache.kafka.clients.consumer.*;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", "my_consumer_group");
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received record: " + record.value());
}
}
}
}
- API Streams.
API Streams в Kafka 4 позволяет разработчикам обрабатывать и преобразовывать потоки данных в режиме реального времени. Вот пример использования класса KafkaStreams для создания простого приложения потоковой обработки:
import org.apache.kafka.streams.*;
public class KafkaStreamsExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
properties.put("value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("my_topic");
KStream<String, String> transformed = input.mapValues(value -> value.toUpperCase());
transformed.to("output_topic");
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
В Kafka 4 представлены мощные API, которые позволяют разработчикам создавать надежные и масштабируемые приложения потоковой передачи. В этой статье мы рассмотрели API-интерфейс Producer, Consumer API и Streams API с примерами кода, демонстрирующими их использование. Используя эти методы, разработчики могут использовать весь потенциал Kafka для обработки данных в реальном времени, потоковой передачи событий и обмена сообщениями.