mqtt continues

This commit is contained in:
Vladan Popovic 2022-06-26 00:56:59 +02:00
parent 8d7d4d898d
commit edf427dcb1
3 changed files with 79 additions and 59 deletions

View File

@ -1,5 +1,5 @@
# Rust often needs a bit of an extra main task stack size compared to C (the default is 3K) # Rust often needs a bit of an extra main task stack size compared to C (the default is 3K)
CONFIG_ESP_MAIN_TASK_STACK_SIZE=7000 CONFIG_ESP_MAIN_TASK_STACK_SIZE=17000
# Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default). # Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default).
# This allows to use 1 ms granuality for thread sleeps (10 ms by default). # This allows to use 1 ms granuality for thread sleeps (10 ms by default).

View File

@ -6,6 +6,7 @@ mod command;
use anyhow; use anyhow;
use std::time::Duration; use std::time::Duration;
use std::thread; use std::thread;
use esp_idf_hal::delay;
use esp_idf_hal::prelude::*; use esp_idf_hal::prelude::*;
use esp_idf_hal::peripherals::Peripherals; use esp_idf_hal::peripherals::Peripherals;
use esp_idf_hal::serial; use esp_idf_hal::serial;
@ -14,6 +15,7 @@ use mqtt::control::ConnectReturnCode;
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier}; use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier};
use mqtt::{Decodable, Encodable, TopicName}; use mqtt::{Decodable, Encodable, TopicName};
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
esp_idf_sys::link_patches(); esp_idf_sys::link_patches();
@ -55,43 +57,61 @@ fn main() -> anyhow::Result<()> {
)?; )?;
} }
loop { if mdm.is_gprs_attached()? {
if mdm.is_gprs_attached()? { let _ = mdm.get_ip_addr()?;
let _ = mdm.get_ip_addr()?;
//println!("connecting to server!"); //println!("connecting to server!");
//if !mdm.tcp_is_ssl_enabled()? { //if !mdm.tcp_is_ssl_enabled()? {
// let _ = mdm.tcp_ssl_enable()?; // let _ = mdm.tcp_ssl_enable()?;
//} //}
if mdm.tcp_is_ssl_enabled()? { if mdm.tcp_is_ssl_enabled()? {
let _ = mdm.tcp_ssl_disable()?; let _ = mdm.tcp_ssl_disable()?;
}
let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive()?;
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
let client_id = "e-bike-tracker";
let mut conn = ConnectPacket::new(client_id);
conn.set_clean_session(true);
let mut buf = Vec::new();
let _ = conn.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
thread::sleep(Duration::from_millis(1000));
println!("+++++++++++++++++++++++++++++++++");
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
println!("+++++++++++++++++++++++++++++++++");
let _ = mdm.tcp_close_connection()?;
break;
} }
println!("!!!!!!!!!!!!GPRS NOT ATTACHED!!!!!!!!!!!"); let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive()?;
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
let client_id = "e-bike-tracker";
let mut conn = ConnectPacket::new(client_id);
conn.set_clean_session(true);
let mut buf = Vec::new();
let _ = conn.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
drop(buf);
println!("+++++++++++++++++++++++++++++++++");
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let topic = TopicName::new("location")?;
let message = "{\"lat\": 20, \"long\": 44}";
let qos = QoSWithPacketIdentifier::Level0;
let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes());
let mut buf = Vec::new();
publish_packet.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
drop(buf);
thread::sleep(Duration::from_millis(300));
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let _ = mdm.tcp_close_connection()?;
} }
Ok(()) Ok(())

View File

@ -164,17 +164,14 @@ impl<UART: serial::Uart> Modem<UART> {
} }
} }
fn send(&mut self, b: u8) -> Result<()> { #[inline(always)]
nb::block!(self.tx.write(b))
.map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?;
Ok(())
}
fn send_bytes(&mut self, payload: &[u8], eos: char) -> Result<()> { fn send_bytes(&mut self, payload: &[u8], eos: char) -> Result<()> {
for b in payload.iter() { for b in payload.iter() {
self.send(*b)?; nb::block!(self.tx.write(*b))
.map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?;
} }
self.send(eos as u8)?; nb::block!(self.tx.write(eos as u8))
.map_err(|_| ModemError::CommandError(format!("error writing {} to serial", eos)))?;
Ok(()) Ok(())
} }
@ -290,22 +287,25 @@ impl<UART: serial::Uart> Modem<UART> {
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> { pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut size = 0; let mut size = 0;
loop { loop {
let reply = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
.map(|reply| { .map(|reply: String| {
reply reply.lines()
.split("\r\n") .map(|line| {
.filter(|line| line.len() > 2 && !line.contains("+CIPRXGET: 2,")) if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
.next() line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
.map(|line| line.chars().enumerate().map(|(idx, c)| buf[size + idx] = c as u8).count()) }
else {
0
}
})
.sum()
})?; })?;
match reply { if reply == 0 {
Some(0) | None => { break Ok(size)
break Ok(size) }
}, else {
Some(x) => { size += reply;
size += x; continue
continue
},
} }
} }
} }