Apache Kafka – популярная распределенная платформа потоковой передачи, которая широко используется для создания конвейеров данных в реальном времени и приложений потоковой передачи. Одним из ключевых аспектов Kafka является политика хранения сообщений, которая определяет, как долго сообщения хранятся в системе. В этой статье блога мы углубимся в тему хранения сообщений Kafka, рассмотрим различные методы настройки хранения и предоставим примеры кода, которые помогут вам понять и реализовать эти методы в вашей настройке Kafka.
Понимание хранения сообщений Kafka.
В Kafka сообщения организованы по темам, и каждая тема разделена на разделы. Когда создается сообщение, оно добавляется в конец журнала раздела. Политика хранения определяет, как долго эти сообщения хранятся в Kafka, позволяя потребителям читать их в течение определенного периода времени.
Настройка хранения сообщений Kafka.
Существует несколько способов настройки хранения сообщений в Kafka. Давайте рассмотрим некоторые из наиболее часто используемых:
- Хранение по времени.
При хранении по времени вы можете указать продолжительность хранения сообщений. Kafka позволяет вам установить глобальный период хранения для всех тем или настроить хранение для каждой темы. Чтобы установить глобальный период хранения, вы можете добавить следующую конфигурацию в файл свойств сервера Kafka:log.retention.ms = 86400000
В этом примере период хранения устанавливается равным 24 часам (86 400 000 миллисекунд).
Чтобы настроить сохранение для каждой темы, вы можете использовать следующую конфигурацию при создании темы:
bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 1 --config retention.ms=3600000 --zookeeper localhost:2181
В этом примере период хранения для «my_topic» устанавливается равным 1 часу (3 600 000 миллисекунд).
-
Хранение на основе размера.
Помимо хранения на основе времени, Kafka также поддерживает хранение на основе размера. С помощью этого метода вы можете указать максимальный размер сегментов журнала, прежде чем они будут удалены. Следующая конфигурация устанавливает максимальный размер сегмента равным 1 гигабайту:log.segment.bytes = 1073741824
Когда сегмент превышает этот размер, он помечается для удаления во время следующего процесса очистки журнала.
-
Сжатие журнала.
Kafka предоставляет функцию сжатия журнала, которая позволяет сохранять только самое последнее значение для каждого ключа в теме. Это полезно в сценариях, в которых требуется вести сжатую историю событий, например поддерживать текущее состояние таблицы базы данных. Чтобы включить сжатие журнала для темы, вы можете использовать следующую конфигурацию:bin/kafka-topics.sh --alter --topic my_topic --config cleanup.policy=compact --zookeeper localhost:2181
В этом примере включается сжатие журнала для темы «my_topic».
В этой статье мы рассмотрели различные методы настройки хранения сообщений в Apache Kafka. Мы обсудили хранение на основе времени, хранение на основе размера и сжатие журналов, приведя примеры кода, иллюстрирующие, как можно реализовать эти методы. Понимая и эффективно настраивая хранение сообщений, вы можете быть уверены, что ваша установка Kafka соответствует вашим конкретным требованиям к хранению и использованию данных.