From b6e7e64e729d123c7c683ca24d2b58433be992a0 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sat, 26 Nov 2022 18:59:33 +0100 Subject: [PATCH] refactor and make mqtt work --- src/main.rs | 9 ++-- src/modem.rs | 149 ++++++++++++++++++++++++++------------------------- 2 files changed, 80 insertions(+), 78 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0505ed7..72c2d8f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ fn main() -> anyhow::Result<()> { // UART interface for the GSM modem let modem_uart = dp.uart1; - // let mut threads: Vec>> = vec![]; + let mut threads: Vec>> = vec![]; // Rx/Tx pins for the GPS modem let gps_rx = dp.pins.gpio13; @@ -43,16 +43,15 @@ fn main() -> anyhow::Result<()> { let (gps_sender, receiver) = std::sync::mpsc::sync_channel::(1); - let accel_sender = gps_sender.clone(); - let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; + // let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; // threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender))); //thread::sleep(Duration::from_millis(1000)); - //threads.push(thread::spawn(move || accel::main(accel_sender))); + threads.push(thread::spawn(move || accel::main(accel_sender))); //thread::sleep(Duration::from_millis(1000)); - //let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; + let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; Ok(()) } diff --git a/src/modem.rs b/src/modem.rs index 35f567f..09141bf 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -31,7 +31,7 @@ pub struct Modem { pub enum ModemError { CommandError(String), SetupError(String), - SendDataError, + SendDataError(String), ReadError, TimeoutError, } @@ -113,26 +113,26 @@ impl Modem { println!("Sending {} ...", cmd.text); let _ = self.serial .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) - .map_err(|_| ModemError::SendDataError)?; + .map_err(|_| ModemError::SendDataError(format!("Error in send_command({:?})", cmd)))?; self.read_response(cmd.contains, cmd.timeout) } - fn tcp_send_data(&mut self, buf: &[u8]) -> Result { + fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result { let _ = self.serial .write("AT+CIPSEND\r".as_bytes()) - .map_err(|_| ModemError::SendDataError)?; + .map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?; let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) .map(char::from) .take_while(|c| *c != '>').collect(); if send_prompt != "\r\n" { - println!("{:?}", send_prompt.as_bytes()); - return Err(ModemError::SendDataError); + let msg = format!("Prompt error, expected \\r\\n, got {:?}", send_prompt.as_bytes()); + return Err(ModemError::SendDataError(msg)); } self.serial - .send_bytes(buf, Some(26)) // 26_u8 = Ctrl+z - to end sending data - .map_err(|_| ModemError::SendDataError)?; + .send_bytes(buf, Some(26_u8)) // 26_u8 = Ctrl+z - to end sending data + .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000)) } @@ -164,6 +164,26 @@ impl Modem { Ok(res.contains("+CGATT: 1")) } + fn try_connect_gprs(&mut self) -> Result<()> { + let mut retries = 0; + loop { + if self.is_gprs_attached()? { + let _ = self.gprs_connect()?; + thread::sleep(Duration::from_millis(1000)); + let ip_addr = self.gprs_status()?; + if ip_addr.contains("0.0.0.0") && retries < 5 { + thread::sleep(Duration::from_millis(1000)); + retries += 1; + continue + } else if retries < 5 { + break Ok(()) + } else { + break Err(ModemError::SetupError(format!("Cannot connect to GPRS after {} retries ... bailing!", retries))); + } + } + } + } + pub fn tcp_is_ssl_enabled(&mut self) -> Result { let res = self.send_command(Command::tcp_ssl_check())?; Ok(res.contains("+CIPSSL: (1)")) @@ -194,8 +214,8 @@ impl Modem { .map(|_| ()) } - pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> { - self.tcp_send_data(buf) + pub fn tcp_manual_send(&mut self, buf: &[u8]) -> Result<()> { + self.tcp_manual_send_data(buf) .map(|_| ()) } @@ -303,18 +323,18 @@ impl Modem { let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec); let _ = self.serial .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) - .map_err(|_| ModemError::SendDataError)?; + .map_err(|err| ModemError::SendDataError(format!("File write error ({:?})", err)))?; let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) .map(char::from) .take_while(|c| *c != '>').collect(); if send_prompt == "" { - return Err(ModemError::SendDataError); + return Err(ModemError::SendDataError("Prompt empty, expected: \\r\\n".to_string())); } self.serial .send_bytes(buf, None) - .map_err(|_| ModemError::SendDataError)?; + .map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?; let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000)); Ok(()) @@ -347,7 +367,7 @@ impl Modem { } fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> { - println!("entered receiving modem reply ..."); + println!("receiving mqtt reply from modem ..."); let size = self.tcp_receive_reply_len()?; println!("receiving reply len({}) ...", size); let mut reply = vec![0 as u8; size]; @@ -361,9 +381,9 @@ impl Modem { let mut buf = Vec::new(); let mut conn = ConnectPacket::new(device_id); conn.set_clean_session(true); - conn.set_keep_alive(0); + conn.set_keep_alive(100); let _ = conn.encode(&mut buf)?; - let _ = self.tcp_send(&mut buf)?; + let _ = self.tcp_manual_send(&mut buf)?; thread::sleep(Duration::from_millis(2000)); drop(buf); @@ -383,7 +403,7 @@ impl Modem { println!("created mqtt publish packet ..."); let _ = packet.encode(&mut buf)?; println!("modem tcp send publish pakage ..."); - let _ = self.tcp_send(&mut buf)?; + let _ = self.tcp_manual_send(&mut buf)?; thread::sleep(Duration::from_millis(2000)); drop(buf); @@ -402,7 +422,7 @@ impl std::io::Read for Modem { impl std::io::Write for Modem { fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tcp_send(buf) + self.tcp_manual_send(buf) .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; Ok(buf.len()) } @@ -439,15 +459,7 @@ pub fn main( mdm.init(pwrkey, rst, power)?; - if !mdm.is_gprs_attached()? { - let _ = mdm.gprs_attach_ap( - crate::config::A1_GPRS_AP.apn, - crate::config::A1_GPRS_AP.username, - crate::config::A1_GPRS_AP.password, - )?; - } - - thread::sleep(Duration::from_millis(1000)); + // thread::sleep(Duration::from_millis(500)); //println!("setting up client TLS cert"); //let client_cert = include_bytes!("../certs/full-bin.p12"); @@ -457,59 +469,50 @@ pub fn main( //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; //let _ = mdm.fs_list("C:\\USER\\")?; - let mut retries = 0; - let is_connected: bool = loop { - if mdm.is_gprs_attached()? { - let _ = mdm.gprs_connect()?; - thread::sleep(Duration::from_millis(1000)); - let ip_addr = mdm.gprs_status()?; - if ip_addr.contains("0.0.0.0") && retries < 5 { - thread::sleep(Duration::from_millis(1000)); - retries += 1; - continue - } else if retries < 5 { - break true - } else { - break false - } + loop { + if !mdm.is_gprs_attached()? { + let _ = mdm.gprs_attach_ap( + crate::config::A1_GPRS_AP.apn, + crate::config::A1_GPRS_AP.username, + crate::config::A1_GPRS_AP.password, + )?; } - }; + if let Ok(()) = mdm.try_connect_gprs() { + let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; - if is_connected { - let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; + let _ = mdm.tcp_set_quick_mode(false); + let _ = mdm.tcp_set_manual_receive(true); + let _ = mdm.tcp_connect("51.158.66.64", 7887)?; - let _ = mdm.tcp_set_quick_mode(false); - let _ = mdm.tcp_set_manual_receive(true); - let _ = mdm.tcp_connect("51.158.66.64", 1883)?; + let _ = mdm.mqtt_connect(device_id)?; - let _ = mdm.mqtt_connect(device_id)?; - - println!("entering queue receive loop ..."); - let mut err_count = 0; - loop { - match receiver.recv() { - Ok(Msg::Gps(solution)) => { - println!("received GPS solution {:?} | sending to mqtt ...", solution); - let _ = mdm.mqtt_publish(device_id, &format!("{:?}", solution))?; - }, - Ok(Msg::Accelerometer(acc)) => { - println!("received message {} | sending to mqtt ...", acc); - let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?; - } - Err(e) => { - println!("received error {} | NOT sending to mqtt ...", e); - if err_count < 10 { - err_count += 1; + println!("entering queue receive loop ..."); + let mut err_count = 0; + let _ = loop { + match receiver.recv() { + Ok(Msg::Gps(solution)) => { + println!("received GPS solution {:?} | sending to mqtt ...", solution); + let _ = mdm.mqtt_publish(device_id, &format!("{:?}", solution))?; + err_count = 0; + }, + Ok(Msg::Accelerometer(acc)) => { + println!("received accel {} | sending to mqtt ...", acc); + let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?; + err_count = 0; } - else { - break + Err(e) => { + if err_count < 10 { + err_count += 1; + println!("received error {} | NOT sending to mqtt ...", e); + } + else { + break + } } } - } - }; + }; - let _ = mdm.tcp_close_connection()?; + let _ = mdm.tcp_close_connection()?; + } } - - Ok(()) }