Создание масштабируемой системы доставки вебхуков с помощью Kafka, SQS и S3

Поскольку мир становится все более и более связанным, программные приложения, конечно же, не являются исключением. Приложениям необходим способ обмена информацией между двумя или более полностью разделенными системами.

Webhooks — это автоматические сообщения, отправляемые из приложений, когда что-то происходит. Они содержат сообщение (или полезную нагрузку) и отправляются на уникальный URL. Webhooks почти всегда быстрее, чем опрос, что делает их идеальным решением для передачи событий из системы во внешний мир.

Они широко используются в таких гигантах индустрии, как Shopify, Stripe, Twitter & Twilio. Если вы посмотрите на PayPal, то webhooks — это то, как PayPal сообщает вашему приложению для электронной коммерции, что ваши клиенты заплатили вам.

В этом посте я представлю решение для системы доставки webhooks с использованием Apache Kafka, AWS SQS и S3. В прошлом у моей компании были различные реализации для отправки событий через webhooks в разных продуктах. Мы решили объединить их в одно унифицированное решение.

Итак, если вы подумываете о создании собственной системы доставки событий через webhook, этот блог для вас!


Определение требований

Давайте рассмотрим, что мы хотели получить от новой системы доставки webhook и почему:

  • Масштабируемость — система должна быть способна адаптироваться к изменениям и различным нагрузкам, используя горизонтальную масштабируемость.

  • Поддержка повторных попыток — отправка webhooks иногда может быть неудачной из-за проблем на принимающей стороне (ошибка, система не работает и т.д.). Мы могли бы просто отбрасывать неудачные сообщения, но это привело бы к потере данных для получателя. Нам нужен механизм для повторного выполнения операции с экспоненциальным отступлением в течение 24 часов.

  • Поддержка большой полезной нагрузки — события webhook могут иметь большую полезную нагрузку в несколько мегабайт.

  • Агностичность полезной нагрузки — не требуется (и даже невозможно) знать значение содержимого сообщения. Полезная нагрузка сообщения доставляется как есть. Это очень важно для того, чтобы одно решение могло обслуживать множество различных бизнес-сценариев.

  • Дедупликация доставки — все события webhook будут доставлены по назначению, но, возможно, не один раз. У нас будет уникальный идентификатор события для целей дедупликации.
    Публичный REST API — необходим для регистрации веб-крючков.

Мы позволили себе некоторые послабления:

  • No Order Guarantee — мы не гарантируем никакого порядка для событий вебхуков
  • Обработка только HTTP/S коммуникации — отправка вебхуков в любом другом виде коммуникации (например, Apache Kafka, TCP и т.д.) не будет поддерживаться.

Создание функциональности

С самого начала нам было ясно, что наша система будет состоять из двух основных уровней: API (используется для конфигураций) и Brain (собственно отправка событий webhook). Давайте рассмотрим каждый из них по отдельности…

Уровень API

Используется для регистрации и настройки веб-крючков. Мы решили выбрать наиболее простое решение и создать REST API. API может быть использован конечным пользователем (подписчиком вебхука) или другим приложением.

URL конечных точек имеет структуру [base-path]/webhooks/<entity>.

Определено 3 типа объектов:

  1. Webhook Target — объект, содержащий целевой URL веб-крючка.

    {
        "id": 111111111,
        "url": "http://www.dummy.com",
        "createdAt": "2021-10-03T17:14:23Z",
        "updatedAt": "2021-10-03T17:14:23Z"
    }
    
  2. Webhook Filter — объект, содержащий список типов событий, которые пользователь хочет прослушивать (по сути, группа событий).

    {
        "id": 222222222,
        "events": [
            "core.orders.created.v1",
            "core.orders.updated.v2"
        ],
        "createdAt": "2021-10-03T17:14:23Z",
        "updatedAt": "2021-10-03T17:14:23Z"
    }
    
  3. Webhook Subscription — сущность, содержащая комбинацию цели webhook и фильтра webhook. События Webhooks будут отправляться только активным подпискам.

    {
        "id": 123456789,
        "targetId": 111111111,
        "filterId": 222222222,
        "active": true,
        "createdAt": "2021-10-03T17:14:23Z",
        "updatedAt": "2021-10-03T17:14:23Z"
    }
    

Каждая сущность поддерживает следующие операции:

  1. GET BY ID (GET /webhooks/<entity>/<entity_id>)
  2. GET ALL (GET /webhooks/<entity>)
  3. CREATE (POST /webhooks/<entity>)
  4. UPDATE (PATCH /webhooks/<entity>/<entity_id>)
  5. DELETE (DELETE /webhooks/<entity>/<entity_id>)

