debug and fix a bunch of modem read/write errors

This commit is contained in:
Vladan Popovic 2022-11-29 21:34:38 +01:00
parent 018af71262
commit 98d0a66cf3
5 changed files with 121 additions and 84 deletions

View File

@ -6,12 +6,13 @@ use std::time::Duration;
use crate::types::*;
pub fn main(sender: SyncSender<Msg>) -> Result<(), anyhow::Error> {
pub fn main(sender: SyncSender<Msg>) -> anyhow::Result<()> {
let mut c = 1_usize;
println!("entering ACCELERATOR sender loop ...");
for i in 0..20 {
println!("sending ACCELERATOR message ({}) of 20 ...", i);
loop {
println!("sending ACCELERATOR message No. {}", c);
let _ = sender.send(Msg::Accelerometer("{\"velocity\": 21.43, \"altitude\": 367}".to_string()))?;
thread::sleep(Duration::from_millis(2000));
thread::sleep(Duration::from_secs(5));
c += 1;
}
Ok(())
}

View File

@ -268,7 +268,7 @@ impl Command {
Command {
text: format!("AT+CIPSTART=\"TCP\",\"{}\",\"{}\"", addr, port),
timeout: Duration::from_millis(5000),
contains: Some("CONNECT OK".to_string()),
contains: Some("OK".to_string()),
}
}

View File

@ -8,11 +8,7 @@ mod serial;
mod types;
use anyhow;
use std::{
thread::{self, JoinHandle},
time::Duration,
};
use std::{thread::{self, JoinHandle}, time::Duration};
use esp_idf_hal::peripherals::Peripherals;
use types::*;
@ -35,11 +31,11 @@ fn main() -> anyhow::Result<()> {
let mut threads: Vec<JoinHandle<anyhow::Result<_>>> = vec![];
// Rx/Tx pins for the GPS modem
let gps_rx = dp.pins.gpio13;
let gps_tx = dp.pins.gpio12;
// UART interface for the GPS modem
let gps_uart = dp.uart2;
// // Rx/Tx pins for the GPS modem
// let gps_rx = dp.pins.gpio13;
// let gps_tx = dp.pins.gpio12;
// // UART interface for the GPS modem
// let gps_uart = dp.uart2;
let (gps_sender, receiver) = std::sync::mpsc::sync_channel::<Msg>(1);
@ -49,7 +45,7 @@ fn main() -> anyhow::Result<()> {
// threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender)));
//thread::sleep(Duration::from_millis(1000));
threads.push(thread::spawn(move || accel::main(accel_sender)));
//thread::sleep(Duration::from_millis(1000));
thread::sleep(Duration::from_millis(3000));
let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?;

View File

@ -16,10 +16,8 @@ use esp_idf_hal::serial::{self, Rx, Tx};
use embedded_hal::digital::v2::OutputPin;
use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier};
use mqtt::{Encodable, TopicName};
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier, VariablePacket};
use mqtt::{Encodable, Decodable, TopicName};
pub type Result<T> = std::result::Result<T, ModemError>;
@ -32,7 +30,7 @@ pub enum ModemError {
CommandError(String),
SetupError(String),
SendDataError(String),
ReadError,
ReadError(String),
TimeoutError,
}
@ -77,10 +75,14 @@ impl<UART: serial::Uart> Modem<UART> {
thread::sleep(Duration::from_millis(1000));
pwrkey.set_high().map_err(|_| ModemError::SetupError("Error setting PWRKEY to high.".to_string()))?;
println!("Waiting for sim module to come online ...");
thread::sleep(Duration::from_millis(3000));
loop {
match self.send_command(Command::probe()) {
Ok(_) => break,
_ => continue,
_ => {
thread::sleep(Duration::from_millis(2000));
continue
},
}
}
Ok(())
@ -90,16 +92,17 @@ impl<UART: serial::Uart> Modem<UART> {
/// None, then the first line is returned. If a timeout is reached. The timeout is provided on
/// input via the `timeout` argument. The first argument `contains` is checked against every
/// line in the response.
fn command_read_response(&mut self) -> Result<String> {
fn command_read_response(&mut self, contains: Option<String>) -> Result<String> {
let mut response = String::new();
loop {
let mut buf = vec![0; 1024];
let num_bytes = self.serial
.read(buf.as_mut_slice())
.map_err(|_| ModemError::ReadError)?;
.map_err(|err| ModemError::ReadError(format!("Error in serial.read(buf) ({:?})", err)))?;
response.push_str(std::str::from_utf8(&buf[0..num_bytes]).map_err(|_| ModemError::ReadError)?);
response.push_str(std::str::from_utf8(&buf[0..num_bytes])
.map_err(|err| ModemError::ReadError(format!("Error in str::from_utf8 ({:?})", err)))?);
if num_bytes < buf.len() {
break
@ -107,12 +110,20 @@ impl<UART: serial::Uart> Modem<UART> {
}
print!("Read {} bytes from serial: {}", response.len(), response);
Ok(response)
if let Some(c) = contains {
if response.contains(&c) {
Ok(response)
} else {
Err(ModemError::CommandError(format!("Didn't get expected ({}) from modem.", c)))
}
} else {
Ok(response)
}
}
fn send_command(&mut self, cmd: Command) -> Result<String> {
println!("-----------------------------------------------------------");
println!("Sending to TX ({}) ...", cmd.text);
println!("Sending {} ...", cmd.text);
let _ = self.serial
.write_bytes(cmd.text.as_bytes())
@ -122,16 +133,23 @@ impl<UART: serial::Uart> Modem<UART> {
.write(&['\r' as u8])
.map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?;
self.command_read_response()
self.command_read_response(cmd.contains)
}
fn handle_prompt(&mut self) -> Result<()> {
let mut prompt_buf = vec![0; 3];
let prompt_len = self.serial.read(&mut prompt_buf).map_err(|_| ModemError::ReadError)?;
let prompt = std::str::from_utf8(prompt_buf.as_slice()).unwrap_or("");
let mut prompt_buf = vec![0; 256];
let prompt_len = self.serial.read(&mut prompt_buf)
.map_err(|err| ModemError::ReadError(format!("Error in handle_prompt() ({:?})", err)))?;
if prompt_len != 3 && prompt != "\r\n>" {
let msg = format!("Prompt error, expected \\r\\n>, got {:?}", prompt);
let prompt = String::from_utf8(prompt_buf[0..prompt_len].to_vec())
.unwrap_or("".to_string())
.trim()
.to_string();
println!("Prompt is: ({})", prompt);
if prompt != ">" {
let msg = format!("Prompt error, expected (>), got ({})", prompt);
Err(ModemError::SendDataError(msg))
} else {
Ok(())
@ -139,19 +157,34 @@ impl<UART: serial::Uart> Modem<UART> {
}
fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result<String> {
println!("Sending AT+CIPSEND to serial TX!");
let _ = self.serial
.write("AT+CIPSEND\r".as_bytes())
.map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?;
let _ = self.handle_prompt()?;
println!("Handled prompt OK!!");
println!("Writing bytes in serial TX! ({:?})", String::from_utf8(buf.to_vec()));
self.serial
.write_bytes(buf)
.map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
self.serial
.write(&[26_u8]) // 26_u8 = Ctrl+z - to end sending data
.map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
self.command_read_response()
println!("DONE Writing bytes in serial TX!");
thread::sleep(Duration::from_millis(500));
println!("Reading bytes in serial RX!");
for _ in 0..3 {
let res = self.command_read_response(Some("SEND OK".into()));
if res.is_ok() {
return res;
}
thread::sleep(Duration::from_millis(1000))
}
Err(ModemError::ReadError(format!("ReadError: cannot read serial RX!")))
}
pub fn gprs_status(&mut self) -> Result<String> {
@ -218,8 +251,15 @@ impl<UART: serial::Uart> Modem<UART> {
}
pub fn tcp_connect(&mut self, addr: &str, port: u16) -> Result<()> {
self.send_command(Command::tcp_connect(addr, port))
.map(|_| ())
let _ = self.send_command(Command::tcp_connect(addr, port))?;
for _ in 0..3 {
if let Ok(reply) = self.command_read_response(Some("CONNECT_OK".to_string())) {
println!("TCP connect replied with {}", reply);
break
}
thread::sleep(Duration::from_millis(500));
}
Ok(())
}
pub fn tcp_set_quick_mode(&mut self, mode: bool) -> Result<()> {
@ -248,18 +288,21 @@ impl<UART: serial::Uart> Modem<UART> {
pub fn tcp_receive_reply_len(&mut self) -> Result<usize> {
let reply = self.send_command(Command::tcp_receive_reply_len())?;
reply.lines()
println!("Receiving TCP reply length!");
let res = reply.lines()
.filter(|line| line.contains("+CIPRXGET: 4"))
.next()
.ok_or(ModemError::CommandError("reply not found :/".to_string()))
.map(|line| self.tcp_parse_response_size(line))
.unwrap_or(Err(ModemError::CommandError(format!("received 0 elements from parsing"))))
.ok_or(ModemError::CommandError("reply body missing :/".to_string()))
.and_then(|line| self.tcp_parse_response_size(line))
.map_err(|_| ModemError::CommandError(format!("received 0 elements from parsing")));
println!("Received ({:?})", res);
res
}
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut size = 0;
loop {
let reply = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
let reply = self.send_command(Command::tcp_receive(buf.len()))
.map(|reply| {
// TODO: parse the response properly
// 1. the first line is \r\n
@ -349,7 +392,7 @@ impl<UART: serial::Uart> Modem<UART> {
self.serial
.write(buf)
.map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?;
let _ = self.command_read_response();
let _ = self.command_read_response(None);
Ok(())
}
@ -380,33 +423,44 @@ impl<UART: serial::Uart> Modem<UART> {
.map(|_| ())
}
fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> {
println!("receiving mqtt reply from modem ...");
let size = self.tcp_receive_reply_len()?;
println!("receiving reply len({}) ...", size);
let mut reply = vec![0 as u8; size];
println!("receiving tcp reply ...");
let _ = self.tcp_receive(&mut reply);
println!("received tcp reply ...");
Ok(())
fn mqtt_receive_reply(&mut self) -> Result<VariablePacket> {
for _ in 0..3 {
let size = self.tcp_receive_reply_len()?;
println!("received reply len({}) ...", size);
if size == 0 {
println!("retrying ...");
continue
} else {
let mut reply = vec![0 as u8; size];
println!("receiving mqtt reply ...");
let _ = self.tcp_receive(&mut reply);
let reply = std::str::from_utf8(&reply).unwrap_or("");
println!("received mqtt reply ({})", reply);
return VariablePacket::decode(&mut reply.as_bytes())
.map_err(|err| ModemError::CommandError(format!("Undecodable MQTT message. ({:?})", err)));
}
}
Err(ModemError::ReadError("TCP server didn't respond!".into()))
}
fn mqtt_connect(&mut self, device_id: &str) -> std::result::Result<(), anyhow::Error> {
fn mqtt_connect(&mut self, device_id: &str) -> anyhow::Result<()> {
let mut buf = Vec::new();
let mut conn = ConnectPacket::new(device_id);
conn.set_clean_session(true);
conn.set_keep_alive(100);
let _ = conn.encode(&mut buf)?;
let _ = self.tcp_manual_send(&mut buf)?;
self.tcp_manual_send(&mut buf).ok();
thread::sleep(Duration::from_millis(2000));
drop(buf);
let reply = self.mqtt_receive_reply()?;
println!("mqtt decoded packet: ({:?})", reply);
let _ = self.mqtt_receive_reply()?;
Ok(())
match reply {
VariablePacket::ConnackPacket(_) => Ok(()),
_ => Err(anyhow::Error::msg("Invalid MQTT reply ... expected CONNACK!"))
}
}
fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> std::result::Result<(), anyhow::Error> {
fn mqtt_publish(&mut self, _device_id: &str, message: &str) -> anyhow::Result<()> {
println!("entered mqtt publish ...");
let mut buf = Vec::new();
let packet = PublishPacket::new(
@ -414,16 +468,10 @@ impl<UART: serial::Uart> Modem<UART> {
QoSWithPacketIdentifier::Level0,
message.as_bytes(),
);
println!("created mqtt publish packet ...");
let _ = packet.encode(&mut buf)?;
println!("modem tcp send publish pakage ...");
let _ = self.tcp_manual_send(&mut buf)?;
thread::sleep(Duration::from_millis(2000));
drop(buf);
println!("receiving modem publish reply ...");
let _ = self.mqtt_receive_reply()?;
println!("created mqtt publish packet ... ({})",
std::str::from_utf8(buf.as_slice()).unwrap_or(""));
self.tcp_manual_send(&mut buf).ok();
Ok(())
}
}
@ -434,18 +482,6 @@ impl<UART: serial::Uart> std::io::Read for Modem<UART> {
}
}
impl<UART: serial::Uart> std::io::Write for Modem<UART> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.tcp_manual_send(buf)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub fn main<T: Sync + Send>(
rx: esp_idf_hal::gpio::Gpio26<T>,
tx: esp_idf_hal::gpio::Gpio27<T>,

View File

@ -77,7 +77,11 @@ impl<UART: serial::Uart> SerialIO<UART> {
Err(nb::Error::Other(err)) => println!("Serial read error :: {:?}", err),
}
};
Ok(count)
if count > 0 {
Ok(count)
} else {
Err(nb::Error::Other(SerialError::ReadError("Rx buffer empty.".to_string())))
}
}
}
@ -87,7 +91,7 @@ const READ_WAIT_TIME: u64 = 50;
impl<UART: serial::Uart> io::Read for SerialIO<UART> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let count = nb::block!(self.read_bytes(buf))
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok(count)
}
}
@ -95,11 +99,11 @@ impl<UART: serial::Uart> io::Read for SerialIO<UART> {
impl<UART: serial::Uart> io::Write for SerialIO<UART> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
nb::block!(self.write_bytes(buf))
.map_err(|_| io::Error::from(io::ErrorKind::Other))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
fn flush(&mut self) -> io::Result<()> {
self.tx.flush()
.map_err(|_| io::Error::from(io::ErrorKind::Other))
.map_err(|err| io::Error::new(io::ErrorKind::Other, SerialError::ReadError(format!("Flush error ({:?})", err))))
}
}