working threads with mpsc::channel

This commit is contained in:
Vladan Popovic 2022-07-06 20:33:43 +02:00
parent d023f5db76
commit 7bfd37b799
5 changed files with 221 additions and 105 deletions

View file

@ -1,5 +1,5 @@
# Rust often needs a bit of an extra main task stack size compared to C (the default is 3K) # Rust often needs a bit of an extra main task stack size compared to C (the default is 3K)
CONFIG_ESP_MAIN_TASK_STACK_SIZE=17000 CONFIG_ESP_MAIN_TASK_STACK_SIZE=64000
# Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default). # Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default).
# This allows to use 1 ms granuality for thread sleeps (10 ms by default). # This allows to use 1 ms granuality for thread sleeps (10 ms by default).

17
src/accel.rs Normal file
View file

@ -0,0 +1,17 @@
use anyhow;
use std::sync::mpsc::SyncSender;
use std::thread;
use std::time::Duration;
use crate::modem::Msg;
pub fn main(sender: SyncSender<Msg>) -> Result<(), anyhow::Error> {
println!("entering ACCELERATOR sender loop ...");
for i in 0..20 {
println!("sending ACCELERATOR message ({}) of 20 ...", i);
let _ = sender.send(Msg::Location("{\"velocity\": 21.43, \"altitude\": 367}".to_string()))?;
thread::sleep(Duration::from_millis(2000));
}
Ok(())
}

17
src/gps.rs Normal file
View file

@ -0,0 +1,17 @@
use anyhow;
use std::sync::mpsc::SyncSender;
use std::thread;
use std::time::Duration;
use crate::modem::Msg;
pub fn main(sender: SyncSender<Msg>) -> Result<(), anyhow::Error> {
println!("entering GPS sender loop ...");
for i in 0..20 {
println!("sending GPS message ({}) of 20 ...", i);
let _ = sender.send(Msg::Location("{\"lat\": 20.4322, \"long\": 44.5432}".to_string()))?;
thread::sleep(Duration::from_millis(2000));
}
Ok(())
}

View file

