📱 AWS CDK 101 — 👯 Получение JSON из S3 через stepfunction

🔰 Начинающие пользователи AWS CDK, пожалуйста, ознакомьтесь с моими предыдущими статьями в этом цикле.

Если вы пропустили предыдущую статью, найдите ее по ссылкам ниже.

🔁 Оригинальный предыдущий пост 🔗 Dev Post

🔁 Reposted previous post at 🔗 dev to @aravindvcyber

В этой статье мы рефакторим одну из наших предыдущих шаговых функций, которая вызывала лямбду, помещающую записи в dynamo, используя полное сообщение из данных события, в ту, которая может читать данные сообщения из s3 с ключами из данных события. Кроме того, это прямое продолжение моей предыдущей статьи, упомянутой выше.

Преимущества использования данных S3 ⌛

Это не единственные преимущества, это только то, что я наблюдал в своих развертываниях.

  • Прямое использование данных S3 сделает полезную нагрузку детализации события всегда одинакового размера и очень легкой.

  • В то время как огромное сообщение перекачивается в шлюз api, S3 будет действовать как место для захвата полной полезной нагрузки сообщения, и только идентификатор объекта s3 будет перекачиваться в детали события, так что мы не перекачиваем много в фактические движущиеся части.

  • Это также помогает просто предварительно обработать фактически полученное сообщение путем проверки формата, целостности данных, преобразования и даже антивирусной проверки.

  • S3 сам по себе может запускать различные управляемые событиями действия асинхронно с помощью своих уведомлений о событиях.

  • Перекачка информации о событиях в функции шага теперь будет намного эффективнее, поскольку мы больше не беспокоимся о размере данных.

  • Также здесь мы использовали S3 в качестве staging, если нам нужна высокая производительность и высокая пропускная способность, мы можем заменить его на dynamodb для хранения данных staging.

План построения 🏗️

Как мы уже говорили, мы пытаемся прочитать JSON из s3, из нашей лямбда-функции message recorder, которую мы доработали в наших предыдущих статьях для записи в dynamodb.

import { S3 } from "aws-sdk";

const s3 = new S3();
Вход в полноэкранный режим Выход из полноэкранного режима

Рефакторинг операции put внутри лямбды в вспомогательную функцию 🎊

Мы рефакторизовали ранее используемую функцию записи сообщений для лучшего повторного использования компонентов, как показано ниже.


const dbPut: any = async (Record: any, msg: any) => {
  const dynamo = new DynamoDB();
  const crt_time: number = new Date(msg.createdAt).getTime();
  const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.messageId },
      createdAt: { N: `${crt_time}` },
      event: { S: msg.event },
    },
    ReturnConsumedCapacity: "TOTAL",
  };

  console.log("putData", JSON.stringify(putData, undefined, 2));

  await dynamo.putItem(putData).promise();
};

Вход в полноэкранный режим Выход из полноэкранного режима

Рефакторинг помощника sendSuccess 🔮

Мы также воспользуемся этой возможностью, чтобы рефакторить существующий оператор SendTaskSuccess в специальную функцию, которую мы могли бы эффективно использовать.


