make serial read/write non-blocking
This commit is contained in:
parent
b6e7e64e72
commit
018af71262
2 changed files with 96 additions and 122 deletions
84
src/modem.rs
84
src/modem.rs
|
@ -7,7 +7,7 @@ use std::{
|
||||||
error::Error,
|
error::Error,
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::Duration,
|
||||||
sync::mpsc::Receiver,
|
sync::mpsc::Receiver,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -90,31 +90,52 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
/// None, then the first line is returned. If a timeout is reached. The timeout is provided on
|
/// None, then the first line is returned. If a timeout is reached. The timeout is provided on
|
||||||
/// input via the `timeout` argument. The first argument `contains` is checked against every
|
/// input via the `timeout` argument. The first argument `contains` is checked against every
|
||||||
/// line in the response.
|
/// line in the response.
|
||||||
fn read_response(&mut self, contains: Option<String>, timeout: Duration) -> Result<String> {
|
fn command_read_response(&mut self) -> Result<String> {
|
||||||
let mut response = String::new();
|
let mut response = String::new();
|
||||||
let start = Instant::now();
|
|
||||||
let match_text: String = contains.unwrap_or("\n".to_string());
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let timeout = timeout.saturating_sub(start.elapsed());
|
let mut buf = vec![0; 1024];
|
||||||
let line = self.serial.read_line(timeout).map_err(|_| ModemError::ReadError)?;
|
let num_bytes = self.serial
|
||||||
print!("Read {} bytes from serial: {}", line.len(), line);
|
.read(buf.as_mut_slice())
|
||||||
response.push_str(&line);
|
.map_err(|_| ModemError::ReadError)?;
|
||||||
if line.contains("ERROR") || line.contains(&match_text) {
|
|
||||||
println!("Found match {} for line {} ; exiting response reader now ...", match_text, line);
|
response.push_str(std::str::from_utf8(&buf[0..num_bytes]).map_err(|_| ModemError::ReadError)?);
|
||||||
println!("-----------------------------------------------------------");
|
|
||||||
break Ok(response.to_string())
|
if num_bytes < buf.len() {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
print!("Read {} bytes from serial: {}", response.len(), response);
|
||||||
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_command(&mut self, cmd: Command) -> Result<String> {
|
fn send_command(&mut self, cmd: Command) -> Result<String> {
|
||||||
println!("-----------------------------------------------------------");
|
println!("-----------------------------------------------------------");
|
||||||
println!("Sending {} ...", cmd.text);
|
println!("Sending to TX ({}) ...", cmd.text);
|
||||||
|
|
||||||
let _ = self.serial
|
let _ = self.serial
|
||||||
.send_bytes(cmd.text.as_bytes(), Some('\r' as u8))
|
.write_bytes(cmd.text.as_bytes())
|
||||||
.map_err(|_| ModemError::SendDataError(format!("Error in send_command({:?})", cmd)))?;
|
.map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?;
|
||||||
self.read_response(cmd.contains, cmd.timeout)
|
|
||||||
|
let _ = self.serial
|
||||||
|
.write(&['\r' as u8])
|
||||||
|
.map_err(|_| ModemError::SendDataError(format!("Error in send_command({})", cmd.text)))?;
|
||||||
|
|
||||||
|
self.command_read_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_prompt(&mut self) -> Result<()> {
|
||||||
|
let mut prompt_buf = vec![0; 3];
|
||||||
|
let prompt_len = self.serial.read(&mut prompt_buf).map_err(|_| ModemError::ReadError)?;
|
||||||
|
let prompt = std::str::from_utf8(prompt_buf.as_slice()).unwrap_or("");
|
||||||
|
|
||||||
|
if prompt_len != 3 && prompt != "\r\n>" {
|
||||||
|
let msg = format!("Prompt error, expected \\r\\n>, got {:?}", prompt);
|
||||||
|
Err(ModemError::SendDataError(msg))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result<String> {
|
fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result<String> {
|
||||||
|
@ -122,18 +143,15 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
.write("AT+CIPSEND\r".as_bytes())
|
.write("AT+CIPSEND\r".as_bytes())
|
||||||
.map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?;
|
.map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?;
|
||||||
|
|
||||||
let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000))
|
let _ = self.handle_prompt()?;
|
||||||
.map(char::from)
|
|
||||||
.take_while(|c| *c != '>').collect();
|
|
||||||
|
|
||||||
if send_prompt != "\r\n" {
|
|
||||||
let msg = format!("Prompt error, expected \\r\\n, got {:?}", send_prompt.as_bytes());
|
|
||||||
return Err(ModemError::SendDataError(msg));
|
|
||||||
}
|
|
||||||
self.serial
|
self.serial
|
||||||
.send_bytes(buf, Some(26_u8)) // 26_u8 = Ctrl+z - to end sending data
|
.write_bytes(buf)
|
||||||
.map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
|
.map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
|
||||||
self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000))
|
self.serial
|
||||||
|
.write(&[26_u8]) // 26_u8 = Ctrl+z - to end sending data
|
||||||
|
.map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
|
||||||
|
self.command_read_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn gprs_status(&mut self) -> Result<String> {
|
pub fn gprs_status(&mut self) -> Result<String> {
|
||||||
|
@ -280,7 +298,8 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
let _ = self.send_command(Command::http_set_content("application/json"));
|
let _ = self.send_command(Command::http_set_content("application/json"));
|
||||||
let _ = self.send_command(Command::http_set_ssl(true));
|
let _ = self.send_command(Command::http_set_ssl(true));
|
||||||
let _ = self.send_command(Command::http_post_len(content.len(), 100000));
|
let _ = self.send_command(Command::http_post_len(content.len(), 100000));
|
||||||
let _ = self.serial.send_bytes(content, Some(26));
|
let _ = self.serial.write_bytes(content);
|
||||||
|
let _ = self.serial.write(&[26_u8]);
|
||||||
let _ = self.send_command(Command::http_post());
|
let _ = self.send_command(Command::http_post());
|
||||||
self.send_command(Command::http_read_response())
|
self.send_command(Command::http_read_response())
|
||||||
}
|
}
|
||||||
|
@ -322,20 +341,15 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
fn file_write(&mut self, buf: &[u8], path: &str, append: bool, input_time_sec: usize) -> Result<()> {
|
fn file_write(&mut self, buf: &[u8], path: &str, append: bool, input_time_sec: usize) -> Result<()> {
|
||||||
let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec);
|
let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec);
|
||||||
let _ = self.serial
|
let _ = self.serial
|
||||||
.send_bytes(cmd.text.as_bytes(), Some('\r' as u8))
|
.write(cmd.text.as_bytes())
|
||||||
.map_err(|err| ModemError::SendDataError(format!("File write error ({:?})", err)))?;
|
.map_err(|err| ModemError::SendDataError(format!("File write error ({:?})", err)))?;
|
||||||
let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000))
|
|
||||||
.map(char::from)
|
|
||||||
.take_while(|c| *c != '>').collect();
|
|
||||||
|
|
||||||
if send_prompt == "" {
|
let _ = self.handle_prompt()?;
|
||||||
return Err(ModemError::SendDataError("Prompt empty, expected: \\r\\n".to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.serial
|
self.serial
|
||||||
.send_bytes(buf, None)
|
.write(buf)
|
||||||
.map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?;
|
.map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?;
|
||||||
let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000));
|
let _ = self.command_read_response();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
128
src/serial.rs
128
src/serial.rs
|
@ -1,14 +1,14 @@
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::Duration;
|
||||||
use embedded_hal::serial::{Read, Write};
|
use embedded_hal::serial::{Read, Write};
|
||||||
use esp_idf_hal::serial::{self, Rx, Tx};
|
use esp_idf_hal::serial::{self, Rx, Tx};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum SerialError {
|
pub enum SerialError {
|
||||||
ReadError,
|
ReadError(String),
|
||||||
WriteError,
|
WriteError(String),
|
||||||
TimeoutError,
|
TimeoutError,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,121 +20,81 @@ impl std::fmt::Display for SerialError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, SerialError>;
|
pub type Result<T> = nb::Result<T, SerialError>;
|
||||||
|
|
||||||
pub struct RxIter<UART: serial::Uart> {
|
|
||||||
inner: Rx<UART>,
|
|
||||||
timeout: Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<UART: serial::Uart> RxIter<UART> {
|
|
||||||
pub fn reset(&mut self, timeout: Duration) -> &mut Self {
|
|
||||||
self.timeout = timeout;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clear(&mut self) -> () {
|
|
||||||
println!("clearing serial rx");
|
|
||||||
self.reset(Duration::from_millis(500)).for_each(drop);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<UART: serial::Uart> Iterator for RxIter<UART> {
|
|
||||||
type Item = u8;
|
|
||||||
|
|
||||||
/// `nb` returns Ok(byte), or one of Err(WouldBlock) and Err(Other) which isn't of anyone's
|
|
||||||
/// interest, so the retry mechanism is triggered on _any_ error every 200ms until a byte is
|
|
||||||
/// received, or the timeout is reached.
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
let start = Instant::now();
|
|
||||||
loop {
|
|
||||||
match self.inner.read() {
|
|
||||||
Ok(b) => {
|
|
||||||
self.timeout = self.timeout.saturating_sub(start.elapsed());
|
|
||||||
break Some(b)
|
|
||||||
},
|
|
||||||
Err(_) => {
|
|
||||||
if start.elapsed() > self.timeout {
|
|
||||||
self.timeout = Duration::ZERO;
|
|
||||||
break None
|
|
||||||
}
|
|
||||||
thread::sleep(Duration::from_millis(200));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SerialIO<UART: serial::Uart> {
|
pub struct SerialIO<UART: serial::Uart> {
|
||||||
pub rx: RxIter<UART>,
|
pub rx: Rx<UART>,
|
||||||
pub tx: Tx<UART>,
|
pub tx: Tx<UART>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<UART: serial::Uart> SerialIO<UART> {
|
impl<UART: serial::Uart> SerialIO<UART> {
|
||||||
pub fn new(tx: Tx<UART>, rx: Rx<UART>) -> Self {
|
pub fn new(tx: Tx<UART>, rx: Rx<UART>) -> Self {
|
||||||
Self {
|
Self { rx, tx }
|
||||||
rx: RxIter { inner: rx, timeout: Duration::from_millis(0) },
|
|
||||||
tx,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_bytes(&mut self, payload: &[u8], eos: Option<u8>) -> Result<usize> {
|
pub fn write_bytes(&mut self, payload: &[u8]) -> Result<usize> {
|
||||||
let mut num_bytes = 0;
|
let mut num_bytes = 0;
|
||||||
for b in payload.iter() {
|
for b in payload.iter() {
|
||||||
nb::block!(self.tx.write(*b))
|
self.tx.write(*b)
|
||||||
.map_err(|err| SerialError::WriteError)?;
|
.map_err(|err| SerialError::WriteError(
|
||||||
|
format!("Error writing in serial port ({:?})", err)))?;
|
||||||
num_bytes += 1;
|
num_bytes += 1;
|
||||||
}
|
}
|
||||||
if num_bytes == payload.len() {
|
if num_bytes == payload.len() {
|
||||||
eos.map(|b| nb::block!(self.tx.write(b)));
|
Ok(num_bytes)
|
||||||
Ok(num_bytes + if eos.is_none() { 0 } else { 1 })
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Err(SerialError::WriteError)
|
Err(nb::Error::Other(
|
||||||
|
SerialError::WriteError(
|
||||||
|
"Written bytes shorter than payload length (write_bytes)".to_string()
|
||||||
|
)
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads a whole line (that ends with \\n) within the given `timeout` passed on input.
|
fn read_bytes(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||||
pub fn read_line(&mut self, timeout: Duration) -> Result<String> {
|
let mut started_reading = false;
|
||||||
let mut line: String = self.rx.reset(timeout)
|
let mut count = 0;
|
||||||
.map(|b| char::from(b))
|
let mut retries = 0;
|
||||||
.take_while(|c| *c != '\n')
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// TODO: \r\n is true for sim800l, but might not be valud for other devices. Re-implement
|
loop {
|
||||||
// this function so that it can be used on all devices.
|
match self.rx.read() {
|
||||||
// \r must come right before \n on read; take_while excludes the matched element.
|
Ok(b) => {
|
||||||
if line.ends_with('\r') {
|
started_reading = true;
|
||||||
line.push('\n');
|
if count < buf.len() {
|
||||||
Ok(line)
|
buf[count] = b;
|
||||||
}
|
count += 1;
|
||||||
else if self.rx.timeout.as_millis() == 0 {
|
|
||||||
Err(SerialError::TimeoutError)
|
|
||||||
}
|
}
|
||||||
|
else { break }
|
||||||
|
},
|
||||||
|
Err(nb::Error::WouldBlock) => {
|
||||||
|
if started_reading || retries > READ_MAX_RETRIES { break }
|
||||||
else {
|
else {
|
||||||
Err(SerialError::ReadError)
|
thread::sleep(Duration::from_millis(READ_WAIT_TIME));
|
||||||
|
retries += 1;
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
Err(nb::Error::Other(err)) => println!("Serial read error :: {:?}", err),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const READ_MAX_RETRIES: usize = 5;
|
||||||
|
const READ_WAIT_TIME: u64 = 50;
|
||||||
|
|
||||||
impl<UART: serial::Uart> io::Read for SerialIO<UART> {
|
impl<UART: serial::Uart> io::Read for SerialIO<UART> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
let buf_size = buf.len();
|
let count = nb::block!(self.read_bytes(buf))
|
||||||
let count = self.rx.reset(Duration::from_millis(1000))
|
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
|
||||||
.enumerate()
|
|
||||||
.map(|(i, b)| {
|
|
||||||
buf[i] = b;
|
|
||||||
i
|
|
||||||
})
|
|
||||||
.take_while(|i| i < &buf_size)
|
|
||||||
.count();
|
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<UART: serial::Uart> io::Write for SerialIO<UART> {
|
impl<UART: serial::Uart> io::Write for SerialIO<UART> {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
self.send_bytes(buf, None)
|
nb::block!(self.write_bytes(buf))
|
||||||
.map_err(|_| io::Error::from(io::ErrorKind::Other))
|
.map_err(|_| io::Error::from(io::ErrorKind::Other))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue