Упрощение управления схемой с помощью реестра схем Kafka и Docker Compose

В этой статье мы рассмотрим, как упростить управление схемой с помощью Kafka Schema Registry и Docker Compose. Реестр схем Kafka — это мощный инструмент, который обеспечивает централизованное управление и развитие схем в Apache Kafka. Объединив его с Docker Compose, мы можем легко настроить локальную среду разработки для управления схемой. Мы обсудим несколько методов с примерами кода, чтобы продемонстрировать использование реестра схемы Kafka с Docker Compose.

Метод 1: запуск реестра схемы Kafka с помощью Docker Compose
Для начала давайте создадим файл Docker Compose для развертывания кластера Kafka и реестра схемы. Ниже приведен пример файла Docker Compose, определяющего необходимые сервисы:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    ports:
      - "8081:8081"
    depends_on:
      - zookeeper
      - kafka

Сохраните приведенный выше код как docker-compose.yml, а затем выполните в терминале следующую команду:

docker-compose up

Это запустит кластер Kafka вместе с реестром схемы.

Метод 2: регистрация схем с помощью API реестра схем
После того, как кластер Kafka и реестр схем запущены, мы можем зарегистрировать схемы с помощью API реестра схем. Вот пример того, как зарегистрировать схему с помощью cURL:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{"schema": "{\"type\":\"record\",\"name\":\"example\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}"}' \
     http://localhost:8081/subjects/example-value/versions

Этот вызов API регистрирует схему для темы Kafka с именем «example-value». Вы можете настроить схему в соответствии с вашими требованиями.

Метод 3: сериализация и десериализация данных с помощью Avro и реестра схемы
Чтобы сериализовать и десериализовать данные с помощью Avro и реестра схемы, вам потребуется использовать совместимую библиотеку сериализации Avro. Вот пример использования библиотеки Confluent Kafka Python:

from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka import KafkaException
# AvroProducer example
producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=value_schema)
value = {"field1": "example data"}
try:
    producer.produce(topic='example-topic', value=value)
except KafkaException as e:
    print(f"Failed to produce message: {e}")
# AvroConsumer example
consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'example-group',
    'schema.registry.url': 'http://localhost:8081'
})
consumer.subscribe(['example-topic'])
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue
    print(f"Received message: {msg.value()}")

В этой статье мы рассмотрели различные методы упрощения управления схемой с помощью Kafka Schema Registry и Docker Compose. Мы узнали, как настроить локальную среду разработки с помощью Docker Compose, зарегистрировать схемы с помощью API-интерфейса реестра схемы и сериализовать/десериализовать данные с помощью Avro и реестра схемы. Используя эти инструменты и методы, вы можете оптимизировать процесс управления схемой в Apache Kafka.