diff --git a/src/modem.rs b/src/modem.rs index 09141bf..11cca89 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -7,7 +7,7 @@ use std::{ error::Error, io::{Read, Write}, thread, - time::{Duration, Instant}, + time::Duration, sync::mpsc::Receiver, }; @@ -90,31 +90,52 @@ impl Modem { /// None, then the first line is returned. If a timeout is reached. The timeout is provided on /// input via the `timeout` argument. The first argument `contains` is checked against every /// line in the response. - fn read_response(&mut self, contains: Option, timeout: Duration) -> Result { + fn command_read_response(&mut self) -> Result { let mut response = String::new(); - let start = Instant::now(); - let match_text: String = contains.unwrap_or("\n".to_string()); loop { - let timeout = timeout.saturating_sub(start.elapsed()); - let line = self.serial.read_line(timeout).map_err(|_| ModemError::ReadError)?; - print!("Read {} bytes from serial: {}", line.len(), line); - response.push_str(&line); - if line.contains("ERROR") || line.contains(&match_text) { - println!("Found match {} for line {} ; exiting response reader now ...", match_text, line); - println!("-----------------------------------------------------------"); - break Ok(response.to_string()) + let mut buf = vec![0; 1024]; + let num_bytes = self.serial + .read(buf.as_mut_slice()) + .map_err(|_| ModemError::ReadError)?; + + response.push_str(std::str::from_utf8(&buf[0..num_bytes]).map_err(|_| ModemError::ReadError)?); + + if num_bytes < buf.len() { + break } } + + print!("Read {} bytes from serial: {}", response.len(), response); + Ok(response) } fn send_command(&mut self, cmd: Command) -> Result { println!("-----------------------------------------------------------"); - println!("Sending {} ...", cmd.text); + println!("Sending to TX ({}) ...", cmd.text); + let _ = self.serial - .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) - .map_err(|_| ModemError::SendDataError(format!("Error in send_command({:?})", cmd)))?; - self.read_response(cmd.contains, cmd.timeout) + .write_bytes(cmd.text.as_bytes()) + .map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?; + + let _ = self.serial + .write(&['\r' as u8]) + .map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?; + + self.command_read_response() + } + + fn handle_prompt(&mut self) -> Result<()> { + let mut prompt_buf = vec![0; 3]; + let prompt_len = self.serial.read(&mut prompt_buf).map_err(|_| ModemError::ReadError)?; + let prompt = std::str::from_utf8(prompt_buf.as_slice()).unwrap_or(""); + + if prompt_len != 3 && prompt != "\r\n>" { + let msg = format!("Prompt error, expected \\r\\n>, got {:?}", prompt); + Err(ModemError::SendDataError(msg)) + } else { + Ok(()) + } } fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result { @@ -122,18 +143,15 @@ impl Modem { .write("AT+CIPSEND\r".as_bytes()) .map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?; - let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) - .map(char::from) - .take_while(|c| *c != '>').collect(); + let _ = self.handle_prompt()?; - if send_prompt != "\r\n" { - let msg = format!("Prompt error, expected \\r\\n, got {:?}", send_prompt.as_bytes()); - return Err(ModemError::SendDataError(msg)); - } self.serial - .send_bytes(buf, Some(26_u8)) // 26_u8 = Ctrl+z - to end sending data + .write_bytes(buf) .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; - self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000)) + self.serial + .write(&[26_u8]) // 26_u8 = Ctrl+z - to end sending data + .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; + self.command_read_response() } pub fn gprs_status(&mut self) -> Result { @@ -280,7 +298,8 @@ impl Modem { let _ = self.send_command(Command::http_set_content("application/json")); let _ = self.send_command(Command::http_set_ssl(true)); let _ = self.send_command(Command::http_post_len(content.len(), 100000)); - let _ = self.serial.send_bytes(content, Some(26)); + let _ = self.serial.write_bytes(content); + let _ = self.serial.write(&[26_u8]); let _ = self.send_command(Command::http_post()); self.send_command(Command::http_read_response()) } @@ -322,20 +341,15 @@ impl Modem { fn file_write(&mut self, buf: &[u8], path: &str, append: bool, input_time_sec: usize) -> Result<()> { let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec); let _ = self.serial - .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) + .write(cmd.text.as_bytes()) .map_err(|err| ModemError::SendDataError(format!("File write error ({:?})", err)))?; - let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) - .map(char::from) - .take_while(|c| *c != '>').collect(); - if send_prompt == "" { - return Err(ModemError::SendDataError("Prompt empty, expected: \\r\\n".to_string())); - } + let _ = self.handle_prompt()?; self.serial - .send_bytes(buf, None) + .write(buf) .map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?; - let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000)); + let _ = self.command_read_response(); Ok(()) } diff --git a/src/serial.rs b/src/serial.rs index 79c6b12..d18efee 100644 --- a/src/serial.rs +++ b/src/serial.rs @@ -1,14 +1,14 @@ use std::error::Error; use std::io; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; use embedded_hal::serial::{Read, Write}; use esp_idf_hal::serial::{self, Rx, Tx}; #[derive(Debug)] pub enum SerialError { - ReadError, - WriteError, + ReadError(String), + WriteError(String), TimeoutError, } @@ -20,121 +20,81 @@ impl std::fmt::Display for SerialError { } } -pub type Result = std::result::Result; - -pub struct RxIter { - inner: Rx, - timeout: Duration, -} - -impl RxIter { - pub fn reset(&mut self, timeout: Duration) -> &mut Self { - self.timeout = timeout; - self - } - - fn clear(&mut self) -> () { - println!("clearing serial rx"); - self.reset(Duration::from_millis(500)).for_each(drop); - } -} - -impl Iterator for RxIter { - type Item = u8; - - /// `nb` returns Ok(byte), or one of Err(WouldBlock) and Err(Other) which isn't of anyone's - /// interest, so the retry mechanism is triggered on _any_ error every 200ms until a byte is - /// received, or the timeout is reached. - fn next(&mut self) -> Option { - let start = Instant::now(); - loop { - match self.inner.read() { - Ok(b) => { - self.timeout = self.timeout.saturating_sub(start.elapsed()); - break Some(b) - }, - Err(_) => { - if start.elapsed() > self.timeout { - self.timeout = Duration::ZERO; - break None - } - thread::sleep(Duration::from_millis(200)); - } - } - } - } -} +pub type Result = nb::Result; pub struct SerialIO { - pub rx: RxIter, + pub rx: Rx, pub tx: Tx, } impl SerialIO { pub fn new(tx: Tx, rx: Rx) -> Self { - Self { - rx: RxIter { inner: rx, timeout: Duration::from_millis(0) }, - tx, - } + Self { rx, tx } } - pub fn send_bytes(&mut self, payload: &[u8], eos: Option) -> Result { + pub fn write_bytes(&mut self, payload: &[u8]) -> Result { let mut num_bytes = 0; for b in payload.iter() { - nb::block!(self.tx.write(*b)) - .map_err(|err| SerialError::WriteError)?; + self.tx.write(*b) + .map_err(|err| SerialError::WriteError( + format!("Error writing in serial port ({:?})", err)))?; num_bytes += 1; } if num_bytes == payload.len() { - eos.map(|b| nb::block!(self.tx.write(b))); - Ok(num_bytes + if eos.is_none() { 0 } else { 1 }) + Ok(num_bytes) } else { - Err(SerialError::WriteError) + Err(nb::Error::Other( + SerialError::WriteError( + "Written bytes shorter than payload length (write_bytes)".to_string() + ) + )) } } - /// Reads a whole line (that ends with \\n) within the given `timeout` passed on input. - pub fn read_line(&mut self, timeout: Duration) -> Result { - let mut line: String = self.rx.reset(timeout) - .map(|b| char::from(b)) - .take_while(|c| *c != '\n') - .collect(); + fn read_bytes(&mut self, buf: &mut [u8]) -> Result { + let mut started_reading = false; + let mut count = 0; + let mut retries = 0; - // TODO: \r\n is true for sim800l, but might not be valud for other devices. Re-implement - // this function so that it can be used on all devices. - // \r must come right before \n on read; take_while excludes the matched element. - if line.ends_with('\r') { - line.push('\n'); - Ok(line) - } - else if self.rx.timeout.as_millis() == 0 { - Err(SerialError::TimeoutError) - } - else { - Err(SerialError::ReadError) - } + loop { + match self.rx.read() { + Ok(b) => { + started_reading = true; + if count < buf.len() { + buf[count] = b; + count += 1; + } + else { break } + }, + Err(nb::Error::WouldBlock) => { + if started_reading || retries > READ_MAX_RETRIES { break } + else { + thread::sleep(Duration::from_millis(READ_WAIT_TIME)); + retries += 1; + } + }, + Err(nb::Error::Other(err)) => println!("Serial read error :: {:?}", err), + } + }; + Ok(count) } } +const READ_MAX_RETRIES: usize = 5; +const READ_WAIT_TIME: u64 = 50; + impl io::Read for SerialIO { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let buf_size = buf.len(); - let count = self.rx.reset(Duration::from_millis(1000)) - .enumerate() - .map(|(i, b)| { - buf[i] = b; - i - }) - .take_while(|i| i < &buf_size) - .count(); + let count = nb::block!(self.read_bytes(buf)) + .map_err(|_| io::Error::from(io::ErrorKind::Other))?; Ok(count) } } impl io::Write for SerialIO { fn write(&mut self, buf: &[u8]) -> io::Result { - self.send_bytes(buf, None) + nb::block!(self.write_bytes(buf)) .map_err(|_| io::Error::from(io::ErrorKind::Other)) }