diff --git a/Cargo.toml b/Cargo.toml index 0150732..4ff3999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ esp-idf-hal = "0.37.4" esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] } mqtt-protocol = "0.11.2" nb = "1.0.0" +ublox = "0.4.2" [build-dependencies] embuild = "0.29" diff --git a/src/gps.rs b/src/gps.rs index 8bf4812..03115c6 100644 --- a/src/gps.rs +++ b/src/gps.rs @@ -1,8 +1,11 @@ use anyhow; -use std::sync::mpsc::SyncSender; -use std::thread; -use std::time::Duration; +use std::{ + sync::mpsc::SyncSender, + thread, + time::Duration, + io::Read, +}; use esp_idf_hal::prelude::*; use esp_idf_hal::serial; @@ -10,10 +13,11 @@ use esp_idf_hal::serial; use ublox::*; use crate::modem::Msg; +use crate::serial::SerialIO; pub fn main( - rx: esp_idf_hal::gpio::Gpio32, tx: esp_idf_hal::gpio::Gpio33, + rx: esp_idf_hal::gpio::Gpio32, uart: serial::UART2, sender: SyncSender, ) -> Result<(), anyhow::Error> { @@ -23,35 +27,46 @@ pub fn main( cts: None, rts: None, }; - let _serial: serial::Serial = serial::Serial::new( + let serial: serial::Serial = serial::Serial::new( uart, serial_pins, serial::config::Config::default().baudrate(Hertz(9600)), )?; + let (tx, rx) = serial.split(); + let mut serial_io = SerialIO::new(tx, rx); let mut parser = Parser::default(); - let mut it = parser.consume(&raw_packet); - - loop { - match it.next() { - Some(Ok(packet)) => { - println!("We've received a &PacketRef, we can handle it ... {:?}", packet); - } - Some(Err(err)) => { - println!("Received a malformed packet {:?}", err); - } - None => { - println!("The internal buffer is now empty"); - break; - } - } - } println!("entering GPS sender loop ..."); - for i in 0..20 { - println!("sending GPS message ({}) of 20 ...", i); - let _ = sender.send(Msg::Location("{\"lat\": 20.4322, \"long\": 44.5432}".to_string()))?; + loop { + let mut local_buf = [0; 100]; + println!("reading 100 bytes from serial ..."); + let nbytes = serial_io.read(&mut local_buf)?; + println!("READ: {}", local_buf.iter().map(|&b| char::from(b)).collect::()); + if nbytes == 0 { + println!("received 0 bytes, exiting ..."); + break; + } + let mut it = parser.consume(&local_buf); + + loop { + match it.next() { + Some(Ok(packet)) => { + let msg = format!("We've received a &PacketRef, we can handle it ... {:?}", packet); + println!("{}", msg); + sender.send(Msg::Location(msg))?; + } + Some(Err(err)) => { + println!("Received a malformed packet {:?}", err); + } + None => { + println!("The internal buffer is now empty"); + break; + } + } + } thread::sleep(Duration::from_millis(2000)); } + println!("exiting GPS sender loop :)"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 08d6e94..c5fbf50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,19 @@ mod accel; mod config; -mod gps; -mod modem; #[allow(dead_code)] mod command; +mod gps; +mod modem; +mod serial; use anyhow; -use std::thread::{self, JoinHandle}; -use std::cell::RefCell; -use std::time::Duration; +use std::{ + thread::{self, JoinHandle}, + time::Duration, +}; use esp_idf_hal::peripherals::Peripherals; -thread_local! { - static TLS: RefCell = RefCell::new(13); -} - - fn main() -> anyhow::Result<()> { esp_idf_sys::link_patches(); @@ -36,24 +33,19 @@ fn main() -> anyhow::Result<()> { let mut threads: Vec>> = vec![]; - println!("Rust main thread: {:?}", thread::current()); + // Rx/Tx pins for the GPS modem + let gps_rx = dp.pins.gpio32; + let gps_tx = dp.pins.gpio33; + // UART interface for the GPS modem + let gps_uart = dp.uart2; - TLS.with(|tls| { - println!("Main TLS before change: {}", *tls.borrow()); - }); - - TLS.with(|tls| *tls.borrow_mut() = 42); - - TLS.with(|tls| { - println!("Main TLS after change: {}", *tls.borrow()); - }); - let (gps_sender, receiver) = std::sync::mpsc::sync_channel::(1); let accel_sender = gps_sender.clone(); - threads.push(thread::spawn(move || gps::main(gps_sender))); + let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; + // threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender))); thread::sleep(Duration::from_millis(1000)); threads.push(thread::spawn(move || accel::main(accel_sender))); thread::sleep(Duration::from_millis(1000)); diff --git a/src/modem.rs b/src/modem.rs index b9a59d8..d209211 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -1,15 +1,18 @@ use crate::command::Command; +use crate::serial::SerialIO; use anyhow; -use std::thread; -use std::error::Error; -use std::time::{Duration, Instant}; -use std::sync::mpsc::Receiver; +use std::{ + error::Error, + io::{Read, Write}, + thread, + time::{Duration, Instant}, + sync::mpsc::Receiver, +}; use esp_idf_hal::prelude::*; use esp_idf_hal::serial::{self, Rx, Tx}; -use embedded_hal::serial::{Read, Write}; use embedded_hal::digital::v2::OutputPin; use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier}; @@ -20,8 +23,7 @@ const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; pub type Result = std::result::Result; pub struct Modem { - rx: RxIter, - tx: Tx, + serial: SerialIO, } #[derive(Debug)] @@ -41,74 +43,10 @@ impl std::fmt::Display for ModemError { } } -pub struct RxIter { - inner: Rx, - timeout: Duration, -} - -impl RxIter { - fn reset(&mut self, timeout: Duration) -> &mut Self { - self.timeout = timeout; - self - } - - fn clear(&mut self) -> () { - println!("clearing serial rx"); - self.reset(Duration::from_millis(500)).for_each(drop); - } - - /// Reads a whole line (that ends with \\n) within the given `timeout` passed on input. - fn read_line(&mut self, timeout: Duration) -> Result { - let mut line: String = self.reset(timeout) - .map(|b| char::from(b)) - .take_while(|c| *c != '\n') - .collect(); - - // \r must come right before \n on read; take_while excludes the matched element. - if line.ends_with('\r') { - line.push('\n'); - Ok(line) - } - else if self.timeout.as_millis() == 0 { - Err(ModemError::TimeoutError) - } - else { - Err(ModemError::ReadError) - } - } -} - -impl Iterator for RxIter { - type Item = u8; - - /// `nb` returns Ok(byte), or one of Err(WouldBlock) and Err(Other) which isn't of anyone's - /// interest, so the retry mechanism is triggered on _any_ error every 200ms until a byte is - /// received, or the timeout is reached. - fn next(&mut self) -> Option { - let start = Instant::now(); - loop { - match self.inner.read() { - Ok(b) => { - self.timeout = self.timeout.saturating_sub(start.elapsed()); - break Some(b) - }, - Err(_) => { - if start.elapsed() > self.timeout { - self.timeout = Duration::ZERO; - break None - } - thread::sleep(Duration::from_millis(200)); - } - } - } - } -} - impl Modem { pub fn new(tx: Tx, rx: Rx) -> Self { Self { - rx: RxIter { inner: rx, timeout: Duration::from_millis(0) }, - tx, + serial: SerialIO::new(tx, rx), } } @@ -158,7 +96,7 @@ impl Modem { loop { let timeout = timeout.saturating_sub(start.elapsed()); - let line = self.rx.read_line(timeout)?; + let line = self.serial.read_line(timeout).map_err(|_| ModemError::ReadError)?; print!("Read {} bytes from serial: {}", line.len(), line); response.push_str(&line); if line.contains("ERROR") || line.contains(&match_text) { @@ -169,36 +107,31 @@ impl Modem { } } - #[inline(always)] - fn send_bytes(&mut self, payload: &[u8], eos: Option) -> Result<()> { - //self.rx.clear(); - for b in payload.iter() { - nb::block!(self.tx.write(*b)) - .map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?; - } - eos.map(|b| nb::block!(self.tx.write(b))); - Ok(()) - } - fn send_command(&mut self, cmd: Command) -> Result { println!("-----------------------------------------------------------"); println!("Sending {} ...", cmd.text); - let _ = self.send_bytes(cmd.text.as_bytes(), Some('\r' as u8))?; + let _ = self.serial + .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) + .map_err(|_| ModemError::SendDataError)?; self.read_response(cmd.contains, cmd.timeout) } fn tcp_send_data(&mut self, buf: &[u8]) -> Result { - let _ = self.send_bytes("AT+CIPSEND".as_bytes(), Some('\r' as u8))?; - let send_request: String = self.rx.reset(Duration::from_millis(3000)) + let _ = self.serial + .write("AT+CIPSEND\r".as_bytes()) + .map_err(|_| ModemError::SendDataError)?; + + let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) .map(char::from) .take_while(|c| *c != '>').collect(); - if send_request != "\r\n" { - println!("{:?}", send_request.as_bytes()); + if send_prompt != "\r\n" { + println!("{:?}", send_prompt.as_bytes()); return Err(ModemError::SendDataError); } - - self.send_bytes(buf, Some(26))?; // 26_u8 = Ctrl+z - to end sending data + self.serial + .send_bytes(buf, Some(26)) // 26_u8 = Ctrl+z - to end sending data + .map_err(|_| ModemError::SendDataError)?; self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000)) } @@ -321,7 +254,7 @@ impl Modem { let _ = self.send_command(Command::http_set_content("application/json")); let _ = self.send_command(Command::http_set_ssl(true)); let _ = self.send_command(Command::http_post_len(content.len(), 100000)); - let _ = self.send_bytes(content, Some(26)); + let _ = self.serial.send_bytes(content, Some(26)); let _ = self.send_command(Command::http_post()); self.send_command(Command::http_read_response()) } @@ -362,16 +295,20 @@ impl Modem { fn file_write(&mut self, buf: &[u8], path: &str, append: bool, input_time_sec: usize) -> Result<()> { let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec); - let _ = self.send_bytes(cmd.text.as_bytes(), Some('\r' as u8))?; - let send_request: String = self.rx.reset(Duration::from_millis(3000)) + let _ = self.serial + .send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) + .map_err(|_| ModemError::SendDataError)?; + let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) .map(char::from) .take_while(|c| *c != '>').collect(); - if send_request == "" { + if send_prompt == "" { return Err(ModemError::SendDataError); } - self.send_bytes(buf, None)?; + self.serial + .send_bytes(buf, None) + .map_err(|_| ModemError::SendDataError)?; let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000)); Ok(()) @@ -451,9 +388,27 @@ impl Modem { } } +impl std::io::Read for Modem { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.tcp_receive(buf).map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted)) + } +} + +impl std::io::Write for Modem { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.tcp_send(buf) + .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + pub enum Msg { Location(String), -// Movement(String), + Movement(String), } pub fn main( @@ -472,8 +427,6 @@ pub fn main( rts: None, }; - // Create the serial and panic with a message ... if we can't create the serial port, then we - // can't communicate with the sim800l module, hence we don't run anymore. let serial: serial::Serial = serial::Serial::new( uart, serial_pins, @@ -522,13 +475,6 @@ pub fn main( }; if is_connected { - //println!("connecting to server!"); - //if !mdm.tcp_is_ssl_enabled()? { - // let _ = mdm.tcp_ssl_enable()?; - //} - //if mdm.tcp_is_ssl_enabled()? { - // let _ = mdm.tcp_ssl_disable()?; - //} let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; let _ = mdm.tcp_set_quick_mode(false); @@ -543,7 +489,6 @@ pub fn main( let _ = mdm.mqtt_publish(device_id, &msg)?; } - let _ = mdm.tcp_close_connection()?; } diff --git a/src/serial.rs b/src/serial.rs new file mode 100644 index 0000000..1fbc629 --- /dev/null +++ b/src/serial.rs @@ -0,0 +1,137 @@ +use std::error::Error; +use std::io; +use std::thread; +use std::time::{Duration, Instant}; +use embedded_hal::serial::{Read, Write}; +use esp_idf_hal::serial::{self, Rx, Tx}; + +#[derive(Debug)] +pub enum SerialError { + ReadError, + WriteError(String), + TimeoutError, +} + +impl Error for SerialError {} + +impl std::fmt::Display for SerialError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +pub type Result = std::result::Result; + +pub struct RxIter { + inner: Rx, + timeout: Duration, +} + +impl RxIter { + pub fn reset(&mut self, timeout: Duration) -> &mut Self { + self.timeout = timeout; + self + } + + fn clear(&mut self) -> () { + println!("clearing serial rx"); + self.reset(Duration::from_millis(500)).for_each(drop); + } +} + +impl Iterator for RxIter { + type Item = u8; + + /// `nb` returns Ok(byte), or one of Err(WouldBlock) and Err(Other) which isn't of anyone's + /// interest, so the retry mechanism is triggered on _any_ error every 200ms until a byte is + /// received, or the timeout is reached. + fn next(&mut self) -> Option { + let start = Instant::now(); + loop { + match self.inner.read() { + Ok(b) => { + self.timeout = self.timeout.saturating_sub(start.elapsed()); + break Some(b) + }, + Err(_) => { + if start.elapsed() > self.timeout { + self.timeout = Duration::ZERO; + break None + } + thread::sleep(Duration::from_millis(200)); + } + } + } + } +} + +pub struct SerialIO { + pub rx: RxIter, + pub tx: Tx, +} + +impl SerialIO { + pub fn new(tx: Tx, rx: Rx) -> Self { + Self { + rx: RxIter { inner: rx, timeout: Duration::from_millis(0) }, + tx, + } + } + + pub fn send_bytes(&mut self, payload: &[u8], eos: Option) -> Result { + //self.rx.clear(); + for b in payload.iter() { + nb::block!(self.tx.write(*b)) + .map_err(|err| SerialError::WriteError(format!("Error writing '{}' to serial, Original error {}", b, err)))?; + } + eos.map(|b| nb::block!(self.tx.write(b))); + Ok(payload.len() + if eos.is_none() { 0 } else { 1 }) + } + + /// Reads a whole line (that ends with \\n) within the given `timeout` passed on input. + pub fn read_line(&mut self, timeout: Duration) -> Result { + let mut line: String = self.rx.reset(timeout) + .map(|b| char::from(b)) + .take_while(|c| *c != '\n') + .collect(); + + // \r must come right before \n on read; take_while excludes the matched element. + if line.ends_with('\r') { + line.push('\n'); + Ok(line) + } + else if self.rx.timeout.as_millis() == 0 { + Err(SerialError::TimeoutError) + } + else { + Err(SerialError::ReadError) + } + } +} + +impl io::Read for SerialIO { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let buf_size = buf.len(); + let count = self.rx.reset(Duration::from_millis(1000)) + .enumerate() + .map(|(i, b)| { + buf[i] = b; + i + }) + .take_while(|i| i < &buf_size) + .count(); + Ok(count) + } +} + +impl io::Write for SerialIO { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.send_bytes(buf, None) + .map_err(|_| io::Error::from(io::ErrorKind::Other)) + } + + fn flush(&mut self) -> io::Result<()> { + self.tx.flush() + .map_err(|err| io::Error::from(io::ErrorKind::Other)) + } +}