@ -1,18 +1,20 @@
mod accel;
mod config; mod config;
mod gps;
mod modem; mod modem;
#[allow(dead_code)] #[allow(dead_code)]
mod command; mod command;
use anyhow; use anyhow;
use std::thread::{self, JoinHandle};
use std::cell::RefCell;
use std::time::Duration; use std::time::Duration;
use std::thread;
use esp_idf_hal::prelude::*;
use esp_idf_hal::peripherals::Peripherals;
use esp_idf_hal::serial;
use mqtt::control::ConnectReturnCode; use esp_idf_hal::peripherals::Peripherals;
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacket, QoSWithPacketIdentifier};
use mqtt::{Decodable, Encodable, TopicName}; thread_local! {
static TLS: RefCell<u32> = RefCell::new(13);
}
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
@ -20,116 +22,43 @@ fn main() -> anyhow::Result<()> {
let dp = Peripherals::take().expect("error taking peripherals"); let dp = Peripherals::take().expect("error taking peripherals");
println!("Rust main thread: {:?}", thread::current());
// LilyGo TTGO T-Call sim800l board serial pins. // LilyGo TTGO T-Call sim800l board serial pins.
let serial_rx = dp.pins.gpio26; let modem_rx = dp.pins.gpio26;
let serial_tx = dp.pins.gpio27; let modem_tx = dp.pins.gpio27;
// LilyGo TTGO T-Call sim800l board power / reset pins.
let serial_pins = serial::Pins {
tx: serial_tx,
rx: serial_rx,
cts: None,
rts: None,
};
// Create the serial and panic with a message ... if we can't create the serial port, then we
// can't communicate with the sim800l module, hence we don't run anymore.
let serial: serial::Serial<serial::UART1, _, _> = serial::Serial::new(
dp.uart1,
serial_pins,
serial::config::Config::default().baudrate(Hertz(115200)),
)?;
let (tx, rx) = serial.split();
let mut mdm = modem::Modem::new(tx, rx);
let modem_pwrkey = dp.pins.gpio4.into_output()?; let modem_pwrkey = dp.pins.gpio4.into_output()?;
let modem_rst = dp.pins.gpio5.into_output()?; let modem_rst = dp.pins.gpio5.into_output()?;
let modem_power = dp.pins.gpio23.into_output()?; let modem_power = dp.pins.gpio23.into_output()?;
// UART interface for the GSM modem
let modem_uart = dp.uart1;
mdm.init(modem_pwrkey, modem_rst, modem_power)?; let mut threads: Vec<JoinHandle<anyhow::Result<_>>> = vec![];
if !mdm.is_gprs_attached()? { println!("Rust main thread: {:?}", thread::current());
let _ = mdm.gprs_attach_ap(
config::A1_GPRS_AP.apn,
config::A1_GPRS_AP.username,
config::A1_GPRS_AP.password,
)?;
}
thread::sleep(Duration::from_millis(2000));
//println!("setting up client TLS cert"); TLS.with(|tls| {
//let client_cert = include_bytes!("../certs/full-bin.p12"); println!("Main TLS before change: {}", *tls.borrow());
//let client_cert_path = "C:\\USER\\fullchain.pem"; });
//let _ = mdm.upload_cert(client_cert_path, client_cert)?; TLS.with(|tls| *tls.borrow_mut() = 42);
//let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?;
//let _ = mdm.fs_list("C:\\USER\\")?;
loop { TLS.with(|tls| {
if mdm.is_gprs_attached()? { println!("Main TLS after change: {}", *tls.borrow());
let _ = mdm.gprs_connect()?; });
for _ in 0..3 {
let ip_addr = mdm.gprs_status()?;
if ip_addr.contains("0.0.0.0") {
thread::sleep(Duration::from_millis(2000));
} else {
break
}
}
//println!("connecting to server!"); let (gps_sender, receiver) = std::sync::mpsc::sync_channel::<modem::Msg>(1);
//if !mdm.tcp_is_ssl_enabled()? {
// let _ = mdm.tcp_ssl_enable()?;
//}
if mdm.tcp_is_ssl_enabled()? {
let _ = mdm.tcp_ssl_disable()?;
}
let client_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; let accel_sender = gps_sender.clone();
let message = "{\"lat\": 20.475370, \"long\": 44.747224}";
let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive(true);
let _ = mdm.tcp_connect("51.158.66.64", 1883)?;
let mut buf = Vec::new();
let mut conn = ConnectPacket::new(client_id);
conn.set_clean_session(true);
conn.set_keep_alive(0);
let _ = conn.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
buf.clear();
threads.push(thread::spawn(move || gps::main(gps_sender)));
thread::sleep(Duration::from_millis(1000));
threads.push(thread::spawn(move || accel::main(accel_sender)));
thread::sleep(Duration::from_millis(1000)); thread::sleep(Duration::from_millis(1000));
let size = mdm.tcp_receive_reply_len()?; let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?;
let mut reply = vec![0 as u8; size];
let _ = mdm.tcp_receive(&mut reply);
println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::<String>());
println!("sending publish packet");
let packet = PublishPacket::new(
TopicName::new("bajsevi/test").unwrap(),
QoSWithPacketIdentifier::Level0,
message.as_bytes(),
);
let _ = packet.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
buf.clear();
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let _ = mdm.tcp_receive(&mut reply);
println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::<String>());
let _ = mdm.tcp_close_connection()?;
break
}
}
Ok(()) Ok(())
} }

View file

