2.2.1. Rust MQTT Publisher

Der Client (RaspberryPi 4B) soll Daten eines 3-Pin KY-015 DHT11 Temperatur- und Luftfeuchtigkeitsensors auslesen und an den MQTT Broker unter der Topic tel/dht11 publishen. tel/dht11 steht für den Pfad Telemetrie und DHT11.

Bemerkung

Dieser Publisher wurde in Rust verfasst.

2.2.1.1. Infos zum Start

Der DHT11 Sensor hat drei Pins:

  • GND (rechts)

  • Vcc (3.3 - 5.5V) (mittig)

  • Signal (links)

Diese Pins werden wie folgt an die RaspberryPi GPIO pins angeschlossen:

KY-015 DHT11

RaspberryPI 4

Signal

GPIO 23 (Pin 16)

Vcc

+ 3.3V (Pin 1)

GND

GND (Pin 6)

Hier ist ein kleiner Sketch, wie es aussehen sollte, wenn alles fertig verkabelt ist:

rpi2dht11.svg

Grafik Quelle: https://sensorkit.joy-it.net/en/sensors/ky-015

Hier noch ein Bild, wie es in Wirklichkeit aussieht:

rpianddht11.png

2.2.1.2. Auslesen der Daten

Dieser Abschnitt befasst sich mit dem Auslesen der Daten, welche anschließend gepublished werden. In diesem Beispiel wurden zwei Methoden verwendet, um an Daten zu gelangen:

2.2.1.2.1. Echte Werte

Zum Auslesen der Temperaturwerte wird die sysfs_gpio crate verwendet. Diese Crate benutzt die sysfs API unter Linux, um GPIO Pins anzusteuern. GPIO pins liegen unter /sys/class/gpio/.

Wichtig

Das Programm kompilliert unter MacOS und Windows nicht, da es keine Mocks für sysfs_gpio gibt. Da das sysfs nur unter Linux existiert, muss man hier darauf verzichten.

extern crate sysfs_gpio;
use sysfs_gpio::{Direction, Pin};

struct DHT11Data {
    temperature: f32,
    humidity: f32,
}

fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
    let pin: Pin = Pin::new(dht11pin);
    let humidity: f32;
    let temperature: f32;

    // -- snip -- //

    if let Err(err) = pin.with_exported(|| {
        // Berechnung von temperature und humidity
    }) {
        // Error Handling
    };

    // Überprüfung der Werte von temperature und humidity
    // -- snip -- //

    // Falls alles okay ist, wird ein DHT11Data Struct als Ok Typ zurückgegeben
    Ok(DHT11Data { humidity, temperature })
}

Innerhalb der Closure in pin.with_exported kann auf den GPIO Pin zugegriffen werden. Hier werden die Temperatur und Luftfeuchtigkeitswerte ermittelt, ähnlich wie in der Arduino Implementierung aus Adafruits Github.

Der folgende Code zeigt die Initialisierung des DHT11 Sensors:

use std::thread::sleep;
use std::time::Duration;

// -- snip -- //
fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
    // -- snip -- //
    if let Err(err) = pin.with_exported(|| {
        // Go into high impedence state to let pull-up raise data line level
        pin.set_direction(Direction::High).unwrap();
        sleep(Duration::new(0, 1_000_000)); // sleep 1 ms
        pin.set_direction(Direction::Out).unwrap();
        pin.set_value(0).unwrap();

        // Sleep 20ms (18ms in datasheet)
        sleep(Duration::new(0, 20 * 1_000_000));

        // End the start signal by setting data line high for 40µs.
        pin.set_direction(Direction::High).unwrap();
        sleep(Duration::new(0, 40 * 1_000));

        // -- snip -- //
    }) {
        // Error Handling
    }
    // -- snip -- //
}
// -- snip -- //

Ist der Sensor initialisiert, können die Daten ausgelesen werden:

const CYCLES: u8 = 80;
const MAX_CYCLES: u16 = 255;

fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
    let pin: Pin = Pin::new(dht11pin);
    let mut laststate: u8 = 1;
	let mut counter: u16 = 0;
	let mut j: usize = 0;
    let humidity: f32;
    let temperature: f32;
	let fahrenheit: f32;
    let mut dht11_raw_data: Vec<u8> = vec![0; 5];

    if let Err(err) = pin.with_exported(|| {
        // -- snip -- //

        // Populate dht11_raw_data
        for i in 0..CYCLES {
            counter = 0;
            // wait for start sequence
            while pin.get_value().unwrap() == laststate {
                counter += 1;
                sleep(Duration::new(0, 1_000)); // Sleep 1µs
                if counter == MAX_CYCLES {
                    break;
                }
            }

            laststate = pin.get_value().unwrap();

            if counter == MAX_CYCLES {
                break;
            }

            // populate data vector
            if (i >= 4) && (i % 2 == 0) {
                dht11_raw_data[j] <<= 1;
                if counter > 16 {
                    dht11_raw_data[j] |= 1;
                }
                j += 1;
            }
        }
        // Everything went fine; dht11_raw_data is populated
        Ok(())
    }) {
        // Something went wrong
        return Err(format!("Defaulting humidity to {}%. Could not read from GPIO: {}", "-1".bold(), err));
    };

    // if more than 40 values have been read, then the data can be valid
    if j >= 40 {
        humidity = dht11_raw_data[0] as f32 + (dht11_raw_data[1] as f32) / 100.0;
        temperature = dht11_raw_data[2] as f32 + (dht11_raw_data[3] as f32) / 100.0;
        fahrenheit = temperature * 9.0 / 5.0 + 32.0;
        println!(
            "Humidity = {} % Temperature = {} °C ({} °F)",
            humidity.to_string().green(),
            temperature.to_string().green(),
            fahrenheit.to_string().green()
        );
    } else  {
        // Faulty data
        return Err(format!("Bad data from DHT11 Sensor. {}", "Defaulting to -1%".red()));
    }
    // Return DHT11Data Struct if everything went as planned
    Ok(DHT11Data { humidity, temperature })
}

