Как использовать MQTT в Rust

Rust – это мультипарадигмальный язык программирования, разработанный для обеспечения производительности и безопасности, особенно безопасного параллелизма. Rust синтаксически похож на C++, но может гарантировать безопасность памяти за счет использования средства проверки заимствований для проверки ссылок. В Rust безопасность памяти достигается без сборки мусора, а подсчет ссылок необязателен.1

MQTT – это облегченный протокол обмена сообщениями IoT, основанный на модели публикации/подписки. Он может использовать очень мало кода и пропускной способности для обеспечения надежной службы сообщений в реальном времени для сетевого оборудования. Кроме того, он широко используется в IoT, мобильном Интернете, интеллектуальном оборудовании, IoV, энергетике и промышленности.

Эта статья в основном рассказывает о том, как использовать клиентскую библиотеку paho-mqtt в проекте Rust, и как реализовать подключение, подписку, обмен сообщениями, отписку и т.д. между клиентом и брокером MQTT.

Инициализация проекта

Этот проект использует Rust 1.44.0 для разработки и тестирования, и управляется с помощью инструмента управления пакетами Cargo 1.44.0. Читатель может проверить текущую версию Rust с помощью следующей команды.

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
Войти в полноэкранный режим Выйти из полноэкранного режима

Выбор клиентской библиотеки MQTT

paho-mqtt является наиболее универсальным и широко используемым MQTT-клиентом в текущей версии Rust. Последняя версия 0.7.1 поддерживает MQTT v5, 3.1.1, 3.1, а также поддерживает передачу данных через стандартный TCP, SSL / TLS, WebSockets, и поддержку QoS 0, 1, 2 и т.д.

Проект инициализации

Выполните следующую команду для создания нового проекта Rust под названием mqtt-example.

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package
Войти в полноэкранный режим Выйдите из полноэкранного режима

Отредактируйте файл Cargo.toml в проекте, добавьте адрес библиотеки paho-mqtt в dependencies и укажите двоичный файл, соответствующий файлу кода subscribe, publish.

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"
Вход в полноэкранный режим Выход из полноэкранного режима

Использование Rust MQTT

Создание клиентского соединения

В этой статье в качестве MQTT-брокера тестового соединения будет использоваться бесплатный публичный MQTT-брокер, предоставляемый EMQX. Этот сервис основан на облачной платформе EMQX для создания MQTT IoT. Информация о доступе к серверу выглядит следующим образом:

  • Брокер: broker.emqx.io
  • TCP-порт: 1883
  • Порт Websocket: 8083

Настройка параметров подключения MQTT Broker

Настройте адрес подключения MQTT Broker (включая порт), тему (здесь мы настроили две темы) и идентификатор клиента.

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
Вход в полноэкранный режим Выход из полноэкранного режима

Записать код подключения MQTT

Напишите код MQTT-соединения, адрес соединения можно передать в качестве аргумента командной строки при выполнении двоичного файла для улучшения работы пользователя. Обычно нам нужно создать клиента, а затем подключить его к broker.emqx.io.

let host = env::args().nth(1).unwrap_or_else(||
    DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri(host)
    .client_id(DFLT_CLIENT.to_string())
    .finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(true)
    .finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
    println!("Unable to connect:nt{:?}", e);
    process::exit(1);
}
Войти в полноэкранный режим Выход из полноэкранного режима

Публикация сообщений

Здесь мы публикуем в общей сложности пять сообщений в две темы rust/mqtt и rust/test, в зависимости от четности цикла.

for num in 0..5 {
    let content =  "Hello world! ".to_string() + &num.to_string();
    let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
    if num % 2 == 0 {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
        msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
    } else {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
    }
    let tok = cli.publish(msg);

            if let Err(e) = tok {
                    println!("Error sending message: {:?}", e);
                    break;
            }
}
Вход в полноэкранный режим Выход из полноэкранного режима

Подписаться

Прежде чем клиент подключился, потребитель должен быть инициализирован. Здесь мы в цикле обрабатываем очередь сообщений в потребителе и выводим имя подписавшейся темы и содержание полученных сообщений.

fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    ...
    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();
    ...
    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }
    ...
}
Вход в полноэкранный режим Выход из полноэкранного режима

Полный код

Код для публикации сообщений

use std::{
    env,
    process,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:nt{:?}", e);
        process::exit(1);
    }

    // Create a message and publish it.
    // Publish message to 'test' and 'hello' topics.
    for num in 0..5 {
        let content =  "Hello world! ".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);

                if let Err(e) = tok {
                        println!("Error sending message: {:?}", e);
                        break;
                }
    }


    // Disconnect from the broker.
    let tok = cli.disconnect(None);
    println!("Disconnect from the broker");
    tok.unwrap();
}
Вход в полноэкранный режим Выйти из полноэкранного режима

Код для подписки

Для улучшения удобства пользователей подписка на сообщения отключается, а темы подписываются заново после восстановления соединения.

use std::{
    env,
    process,
    thread,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();

    // Define the set of options for the connection.
    let lwt = mqtt::MessageBuilder::new()
        .topic("test")
        .payload("Consumer lost connection")
        .finalize();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .will_message(lwt)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:nt{:?}", e);
        process::exit(1);
    }

    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }

    // If still connected, then disconnect now.
    if cli.is_connected() {
        println!("Disconnecting");
        cli.unsubscribe_many(DFLT_TOPICS).unwrap();
        cli.disconnect(None).unwrap();
    }
    println!("Exiting");
}
Вход в полноэкранный режим Выход из полноэкранного режима

Запуск и тестирование

Компиляция двоичных файлов

Следующая команда генерирует двоичный файл sub, pub в каталоге mqtt-example/target/debug.

cargo build
Вход в полноэкранный режим Выход из полноэкранного режима

Подписка сообщений

Выполните двоичный файл sub и дождитесь публикации сообщения.

Публикация сообщения

Выполнив двоичный файл pub, вы увидите, что сообщения были опубликованы в темах rust/test и rust/mqtt соответственно.


Между тем, опубликованные сообщения также видны в подписке на сообщения.

На данный момент мы завершили использование клиента paho-mqtt для подключения к публичному MQTT брокеру, а также реализовали подключение, публикацию сообщений и подписку между тестовым клиентом и MQTT брокером.


  1. https://en.wikipedia.org/wiki/Rust_(язык_программирования)

Оцените статью
Procodings.ru
Добавить комментарий