@ -1,12 +1,19 @@
use crate::command::Command; use crate::command::Command;
use anyhow;
use std::thread; use std::thread;
use std::error::Error; use std::error::Error;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::sync::mpsc::Receiver;
use esp_idf_hal::prelude::*;
use esp_idf_hal::serial::{self, Rx, Tx};
use embedded_hal::serial::{Read, Write}; use embedded_hal::serial::{Read, Write};
use embedded_hal::digital::v2::OutputPin; use embedded_hal::digital::v2::OutputPin;
use esp_idf_hal::serial::{self, Rx, Tx};
use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier};
use mqtt::{Encodable, TopicName};
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
@ -395,4 +402,150 @@ impl<UART: serial::Uart> Modem<UART> {
let _ = self.send_command(Command::ssl_set_root_cert(path, filesize))?; let _ = self.send_command(Command::ssl_set_root_cert(path, filesize))?;
Ok(()) Ok(())
} }
fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> {
println!("entered receiving modem reply ...");
let size = self.tcp_receive_reply_len()?;
println!("receiving reply len({}) ...", size);
let mut reply = vec![0 as u8; size];
println!("receiving tcp reply ...");
let _ = self.tcp_receive(&mut reply);
println!("received tcp reply ...");
Ok(())
}
fn mqtt_connect(&mut self, device_id: &str) -> std::result::Result<(), anyhow::Error> {
let mut buf = Vec::new();
let mut conn = ConnectPacket::new(device_id);
conn.set_clean_session(true);
conn.set_keep_alive(0);
let _ = conn.encode(&mut buf)?;
let _ = self.tcp_send(&mut buf)?;
thread::sleep(Duration::from_millis(2000));
drop(buf);
let _ = self.mqtt_receive_reply()?;
Ok(())
}
fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> std::result::Result<(), anyhow::Error> {
println!("entered mqtt publish ...");
let mut buf = Vec::new();
let packet = PublishPacket::new(
TopicName::new(format!("bajsevi/location")).unwrap(),
QoSWithPacketIdentifier::Level0,
message.as_bytes(),
);
println!("created mqtt publish packet ...");
let _ = packet.encode(&mut buf)?;
println!("modem tcp send publish pakage ...");
let _ = self.tcp_send(&mut buf)?;
thread::sleep(Duration::from_millis(2000));
drop(buf);
println!("receiving modem publish reply ...");
let _ = self.mqtt_receive_reply()?;
Ok(())
}
}
pub enum Msg {
Location(String),
// Movement(String),
}
pub fn main<T: Sync + Send>(
rx: esp_idf_hal::gpio::Gpio26<T>,
tx: esp_idf_hal::gpio::Gpio27<T>,
uart: serial::UART1,
pwrkey: esp_idf_hal::gpio::Gpio4<esp_idf_hal::gpio::Output>,
rst: esp_idf_hal::gpio::Gpio5<esp_idf_hal::gpio::Output>,
power: esp_idf_hal::gpio::Gpio23<esp_idf_hal::gpio::Output>,
receiver: Receiver<Msg>,
) -> std::result::Result<(), anyhow::Error> {
let serial_pins = serial::Pins {
tx,
rx,
cts: None,
rts: None,
};
// Create the serial and panic with a message ... if we can't create the serial port, then we
// can't communicate with the sim800l module, hence we don't run anymore.
let serial: serial::Serial<serial::UART1, _, _> = serial::Serial::new(
uart,
serial_pins,
serial::config::Config::default().baudrate(Hertz(115200)),
)?;
let (tx, rx) = serial.split();
let mut mdm = Modem::new(tx, rx);
mdm.init(pwrkey, rst, power)?;
if !mdm.is_gprs_attached()? {
let _ = mdm.gprs_attach_ap(
crate::config::A1_GPRS_AP.apn,
crate::config::A1_GPRS_AP.username,
crate::config::A1_GPRS_AP.password,
)?;
}
thread::sleep(Duration::from_millis(1000));
//println!("setting up client TLS cert");
//let client_cert = include_bytes!("../certs/full-bin.p12");
//let client_cert_path = "C:\\USER\\fullchain.pem";
//let _ = mdm.upload_cert(client_cert_path, client_cert)?;
//let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?;
//let _ = mdm.fs_list("C:\\USER\\")?;
let mut retries = 0;
let is_connected: bool = loop {
if mdm.is_gprs_attached()? {
let _ = mdm.gprs_connect()?;
thread::sleep(Duration::from_millis(1000));
let ip_addr = mdm.gprs_status()?;
if ip_addr.contains("0.0.0.0") && retries < 5 {
thread::sleep(Duration::from_millis(1000));
retries += 1;
continue
} else if retries < 5 {
break true
} else {
break false
}
}
};
if is_connected {
//println!("connecting to server!");
//if !mdm.tcp_is_ssl_enabled()? {
// let _ = mdm.tcp_ssl_enable()?;
//}
//if mdm.tcp_is_ssl_enabled()? {
// let _ = mdm.tcp_ssl_disable()?;
//}
let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267";
let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive(true);
let _ = mdm.tcp_connect("51.158.66.64", 1883)?;
let _ = mdm.mqtt_connect(device_id)?;
println!("entering queue receive loop ...");
while let Ok(Msg::Location(msg)) = receiver.recv() {
println!("received message {} | sending to mqtt ...", msg);
let _ = mdm.mqtt_publish(device_id, &msg)?;
}
let _ = mdm.tcp_close_connection()?;
}
Ok(())
} }