Kafka, названная в честь известного чешского писателя Франца Кафки, — это распределенная платформа потоковой передачи событий с открытым исходным кодом, предназначенная для обработки потоков данных в реальном времени и построения масштабируемых и отказоустойчивых конвейеров данных. Его архитектура очень универсальна и может использоваться в различных случаях: от систем агрегирования журналов и обмена сообщениями до сложной обработки событий и потоковой обработки.
В этой статье мы углубимся в основные концепции архитектуры Kafka и рассмотрим различные методы использования Kafka для создания надежных и надежных конвейеров данных в реальном времени. Мы предоставим примеры кода, чтобы проиллюстрировать каждый метод и продемонстрировать возможности и гибкость Kafka в обработке крупномасштабных сценариев потоковой передачи данных.
- Производители и потребители.
Основными компонентами архитектуры Kafka являются производители и потребители. Производители публикуют записи данных в темах Kafka, а потребители подписываются на одну или несколько тем, чтобы использовать эти записи. Давайте рассмотрим пример производителя и потребителя в Python:
from kafka import KafkaProducer, KafkaConsumer
# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Publish a message to a topic
producer.send('my_topic', b'Hello, Kafka!')
# Create a Kafka consumer
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')
# Consume messages from the topic
for message in consumer:
print(message.value.decode('utf-8'))
- Темы и разделы.
Kafka организует записи данных по темам, которые далее делятся на разделы. Каждый раздел представляет собой упорядоченную и неизменяемую последовательность записей. Kafka обеспечивает отказоустойчивость и масштабируемость, распределяя разделы между несколькими брокерами в кластере. Вот пример создания темы с несколькими разделами:
bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
- Сериализация и десериализация сообщений.
Kafka позволяет производителям и потребителям использовать разные форматы данных для сериализации и десериализации сообщений. Обычно используемые форматы включают Avro, JSON и Protobuf. Вот пример создания и использования сообщений в кодировке Avro с использованием клиентской библиотеки Confluent Python:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer
# Create an Avro producer
producer = AvroProducer({'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'},
default_value_schema=avro_schema)
# Produce an Avro-encoded message
producer.produce('my_topic', value={'name': 'John', 'age': 30})
# Create an Avro consumer
consumer = AvroConsumer({'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'})
# Consume Avro-encoded messages
consumer.subscribe(['my_topic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print(msg.value())
- Kafka Connect:
Kafka Connect — это платформа для простой и надежной интеграции Kafka с внешними системами. Он обеспечивает масштабируемый и отказоустойчивый способ импорта и экспорта данных между Kafka и различными источниками/приемниками данных, такими как базы данных, файловые системы и очереди сообщений. Вот пример настройки коннектора источника Kafka Connect для приема данных из базы данных MySQL:
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "user",
"connection.password": "password",
"topic.prefix": "mysql-",
"mode": "timestamp",
"timestamp.column.name": "last_modified",
"table.whitelist": "users",
"tasks.max": "1"
}
}' http://localhost:8083/connectors
- Потоковая обработка с помощью Kafka Streams.
Kafka Streams — это мощная клиентская библиотека для создания приложений потоковой обработки в реальном времени. Он позволяет разработчикам выполнять операции с отслеживанием состояния, такие как фильтрация, агрегирование и объединение, с потоками данных. Вот пример приложения Kafka Streams, которое выполняет подсчет слов во входном потоке:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("input-topic");
KTable<String, Long> wordCounts = inputStream
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
wordcounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Архитектура Kafka обеспечивает надежную и масштабируемую основу для построения конвейеров данных в реальном времени. В этой статье мы рассмотрели различные методы, а также примеры кода, для использования возможностей Kafka, включая производителей и потребителей, темы и разделы, сериализацию и десериализацию сообщений, Kafka Connect и Kafka Streams. Освоив эти методы, разработчики смогут раскрыть весь потенциал Kafka и создавать эффективные, отказоустойчивые и масштабируемые приложения потоковой передачи данных.