Изучение API Kafka 4: подробное руководство по ключевым методам с примерами кода

Kafka — это распределенная платформа потоковой передачи, позволяющая создавать конвейеры данных в реальном времени и приложения потоковой передачи. С выпуском Kafka 4 было представлено несколько новых API и методов, расширяющих функциональные возможности и возможности Kafka. В этой статье мы рассмотрим некоторые ключевые методы API Kafka 4 и приведем примеры кода, демонстрирующие их использование.

  1. API Producer:
    API Producer в Kafka 4 позволяет приложениям публиковать записи в темах Kafka. Вот пример использования класса KafkaProducer для отправки записи:
import org.apache.kafka.clients.producer.*;
public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "my_key", "my_value");
        producer.send(record);
        producer.close();
    }
}
  1. Потребительский API:
    Потребительский API позволяет приложениям подписываться на темы Kafka и использовать записи, созданные производителями. Вот пример использования класса KafkaConsumer для использования записей:
import org.apache.kafka.clients.consumer.*;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "my_consumer_group");
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("my_topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received record: " + record.value());
            }
        }
    }
}
  1. API Streams.
    API Streams в Kafka 4 позволяет разработчикам обрабатывать и преобразовывать потоки данных в режиме реального времени. Вот пример использования класса KafkaStreams для создания простого приложения потоковой обработки:
import org.apache.kafka.streams.*;
public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
        properties.put("value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("my_topic");
        KStream<String, String> transformed = input.mapValues(value -> value.toUpperCase());
        transformed.to("output_topic");
        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

В Kafka 4 представлены мощные API, которые позволяют разработчикам создавать надежные и масштабируемые приложения потоковой передачи. В этой статье мы рассмотрели API-интерфейс Producer, Consumer API и Streams API с примерами кода, демонстрирующими их использование. Используя эти методы, разработчики могут использовать весь потенциал Kafka для обработки данных в реальном времени, потоковой передачи событий и обмена сообщениями.