Чтобы создать веб-крючок, необходимо выполнить 3 шага:

  1. Создайте Webhook Target, который будет указывать на место, куда вы хотите отправить сообщение webhook.
  2. Создайте фильтр Webhook Filter, который укажет, какие события должны быть отправлены в качестве сообщений webhook.
  3. Создайте подписку Webhook Subscription, чтобы связать цель и фильтр. Только после завершения этого шага будут отправляться фактические сообщения webhook.

Мозговой уровень

Уровень, который будет отвечать за фактическую отправку webhooks. Разумно разбить его на еще более мелкие части, что позволит нам масштабировать различные области системы независимо от других областей, которые не нуждаются в таком же масштабировании.

Сначала давайте определим некоторую простую терминологию:

  1. Webhook Event — событие, полученное от других приложений.
  2. Webhook Delivery — комбинация Webhook Target и Webhook Event. Поскольку каждое событие может интересовать несколько целей, оно дублируется для каждой цели
  3. Webhook Message — фактические данные, отправленные на целевой URL.

Потребитель

Потребитель обрабатывает события webhook из Kafka и решает (на основе данных подписки), какие цели Webhook применимы. Решив это, он генерирует Webhook Delivery для каждой цели, которая должна получить событие, и публикует его в очереди AWS SQS.

Поскольку SQS имеет ограничение в 256 КБ на сообщение, а у нас сообщения больше этого, мы храним полезную нагрузку сообщения в S3 для последующего извлечения.

Кэш

События в системе происходят очень часто, и для каждого из них нам нужно найти все соответствующие подписки, определенные в БД данных о подписках. Чтобы не нагружать БД, мы храним в памяти локальный кэш с конфигурацией подписки.

Диспетчер

Диспетчер — это часть системы, которая выполняет фактическую отправку полезной нагрузки события в виде Webhook-сообщения на соответствующий URL. Он получает доставку, созданную потребителем из очереди AWS SQS, и выполняет вызов HTTP POST на нужный URL с полезной нагрузкой доставки. Это отдельный модуль для того, чтобы можно было решать проблемы, связанные с помехами производительности, но об этом подробнее позже.

Менеджер повторных запросов

Как уже упоминалось в требованиях, отправка webhooks может завершиться неудачей, поэтому мы должны иметь механизм для повторного выполнения операции (экспоненциальный бэкофф в течение 24 часов). Не найдя существующего решения, отвечающего нашим требованиям, мы решили реализовать его самостоятельно, используя возможности SQS для отложенных сообщений и S3 для хранения полезной нагрузки. Для более глубокого погружения в эту тему, пожалуйста, обратитесь к моей предстоящей статье в блоге (ссылка в конце этого сообщения).


Собираем все вместе

Системный поток

  1. Используя API, определяется новый вебхук (цель, фильтр & подписка) и сохраняется в БД данных подписки. Это делается один раз, и с этого момента соответствующие события будут отправляться в виде сообщений webhook.

  2. Как только что-то происходит в системе, которая использует систему доставки webhooks (отмечена как App1 на схеме 2), она должна отправить сообщение в специальную тему Kafka с полезной нагрузкой события и типом события. Вот схема такого сообщения:

    {
      "type": "record",
      "name": "WebhookEvent",
      "namespace": "com.yotpo.platform.webhookdelivery.messages",
      "fields": [
        {
          "name": "event_type",
          "type": "string"
        },
        {
          "name": "payload",
          "type": "string"
        }
      ]
    }
    
  3. Потребитель опрашивает сообщение из темы Kafka и использует определения, которые были сделаны ранее в API и сохранены в локальном кэше. Затем он решает, существует ли подписка, использующая фильтр, в котором определен текущий тип события. Если да, то он собирает все подписки, которые совпали, и создает сообщение о доставке для всех целей, определенных в них. Созданные доставки отправляются в очередь SQS.

  4. Диспетчер опрашивает сообщения webhook из очереди SQS и отправляет их цели. Если диспетчер получает ссылку на S3 для полезной нагрузки, он извлекает ее перед отправкой доставки на URL.

  5. Если отправка не удалась, доставка возвращается менеджеру повторных попыток, который отвечает за повторную отправку диспетчеру. Так продолжается до тех пор, пока доставка не будет успешной или пока не будет исчерпано количество попыток.


Заключительные мысли

Эта статья затрагивает только такие темы, как решение проблем, связанных с реальной жизнью (например, управление повторными попытками) или производительностью (кэширование и операции ввода-вывода).
Кроме того, очень интересными и сложными темами, которые следует рассмотреть, являются использование в многопользовательской SaaS-системе. При интеграции в такую систему необходимо учитывать больше вопросов. Такие вопросы, как голодание и помехи производительности, безопасность (CRC, проверка заголовка подписи), разделение данных и многое другое.
По всем этим вопросам я приглашаю вас ознакомиться с моей предстоящей статьей — Решение проблем системы доставки webhook в многопользовательской SaaS-системе.

Оцените статью
Procodings.ru
Добавить комментарий