2.2.1. Rust MQTT Publisher
Der Client (RaspberryPi 4B) soll Daten eines 3-Pin KY-015 DHT11 Temperatur- und Luftfeuchtigkeitsensors auslesen und an den MQTT Broker unter der Topic tel/dht11 publishen. tel/dht11 steht für den Pfad Telemetrie und DHT11.
Bemerkung
Dieser Publisher wurde in Rust verfasst.
2.2.1.1. Infos zum Start
Der DHT11 Sensor hat drei Pins:
GND (rechts)
Vcc (3.3 - 5.5V) (mittig)
Signal (links)
Diese Pins werden wie folgt an die RaspberryPi GPIO pins angeschlossen:
KY-015 DHT11 |
RaspberryPI 4 |
---|---|
Signal |
GPIO 23 (Pin 16) |
Vcc |
+ 3.3V (Pin 1) |
GND |
GND (Pin 6) |
Hier ist ein kleiner Sketch, wie es aussehen sollte, wenn alles fertig verkabelt ist:
Grafik Quelle: https://sensorkit.joy-it.net/en/sensors/ky-015
Hier noch ein Bild, wie es in Wirklichkeit aussieht:
2.2.1.2. Auslesen der Daten
Dieser Abschnitt befasst sich mit dem Auslesen der Daten, welche anschließend gepublished werden. In diesem Beispiel wurden zwei Methoden verwendet, um an Daten zu gelangen:
2.2.1.2.1. Echte Werte
Zum Auslesen der Temperaturwerte wird die sysfs_gpio crate verwendet. Diese Crate benutzt die sysfs API unter Linux, um GPIO Pins anzusteuern. GPIO pins liegen unter /sys/class/gpio/.
Wichtig
Das Programm kompilliert unter MacOS und Windows nicht, da es keine Mocks für sysfs_gpio gibt. Da das sysfs nur unter Linux existiert, muss man hier darauf verzichten.
extern crate sysfs_gpio;
use sysfs_gpio::{Direction, Pin};
struct DHT11Data {
temperature: f32,
humidity: f32,
}
fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
let pin: Pin = Pin::new(dht11pin);
let humidity: f32;
let temperature: f32;
// -- snip -- //
if let Err(err) = pin.with_exported(|| {
// Berechnung von temperature und humidity
}) {
// Error Handling
};
// Überprüfung der Werte von temperature und humidity
// -- snip -- //
// Falls alles okay ist, wird ein DHT11Data Struct als Ok Typ zurückgegeben
Ok(DHT11Data { humidity, temperature })
}
Innerhalb der Closure
in pin.with_exported
kann auf den GPIO Pin
zugegriffen werden.
Hier werden die Temperatur und Luftfeuchtigkeitswerte ermittelt, ähnlich wie
in der
Arduino Implementierung
aus Adafruits Github.
Der folgende Code zeigt die Initialisierung des DHT11 Sensors:
use std::thread::sleep;
use std::time::Duration;
// -- snip -- //
fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
// -- snip -- //
if let Err(err) = pin.with_exported(|| {
// Go into high impedence state to let pull-up raise data line level
pin.set_direction(Direction::High).unwrap();
sleep(Duration::new(0, 1_000_000)); // sleep 1 ms
pin.set_direction(Direction::Out).unwrap();
pin.set_value(0).unwrap();
// Sleep 20ms (18ms in datasheet)
sleep(Duration::new(0, 20 * 1_000_000));
// End the start signal by setting data line high for 40µs.
pin.set_direction(Direction::High).unwrap();
sleep(Duration::new(0, 40 * 1_000));
// -- snip -- //
}) {
// Error Handling
}
// -- snip -- //
}
// -- snip -- //
Ist der Sensor initialisiert, können die Daten ausgelesen werden:
const CYCLES: u8 = 80;
const MAX_CYCLES: u16 = 255;
fn get_dht11_data(dht11pin: u64) -> Result<DHT11Data, String> {
let pin: Pin = Pin::new(dht11pin);
let mut laststate: u8 = 1;
let mut counter: u16 = 0;
let mut j: usize = 0;
let humidity: f32;
let temperature: f32;
let fahrenheit: f32;
let mut dht11_raw_data: Vec<u8> = vec![0; 5];
if let Err(err) = pin.with_exported(|| {
// -- snip -- //
// Populate dht11_raw_data
for i in 0..CYCLES {
counter = 0;
// wait for start sequence
while pin.get_value().unwrap() == laststate {
counter += 1;
sleep(Duration::new(0, 1_000)); // Sleep 1µs
if counter == MAX_CYCLES {
break;
}
}
laststate = pin.get_value().unwrap();
if counter == MAX_CYCLES {
break;
}
// populate data vector
if (i >= 4) && (i % 2 == 0) {
dht11_raw_data[j] <<= 1;
if counter > 16 {
dht11_raw_data[j] |= 1;
}
j += 1;
}
}
// Everything went fine; dht11_raw_data is populated
Ok(())
}) {
// Something went wrong
return Err(format!("Defaulting humidity to {}%. Could not read from GPIO: {}", "-1".bold(), err));
};
// if more than 40 values have been read, then the data can be valid
if j >= 40 {
humidity = dht11_raw_data[0] as f32 + (dht11_raw_data[1] as f32) / 100.0;
temperature = dht11_raw_data[2] as f32 + (dht11_raw_data[3] as f32) / 100.0;
fahrenheit = temperature * 9.0 / 5.0 + 32.0;
println!(
"Humidity = {} % Temperature = {} °C ({} °F)",
humidity.to_string().green(),
temperature.to_string().green(),
fahrenheit.to_string().green()
);
} else {
// Faulty data
return Err(format!("Bad data from DHT11 Sensor. {}", "Defaulting to -1%".red()));
}
// Return DHT11Data Struct if everything went as planned
Ok(DHT11Data { humidity, temperature })
}
2.2.1.2.2. Stubbing
Da der Client auch auf anderen Geräten als einem RaspberryPi gestartet werden sollen kann, ist ein Stub zum Sensordaten auslesen ebenfalls wichtig.
Der Stub generiert Zufallsdaten für Temperaturen zwischen 20.0 und 21.0°C und Luftfeuchtigkeit zwischen 45.0 und 46.0%.
use rand::{Rng, rngs::ThreadRng};
// -- snip -- //
fn get_dht11_data_random() -> DHT11Data {
let mut rng: ThreadRng = rand::thread_rng();
// Produce humidity between 45 and 46%
let humidity: f32 = rng.gen_range(45.0..46.0);
let temperature: f32 = rng.gen_range(20.0..21.0);
DHT11Data { temperature, humidity }
}
// -- snip -- //
2.2.1.3. Erstellen eines MQTT Clients
In diesem Beispiel wurde die paho_mqtt Crate verwendet.
Diese Crate enthält Structs
zum erstellen eines Publisher und Subscriber
Clients und macht den folgenden Code besser lesbar als eigene
Implementierungen.
Zunächst werden argumente von der Kommandozeile gelesen, um den Publisher Client zu konfigurieren:
#[derive(Clone)]
pub struct MqttOptions {
pub mqtt_broker: String,
pub mqtt_user: String,
pub mqtt_password: String,
pub topic: String,
pub client_id: String,
pub dht11_pin: Option<u64>,
}
pub fn get_options() -> MqttOptions {
// Define arguments and parse them
// -- snip -- //
// return parsed arguments
MqttOptions { mqtt_broker, mqtt_user, mqtt_password, topic, client_id, dht11_pin }
}
Diese MqttOptions können dann verwendet werden, um den Client zu konstruieren:
use paho_mqtt::{
CreateOptionsBuilder,
Client,
ConnectOptionsBuilder,
CreateOptions,
ConnectOptions
};
use std::process;
// -- snip -- //
pub fn get_client(options: &MqttOptions) -> Client {
// Build Create options from MqttOptions
let create_opts: CreateOptions = CreateOptionsBuilder::new()
.server_uri(String::from("tcp://") + &options.mqtt_broker)
.client_id(&options.client_id)
.finalize();
// Create Client with Create Options
let client: Client = Client::new(create_opts).unwrap_or_else(|err: paho_mqtt::Error| {
println!("{}: {:?}", "Error creating the client".red(), err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts: ConnectOptions = ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(10))
.clean_session(true)
.user_name(&options.mqtt_user)
.password(&options.mqtt_password)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = client.connect(conn_opts) {
println!("{}:\n\t{:?}", "Unable to connect".red(), e);
process::exit(1);
}
// Return the client
client
}
2.2.1.4. Nachrichten generieren
Alles in allem müssen jetzt nur noch die gesammelten Daten mithilfe des
erstellen Clients gepublished werden.
Dazu hat paho_mqtt das Struct Message
.
Mithilfe von Messages kann dies realisiert werden.
Messages bekommen einen String als payload mit, der an eine Topic gepublished
werden soll.
Dieser String sollte in unserem Fall in JSON Syntax vorliegen, weshalb die
Crate serde_json verwendet
wird.
In dieser Crate existiert ein Macro json!
, dass dynamisch einen JSON String
erzeugen kann.
Hier ist eine Funktion, die mithilfe der Optionen eine paho_mqtt::Message
baut:
use paho_mqtt::{Message, QOS_1};
use serde_json::json;
use tokio::time;
// -- snip -- //
pub fn get_humidity_payload(options: &MqttOptions) -> Message {
// Decide which method of getting DHT11Data is used here
let dht11_data: DHT11Data = if let Some(dht11_pin) = options.dht11_pin {
match get_dht11_data(dht11_pin) {
Ok(data) => data,
Err(err) => {
println!("{err}");
// return -1% humidity and 0 K if something went wrong
DHT11Data { humidity: -1.0, temperature: -273.15 }
},
}
} else {
get_dht11_data_random()
};
// Generate Timestamp for payload
let timestamp: u128 = get_sys_time_in_millis();
// Generate JSON String for message payload
let data: String = json!({
"timestamp": timestamp,
"humidity": dht11_data.humidity,
"temperature_c": dht11_data.temperature,
"temperature_f": dht11_data.get_fahrenheit()
}).to_string();
// Create and return new MQTT Message with given topic, payload and QOS
Message::new(&options.topic, data, QOS_1)
}
Die verwendete Funktion get_sys_time_in_millis ist eine Eigenimplementierung
und macht im python subscriber das decoden von Zeitstempeln einfacher, da es
die Millisekunden seit der ersten UNIX_EPOCH zurück gibt, wie Pythons
time.time
:
use std::time::{SystemTime, UNIX_EPOCH};
fn get_sys_time_in_millis() -> u128 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_millis(),
Err(_) => {panic!("SystemTime before UNIX EPOCH!")},
}
}
2.2.1.5. Nachrichten Publishen
Nachrichten sollen periodisch / zyklisch gepublished werden. Um das zu erzielen, wird in diesem Beispiel die Crate tokio verwendet, welche Asynchronitat in Rust bringt.
use paho_mqtt::Client;
use rust_mqtt_pub::MqttOptions;
use tokio::{
time::{
sleep,
Duration
},
signal::ctrl_c,
task
};
// 1s between MQTT Messages
const TIME_BETWEEN_MQTT_MSGS: u64 = 1000;
#[tokio::main]
async fn main() {
// read commandline options
let options: MqttOptions = rust_mqtt_pub::get_options();
// Create Client
let client: Client = rust_mqtt_pub::get_client(&options);
task::spawn(async || {
// Signal handler that stops everything on SIGINT (ctrl + c)
});
// Main loop, where MQTT Messages are generated and published
while client.is_connected() {
task::spawn(async || {
// Publish MQTT Messages
});
sleep(Duration::from_millis(TIME_BETWEEN_MQTT_MSGS)).await;
}
}
Durch die Asynchronität lässt sich zu großer Datenstau vermeiden, da das Lesen von Daten des DHT11 somit nicht blockiert.
Die beiden Closures, welche asynchron ablaufen sollten, wurden hier als eigene Funktionen definiert, um den MQTT Client clonen zu können:
// --snip -- //
async fn on_sigint(client: Client) {
loop {
// await ctrl c
ctrl_c().await.unwrap();
// Disconnect from the broker.
println!("Got SIGINT, disconnecting from the Broker ...");
match client.clone().disconnect(None) {
Ok(_) => {
println!("Disconnected from the MQTT Broker.")
},
Err(err) => {
println!("Disconnect from the broker failed: {err:?}");
},
}
}
}
async fn get_and_publish(client: Client, options: MqttOptions) {
// Generate Payload
let payload: Message = rust_mqtt_pub::get_humidity_payload(&options);
// Publish payload
let _ = client.publish(payload);
}
Zuletzt müssen nur noch die beiden Funktionen in den Threads verwendet werden:
use paho_mqtt::{Client, Message};
use rust_mqtt_pub::MqttOptions;
use tokio::{
time::{
sleep,
Duration
},
signal::ctrl_c,
task
};
use colored::Colorize;
const TIME_BETWEEN_MQTT_MSGS: u64 = 1000;
#[tokio::main]
async fn main() {
let options: MqttOptions = rust_mqtt_pub::get_options();
let client: Client = rust_mqtt_pub::get_client(&options);
task::spawn(on_sigint(client.clone()));
while client.is_connected() {
task::spawn(get_and_publish(client.clone(), options.clone()));
sleep(Duration::from_millis(TIME_BETWEEN_MQTT_MSGS)).await;
}
}
async fn on_sigint(client: Client) {
loop {
ctrl_c().await.unwrap();
// Disconnect from the broker.
println!("Got {}, disconnecting from the Broker ...", "SIGINT".bold());
match client.clone().disconnect(None) {
Ok(_) => {
println!("{} from the MQTT Broker.", "Disconnected".green().bold())
},
Err(err) => {
println!("{}: {:?}", "Disconnect from the broker failed".red(), err);
},
}
}
}
async fn get_and_publish(client: Client, options: MqttOptions) {
let payload: Message = rust_mqtt_pub::get_humidity_payload(&options);
let _ = client.publish(payload);
}
Und das wars! Damit ist der Rust MQTT Publisher vollständig.
Die sourcen befinden sich im Hochschule Gitlab unter https://gitlab.informatik.hs-augsburg.de/dva/berichte-2023/56/-/tree/main/projects/mqtt/rust_mqtt_pub