From b93b35700773fbc6495fb2569fda1f89120b1010 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Thu, 30 Jun 2022 23:54:24 +0200 Subject: [PATCH] tcp client stack and some improvements --- Cargo.toml | 3 +- src/command.rs | 12 +++++- src/main.rs | 67 +++++++++------------------- src/modem.rs | 115 +++++++++++++++++++++++++++++++++++++++---------- 4 files changed, 124 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0150732..cf4a828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,9 @@ anyhow = "1.0.57" embedded-hal = "0.2.7" esp-idf-hal = "0.37.4" esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] } -mqtt-protocol = "0.11.2" +minimq = "0.5.3" nb = "1.0.0" +std-embedded-time = "0.1.0" [build-dependencies] embuild = "0.29" diff --git a/src/command.rs b/src/command.rs index f44a914..b49d945 100644 --- a/src/command.rs +++ b/src/command.rs @@ -327,9 +327,9 @@ impl Command { } } - pub fn tcp_set_manual_receive() -> Command { + pub fn tcp_set_manual_receive(is_manual: bool) -> Command { Command { - text: "AT+CIPRXGET=1".to_string(), + text: format!("AT+CIPRXGET={}", is_manual as u8), timeout: Duration::from_millis(3000), contains: Some("OK".to_string()), } @@ -342,4 +342,12 @@ impl Command { contains: Some("CLOSE OK".to_string()), } } + + pub fn tcp_connection_state() -> Command { + Command { + text: "AT+CIPSTATUS".to_string(), + timeout: Duration::from_millis(2000), + contains: Some("STATE".to_string()), + } + } } diff --git a/src/main.rs b/src/main.rs index 292a6f1..3bd003f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,11 @@ mod modem; mod command; use anyhow; -use std::time::Duration; -use std::thread; -use esp_idf_hal::delay; use esp_idf_hal::prelude::*; use esp_idf_hal::peripherals::Peripherals; use esp_idf_hal::serial; -use mqtt::control::ConnectReturnCode; -use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier}; -use mqtt::{Decodable, Encodable, TopicName}; - +use minimq::{Minimq, QoS, Retain}; fn main() -> anyhow::Result<()> { esp_idf_sys::link_patches(); @@ -68,50 +62,29 @@ fn main() -> anyhow::Result<()> { let _ = mdm.tcp_ssl_disable()?; } let _ = mdm.tcp_set_quick_mode(false); - let _ = mdm.tcp_set_manual_receive()?; - let _ = mdm.tcp_connect("51.158.66.64", 9988)?; + let _ = mdm.tcp_set_manual_receive(true)?; - let client_id = "e-bike-tracker"; - let mut conn = ConnectPacket::new(client_id); - conn.set_clean_session(true); - let mut buf = Vec::new(); - let _ = conn.encode(&mut buf)?; + let mut mqtt: Minimq<_, _, 256, 16> = Minimq::new( + "51.158.66.64".parse().unwrap(), + "e-bike-tracker", + mdm, + std_embedded_time::StandardClock::default()).unwrap(); - let _ = mdm.tcp_send(&mut buf)?; - drop(buf); + mqtt.client + .set_will( + "exit", + "Test complete".as_bytes(), + QoS::AtMostOnce, + Retain::NotRetained, + &[], + ) + .unwrap(); - println!("+++++++++++++++++++++++++++++++++"); - let size = mdm.tcp_receive_reply_len()?; - - let mut reply = vec![0 as u8; size]; - let received_size = mdm.tcp_receive(&mut reply)?; - - println!("expected: {} / received: {}", size, received_size); - println!("+++++++++++++++++++++++++++++++++"); - drop(reply); - - let topic = TopicName::new("location")?; + println!("created mqtt client ... "); let message = "{\"lat\": 20, \"long\": 44}"; - let qos = QoSWithPacketIdentifier::Level0; - - let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes()); - let mut buf = Vec::new(); - publish_packet.encode(&mut buf)?; - let _ = mdm.tcp_send(&mut buf)?; - drop(buf); - - thread::sleep(Duration::from_millis(300)); - let size = mdm.tcp_receive_reply_len()?; - - let mut reply = vec![0 as u8; size]; - let received_size = mdm.tcp_receive(&mut reply)?; - println!("expected: {} / received: {}", size, received_size); - println!("+++++++++++++++++++++++++++++++++"); - println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::()); - println!("+++++++++++++++++++++++++++++++++"); - drop(reply); - - let _ = mdm.tcp_close_connection()?; + println!("message = {}", message); + mqtt.client.publish("devices/location", message.as_bytes(), QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap(); + println!("published message ... "); } Ok(()) diff --git a/src/modem.rs b/src/modem.rs index c7f86b7..19cd4b1 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use embedded_hal::serial::{Read, Write}; use embedded_hal::digital::v2::OutputPin; use esp_idf_hal::serial::{self, Rx, Tx}; +use minimq::embedded_nal::{TcpClientStack, SocketAddr}; const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; @@ -182,7 +183,7 @@ impl Modem { self.read_response(cmd.contains, cmd.timeout) } - fn send_data(&mut self, buf: &[u8]) -> Result { + fn send_data(&mut self, buf: &[u8]) -> Result { self.rx.clear(); let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?; let send_request: String = self.rx.reset(Duration::from_millis(3000)) @@ -194,15 +195,8 @@ impl Modem { } self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data - let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000)); - - self.rx.clear(); - let res = self.send_command(Command { - text: "AT+CIPACK".to_string(), - contains: Some("OK".to_string()), - timeout: Duration::from_millis(3000), - })?; - Ok(res) + let _ = self.read_response(Some("SEND OK".to_string()), Duration::from_millis(1000))?; + Ok(buf.len()) } pub fn get_ip_addr(&mut self) -> Result { @@ -255,14 +249,13 @@ impl Modem { Ok(()) } - pub fn tcp_set_manual_receive(&mut self) -> Result<()> { - self.send_command(Command::tcp_set_manual_receive())?; + pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> { + self.send_command(Command::tcp_set_manual_receive(is_manual))?; Ok(()) } - pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> { - self.send_data(buf)?; - Ok(()) + pub fn tcp_send(&mut self, buf: &[u8]) -> Result { + self.send_data(buf) } fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result { @@ -287,30 +280,106 @@ impl Modem { pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result { let mut size = 0; loop { - let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) + let reply_len: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) .map(|reply: String| { reply.lines() - .map(|line| { + .fold(0, |acc, line| { if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") { - line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count() + acc += line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count() } else { 0 } }) - .sum() })?; - if reply == 0 { + if reply_len == 0 { break Ok(size) } else { - size += reply; + size += reply_len; continue } } } - pub fn tcp_close_connection(&mut self) -> Result { - self.send_command(Command::tcp_close()) + pub fn tcp_close_connection(&mut self) -> Result<()> { + self.send_command(Command::tcp_close())?; + Ok(()) + } + + pub fn tcp_is_connected(&mut self) -> Result { + let response = self.send_command(Command::tcp_connection_state())?; + let state = response.lines().last().and_then(|line| line.split(",").last()); + Ok(state.unwrap_or("CLOSED") == "CONNECTED") + } +} + +pub struct ModemTcpStack; + +pub struct ModemSocket { + state: SocketState, +} + +pub enum SocketState { + Building, + Connected(Modem), +} + +impl SocketState { + fn new() -> Self { + Self::Building + } + + fn get_running(&mut self) -> std::io::Result<&mut T> { + match self { + SocketState::Connected(ref mut s) => Ok(s), + _ => OutOfOrder.into(), + } + } +} + +impl ModemSocket { + fn new() -> Self { + Self { + state: SocketState::new(), + } + } + + fn get_running(s: Modem) -> Self { + Self { + state: SocketState::Connected(s) + } + } +} + +impl TcpClientStack for ModemTcpStack { + type Error = ModemError; + type TcpSocket = ModemSocket; + + fn socket(&mut self) -> Result { + Ok(self.modem) + } + + fn connect(&mut self, socket: &mut Self::TcpSocket, remote: SocketAddr) -> std::result::Result<(), nb::Error> { + self.tcp_connect(&format!("{}", remote.ip()), remote.port()) + .map_err(|err| nb::Error::Other(err)) + } + + fn is_connected(&mut self, _socket: &Self::TcpSocket) -> Result { + self.tcp_is_connected() + } + + fn send(&mut self, _socket: &mut Self::TcpSocket, buffer: &[u8]) -> std::result::Result> { + self.tcp_send(buffer) + .map_err(|err| nb::Error::Other(err)) + } + + fn receive( &mut self, _socket: &mut Self::TcpSocket, buffer: &mut [u8]) -> std::result::Result> { + self.tcp_receive(buffer) + .map_err(|err| nb::Error::Other(err)) + } + + fn close(&mut self, _socket: Self::TcpSocket) -> Result<()> { + self.tcp_close_connection() } }