2.2.1.2.2. Stubbing

Da der Client auch auf anderen Geräten als einem RaspberryPi gestartet werden sollen kann, ist ein Stub zum Sensordaten auslesen ebenfalls wichtig.

Der Stub generiert Zufallsdaten für Temperaturen zwischen 20.0 und 21.0°C und Luftfeuchtigkeit zwischen 45.0 und 46.0%.

use rand::{Rng, rngs::ThreadRng};
// -- snip -- //
fn get_dht11_data_random() -> DHT11Data {
    let mut rng: ThreadRng = rand::thread_rng();
    // Produce humidity between 45 and 46%
    let humidity: f32 = rng.gen_range(45.0..46.0);
    let temperature: f32 = rng.gen_range(20.0..21.0);
    DHT11Data { temperature, humidity }
}
// -- snip -- //

2.2.1.3. Erstellen eines MQTT Clients

In diesem Beispiel wurde die paho_mqtt Crate verwendet.

Diese Crate enthält Structs zum erstellen eines Publisher und Subscriber Clients und macht den folgenden Code besser lesbar als eigene Implementierungen.

Zunächst werden argumente von der Kommandozeile gelesen, um den Publisher Client zu konfigurieren:

#[derive(Clone)]
pub struct MqttOptions {
    pub mqtt_broker: String,
    pub mqtt_user: String,
    pub mqtt_password: String,
    pub topic: String,
    pub client_id: String,
    pub dht11_pin: Option<u64>,
}

pub fn get_options() -> MqttOptions {
    // Define arguments and parse them
    // -- snip -- //

    // return parsed arguments
    MqttOptions { mqtt_broker, mqtt_user, mqtt_password, topic, client_id, dht11_pin }
}

Diese MqttOptions können dann verwendet werden, um den Client zu konstruieren:

use paho_mqtt::{
    CreateOptionsBuilder,
    Client,
    ConnectOptionsBuilder,
    CreateOptions,
    ConnectOptions
};
use std::process;

