Реализация шаблона разветвления SNS + SQS: методы и примеры

Под «SNS + SQS Fan Out» подразумевается шаблон распределенного обмена сообщениями с использованием Amazon Simple Notification Service (SNS) и Amazon Simple Queue Service (SQS) для широковещательной рассылки сообщений множеству потребителей. Вот несколько способов реализации этого шаблона вместе с примерами кода:

Метод 1. Использование AWS SDK для Python (Boto3)

import boto3
def fan_out_message_sns_sqs(topic_arn, message):
    sns = boto3.client('sns')
    sqs = boto3.client('sqs')
    # Publish message to SNS topic
    sns.publish(
        TopicArn=topic_arn,
        Message=message
    )
    # Retrieve SQS queues subscribed to the SNS topic
    response = sns.list_subscriptions_by_topic(TopicArn=topic_arn)
    queues = [subscription['Endpoint'] for subscription in response['Subscriptions']]
    # Send message to each subscribed SQS queue
    for queue_url in queues:
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=message
        )

Метод 2. Использование AWS SDK для Java (AWS SDK для Java 2.x)

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.awssdk.services.sns.model.ListSubscriptionsByTopicRequest;
import software.amazon.awssdk.services.sns.model.ListSubscriptionsByTopicResponse;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
public class SnsSqsFanOut {
    public void fanOutMessageSnsSqs(String topicArn, String message) {
        SnsClient snsClient = SnsClient.builder()
                .region(Region.US_EAST_1)
                .build();
        SqsClient sqsClient = SqsClient.builder()
                .region(Region.US_EAST_1)
                .build();
        // Publish message to SNS topic
        PublishResponse publishResponse = snsClient.publish(
                PublishRequest.builder()
                        .topicArn(topicArn)
                        .message(message)
                        .build()
        );
        // Retrieve SQS queues subscribed to the SNS topic
        ListSubscriptionsByTopicResponse response = snsClient.listSubscriptionsByTopic(
                ListSubscriptionsByTopicRequest.builder()
                        .topicArn(topicArn)
                        .build()
        );
        List<String> queues = response.subscriptions().stream()
                .map(subscription -> subscription.endpoint())
                .collect(Collectors.toList());
        // Send message to each subscribed SQS queue
        for (String queueUrl : queues) {
            sqsClient.sendMessage(
                    SendMessageRequest.builder()
                            .queueUrl(queueUrl)
                            .messageBody(message)
                            .build()
            );
        }
    }
}

Это всего лишь примеры использования пакетов AWS SDK для Python и Java. Существуют также SDK для других языков программирования, таких как JavaScript,.NET, Ruby и т. д. Вы можете выбрать SDK, соответствующий предпочитаемому вами языку программирования.