diff --git a/sdkconfig.defaults b/sdkconfig.defaults index 1016968..a0f1158 100644 --- a/sdkconfig.defaults +++ b/sdkconfig.defaults @@ -1,5 +1,5 @@ # Rust often needs a bit of an extra main task stack size compared to C (the default is 3K) -CONFIG_ESP_MAIN_TASK_STACK_SIZE=17000 +CONFIG_ESP_MAIN_TASK_STACK_SIZE=64000 # Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default). # This allows to use 1 ms granuality for thread sleeps (10 ms by default). diff --git a/src/accel.rs b/src/accel.rs new file mode 100644 index 0000000..772ed41 --- /dev/null +++ b/src/accel.rs @@ -0,0 +1,17 @@ +use anyhow; + +use std::sync::mpsc::SyncSender; +use std::thread; +use std::time::Duration; + +use crate::modem::Msg; + +pub fn main(sender: SyncSender) -> Result<(), anyhow::Error> { + println!("entering ACCELERATOR sender loop ..."); + for i in 0..20 { + println!("sending ACCELERATOR message ({}) of 20 ...", i); + let _ = sender.send(Msg::Location("{\"velocity\": 21.43, \"altitude\": 367}".to_string()))?; + thread::sleep(Duration::from_millis(2000)); + } + Ok(()) +} diff --git a/src/gps.rs b/src/gps.rs new file mode 100644 index 0000000..9a2460f --- /dev/null +++ b/src/gps.rs @@ -0,0 +1,17 @@ +use anyhow; + +use std::sync::mpsc::SyncSender; +use std::thread; +use std::time::Duration; + +use crate::modem::Msg; + +pub fn main(sender: SyncSender) -> Result<(), anyhow::Error> { + println!("entering GPS sender loop ..."); + for i in 0..20 { + println!("sending GPS message ({}) of 20 ...", i); + let _ = sender.send(Msg::Location("{\"lat\": 20.4322, \"long\": 44.5432}".to_string()))?; + thread::sleep(Duration::from_millis(2000)); + } + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 8e70b67..08d6e94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,20 @@ +mod accel; mod config; +mod gps; mod modem; #[allow(dead_code)] mod command; use anyhow; +use std::thread::{self, JoinHandle}; +use std::cell::RefCell; use std::time::Duration; -use std::thread; -use esp_idf_hal::prelude::*; -use esp_idf_hal::peripherals::Peripherals; -use esp_idf_hal::serial; -use mqtt::control::ConnectReturnCode; -use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacket, QoSWithPacketIdentifier}; -use mqtt::{Decodable, Encodable, TopicName}; +use esp_idf_hal::peripherals::Peripherals; + +thread_local! { + static TLS: RefCell = RefCell::new(13); +} fn main() -> anyhow::Result<()> { @@ -20,116 +22,43 @@ fn main() -> anyhow::Result<()> { let dp = Peripherals::take().expect("error taking peripherals"); + println!("Rust main thread: {:?}", thread::current()); + // LilyGo TTGO T-Call sim800l board serial pins. - let serial_rx = dp.pins.gpio26; - let serial_tx = dp.pins.gpio27; - - let serial_pins = serial::Pins { - tx: serial_tx, - rx: serial_rx, - cts: None, - rts: None, - }; - - // Create the serial and panic with a message ... if we can't create the serial port, then we - // can't communicate with the sim800l module, hence we don't run anymore. - let serial: serial::Serial = serial::Serial::new( - dp.uart1, - serial_pins, - serial::config::Config::default().baudrate(Hertz(115200)), - )?; - - let (tx, rx) = serial.split(); - let mut mdm = modem::Modem::new(tx, rx); - + let modem_rx = dp.pins.gpio26; + let modem_tx = dp.pins.gpio27; + // LilyGo TTGO T-Call sim800l board power / reset pins. let modem_pwrkey = dp.pins.gpio4.into_output()?; let modem_rst = dp.pins.gpio5.into_output()?; let modem_power = dp.pins.gpio23.into_output()?; + // UART interface for the GSM modem + let modem_uart = dp.uart1; - mdm.init(modem_pwrkey, modem_rst, modem_power)?; + let mut threads: Vec>> = vec![]; - if !mdm.is_gprs_attached()? { - let _ = mdm.gprs_attach_ap( - config::A1_GPRS_AP.apn, - config::A1_GPRS_AP.username, - config::A1_GPRS_AP.password, - )?; - } + println!("Rust main thread: {:?}", thread::current()); - thread::sleep(Duration::from_millis(2000)); - //println!("setting up client TLS cert"); - //let client_cert = include_bytes!("../certs/full-bin.p12"); - //let client_cert_path = "C:\\USER\\fullchain.pem"; + TLS.with(|tls| { + println!("Main TLS before change: {}", *tls.borrow()); + }); - //let _ = mdm.upload_cert(client_cert_path, client_cert)?; - //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; - //let _ = mdm.fs_list("C:\\USER\\")?; + TLS.with(|tls| *tls.borrow_mut() = 42); - loop { - if mdm.is_gprs_attached()? { - let _ = mdm.gprs_connect()?; - for _ in 0..3 { - let ip_addr = mdm.gprs_status()?; - if ip_addr.contains("0.0.0.0") { - thread::sleep(Duration::from_millis(2000)); - } else { - break - } - } + TLS.with(|tls| { + println!("Main TLS after change: {}", *tls.borrow()); + }); - //println!("connecting to server!"); - //if !mdm.tcp_is_ssl_enabled()? { - // let _ = mdm.tcp_ssl_enable()?; - //} - if mdm.tcp_is_ssl_enabled()? { - let _ = mdm.tcp_ssl_disable()?; - } - - let client_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; - let message = "{\"lat\": 20.475370, \"long\": 44.747224}"; + let (gps_sender, receiver) = std::sync::mpsc::sync_channel::(1); - let _ = mdm.tcp_set_quick_mode(false); - let _ = mdm.tcp_set_manual_receive(true); + let accel_sender = gps_sender.clone(); - let _ = mdm.tcp_connect("51.158.66.64", 1883)?; + threads.push(thread::spawn(move || gps::main(gps_sender))); + thread::sleep(Duration::from_millis(1000)); + threads.push(thread::spawn(move || accel::main(accel_sender))); + thread::sleep(Duration::from_millis(1000)); - let mut buf = Vec::new(); - - let mut conn = ConnectPacket::new(client_id); - conn.set_clean_session(true); - conn.set_keep_alive(0); - let _ = conn.encode(&mut buf)?; - let _ = mdm.tcp_send(&mut buf)?; - buf.clear(); - - thread::sleep(Duration::from_millis(1000)); - - let size = mdm.tcp_receive_reply_len()?; - let mut reply = vec![0 as u8; size]; - let _ = mdm.tcp_receive(&mut reply); - println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::()); - - println!("sending publish packet"); - let packet = PublishPacket::new( - TopicName::new("bajsevi/test").unwrap(), - QoSWithPacketIdentifier::Level0, - message.as_bytes(), - ); - let _ = packet.encode(&mut buf)?; - let _ = mdm.tcp_send(&mut buf)?; - buf.clear(); - - let size = mdm.tcp_receive_reply_len()?; - let mut reply = vec![0 as u8; size]; - let _ = mdm.tcp_receive(&mut reply); - println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::()); - - let _ = mdm.tcp_close_connection()?; - - break - } - } + let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; Ok(()) } diff --git a/src/modem.rs b/src/modem.rs index 9a12acb..b9a59d8 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -1,12 +1,19 @@ use crate::command::Command; +use anyhow; use std::thread; use std::error::Error; use std::time::{Duration, Instant}; +use std::sync::mpsc::Receiver; + +use esp_idf_hal::prelude::*; +use esp_idf_hal::serial::{self, Rx, Tx}; use embedded_hal::serial::{Read, Write}; use embedded_hal::digital::v2::OutputPin; -use esp_idf_hal::serial::{self, Rx, Tx}; + +use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier}; +use mqtt::{Encodable, TopicName}; const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; @@ -395,4 +402,150 @@ impl Modem { let _ = self.send_command(Command::ssl_set_root_cert(path, filesize))?; Ok(()) } + + fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> { + println!("entered receiving modem reply ..."); + let size = self.tcp_receive_reply_len()?; + println!("receiving reply len({}) ...", size); + let mut reply = vec![0 as u8; size]; + println!("receiving tcp reply ..."); + let _ = self.tcp_receive(&mut reply); + println!("received tcp reply ..."); + Ok(()) + } + + fn mqtt_connect(&mut self, device_id: &str) -> std::result::Result<(), anyhow::Error> { + let mut buf = Vec::new(); + let mut conn = ConnectPacket::new(device_id); + conn.set_clean_session(true); + conn.set_keep_alive(0); + let _ = conn.encode(&mut buf)?; + let _ = self.tcp_send(&mut buf)?; + + thread::sleep(Duration::from_millis(2000)); + drop(buf); + + let _ = self.mqtt_receive_reply()?; + Ok(()) + } + + fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> std::result::Result<(), anyhow::Error> { + println!("entered mqtt publish ..."); + let mut buf = Vec::new(); + let packet = PublishPacket::new( + TopicName::new(format!("bajsevi/location")).unwrap(), + QoSWithPacketIdentifier::Level0, + message.as_bytes(), + ); + println!("created mqtt publish packet ..."); + let _ = packet.encode(&mut buf)?; + println!("modem tcp send publish pakage ..."); + let _ = self.tcp_send(&mut buf)?; + + thread::sleep(Duration::from_millis(2000)); + drop(buf); + + println!("receiving modem publish reply ..."); + let _ = self.mqtt_receive_reply()?; + Ok(()) + } +} + +pub enum Msg { + Location(String), +// Movement(String), +} + +pub fn main( + rx: esp_idf_hal::gpio::Gpio26, + tx: esp_idf_hal::gpio::Gpio27, + uart: serial::UART1, + pwrkey: esp_idf_hal::gpio::Gpio4, + rst: esp_idf_hal::gpio::Gpio5, + power: esp_idf_hal::gpio::Gpio23, + receiver: Receiver, +) -> std::result::Result<(), anyhow::Error> { + let serial_pins = serial::Pins { + tx, + rx, + cts: None, + rts: None, + }; + + // Create the serial and panic with a message ... if we can't create the serial port, then we + // can't communicate with the sim800l module, hence we don't run anymore. + let serial: serial::Serial = serial::Serial::new( + uart, + serial_pins, + serial::config::Config::default().baudrate(Hertz(115200)), + )?; + + let (tx, rx) = serial.split(); + let mut mdm = Modem::new(tx, rx); + + mdm.init(pwrkey, rst, power)?; + + if !mdm.is_gprs_attached()? { + let _ = mdm.gprs_attach_ap( + crate::config::A1_GPRS_AP.apn, + crate::config::A1_GPRS_AP.username, + crate::config::A1_GPRS_AP.password, + )?; + } + + thread::sleep(Duration::from_millis(1000)); + + //println!("setting up client TLS cert"); + //let client_cert = include_bytes!("../certs/full-bin.p12"); + //let client_cert_path = "C:\\USER\\fullchain.pem"; + + //let _ = mdm.upload_cert(client_cert_path, client_cert)?; + //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; + //let _ = mdm.fs_list("C:\\USER\\")?; + + let mut retries = 0; + let is_connected: bool = loop { + if mdm.is_gprs_attached()? { + let _ = mdm.gprs_connect()?; + thread::sleep(Duration::from_millis(1000)); + let ip_addr = mdm.gprs_status()?; + if ip_addr.contains("0.0.0.0") && retries < 5 { + thread::sleep(Duration::from_millis(1000)); + retries += 1; + continue + } else if retries < 5 { + break true + } else { + break false + } + } + }; + + if is_connected { + //println!("connecting to server!"); + //if !mdm.tcp_is_ssl_enabled()? { + // let _ = mdm.tcp_ssl_enable()?; + //} + //if mdm.tcp_is_ssl_enabled()? { + // let _ = mdm.tcp_ssl_disable()?; + //} + let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; + + let _ = mdm.tcp_set_quick_mode(false); + let _ = mdm.tcp_set_manual_receive(true); + let _ = mdm.tcp_connect("51.158.66.64", 1883)?; + + let _ = mdm.mqtt_connect(device_id)?; + + println!("entering queue receive loop ..."); + while let Ok(Msg::Location(msg)) = receiver.recv() { + println!("received message {} | sending to mqtt ...", msg); + let _ = mdm.mqtt_publish(device_id, &msg)?; + } + + + let _ = mdm.tcp_close_connection()?; + } + + Ok(()) }