🔰 Начинающие пользователи AWS CDK, пожалуйста, ознакомьтесь с моими предыдущими статьями в этом цикле.
Если вы пропустили предыдущую статью, найдите ее по ссылкам ниже.
🔁 Оригинальный предыдущий пост 🔗 Dev Post
🔁 Reposted previous post at 🔗 dev to @aravindvcyber
В этой статье мы рефакторим одну из наших предыдущих шаговых функций, которая вызывала лямбду, помещающую записи в dynamo, используя полное сообщение из данных события, в ту, которая может читать данные сообщения из s3 с ключами из данных события. Кроме того, это прямое продолжение моей предыдущей статьи, упомянутой выше.
- Преимущества использования данных S3 ⌛
- План построения 🏗️
- Рефакторинг операции put внутри лямбды в вспомогательную функцию 🎊
- Рефакторинг помощника sendSuccess 🔮
- Рефакторинг помощника sendFailure 🏄.
- Предоставление доступа к объекту чтения для лямбды 🍃
- Изменения внутри лямбды 🍌
- Удаление содержимого события из полезной нагрузки вызова statemachine 🍭
- Интеграционное тестирование 🍁
- Keypoint async-await 🔔
Преимущества использования данных S3 ⌛
Это не единственные преимущества, это только то, что я наблюдал в своих развертываниях.
-
Прямое использование данных S3 сделает полезную нагрузку детализации события всегда одинакового размера и очень легкой.
-
В то время как огромное сообщение перекачивается в шлюз api, S3 будет действовать как место для захвата полной полезной нагрузки сообщения, и только идентификатор объекта s3 будет перекачиваться в детали события, так что мы не перекачиваем много в фактические движущиеся части.
-
Это также помогает просто предварительно обработать фактически полученное сообщение путем проверки формата, целостности данных, преобразования и даже антивирусной проверки.
-
S3 сам по себе может запускать различные управляемые событиями действия асинхронно с помощью своих уведомлений о событиях.
-
Перекачка информации о событиях в функции шага теперь будет намного эффективнее, поскольку мы больше не беспокоимся о размере данных.
-
Также здесь мы использовали S3 в качестве staging, если нам нужна высокая производительность и высокая пропускная способность, мы можем заменить его на dynamodb для хранения данных staging.
План построения 🏗️
Как мы уже говорили, мы пытаемся прочитать JSON из s3, из нашей лямбда-функции message recorder, которую мы доработали в наших предыдущих статьях для записи в dynamodb.
import { S3 } from "aws-sdk";
const s3 = new S3();
Рефакторинг операции put внутри лямбды в вспомогательную функцию 🎊
Мы рефакторизовали ранее используемую функцию записи сообщений для лучшего повторного использования компонентов, как показано ниже.
const dbPut: any = async (Record: any, msg: any) => {
const dynamo = new DynamoDB();
const crt_time: number = new Date(msg.createdAt).getTime();
const putData: PutItemInput = {
TableName: process.env.MESSAGES_TABLE_NAME || "",
Item: {
messageId: { S: msg.messageId },
createdAt: { N: `${crt_time}` },
event: { S: msg.event },
},
ReturnConsumedCapacity: "TOTAL",
};
console.log("putData", JSON.stringify(putData, undefined, 2));
await dynamo.putItem(putData).promise();
};
Рефакторинг помощника sendSuccess 🔮
Мы также воспользуемся этой возможностью, чтобы рефакторить существующий оператор SendTaskSuccess в специальную функцию, которую мы могли бы эффективно использовать.
const funcSuccess: any = (res: any, mid: string, token: string) => {
console.log("sending success ", { res });
const sendSuccess: StepFunctions.SendTaskSuccessInput = {
output: JSON.stringify({
statusCode: 200,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: mid,
ProcessorResult: res,
},
}),
taskToken: token,
};
const resultStatus = sfn.sendTaskSuccess(
sendSuccess,
(err: any, data: any) => {
if (err) console.log(err, err.stack);
else console.log(data);
}
);
console.log("sent success: ", { resultStatus, sendSuccess });
};
Рефакторинг помощника sendFailure 🏄.
Аналогично, мы также рефакторим и создадим модуль sendTaskFailure в новую вспомогательную функцию.
const funcFailure: any = (err: any, mid: string, token: string) => {
console.log("sending failure ", { err });
const sendFailure: StepFunctions.SendTaskFailureInput = {
error: JSON.stringify(err),
cause: JSON.stringify({
statusCode: 500,
headers: { "Content-Type": "text/json" },
putStatus: {
messageId: mid,
ProcessorResult: err,
},
}),
taskToken: token,
};
const resultStatus = sfn.sendTaskFailure(
sendFailure,
(err: any, data: any) => {
if (err) console.log(err, err.stack);
else console.log(data);
}
);
console.log("sent failure: ", { resultStatus, sendFailure });
};
Предоставление доступа к объекту чтения для лямбды 🍃
В нашей предыдущей статье мы предоставили доступ обработчику входа к putObject, здесь мы предоставим доступ функции записи для чтения данных следующим образом.
stgMsgBucket.grantWrite(eventCounterBus.handler);
stgMsgBucket.grantRead(messageRecorder);
Изменения внутри лямбды 🍌
Приведенные ниже изменения будут использоваться внутри обработчика лямбды для чтения каждого сообщения с помощью event.detail
содержимое которого теперь содержит имя ведра и ключ из события.
Здесь видно, что имя ведра и ключ объекта извлекаются из сообщения и используются для получения объекта из S3, затем он помещается в dynamodb для записи с помощью различных вспомогательных функций, созданных выше.
await Promise.all(
event.Records.map(async (Record: any) => {
console.log("Received message:", JSON.stringify(event, undefined, 2));
const msg = JSON.parse(Record.body).Record;
const s3Get = await s3
.getObject({
Bucket: msg.bucket,
Key: msg.key,
})
.promise();
const data = s3Get.Body?.toString("utf-8");
if (data) {
msg.event = data;
const token = JSON.parse(Record.body).MyTaskToken;
await dbPut(Record, msg)
.then(async (data: any) => {
await funcSuccess(data, msg.messageId, token);
})
.catch(async (err: any) => {
await funcFailure(err, msg.messageId, token);
});
}
})
);
Получив сообщение, мы обновляем объект message содержимым сообщения и вставляем его в dynamodb с помощью dbPut
, который мы преломили ранее.
Удаление содержимого события из полезной нагрузки вызова statemachine 🍭
Теперь мы можем убрать использование фактического тела сообщения во всем конвейере, что позволяет уменьшить объем памяти, используемой при передаче вызовов во время различных вызовов.
const sfnTaskPayload = sfn.TaskInput.fromObject({
MyTaskToken: sfn.JsonPath.taskToken,
Record: {
"messageId.$": "$.id",
"createdAt.$": "$.time",
// "event.$": "States.StringToJson($.detail.message)",
// "event.$": "$.detail.message",
"bucket.$": "$.detail.message.bucket",
"key.$": "$.detail.message.key"
},
});
Также в функции обработчика входа, рассмотренной в прошлой статье, как показано ниже
const message = JSON.parse(event.body);
message.uuid = getUuid();
message.handler = context.awsRequestId;
message.key = `uploads/${message.uuid}.json`;
message.bucket = process.env.BucketName || "";
console.log("Initial request:", JSON.stringify(message, undefined, 2));
delete message.message; //new line added, since s3 will have the data
Интеграционное тестирование 🍁
Keypoint async-await 🔔
Одна важная вещь, которую вам, возможно, придется узнать здесь, это то, как мы обработали async-await, чтобы сначала получить данные из S3, а затем записать их в dynamodb, среди других операций async.
Новички, скорее всего, могут ошибиться при использовании async-await и получить утечку данных при плохой реализации обратных вызовов.
В этом случае трассировка, приведенная ниже, поможет вам понять, в чем заключается проблема.
В следующих статьях мы будем добавлять больше соединений в наш стек и делать его более удобным в использовании, создавая новые конструкции, поэтому рассмотрите возможность следовать за мной и подписаться на мою рассылку.
⏭ У нас есть следующая статья по serverless, загляните на
https://dev.to/aravindvcyber/aws-cdk-101-fetching-json-from-dynamodb-vs-s3-through-stepfunction-1mb7
🎉 Спасибо за поддержку! 🙏
Будет здорово, если вы захотите ☕ Купить мне кофе, чтобы поддержать мои усилия.
🔁 Оригинальный пост 🔗 Dev Post
🔁 Reposted at 🔗 dev to @aravindvcyber
🚁 AWS CDK 101 — 🐬 Получение JSON из dynamodb vs S3 через stepfunction @hashnode
Ознакомьтесь с полной коллекциейhttps://t.co/CuYxnKr0Ig
%[https://t.co/wBjjaPapaN]#TheHashnodeWriteathon#serverless #awscdk #dynamodb #amazons3 #typescript
— Aravind V (@Aravind_V7) 8 мая 2022 г.