Освоение потоков данных Kinesis: подробное руководство по обработке данных в реальном времени

Привет, ребята! Сегодня мы собираемся погрузиться в захватывающий мир Kinesis Data Streams. Если вы хотите обрабатывать данные в реальном времени в больших масштабах, Kinesis Data Streams — ваше идеальное решение. В этой статье блога мы рассмотрим различные методы использования Kinesis Data Streams и попутно предоставим вам несколько полезных примеров кода. Итак, начнём!

Метод 1: создание потока данных

Чтобы использовать возможности Kinesis Data Streams, вам необходимо создать поток данных. Давайте пройдемся по шагам с использованием AWS SDK для Python (Boto3):

import boto3
client = boto3.client('kinesis')
response = client.create_stream(
    StreamName='my-data-stream',
    ShardCount=2
)
print(response)

В этом примере мы используем метод create_streamдля создания потока данных с именем «my-data-stream» с двумя сегментами. Шарды – это единицы пропускной способности в Kinesis Data Streams, позволяющие масштабировать возможности обработки данных.

Метод 2: помещение записей в поток данных

После того как вы настроили поток данных, следующим шагом будет добавление в него записей. Вот как этого можно добиться с помощью AWS SDK для Java:

import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
KinesisClient client = KinesisClient.create();
PutRecordRequest request = PutRecordRequest.builder()
    .streamName("my-data-stream")
    .partitionKey("partition-key-1")
    .data(SdkBytes.fromUtf8String("Hello, Kinesis!"))
    .build();
PutRecordResponse response = client.putRecord(request);
System.out.println(response);

Здесь мы используем метод putRecordдля вставки записи в поток данных my-data-stream. Запись состоит из ключа раздела, который определяет сегмент, которому будет назначена запись, и фактическую полезную нагрузку данных.

Метод 3: использование записей из потока данных

Чтобы обрабатывать и использовать записи из потока данных, вам необходимо настроить потребительское приложение. Давайте рассмотрим пример использования клиентской библиотеки Kinesis (KCL) для Python:

from boto import kinesis
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
kinesis_conn = kinesis.connect_to_region("us-east-1")
stream_name = "my-data-stream"
shard_id = "shardId-000000000001"
shard_iterator = kinesis_conn.get_shard_iterator(stream_name, shard_id, "LATEST")["ShardIterator"]
while True:
    try:
        records_response = kinesis_conn.get_records(shard_iterator, limit=1000, read_timeout=30)
        for record in records_response["Records"]:
            print(record)
        shard_iterator = records_response["NextShardIterator"]
    except ProvisionedThroughputExceededException:
        pass

В этом примере мы используем KCL для настройки потребительского приложения, которое непрерывно извлекает записи из потока данных «my-data-stream». Метод get_recordsпозволяет нам извлечь из потока пакет записей, которые затем можно обработать и проанализировать по мере необходимости.

Метод 4. Масштабирование потоков данных

Одним из ключевых преимуществ Kinesis Data Streams является возможность легкого масштабирования. По мере роста ваших потребностей в обработке данных вы можете динамически регулировать количество сегментов в потоке данных. Вот пример того, как можно масштабировать поток данных с помощью AWS SDK для.NET:

using Amazon.Kinesis;
var client = new AmazonKinesisClient();
var streamName = "my-data-stream";
var shardCount = 4;
client.UpdateShardCount(new UpdateShardCountRequest
{
    StreamName = streamName,
    TargetShardCount = shardCount,
    ScalingType = ScalingType.UNIFORM_SCALING
});

В этом фрагменте кода мы используем метод UpdateShardCountдля масштабирования потока данных my-data-stream до желаемого количества сегментов. Параметр ScalingType.UNIFORM_SCALINGобеспечивает равномерное распределение операции масштабирования по потоку.

Заключение

Поздравляем! Вы узнали несколько ценных методов работы с потоками данных Kinesis. Создавая потоки данных, помещая в них записи, потребляя записи и масштабируя потоки, вы можете создавать мощные конвейеры данных и обрабатывать данные в реальном времени в любом масштабе. Итак, вперед и изучите возможности Kinesis Data Streams!