Привет, ребята! Сегодня мы собираемся погрузиться в захватывающий мир Kinesis Data Streams. Если вы хотите обрабатывать данные в реальном времени в больших масштабах, Kinesis Data Streams — ваше идеальное решение. В этой статье блога мы рассмотрим различные методы использования Kinesis Data Streams и попутно предоставим вам несколько полезных примеров кода. Итак, начнём!
Метод 1: создание потока данных
Чтобы использовать возможности Kinesis Data Streams, вам необходимо создать поток данных. Давайте пройдемся по шагам с использованием AWS SDK для Python (Boto3):
import boto3
client = boto3.client('kinesis')
response = client.create_stream(
StreamName='my-data-stream',
ShardCount=2
)
print(response)
В этом примере мы используем метод create_streamдля создания потока данных с именем «my-data-stream» с двумя сегментами. Шарды – это единицы пропускной способности в Kinesis Data Streams, позволяющие масштабировать возможности обработки данных.
Метод 2: помещение записей в поток данных
После того как вы настроили поток данных, следующим шагом будет добавление в него записей. Вот как этого можно добиться с помощью AWS SDK для Java:
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
KinesisClient client = KinesisClient.create();
PutRecordRequest request = PutRecordRequest.builder()
.streamName("my-data-stream")
.partitionKey("partition-key-1")
.data(SdkBytes.fromUtf8String("Hello, Kinesis!"))
.build();
PutRecordResponse response = client.putRecord(request);
System.out.println(response);
Здесь мы используем метод putRecordдля вставки записи в поток данных my-data-stream. Запись состоит из ключа раздела, который определяет сегмент, которому будет назначена запись, и фактическую полезную нагрузку данных.
Метод 3: использование записей из потока данных
Чтобы обрабатывать и использовать записи из потока данных, вам необходимо настроить потребительское приложение. Давайте рассмотрим пример использования клиентской библиотеки Kinesis (KCL) для Python:
from boto import kinesis
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
kinesis_conn = kinesis.connect_to_region("us-east-1")
stream_name = "my-data-stream"
shard_id = "shardId-000000000001"
shard_iterator = kinesis_conn.get_shard_iterator(stream_name, shard_id, "LATEST")["ShardIterator"]
while True:
try:
records_response = kinesis_conn.get_records(shard_iterator, limit=1000, read_timeout=30)
for record in records_response["Records"]:
print(record)
shard_iterator = records_response["NextShardIterator"]
except ProvisionedThroughputExceededException:
pass
В этом примере мы используем KCL для настройки потребительского приложения, которое непрерывно извлекает записи из потока данных «my-data-stream». Метод get_recordsпозволяет нам извлечь из потока пакет записей, которые затем можно обработать и проанализировать по мере необходимости.
Метод 4. Масштабирование потоков данных
Одним из ключевых преимуществ Kinesis Data Streams является возможность легкого масштабирования. По мере роста ваших потребностей в обработке данных вы можете динамически регулировать количество сегментов в потоке данных. Вот пример того, как можно масштабировать поток данных с помощью AWS SDK для.NET:
using Amazon.Kinesis;
var client = new AmazonKinesisClient();
var streamName = "my-data-stream";
var shardCount = 4;
client.UpdateShardCount(new UpdateShardCountRequest
{
StreamName = streamName,
TargetShardCount = shardCount,
ScalingType = ScalingType.UNIFORM_SCALING
});
В этом фрагменте кода мы используем метод UpdateShardCountдля масштабирования потока данных my-data-stream до желаемого количества сегментов. Параметр ScalingType.UNIFORM_SCALINGобеспечивает равномерное распределение операции масштабирования по потоку.
Заключение
Поздравляем! Вы узнали несколько ценных методов работы с потоками данных Kinesis. Создавая потоки данных, помещая в них записи, потребляя записи и масштабируя потоки, вы можете создавать мощные конвейеры данных и обрабатывать данные в реальном времени в любом масштабе. Итак, вперед и изучите возможности Kinesis Data Streams!