From edf427dcb16f81e2828731ccab1c5a803a5f106d Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sun, 26 Jun 2022 00:56:59 +0200 Subject: [PATCH] mqtt continues --- sdkconfig.defaults | 2 +- src/main.rs | 90 ++++++++++++++++++++++++++++------------------ src/modem.rs | 46 ++++++++++++------------ 3 files changed, 79 insertions(+), 59 deletions(-) diff --git a/sdkconfig.defaults b/sdkconfig.defaults index e896453..1016968 100644 --- a/sdkconfig.defaults +++ b/sdkconfig.defaults @@ -1,5 +1,5 @@ # Rust often needs a bit of an extra main task stack size compared to C (the default is 3K) -CONFIG_ESP_MAIN_TASK_STACK_SIZE=7000 +CONFIG_ESP_MAIN_TASK_STACK_SIZE=17000 # Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default). # This allows to use 1 ms granuality for thread sleeps (10 ms by default). diff --git a/src/main.rs b/src/main.rs index 0ff7c55..292a6f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod command; use anyhow; use std::time::Duration; use std::thread; +use esp_idf_hal::delay; use esp_idf_hal::prelude::*; use esp_idf_hal::peripherals::Peripherals; use esp_idf_hal::serial; @@ -14,6 +15,7 @@ use mqtt::control::ConnectReturnCode; use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier}; use mqtt::{Decodable, Encodable, TopicName}; + fn main() -> anyhow::Result<()> { esp_idf_sys::link_patches(); @@ -55,43 +57,61 @@ fn main() -> anyhow::Result<()> { )?; } - loop { - if mdm.is_gprs_attached()? { - let _ = mdm.get_ip_addr()?; + if mdm.is_gprs_attached()? { + let _ = mdm.get_ip_addr()?; - //println!("connecting to server!"); - //if !mdm.tcp_is_ssl_enabled()? { - // let _ = mdm.tcp_ssl_enable()?; - //} - if mdm.tcp_is_ssl_enabled()? { - let _ = mdm.tcp_ssl_disable()?; - } - let _ = mdm.tcp_set_quick_mode(false); - let _ = mdm.tcp_set_manual_receive()?; - let _ = mdm.tcp_connect("51.158.66.64", 9988)?; - - let client_id = "e-bike-tracker"; - let mut conn = ConnectPacket::new(client_id); - conn.set_clean_session(true); - let mut buf = Vec::new(); - let _ = conn.encode(&mut buf)?; - - let _ = mdm.tcp_send(&mut buf)?; - thread::sleep(Duration::from_millis(1000)); - println!("+++++++++++++++++++++++++++++++++"); - let size = mdm.tcp_receive_reply_len()?; - - let mut reply = vec![0 as u8; size]; - let received_size = mdm.tcp_receive(&mut reply)?; - - println!("expected: {} / received: {}", size, received_size); - println!("+++++++++++++++++++++++++++++++++"); - println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::()); - println!("+++++++++++++++++++++++++++++++++"); - let _ = mdm.tcp_close_connection()?; - break; + //println!("connecting to server!"); + //if !mdm.tcp_is_ssl_enabled()? { + // let _ = mdm.tcp_ssl_enable()?; + //} + if mdm.tcp_is_ssl_enabled()? { + let _ = mdm.tcp_ssl_disable()?; } - println!("!!!!!!!!!!!!GPRS NOT ATTACHED!!!!!!!!!!!"); + let _ = mdm.tcp_set_quick_mode(false); + let _ = mdm.tcp_set_manual_receive()?; + let _ = mdm.tcp_connect("51.158.66.64", 9988)?; + + let client_id = "e-bike-tracker"; + let mut conn = ConnectPacket::new(client_id); + conn.set_clean_session(true); + let mut buf = Vec::new(); + let _ = conn.encode(&mut buf)?; + + let _ = mdm.tcp_send(&mut buf)?; + drop(buf); + + println!("+++++++++++++++++++++++++++++++++"); + let size = mdm.tcp_receive_reply_len()?; + + let mut reply = vec![0 as u8; size]; + let received_size = mdm.tcp_receive(&mut reply)?; + + println!("expected: {} / received: {}", size, received_size); + println!("+++++++++++++++++++++++++++++++++"); + drop(reply); + + let topic = TopicName::new("location")?; + let message = "{\"lat\": 20, \"long\": 44}"; + let qos = QoSWithPacketIdentifier::Level0; + + let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes()); + let mut buf = Vec::new(); + publish_packet.encode(&mut buf)?; + let _ = mdm.tcp_send(&mut buf)?; + drop(buf); + + thread::sleep(Duration::from_millis(300)); + let size = mdm.tcp_receive_reply_len()?; + + let mut reply = vec![0 as u8; size]; + let received_size = mdm.tcp_receive(&mut reply)?; + println!("expected: {} / received: {}", size, received_size); + println!("+++++++++++++++++++++++++++++++++"); + println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::()); + println!("+++++++++++++++++++++++++++++++++"); + drop(reply); + + let _ = mdm.tcp_close_connection()?; } Ok(()) diff --git a/src/modem.rs b/src/modem.rs index 44644b0..c7f86b7 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -164,17 +164,14 @@ impl Modem { } } - fn send(&mut self, b: u8) -> Result<()> { - nb::block!(self.tx.write(b)) - .map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?; - Ok(()) - } - + #[inline(always)] fn send_bytes(&mut self, payload: &[u8], eos: char) -> Result<()> { for b in payload.iter() { - self.send(*b)?; + nb::block!(self.tx.write(*b)) + .map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?; } - self.send(eos as u8)?; + nb::block!(self.tx.write(eos as u8)) + .map_err(|_| ModemError::CommandError(format!("error writing {} to serial", eos)))?; Ok(()) } @@ -290,22 +287,25 @@ impl Modem { pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result { let mut size = 0; loop { - let reply = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) - .map(|reply| { - reply - .split("\r\n") - .filter(|line| line.len() > 2 && !line.contains("+CIPRXGET: 2,")) - .next() - .map(|line| line.chars().enumerate().map(|(idx, c)| buf[size + idx] = c as u8).count()) + let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) + .map(|reply: String| { + reply.lines() + .map(|line| { + if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") { + line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count() + } + else { + 0 + } + }) + .sum() })?; - match reply { - Some(0) | None => { - break Ok(size) - }, - Some(x) => { - size += x; - continue - }, + if reply == 0 { + break Ok(size) + } + else { + size += reply; + continue } } }