// -- snip -- //
pub fn get_client(options: &MqttOptions) -> Client {
    // Build Create options from MqttOptions
    let create_opts: CreateOptions = CreateOptionsBuilder::new()
        .server_uri(String::from("tcp://") + &options.mqtt_broker)
        .client_id(&options.client_id)
        .finalize();

    // Create Client with Create Options
    let client: Client = Client::new(create_opts).unwrap_or_else(|err: paho_mqtt::Error| {
        println!("{}: {:?}", "Error creating the client".red(), err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts: ConnectOptions = ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(10))
        .clean_session(true)
        .user_name(&options.mqtt_user)
        .password(&options.mqtt_password)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = client.connect(conn_opts) {
        println!("{}:\n\t{:?}", "Unable to connect".red(), e);
        process::exit(1);
    }

    // Return the client
    client
}

2.2.1.4. Nachrichten generieren

Alles in allem müssen jetzt nur noch die gesammelten Daten mithilfe des erstellen Clients gepublished werden. Dazu hat paho_mqtt das Struct Message. Mithilfe von Messages kann dies realisiert werden.

Messages bekommen einen String als payload mit, der an eine Topic gepublished werden soll. Dieser String sollte in unserem Fall in JSON Syntax vorliegen, weshalb die Crate serde_json verwendet wird. In dieser Crate existiert ein Macro json!, dass dynamisch einen JSON String erzeugen kann.

Hier ist eine Funktion, die mithilfe der Optionen eine paho_mqtt::Message baut:

use paho_mqtt::{Message, QOS_1};
use serde_json::json;
use tokio::time;

// -- snip -- //

pub fn get_humidity_payload(options: &MqttOptions) -> Message {
    // Decide which method of getting DHT11Data is used here
    let dht11_data: DHT11Data = if let Some(dht11_pin) = options.dht11_pin {
        match get_dht11_data(dht11_pin) {
            Ok(data) => data,
            Err(err) => {
                println!("{err}");
                // return -1% humidity and 0 K if something went wrong
                DHT11Data { humidity: -1.0, temperature: -273.15 }
            },
        }
    } else {
        get_dht11_data_random()
    };
    // Generate Timestamp for payload
    let timestamp: u128 = get_sys_time_in_millis();

    // Generate JSON String for message payload
    let data: String = json!({
        "timestamp": timestamp,
        "humidity": dht11_data.humidity,
        "temperature_c": dht11_data.temperature,
        "temperature_f": dht11_data.get_fahrenheit()
    }).to_string();

    // Create and return new MQTT Message with given topic, payload and QOS
    Message::new(&options.topic, data, QOS_1)
}

Die verwendete Funktion get_sys_time_in_millis ist eine Eigenimplementierung und macht im python subscriber das decoden von Zeitstempeln einfacher, da es die Millisekunden seit der ersten UNIX_EPOCH zurück gibt, wie Pythons time.time:

use std::time::{SystemTime, UNIX_EPOCH};

fn get_sys_time_in_millis() -> u128 {
    match SystemTime::now().duration_since(UNIX_EPOCH) {
        Ok(n) => n.as_millis(),
        Err(_) => {panic!("SystemTime before UNIX EPOCH!")},
    }
}

2.2.1.5. Nachrichten Publishen

Nachrichten sollen periodisch / zyklisch gepublished werden. Um das zu erzielen, wird in diesem Beispiel die Crate tokio verwendet, welche Asynchronitat in Rust bringt.

use paho_mqtt::Client;
use rust_mqtt_pub::MqttOptions;
use tokio::{
    time::{
        sleep,
        Duration
    },
    signal::ctrl_c,
    task
};

// 1s between MQTT Messages
const TIME_BETWEEN_MQTT_MSGS: u64 = 1000;

#[tokio::main]
async fn main() {
    // read commandline options
    let options: MqttOptions = rust_mqtt_pub::get_options();

    // Create Client
    let client: Client = rust_mqtt_pub::get_client(&options);

    task::spawn(async || {
        // Signal handler that stops everything on SIGINT (ctrl + c)
    });

    // Main loop, where MQTT Messages are generated and published
    while client.is_connected() {
        task::spawn(async || {
            // Publish MQTT Messages
        });
        sleep(Duration::from_millis(TIME_BETWEEN_MQTT_MSGS)).await;
    }
}

Durch die Asynchronität lässt sich zu großer Datenstau vermeiden, da das Lesen von Daten des DHT11 somit nicht blockiert.

Die beiden Closures, welche asynchron ablaufen sollten, wurden hier als eigene Funktionen definiert, um den MQTT Client clonen zu können:

// --snip -- //
async fn on_sigint(client: Client) {
    loop {
        // await ctrl c
        ctrl_c().await.unwrap();

        // Disconnect from the broker.
        println!("Got SIGINT, disconnecting from the Broker ...");
        match client.clone().disconnect(None) {
            Ok(_) => {
                println!("Disconnected from the MQTT Broker.")
            },
            Err(err) => {
                println!("Disconnect from the broker failed: {err:?}");
            },
        }
    }
}

async fn get_and_publish(client: Client, options: MqttOptions) {
    // Generate Payload
    let payload: Message = rust_mqtt_pub::get_humidity_payload(&options);
    // Publish payload
    let _  = client.publish(payload);
}

Zuletzt müssen nur noch die beiden Funktionen in den Threads verwendet werden:

use paho_mqtt::{Client, Message};
use rust_mqtt_pub::MqttOptions;
use tokio::{
    time::{
        sleep,
        Duration
    },
    signal::ctrl_c,
    task
};
use colored::Colorize;

const TIME_BETWEEN_MQTT_MSGS: u64 = 1000;

#[tokio::main]
async fn main() {
    let options: MqttOptions = rust_mqtt_pub::get_options();
    let client: Client = rust_mqtt_pub::get_client(&options);

    task::spawn(on_sigint(client.clone()));
    while client.is_connected() {
        task::spawn(get_and_publish(client.clone(), options.clone()));
        sleep(Duration::from_millis(TIME_BETWEEN_MQTT_MSGS)).await;
    }
}

async fn on_sigint(client: Client) {
    loop {
        ctrl_c().await.unwrap();

        // Disconnect from the broker.
        println!("Got {}, disconnecting from the Broker ...", "SIGINT".bold());
        match client.clone().disconnect(None) {
            Ok(_) => {
                println!("{} from the MQTT Broker.", "Disconnected".green().bold())
            },
            Err(err) => {
                println!("{}: {:?}", "Disconnect from the broker failed".red(), err);
            },
        }
    }
}

async fn get_and_publish(client: Client, options: MqttOptions) {
    let payload: Message = rust_mqtt_pub::get_humidity_payload(&options);
    let _  = client.publish(payload);
}

Und das wars! Damit ist der Rust MQTT Publisher vollständig.

Die sourcen befinden sich im Hochschule Gitlab unter https://gitlab.informatik.hs-augsburg.de/dva/berichte-2023/56/-/tree/main/projects/mqtt/rust_mqtt_pub