В этой статье мы рассмотрим, как упростить управление схемой с помощью Kafka Schema Registry и Docker Compose. Реестр схем Kafka — это мощный инструмент, который обеспечивает централизованное управление и развитие схем в Apache Kafka. Объединив его с Docker Compose, мы можем легко настроить локальную среду разработки для управления схемой. Мы обсудим несколько методов с примерами кода, чтобы продемонстрировать использование реестра схемы Kafka с Docker Compose.
Метод 1: запуск реестра схемы Kafka с помощью Docker Compose
Для начала давайте создадим файл Docker Compose для развертывания кластера Kafka и реестра схемы. Ниже приведен пример файла Docker Compose, определяющего необходимые сервисы:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:latest
ports:
- "8081:8081"
depends_on:
- zookeeper
- kafka
Сохраните приведенный выше код как docker-compose.yml, а затем выполните в терминале следующую команду:
docker-compose up
Это запустит кластер Kafka вместе с реестром схемы.
Метод 2: регистрация схем с помощью API реестра схем
После того, как кластер Kafka и реестр схем запущены, мы можем зарегистрировать схемы с помощью API реестра схем. Вот пример того, как зарегистрировать схему с помощью cURL:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"example\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/example-value/versions
Этот вызов API регистрирует схему для темы Kafka с именем «example-value». Вы можете настроить схему в соответствии с вашими требованиями.
Метод 3: сериализация и десериализация данных с помощью Avro и реестра схемы
Чтобы сериализовать и десериализовать данные с помощью Avro и реестра схемы, вам потребуется использовать совместимую библиотеку сериализации Avro. Вот пример использования библиотеки Confluent Kafka Python:
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka import KafkaException
# AvroProducer example
producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=value_schema)
value = {"field1": "example data"}
try:
producer.produce(topic='example-topic', value=value)
except KafkaException as e:
print(f"Failed to produce message: {e}")
# AvroConsumer example
consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'example-group',
'schema.registry.url': 'http://localhost:8081'
})
consumer.subscribe(['example-topic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received message: {msg.value()}")
В этой статье мы рассмотрели различные методы упрощения управления схемой с помощью Kafka Schema Registry и Docker Compose. Мы узнали, как настроить локальную среду разработки с помощью Docker Compose, зарегистрировать схемы с помощью API-интерфейса реестра схемы и сериализовать/десериализовать данные с помощью Avro и реестра схемы. Используя эти инструменты и методы, вы можете оптимизировать процесс управления схемой в Apache Kafka.