Службы обмена сообщениями на AWS — Amazon SQS

Фото Will Truettner on Unsplash

Обзор

При рассмотрении вопроса масштабирования сервисов в производстве обычно используется такой подход: определить, какие рабочие процессы могут быть перестроены в асинхронные рабочие процессы, что позволит вам разделить сервисы, которые не обязательно должны работать последовательно. Это позволяет вам масштабировать сервисы независимо друг от друга путем передачи сообщений между сервисами.

Один из подходов к этому в облаке AWS заключается в использовании Amazon SQS (Simple Queue Service). Как следует из названия, SQS предлагает безопасную, долговечную и доступную размещенную очередь, которая позволяет интегрировать и разделять распределенные программные системы и компоненты.

Покажите мне код!

Следующие примеры кода написаны на Java, но вы можете использовать любой из поддерживаемых языков, для которых есть AWS SDK. Список можно посмотреть здесь Давайте погрузимся в работу!

Инициализация очереди

Используя aws cli, можно создать стандартную очередь SQS с помощью команды aws sqs create-queue, как показано ниже:

aws sqs create-queue --queue-name my-sqs-using-cli
Вход в полноэкранный режим Выход из полноэкранного режима

Вы также можете прикрепить теги к очереди при ее создании, как показано ниже:

aws sqs create-queue --queue-name my-sqs-using-cli-with-tag --tags "env"="test"

Войти в полноэкранный режим Выйти из полноэкранного режима

Запись сообщений в очередь

Итак, когда очередь создана, давайте опубликуем в ней наше первое сообщение!

Импортируйте следующие библиотеки:

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
Войти в полноэкранный режим Выйти из полноэкранного режима

Нам понадобится URL очереди при выполнении любых операций с очередью. Давайте найдем его, используя имя очереди:

AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
Войти в полноэкранный режим Выйти из полноэкранного режима

Имея под рукой URL очереди, давайте добавим в очередь одно сообщение:

SendMessageRequest send_msg_request = new SendMessageRequest()
        .withQueueUrl(queueUrl)
        .withMessageBody("hello world")
        .withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
Войти в полноэкранный режим Выйти из полноэкранного режима

Вы также можете отправить несколько сообщений в одном запросе, используя метод sendMessageBatch:

SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
        .withQueueUrl(queueUrl)
        .withEntries(
                new SendMessageBatchRequestEntry(
                        "msg_1", "Hello from message 1"),
                new SendMessageBatchRequestEntry(
                        "msg_2", "Hello from message 2")
                        .withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
Войти в полноэкранный режим Выйти из полноэкранного режима

Чтение сообщений из очереди

Когда наша очередь заполнена сообщениями, давайте прочитаем их!

List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
Вход в полноэкранный режим Выход из полноэкранного режима

Что происходит, если сообщение не может быть прочитано? На помощь приходят очереди с мертвыми буквами!

SQS обеспечивает поддержку очередей с мертвыми буквами. Очередь мертвых букв — это очередь, которую другие очереди (исходные) могут использовать для сообщений, которые не могут быть успешно обработаны. Вы можете отложить и изолировать эти сообщения в очереди мертвых букв, чтобы определить, почему их обработка не увенчалась успехом.

Давайте начнем с создания исходной очереди и очереди мертвых букв:

final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

// Create source queue
try {
    sqs.createQueue(src_queue_name);
} catch (AmazonSQSException e) {
    if (!e.getErrorCode().equals("QueueAlreadyExists")) {
        throw e;
    }
}

// Create dead-letter queue
try {
    sqs.createQueue(dl_queue_name);
} catch (AmazonSQSException e) {
    if (!e.getErrorCode().equals("QueueAlreadyExists")) {
        throw e;
    }
}
Войти в полноэкранный режим Выйти из полноэкранного режима

Чтобы назначить очередь мертвых писем, необходимо сначала создать политику redrive, а затем задать эту политику в атрибутах очереди. Политика redrive задается в JSON и определяет ARN очереди мертвых писем и максимальное количество раз, которое сообщение может быть получено и не обработано, прежде чем оно будет отправлено в очередь мертвых писем.

String dl_queue_url = sqs.getQueueUrl(dl_queue_name)
                         .getQueueUrl();

GetQueueAttributesResult queue_attrs = sqs.getQueueAttributes(
        new GetQueueAttributesRequest(dl_queue_url)
            .withAttributeNames("QueueArn"));

String dl_queue_arn = queue_attrs.getAttributes().get("QueueArn");

// Set dead letter queue with redrive policy on source queue.
String src_queue_url = sqs.getQueueUrl(src_queue_name)
                          .getQueueUrl();

SetQueueAttributesRequest request = new SetQueueAttributesRequest()
        .withQueueUrl(src_queue_url)
        .addAttributesEntry("RedrivePolicy",
                "{"maxReceiveCount":"5", "deadLetterTargetArn":""
                + dl_queue_arn + ""}");

sqs.setQueueAttributes(request);
Вход в полноэкранный режим Выход из полноэкранного режима

Теперь очередь мертвых писем готова принять все сообщения, которые не были прочитаны из-за ошибки или тайм-аута. Вы можете прочитать сообщения, оставшиеся в очереди мертвых писем, запустив отдельное задание и либо повторив попытку отправки сообщений в исходную очередь для обработки, либо сохранив их в постоянном хранилище, например в базе данных, для дальнейшего анализа.

Подведение итогов

Итак, как вы видели из приведенных выше шагов, мы смогли инициализировать очередь на SQS, читать сообщения из нее, а также использовать очередь Dead letter queue для хранения сообщений, которые не удалось обработать.

Ждите будущих статей о других службах обмена сообщениями на AWS, которые можно использовать для разделения ваших служб для лучшего масштабирования и долговечности.

Спасибо за чтение!

Оцените статью
Procodings.ru
Добавить комментарий