В этом посте я подведу итог всему, что я узнал о выполнении Lambda-функций с помощью Rust за последние шесть месяцев, выделив хорошие, производительные и плохие моменты, с которыми я столкнулся, используя Rust в продакшене.
Оптимизация для выполнения Lambda
Прежде чем начать, давайте вкратце расскажем о том, как ускорить выполнение Lambda-функции.
Во-первых, каждая среда выполнения может иметь некоторые дополнительные настройки, как, например, Node.js:
AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
HTTP/HTTPS-агент Node.js по умолчанию создает новое TCP-соединение для каждого нового запроса. Чтобы избежать затрат на создание нового соединения, вы можете повторно использовать существующее соединение.
Для Java вы можете использовать JAVA_TOOL_OPTIONS. С помощью этой переменной среды можно изменить различные аспекты конфигурации JVM, включая функциональность сборки мусора, параметры памяти и конфигурацию многоуровневой компиляции. Например, чтобы изменить уровень многоуровневой компиляции на 1, необходимо установить значение:
JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1
Я хочу кратко изложить основы:
Как говорится в статье AWS:
Когда служба Lambda получает запрос на выполнение функции через Lambda API, служба сначала подготавливает среду выполнения. На этом этапе служба загружает код функции. Затем он создает среду с указанной памятью, временем выполнения и конфигурацией. После завершения Lambda запускает любой код инициализации вне обработчика событий, прежде чем окончательно запустить код обработчика. Служба Lambda сохраняет среду выполнения вместо того, чтобы уничтожить ее сразу после выполнения.
Как от разработчика, от меня требуется только оптимизация:
- Bootstrap среды выполнения
- Выполнение кода
- Использовать архитектуру arm64
Это означает, что я должен убедиться:
- Мой пакет как можно меньше
- Инициализировать мои классы, SDK-клиенты и соединения с базой данных вне обработчика функций
- Кэшировать статические активы локально в каталоге /tmp.
Выполнение всего этого позволит сэкономить время выполнения и затраты на последующие вызовы (теплый старт).
В дополнение к этим основным правилам, возможно, будет полезно упомянуть:
- Избегание жирных лямбда-функций
- Выполнение кода параллельно, когда это возможно
Параллелизм: Параллелизм означает, что приложение разбивает свои задачи на более мелкие подзадачи, которые могут обрабатываться параллельно, например, на нескольких процессорах одновременно.
let mut tasks = Vec::with_capacity(event.records.len());
....
for record in event.records.into_iter() {
tasks.push(tokio::spawn(async move {
.....
do_something(&shared_client, &request)
.await
.map_or_else(|e| log::error!("Error {:?}", e), |_| ());
}))
}
join_all(tasks).await;
....
}
Параллелизм: Параллелизм означает, что приложение выполняет несколько задач одновременно (параллельно).
// convert array in streams to use async
let mut records = stream::iter(event.records);
// processing one element at a time
while let Some(record) = records.next().await {
Rust
Мое путешествие с Rust началось 6 января 2022 года, и с тех пор я написал немного о его настройке и показал несколько сравнений кода. Все эти посты относятся к серии Serverless Rust.
Я пытался показать, что Rust можно использовать не только для тяжелых сетевых задач, операционной системы или игр, но и для гораздо более высокоуровневых целей. На самом деле, после использования .NET и в основном Node.js для AWS Lambda, я решил расширить границы скорости, используя другую среду выполнения.
Время выполнения AWS Lambda может быть написано на любом языке программирования. Время выполнения – это программа, которая запускает метод-обработчик функции Lambda при вызове функции. Можно включить runtime в пакет развертывания функции в виде исполняемого файла с именем bootstrap. В этом случае крейт, поддерживаемый AWS, позволяет очень просто указать службе Lambda, как запускать приложения, написанные на языке Rust.
lambda_runtime = "0.5.1"
Существует множество примеров Lambda, написанных на Rust, но я хотел бы остановиться на основных зависимостях, необходимых для запуска проекта:
[dependencies]
aws-config = "0.12.0" // AWS SDK config and credential provider implementations.
lambda_runtime = "0.5.1" // Lambda runtime
serde = {version = "1.0", features = ["derive"] } // framework for serializing and deserializing
serde_json = "1.0.68" // framework for serializing and deserializing
tokio = "1.13.0" // A runtime for writing asynchronous required for the lambda_runtime
tracing-subscriber = "0.3" // required to enable CloudWatch error logging by the runtime
Как и в случае почти со всеми рунтимами, шаблон следует тем же правилам:
- Установите ваши зависимости
- Импортируйте их
- Объявите свой обработчик
Результат будет выглядеть следующим образом:
// imports my libraries
use lambda_runtime::{service_fn, Error, LambdaEvent};
use serde_json::Value;
// expected by lambda_runtime to make the main function asynchronous
#[tokio::main]
// Result<(), Error> is the idiomatic way to handle errors in Rust instead of try catch them
// https://doc.rust-lang.org/rust-by-example/error/result.html#using-result-in-main
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing_subscriber::fmt()
// this needs to be set to false, otherwise ANSI color codes will
// show up in a confusing manner in CloudWatch logs.
.with_ansi(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
// some custom code to init all my classes, SDK etc outside of the handler
let app_client = MyAppClient::builder()
.init_something(xxx)
.init_something_else()
.build();
// this is necessary because rust is not supported as a provided runtime
lambda_runtime::run(service_fn(|event: LambdaEvent<Value>| {
execute(&app_client, event)
}))
.await?; //block until the results come back. The ? is to handle the propagation of the error.
Ok(()) // tell the caller that things were successful
}
// our handler maybe in the future will be the only thing that we will write
// app_client is my custom code that contains all the initialisation outside of the handler
// _event is the payload and it is going to be deserialised into Value
pub async fn execute(app_client: &dyn MyAppInitialisation, _event: LambdaEvent<Value>) -> Result<(), Error> {
//do something
Ok(())
}
Не нужно бояться Rust, и он не так сложен, как другие языки. Мне понадобилось более или менее недели, чтобы написать свою первую лямбду, которая подключалась к DynamoDB, и, честно говоря, столько же времени я писал ее впервые на Node.js, .NET или Golang.
Мой изначальный опыт – это/был .NET и Java, так что, возможно, мне было гораздо проще разобраться с владением ими. По сути, это идея о том, что две части кода не могут одновременно обращаться к одной и той же части памяти.
Это уникальная особенность, которая позволяет Rust гарантировать безопасность памяти без сборщика мусора. Независимо от языка программирования, жизненный цикл памяти практически всегда одинаков:
- Выделить необходимую память
- Использовать выделенную память
- Освободить выделенную память, когда она больше не нужна.
Такие языки, как Javascript, являются неявными. Я объявляю свою переменную, а время выполнения языка заботится об остальном с помощью сборщика мусора. В языках со сборщиком мусора мне не нужно беспокоиться о том, что попадает в стек, а что в кучу. Данные в стеке удаляются, как только выходят за пределы области видимости. О данных, которые живут на куче, позаботится сборщик мусора, как только они перестанут быть нужными.
Короче говоря, все это хорошо и значительно облегчает жизнь разработчика.
В Rust нет сборщика мусора, и поэтому, как разработчик, я могу сделать одно из трех:
- Переместить сами данные и отказаться от права собственности на них.
- Создать копию данных и передать ее.
- Передать ссылку на данные и сохранить право собственности, позволив получателю взять их на время.
Это может быть сложно переварить, но ничего страшного в этом нет. Я привык и да:
- Сделать код более многословным
- Усложнить все параллелизмом
Поскольку я хочу быть хорошим бессерверным программистом, я держу свой код Lambda как можно меньше, не создавая большой сложности. Тем не менее, я уверен, что сообщество Rust будет радо пообщаться и поделиться советом, когда возникнет сложная проблема. Мне нравится использовать:
- Форум по языку программирования Rust
- Сообщество разработчиков языка программирования Rust на Slack!
Юнит-тесты Rust
В наше время, я думаю, все пишут модульные тесты, и у всех языков есть официальная библиотека или библиотека, на которую можно ссылаться. Однако, Rust поставляется с очень базовой поддержкой, которую я видел только > 10 лет назад, когда юнит-тесты не были вещью, и поэтому, когда я исследовал, я просто нашел что-то не очень полезное:
fn prints_and_returns_10(a: i32) -> i32 {
println!("I got the value {}", a);
10
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn this_test_will_pass() {
let value = prints_and_returns_10(4);
assert_eq!(10, value);
}
#[test]
fn this_test_will_fail() {
let value = prints_and_returns_10(8);
assert_eq!(5, value);
}
}
Во время написания тестов мне часто нужно что-то настроить перед запуском тестов и, возможно, что-то после завершения теста. Всего этого не существует, если только вы не пойдете и не найдете какую-нибудь библиотеку, которая обычно не совершенна, активно поддерживается и т.д. Еще одна вещь, с которой я боролся, – это мокинг. Так как я использую библиотеки AWS SDK, такие как DynamoDB, SQS и т.д., мне нужно все подражать, чтобы тестировать свою логику в полной изоляции.
К счастью для меня, существует отличный крейт Mockall, поддерживаемый ангелами, а его владелец Алан Сомерс довольно активно помогает сообществу при возникновении вопросов.
Как вы можете прочитать в руководстве:
Есть два способа использования Mockall. Самый простой – использовать #[automock]. Он может подражать большинству признаков или структур, используя только один блок impl. Для вещей, с которыми он не может справиться, есть mock!
Какой бы метод не использовался, основная идея одна и та же.
- Создайте имитируемую структуру. Его имя будет таким же, как у оригинала, с приставкой “Mock”.
- В своем тесте инстанцируйте имитационную структуру с помощью ее метода new или метода по умолчанию.
- Установите ожидания для макета структуры. Каждое ожидание может иметь требуемые совпадения аргументов, количество необходимых вызовов и требуемую позицию в последовательности. Каждое ожидание также должно иметь возвращаемое значение.
- Передайте объект-макет в код, который вы тестируете. Он вернет запрограммированные значения возврата, заданные в предыдущем шаге. Любой доступ, противоречащий вашим ожиданиям, вызовет панику.
Пример
Недавно я заинтересовался многорегиональной конфигурацией serverless, и оказалось, что существует множество сервисов AWS, которые помогут вам добиться мультирегиональности:
- Конфигурация отказоустойчивости CloudFront
- Amazon API Gateway
- Глобальные таблицы DynamoDB
- Бессерверный кэш
Как только у меня появится время, я опубликую полный пример проекта, а пока давайте рассмотрим случай аннулирования CloudFront в сценарии Serverless cache.
use aws_lambda_events::event::sqs::SqsEvent;
use aws_sdk_cloudfront::model::{invalidation_batch, paths};
use lambda_runtime::{service_fn, Error, LambdaEvent};
utils::injections::invalidation_di::{InvalidationAppClient, InvalidationAppInitialisation}};
use my_app::{dtos::invalidation_request::InvalidationRequest, utils::injections::invalidation_di::{InvalidationAppClient, InvalidationAppInitialisation}};
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_ansi(false)
.without_time()
.init();
let config = aws_config::load_from_env().await;
let cloudfront_client = aws_sdk_cloudfront::Client::new(&config);
let distribution_name = std::env::var("DISTRIBUTION_NAME").expect("DISTRIBUTION_NAME must be set");
let app_client = InvalidationAppClient::builder()
.cloudfront_client(cloudfront_client)
.distribution_id(distribution_name)
.build();
lambda_runtime::run(service_fn(|event: LambdaEvent<SqsEvent>| {
execute(&app_client, event)
}))
.await?;
Ok(())
}
pub async fn execute(app_client: &dyn InvalidationAppInitialisation, event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
println!("{:?}", event);
let mut items: Vec<String> = vec![];
for record in event.payload.records.into_iter() {
if let Some(body) = &record.body {
let request = serde_json::from_str::<InvalidationRequest>(&body)?;
items.push(format!("/?key1={}&key2={}", request.my_key1, request.my_key2));
}
}
let paths = paths::Builder::default()
.quantity(items.len().try_into().unwrap())
.set_items(Some(items))
.build();
let invalidation_batch = invalidation_batch::Builder::default()
.paths(paths)
.set_caller_reference(Some(format!("{}", app_client.get_timestamp())))
.build();
app_client.get_cloudfront_client()
.create_invalidation()
.distribution_id(app_client.get_distribution_id())
.invalidation_batch(invalidation_batch)
.send()
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use aws_sdk_cloudfront::{Client, Config, Credentials, Region};
use aws_smithy_client::{test_connection::TestConnection, erase::DynConnector};
use aws_smithy_http::body::SdkBody;
use lambda_http::Context;
use mockall::mock;
use my_app::error::ApplicationError;
async fn get_mock_config() -> aws_sdk_cloudfront::Config {
let cfg = aws_config::from_env()
.region(Region::new("eu-central-1"))
.credentials_provider(Credentials::new(
"accesskey",
"privatekey",
None,
None,
"dummy",
))
.load()
.await;
Config::new(&cfg)
}
fn get_request_builder() -> http::request::Builder {
http::Request::builder()
.header("content-type", "application/xml")
.uri(http::uri::Uri::from_static(
"https://cloudfront.amazonaws.com/2020-05-31/distribution/some_distribution/invalidation",
))
}
#[tokio::test]
async fn invalidate_key() -> Result<(), ApplicationError> {
// ARRANGE
let conn = TestConnection::new(vec![(
get_request_builder()
.body(SdkBody::from(
r#"<InvalidationBatch xmlns="http://cloudfront.amazonaws.com/doc/2020-05-31/"><Paths><Quantity>1</Quantity><Items><Path>/?key1=something&key2=something</Path></Items></Paths><CallerReference>1336468263</CallerReference></InvalidationBatch>"#,
))
.unwrap(),
http::Response::builder()
.status(200)
.body(SdkBody::from(r#"
<Invalidation>
<CreateTime>2019-12-05T18:40:49.413Z</CreateTime>
<Id>I2J0I21PCUYOIK</Id>
<InvalidationBatch>
<CallerReference>1336468263</CallerReference>
<Paths>
<Items>
<Path>/?key1=something&key2=something</Path>
</Items>
<Quantity>1</Quantity>
</Paths>
</InvalidationBatch>
<Status>InProgress</Status>
</Invalidation>
"#))
.unwrap(),
)]);
let client = Client::from_conf_conn(get_mock_config().await, DynConnector::new(conn.clone()));
let data = r#"{
"Records": [
{
"messageId": "059f34b4-87a3-46ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHDfsdgvbUMZj6rYigCgxlaS3SLy0a",
"body": "{"key1":"something","key2":"something"}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENHSNOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"aws_region": "us-east-2"
}
]
}"#;
let sqs = serde_json::from_str::<SqsEvent>(&data).unwrap();
let context = Context::default();
let event = LambdaEvent::new(sqs, context);
mock! {
pub InvalidationAppClient {}
impl InvalidationAppInitialisation for InvalidationAppClient {
fn get_timestamp(&self) -> i64 {
1336468263
}
fn get_cloudfront_client(&self) -> aws_sdk_cloudfront::Client {
client
}
fn get_distribution_id(&self) -> String {
"something".clone()
}
}
}
let mut mock = MockInvalidationAppClient::new();
mock.expect_get_timestamp().times(1).return_const(1336468263);
mock.expect_get_cloudfront_client().times(1).returning(move || client.clone());
mock.expect_get_distribution_id().times(1).return_const("some_distribution");
// ACT
execute(&mock, event).await?;
// ASSERT
assert_eq!(conn.requests().len(), 1);
conn.assert_requests_match(&vec![]);
Ok(())
}
}
Этот код очень прост:
- Лямбда запускается событием SQS.
- Последовательно обрабатываем SQS Batch, чтобы собрать все ключи для аннулирования в CloudFront.
- Мы аннулируем партию ключей для конкретной рассылки.
В части модульных тестов очень много всего, и я постараюсь выделить каждый фрагмент.
Здесь нет “BeforeEach” или “OneTimeSetUp”, поэтому вам нужно написать код для имитации. Скорее всего, я напишу что-то для эмуляции работы моих тестов и напишу какую-то общую реализацию для имитации AWS SDK.
Подражание AWS SDK
В тесте два метода вызываются в части ARRANGE модульного теста:
async fn get_mock_config() -> aws_sdk_cloudfront::Config {}
fn get_request_builder() -> http::request::Builder {}
Это методы для “передразнивания” сервиса AWS SDK, и для выполнения такой операции я могу использовать TestConnection, предоставляемый в каждом сервисе, что позволяет мне имитировать запрос и ответ.
Имейте в виду, что если в вашей логике есть несколько сервисов, вам нужно инстанцировать TestConnection для каждого сервиса, создать конкретный запрос для каждого сервиса и выяснить, какой “content-type” использовать и какой “URI” использовать.
Мокинг лямбда-событий
В Rust у нас есть два ящика для Lambda, и они могут требовать разную полезную нагрузку.
С помощью lambda_runtime:
lambda_runtime::run(service_fn(|event: LambdaEvent<SqsEvent>| {
execute(&app_client, event)
})).await?;
}
pub async fn execute(app_client: &dyn InvalidationAppInitialisation, event: LambdaEvent<SqsEvent>) -> Result<(), Error> { }
.....
#[tokio::test]
async fn my_test() -> Result<(), ApplicationError> {
let data = r#"{
"Records": [
{
"messageId": "059f34b4-87a3-46ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHDfsdgvbUMZj6rYigCgxlaS3SLy0a",
"body": "{"key1":"something","key2":"something"}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENHSNOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"aws_region": "us-east-2"
}
]
}"#;
let sqs = serde_json::from_str::<SqsEvent>(&data).unwrap();
let context = Context::default();
let event = LambdaEvent::new(sqs, context);
...
}
при интеграции лямбда API Gateway REST и HTTP API с помощью lambda_http:
lambda_http::run(service_fn(|event: Request| execute(&app_client, event))).await?;
...
pub async fn execute(app_client: &dyn InvalidationAppInitialisation, event: Request) > Result<impl IntoResponse, Error> { }
.....
#[tokio::test]
async fn my_test() -> Result<(), ApplicationError> {
let mut item = HashMap::new();
item.insert("xxxx".into(), vec!["key1".into()]);
let request = Request::default().with_query_string_parameters(item.clone());
//or in case of a post
let request = http::Request::builder()
.header("Content-Type", "application/json")
.body(Body::from(
r#"{"key1":"something", "key2": "something"}"#,
))
.unwrap();
...
}
Инициализация функции
Чтобы достичь максимальной скорости, я инициализировал свои классы, SDK-клиенты, соединения с базой данных и т.д. вне обработчика функции с помощью некоторого пользовательского кода, и передал его в метод обработчика вместе с полезной нагрузкой Lambda:
let app_client = ListAppClient::builder()
.list_by_brand_query(query)
.dynamo_db_client(dynamodb_client.clone())
.build();
lambda_runtime::run(service_fn(|event: LambdaEvent<SqsEvent>| {
execute(&app_client, event)
})).await?;
MockAll может подражать признакам, в данном случае InvalidationAppInitialisation.
pub async fn execute(app_client: &dyn InvalidationAppInitialisation, event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
Внутри модульного теста в части ARRANGE присутствует синтаксис MockAll:
mock! {
pub InvalidationAppClient {}
impl InvalidationAppInitialisation for InvalidationAppClient {
fn get_timestamp(&self) -> i64 {
1336468263
}
fn get_cloudfront_client(&self) -> aws_sdk_cloudfront::Client {
client
}
fn get_distribution_id(&self) -> String {
"something".clone()
}
}
}
let mut mock = MockInvalidationAppClient::new();
mock.expect_get_timestamp().times(1).return_const(1336468263);
mock.expect_get_cloudfront_client().times(1).returning(move || client.clone());
mock.expect_get_distribution_id().times(1).return_const("some_distribution");
Мне нужно переопределить методы, над которыми я хочу поиздеваться, и вернуть необходимую информацию, чтобы тест прошел без создания реального соединения или беспокойства о реализации зависимостей моего класса.
Издевательство, когда проверка заемщика кусает вас в ответ
В начале этого поста я упомянул об использовании параллелизма, когда это возможно.
Если я хочу обрабатывать все SQS параллельно, я напишу что-то вроде:
pub async fn execute(app_client: &dyn InvalidationAppInitialisation, event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let mut tasks = Vec::with_capacity(event.records.len());
....
for record in event.records.into_iter() {
// This will be the concrete init and it will not be possible to mock do_something()
// let shared_app_client = app_client.clone();
let shared_app_client = app_client.clone_to_arc();
tasks.push(tokio::spawn(async move {
if let Some(body) = &record.body {
let request = serde_json::from_str::<MyStruct>(&body);
if let Ok(request) = request {
shared_app_client.do_something(&request).await
.map_or_else(|e| {
println!("ERROR {:?}", e);
}, |_| ());
}
}
}));
}
join_all(tasks).await;
....
}
Я использую The Atomic Reference Counter (Arc) для обмена неизменяемыми данными между потоками потокобезопасным способом. В данном случае мы хотим поделиться shared_app_client, который хранит ссылки на мои классы, клиенты SDK, подключения к базе данных и т.д.
Чтобы сделать возможным мокинг, мне нужно реализовать обертку для моего трейта:
fn clone_to_arc(&self) -> Arc<dyn InvalidationAppInitialisation> {
Arc::new(self.clone())
}
Имея такую обертку, я могу сделать следующее:
mock! {
pub InvalidationAppClient {}
impl InvalidationAppInitialisation for InvalidationAppClient {
....
fn clone_to_arc(&self) -> Arc<dyn InvalidationAppInitialisation> {
Arc::new(self.clone())
}
}
}
let mut mock = MockInvalidationAppClient::new();
mock.expect_clone_to_arc().times(1).returning(move || {
let mut m = MockInvalidationAppClient::new();
m.expect_do_something().times(1).returning(|_| Ok(()));
Arc::new(m)
});
Производительность
Все знают, что Rust и Golang – самые быстрые, за ними следуют .NET и Java (только теплое состояние) и, в конце концов, Python и Node.js.
Я запускаю функцию AWS Lambda, загружающую материал из S3 или DynamoDB, и добился следующих результатов:
count | p50 | p90 | p99 | максимум | |
---|---|---|---|---|---|
теплый | > 20M | 1.19мс | 1.29мс | 1.74мс | 108.42мс |
холод | < 400 | 312.12мс | 416.21мс | 534.23мс | 1.11s |
теплый | > 12M | 13.78ms | 19.39ms | 36.49мс | 148.42мс |
холод | < 200 | 102.56мс | 125.18 | 145.54мс | 157,13 мс |
По моему опыту, задержка происходит в сети AWS, поэтому цифры всегда могут отличаться, но я думаю, что они довольно хорошие.
Если вас интересует сравнение производительности, AWS опубликовал одно приложение с несколькими языками:
- Graavlm
- Rust
- TypeScript
- Golang
- Kotlin
- .NET
- Groovy
- Python