const funcSuccess: any = (res: any, mid: string, token: string) => {
  console.log("sending success ", { res });
  const sendSuccess: StepFunctions.SendTaskSuccessInput = {
    output: JSON.stringify({
      statusCode: 200,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: res,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskSuccess(
    sendSuccess,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent success: ", { resultStatus, sendSuccess });
};
Вход в полноэкранный режим Выход из полноэкранного режима

Рефакторинг помощника sendFailure 🏄.

Аналогично, мы также рефакторим и создадим модуль sendTaskFailure в новую вспомогательную функцию.

const funcFailure: any = (err: any, mid: string, token: string) => {
  console.log("sending failure ", { err });
  const sendFailure: StepFunctions.SendTaskFailureInput = {
    error: JSON.stringify(err),
    cause: JSON.stringify({
      statusCode: 500,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: err,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskFailure(
    sendFailure,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent failure: ", { resultStatus, sendFailure });
};
Вход в полноэкранный режим Выход из полноэкранного режима

Предоставление доступа к объекту чтения для лямбды 🍃

В нашей предыдущей статье мы предоставили доступ обработчику входа к putObject, здесь мы предоставим доступ функции записи для чтения данных следующим образом.


stgMsgBucket.grantWrite(eventCounterBus.handler);
stgMsgBucket.grantRead(messageRecorder);

Вход в полноэкранный режим Выход из полноэкранного режима

Изменения внутри лямбды 🍌

Приведенные ниже изменения будут использоваться внутри обработчика лямбды для чтения каждого сообщения с помощью event.detail
содержимое которого теперь содержит имя ведра и ключ из события.

Здесь видно, что имя ведра и ключ объекта извлекаются из сообщения и используются для получения объекта из S3, затем он помещается в dynamodb для записи с помощью различных вспомогательных функций, созданных выше.

await Promise.all(
    event.Records.map(async (Record: any) => {
      console.log("Received message:", JSON.stringify(event, undefined, 2));
      const msg = JSON.parse(Record.body).Record;
      const s3Get = await s3
        .getObject({
          Bucket: msg.bucket,
          Key: msg.key,
        })
        .promise();
      const data = s3Get.Body?.toString("utf-8");
      if (data) {
        msg.event = data;
        const token = JSON.parse(Record.body).MyTaskToken;
        await dbPut(Record, msg)
          .then(async (data: any) => {
            await funcSuccess(data, msg.messageId, token);
          })
          .catch(async (err: any) => {
            await funcFailure(err, msg.messageId, token);
          });
      }
    })
  );
Вход в полноэкранный режим Выход из полноэкранного режима

Получив сообщение, мы обновляем объект message содержимым сообщения и вставляем его в dynamodb с помощью dbPut, который мы преломили ранее.

Удаление содержимого события из полезной нагрузки вызова statemachine 🍭

Теперь мы можем убрать использование фактического тела сообщения во всем конвейере, что позволяет уменьшить объем памяти, используемой при передаче вызовов во время различных вызовов.

const sfnTaskPayload = sfn.TaskInput.fromObject({
      MyTaskToken: sfn.JsonPath.taskToken,
      Record: {
        "messageId.$": "$.id",
        "createdAt.$": "$.time",
        // "event.$": "States.StringToJson($.detail.message)",
       // "event.$": "$.detail.message",
        "bucket.$": "$.detail.message.bucket",
        "key.$": "$.detail.message.key"
      },
    });

Вход в полноэкранный режим Выход из полноэкранного режима

Также в функции обработчика входа, рассмотренной в прошлой статье, как показано ниже

   const message = JSON.parse(event.body);
  message.uuid = getUuid();
  message.handler = context.awsRequestId;
  message.key = `uploads/${message.uuid}.json`;
  message.bucket = process.env.BucketName || "";
  console.log("Initial request:", JSON.stringify(message, undefined, 2));
  delete message.message;  //new line added, since s3 will have the data
Вход в полноэкранный режим Выход из полноэкранного режима

Интеграционное тестирование 🍁

Keypoint async-await 🔔

Одна важная вещь, которую вам, возможно, придется узнать здесь, это то, как мы обработали async-await, чтобы сначала получить данные из S3, а затем записать их в dynamodb, среди других операций async.

Новички, скорее всего, могут ошибиться при использовании async-await и получить утечку данных при плохой реализации обратных вызовов.

В этом случае трассировка, приведенная ниже, поможет вам понять, в чем заключается проблема.

В следующих статьях мы будем добавлять больше соединений в наш стек и делать его более удобным в использовании, создавая новые конструкции, поэтому рассмотрите возможность следовать за мной и подписаться на мою рассылку.

⏭ У нас есть следующая статья по serverless, загляните на

https://dev.to/aravindvcyber/aws-cdk-101-fetching-json-from-dynamodb-vs-s3-through-stepfunction-1mb7

🎉 Спасибо за поддержку! 🙏

Будет здорово, если вы захотите ☕ Купить мне кофе, чтобы поддержать мои усилия.

🔁 Оригинальный пост 🔗 Dev Post

🔁 Reposted at 🔗 dev to @aravindvcyber

🚁 AWS CDK 101 — 🐬 Получение JSON из dynamodb vs S3 через stepfunction @hashnode

Ознакомьтесь с полной коллекциейhttps://t.co/CuYxnKr0Ig

%[https://t.co/wBjjaPapaN]#TheHashnodeWriteathon#serverless #awscdk #dynamodb #amazons3 #typescript

— Aravind V (@Aravind_V7) 8 мая 2022 г.

Оцените статью
Procodings.ru
Добавить комментарий