From 98d0a66cf3aac669e393808eec48b14e318da6b9 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Tue, 29 Nov 2022 21:34:38 +0100 Subject: [PATCH] debug and fix a bunch of modem read/write errors --- src/accel.rs | 11 ++-- src/command.rs | 2 +- src/main.rs | 18 +++--- src/modem.rs | 162 ++++++++++++++++++++++++++++++------------------- src/serial.rs | 12 ++-- 5 files changed, 121 insertions(+), 84 deletions(-) diff --git a/src/accel.rs b/src/accel.rs index 50c0e56..1936c7a 100644 --- a/src/accel.rs +++ b/src/accel.rs @@ -6,12 +6,13 @@ use std::time::Duration; use crate::types::*; -pub fn main(sender: SyncSender) -> Result<(), anyhow::Error> { +pub fn main(sender: SyncSender) -> anyhow::Result<()> { + let mut c = 1_usize; println!("entering ACCELERATOR sender loop ..."); - for i in 0..20 { - println!("sending ACCELERATOR message ({}) of 20 ...", i); + loop { + println!("sending ACCELERATOR message No. {}", c); let _ = sender.send(Msg::Accelerometer("{\"velocity\": 21.43, \"altitude\": 367}".to_string()))?; - thread::sleep(Duration::from_millis(2000)); + thread::sleep(Duration::from_secs(5)); + c += 1; } - Ok(()) } diff --git a/src/command.rs b/src/command.rs index ce95655..00fb9ed 100644 --- a/src/command.rs +++ b/src/command.rs @@ -268,7 +268,7 @@ impl Command { Command { text: format!("AT+CIPSTART=\"TCP\",\"{}\",\"{}\"", addr, port), timeout: Duration::from_millis(5000), - contains: Some("CONNECT OK".to_string()), + contains: Some("OK".to_string()), } } diff --git a/src/main.rs b/src/main.rs index 72c2d8f..cd55843 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,11 +8,7 @@ mod serial; mod types; use anyhow; -use std::{ - thread::{self, JoinHandle}, - time::Duration, -}; - +use std::{thread::{self, JoinHandle}, time::Duration}; use esp_idf_hal::peripherals::Peripherals; use types::*; @@ -35,11 +31,11 @@ fn main() -> anyhow::Result<()> { let mut threads: Vec>> = vec![]; - // Rx/Tx pins for the GPS modem - let gps_rx = dp.pins.gpio13; - let gps_tx = dp.pins.gpio12; - // UART interface for the GPS modem - let gps_uart = dp.uart2; + // // Rx/Tx pins for the GPS modem + // let gps_rx = dp.pins.gpio13; + // let gps_tx = dp.pins.gpio12; + // // UART interface for the GPS modem + // let gps_uart = dp.uart2; let (gps_sender, receiver) = std::sync::mpsc::sync_channel::(1); @@ -49,7 +45,7 @@ fn main() -> anyhow::Result<()> { // threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender))); //thread::sleep(Duration::from_millis(1000)); threads.push(thread::spawn(move || accel::main(accel_sender))); - //thread::sleep(Duration::from_millis(1000)); + thread::sleep(Duration::from_millis(3000)); let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; diff --git a/src/modem.rs b/src/modem.rs index 11cca89..1ad179a 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -16,10 +16,8 @@ use esp_idf_hal::serial::{self, Rx, Tx}; use embedded_hal::digital::v2::OutputPin; -use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier}; -use mqtt::{Encodable, TopicName}; - -const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; +use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier, VariablePacket}; +use mqtt::{Encodable, Decodable, TopicName}; pub type Result = std::result::Result; @@ -32,7 +30,7 @@ pub enum ModemError { CommandError(String), SetupError(String), SendDataError(String), - ReadError, + ReadError(String), TimeoutError, } @@ -77,10 +75,14 @@ impl Modem { thread::sleep(Duration::from_millis(1000)); pwrkey.set_high().map_err(|_| ModemError::SetupError("Error setting PWRKEY to high.".to_string()))?; println!("Waiting for sim module to come online ..."); + thread::sleep(Duration::from_millis(3000)); loop { match self.send_command(Command::probe()) { Ok(_) => break, - _ => continue, + _ => { + thread::sleep(Duration::from_millis(2000)); + continue + }, } } Ok(()) @@ -90,16 +92,17 @@ impl Modem { /// None, then the first line is returned. If a timeout is reached. The timeout is provided on /// input via the `timeout` argument. The first argument `contains` is checked against every /// line in the response. - fn command_read_response(&mut self) -> Result { + fn command_read_response(&mut self, contains: Option) -> Result { let mut response = String::new(); loop { let mut buf = vec![0; 1024]; let num_bytes = self.serial .read(buf.as_mut_slice()) - .map_err(|_| ModemError::ReadError)?; + .map_err(|err| ModemError::ReadError(format!("Error in serial.read(buf) ({:?})", err)))?; - response.push_str(std::str::from_utf8(&buf[0..num_bytes]).map_err(|_| ModemError::ReadError)?); + response.push_str(std::str::from_utf8(&buf[0..num_bytes]) + .map_err(|err| ModemError::ReadError(format!("Error in str::from_utf8 ({:?})", err)))?); if num_bytes < buf.len() { break @@ -107,12 +110,20 @@ impl Modem { } print!("Read {} bytes from serial: {}", response.len(), response); - Ok(response) + if let Some(c) = contains { + if response.contains(&c) { + Ok(response) + } else { + Err(ModemError::CommandError(format!("Didn't get expected ({}) from modem.", c))) + } + } else { + Ok(response) + } } fn send_command(&mut self, cmd: Command) -> Result { println!("-----------------------------------------------------------"); - println!("Sending to TX ({}) ...", cmd.text); + println!("Sending {} ...", cmd.text); let _ = self.serial .write_bytes(cmd.text.as_bytes()) @@ -122,16 +133,23 @@ impl Modem { .write(&['\r' as u8]) .map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?; - self.command_read_response() + self.command_read_response(cmd.contains) } fn handle_prompt(&mut self) -> Result<()> { - let mut prompt_buf = vec![0; 3]; - let prompt_len = self.serial.read(&mut prompt_buf).map_err(|_| ModemError::ReadError)?; - let prompt = std::str::from_utf8(prompt_buf.as_slice()).unwrap_or(""); + let mut prompt_buf = vec![0; 256]; + let prompt_len = self.serial.read(&mut prompt_buf) + .map_err(|err| ModemError::ReadError(format!("Error in handle_prompt() ({:?})", err)))?; - if prompt_len != 3 && prompt != "\r\n>" { - let msg = format!("Prompt error, expected \\r\\n>, got {:?}", prompt); + let prompt = String::from_utf8(prompt_buf[0..prompt_len].to_vec()) + .unwrap_or("".to_string()) + .trim() + .to_string(); + + println!("Prompt is: ({})", prompt); + + if prompt != ">" { + let msg = format!("Prompt error, expected (>), got ({})", prompt); Err(ModemError::SendDataError(msg)) } else { Ok(()) @@ -139,19 +157,34 @@ impl Modem { } fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result { + println!("Sending AT+CIPSEND to serial TX!"); let _ = self.serial .write("AT+CIPSEND\r".as_bytes()) .map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?; let _ = self.handle_prompt()?; + println!("Handled prompt OK!!"); + println!("Writing bytes in serial TX! ({:?})", String::from_utf8(buf.to_vec())); self.serial .write_bytes(buf) .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; self.serial .write(&[26_u8]) // 26_u8 = Ctrl+z - to end sending data .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; - self.command_read_response() + println!("DONE Writing bytes in serial TX!"); + + thread::sleep(Duration::from_millis(500)); + + println!("Reading bytes in serial RX!"); + for _ in 0..3 { + let res = self.command_read_response(Some("SEND OK".into())); + if res.is_ok() { + return res; + } + thread::sleep(Duration::from_millis(1000)) + } + Err(ModemError::ReadError(format!("ReadError: cannot read serial RX!"))) } pub fn gprs_status(&mut self) -> Result { @@ -218,8 +251,15 @@ impl Modem { } pub fn tcp_connect(&mut self, addr: &str, port: u16) -> Result<()> { - self.send_command(Command::tcp_connect(addr, port)) - .map(|_| ()) + let _ = self.send_command(Command::tcp_connect(addr, port))?; + for _ in 0..3 { + if let Ok(reply) = self.command_read_response(Some("CONNECT_OK".to_string())) { + println!("TCP connect replied with {}", reply); + break + } + thread::sleep(Duration::from_millis(500)); + } + Ok(()) } pub fn tcp_set_quick_mode(&mut self, mode: bool) -> Result<()> { @@ -248,18 +288,21 @@ impl Modem { pub fn tcp_receive_reply_len(&mut self) -> Result { let reply = self.send_command(Command::tcp_receive_reply_len())?; - reply.lines() + println!("Receiving TCP reply length!"); + let res = reply.lines() .filter(|line| line.contains("+CIPRXGET: 4")) .next() - .ok_or(ModemError::CommandError("reply not found :/".to_string())) - .map(|line| self.tcp_parse_response_size(line)) - .unwrap_or(Err(ModemError::CommandError(format!("received 0 elements from parsing")))) + .ok_or(ModemError::CommandError("reply body missing :/".to_string())) + .and_then(|line| self.tcp_parse_response_size(line)) + .map_err(|_| ModemError::CommandError(format!("received 0 elements from parsing"))); + println!("Received ({:?})", res); + res } pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result { let mut size = 0; loop { - let reply = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) + let reply = self.send_command(Command::tcp_receive(buf.len())) .map(|reply| { // TODO: parse the response properly // 1. the first line is \r\n @@ -349,7 +392,7 @@ impl Modem { self.serial .write(buf) .map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?; - let _ = self.command_read_response(); + let _ = self.command_read_response(None); Ok(()) } @@ -380,33 +423,44 @@ impl Modem { .map(|_| ()) } - fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> { - println!("receiving mqtt reply from modem ..."); - let size = self.tcp_receive_reply_len()?; - println!("receiving reply len({}) ...", size); - let mut reply = vec![0 as u8; size]; - println!("receiving tcp reply ..."); - let _ = self.tcp_receive(&mut reply); - println!("received tcp reply ..."); - Ok(()) + fn mqtt_receive_reply(&mut self) -> Result { + for _ in 0..3 { + let size = self.tcp_receive_reply_len()?; + println!("received reply len({}) ...", size); + if size == 0 { + println!("retrying ..."); + continue + } else { + let mut reply = vec![0 as u8; size]; + println!("receiving mqtt reply ..."); + let _ = self.tcp_receive(&mut reply); + let reply = std::str::from_utf8(&reply).unwrap_or(""); + println!("received mqtt reply ({})", reply); + return VariablePacket::decode(&mut reply.as_bytes()) + .map_err(|err| ModemError::CommandError(format!("Undecodable MQTT message. ({:?})", err))); + } + } + Err(ModemError::ReadError("TCP server didn't respond!".into())) } - fn mqtt_connect(&mut self, device_id: &str) -> std::result::Result<(), anyhow::Error> { + fn mqtt_connect(&mut self, device_id: &str) -> anyhow::Result<()> { let mut buf = Vec::new(); let mut conn = ConnectPacket::new(device_id); conn.set_clean_session(true); conn.set_keep_alive(100); let _ = conn.encode(&mut buf)?; - let _ = self.tcp_manual_send(&mut buf)?; + self.tcp_manual_send(&mut buf).ok(); - thread::sleep(Duration::from_millis(2000)); - drop(buf); + let reply = self.mqtt_receive_reply()?; + println!("mqtt decoded packet: ({:?})", reply); - let _ = self.mqtt_receive_reply()?; - Ok(()) + match reply { + VariablePacket::ConnackPacket(_) => Ok(()), + _ => Err(anyhow::Error::msg("Invalid MQTT reply ... expected CONNACK!")) + } } - fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> std::result::Result<(), anyhow::Error> { + fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> anyhow::Result<()> { println!("entered mqtt publish ..."); let mut buf = Vec::new(); let packet = PublishPacket::new( @@ -414,16 +468,10 @@ impl Modem { QoSWithPacketIdentifier::Level0, message.as_bytes(), ); - println!("created mqtt publish packet ..."); let _ = packet.encode(&mut buf)?; - println!("modem tcp send publish pakage ..."); - let _ = self.tcp_manual_send(&mut buf)?; - - thread::sleep(Duration::from_millis(2000)); - drop(buf); - - println!("receiving modem publish reply ..."); - let _ = self.mqtt_receive_reply()?; + println!("created mqtt publish packet ... ({})", + std::str::from_utf8(buf.as_slice()).unwrap_or("")); + self.tcp_manual_send(&mut buf).ok(); Ok(()) } } @@ -434,18 +482,6 @@ impl std::io::Read for Modem { } } -impl std::io::Write for Modem { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tcp_manual_send(buf) - .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - pub fn main( rx: esp_idf_hal::gpio::Gpio26, tx: esp_idf_hal::gpio::Gpio27, diff --git a/src/serial.rs b/src/serial.rs index d18efee..95d05ea 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -77,7 +77,11 @@ impl SerialIO { Err(nb::Error::Other(err)) => println!("Serial read error :: {:?}", err), } }; - Ok(count) + if count > 0 { + Ok(count) + } else { + Err(nb::Error::Other(SerialError::ReadError("Rx buffer empty.".to_string()))) + } } } @@ -87,7 +91,7 @@ const READ_WAIT_TIME: u64 = 50; impl io::Read for SerialIO { fn read(&mut self, buf: &mut [u8]) -> io::Result { let count = nb::block!(self.read_bytes(buf)) - .map_err(|_| io::Error::from(io::ErrorKind::Other))?; + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; Ok(count) } } @@ -95,11 +99,11 @@ impl io::Read for SerialIO { impl io::Write for SerialIO { fn write(&mut self, buf: &[u8]) -> io::Result { nb::block!(self.write_bytes(buf)) - .map_err(|_| io::Error::from(io::ErrorKind::Other)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } fn flush(&mut self) -> io::Result<()> { self.tx.flush() - .map_err(|_| io::Error::from(io::ErrorKind::Other)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, SerialError::ReadError(format!("Flush error ({:?})", err)))) } }