Compare commits
1 commit
main
...
mqtt-stack
Author | SHA1 | Date | |
---|---|---|---|
b93b357007 |
4 changed files with 124 additions and 73 deletions
|
@ -17,8 +17,9 @@ anyhow = "1.0.57"
|
||||||
embedded-hal = "0.2.7"
|
embedded-hal = "0.2.7"
|
||||||
esp-idf-hal = "0.37.4"
|
esp-idf-hal = "0.37.4"
|
||||||
esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] }
|
esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] }
|
||||||
mqtt-protocol = "0.11.2"
|
minimq = "0.5.3"
|
||||||
nb = "1.0.0"
|
nb = "1.0.0"
|
||||||
|
std-embedded-time = "0.1.0"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
embuild = "0.29"
|
embuild = "0.29"
|
||||||
|
|
|
@ -327,9 +327,9 @@ impl Command {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tcp_set_manual_receive() -> Command {
|
pub fn tcp_set_manual_receive(is_manual: bool) -> Command {
|
||||||
Command {
|
Command {
|
||||||
text: "AT+CIPRXGET=1".to_string(),
|
text: format!("AT+CIPRXGET={}", is_manual as u8),
|
||||||
timeout: Duration::from_millis(3000),
|
timeout: Duration::from_millis(3000),
|
||||||
contains: Some("OK".to_string()),
|
contains: Some("OK".to_string()),
|
||||||
}
|
}
|
||||||
|
@ -342,4 +342,12 @@ impl Command {
|
||||||
contains: Some("CLOSE OK".to_string()),
|
contains: Some("CLOSE OK".to_string()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tcp_connection_state() -> Command {
|
||||||
|
Command {
|
||||||
|
text: "AT+CIPSTATUS".to_string(),
|
||||||
|
timeout: Duration::from_millis(2000),
|
||||||
|
contains: Some("STATE".to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
67
src/main.rs
67
src/main.rs
|
@ -4,17 +4,11 @@ mod modem;
|
||||||
mod command;
|
mod command;
|
||||||
|
|
||||||
use anyhow;
|
use anyhow;
|
||||||
use std::time::Duration;
|
|
||||||
use std::thread;
|
|
||||||
use esp_idf_hal::delay;
|
|
||||||
use esp_idf_hal::prelude::*;
|
use esp_idf_hal::prelude::*;
|
||||||
use esp_idf_hal::peripherals::Peripherals;
|
use esp_idf_hal::peripherals::Peripherals;
|
||||||
use esp_idf_hal::serial;
|
use esp_idf_hal::serial;
|
||||||
|
|
||||||
use mqtt::control::ConnectReturnCode;
|
use minimq::{Minimq, QoS, Retain};
|
||||||
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier};
|
|
||||||
use mqtt::{Decodable, Encodable, TopicName};
|
|
||||||
|
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
esp_idf_sys::link_patches();
|
esp_idf_sys::link_patches();
|
||||||
|
@ -68,50 +62,29 @@ fn main() -> anyhow::Result<()> {
|
||||||
let _ = mdm.tcp_ssl_disable()?;
|
let _ = mdm.tcp_ssl_disable()?;
|
||||||
}
|
}
|
||||||
let _ = mdm.tcp_set_quick_mode(false);
|
let _ = mdm.tcp_set_quick_mode(false);
|
||||||
let _ = mdm.tcp_set_manual_receive()?;
|
let _ = mdm.tcp_set_manual_receive(true)?;
|
||||||
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
|
|
||||||
|
|
||||||
let client_id = "e-bike-tracker";
|
let mut mqtt: Minimq<_, _, 256, 16> = Minimq::new(
|
||||||
let mut conn = ConnectPacket::new(client_id);
|
"51.158.66.64".parse().unwrap(),
|
||||||
conn.set_clean_session(true);
|
"e-bike-tracker",
|
||||||
let mut buf = Vec::new();
|
mdm,
|
||||||
let _ = conn.encode(&mut buf)?;
|
std_embedded_time::StandardClock::default()).unwrap();
|
||||||
|
|
||||||
let _ = mdm.tcp_send(&mut buf)?;
|
mqtt.client
|
||||||
drop(buf);
|
.set_will(
|
||||||
|
"exit",
|
||||||
|
"Test complete".as_bytes(),
|
||||||
|
QoS::AtMostOnce,
|
||||||
|
Retain::NotRetained,
|
||||||
|
&[],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
println!("+++++++++++++++++++++++++++++++++");
|
println!("created mqtt client ... ");
|
||||||
let size = mdm.tcp_receive_reply_len()?;
|
|
||||||
|
|
||||||
let mut reply = vec![0 as u8; size];
|
|
||||||
let received_size = mdm.tcp_receive(&mut reply)?;
|
|
||||||
|
|
||||||
println!("expected: {} / received: {}", size, received_size);
|
|
||||||
println!("+++++++++++++++++++++++++++++++++");
|
|
||||||
drop(reply);
|
|
||||||
|
|
||||||
let topic = TopicName::new("location")?;
|
|
||||||
let message = "{\"lat\": 20, \"long\": 44}";
|
let message = "{\"lat\": 20, \"long\": 44}";
|
||||||
let qos = QoSWithPacketIdentifier::Level0;
|
println!("message = {}", message);
|
||||||
|
mqtt.client.publish("devices/location", message.as_bytes(), QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap();
|
||||||
let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes());
|
println!("published message ... ");
|
||||||
let mut buf = Vec::new();
|
|
||||||
publish_packet.encode(&mut buf)?;
|
|
||||||
let _ = mdm.tcp_send(&mut buf)?;
|
|
||||||
drop(buf);
|
|
||||||
|
|
||||||
thread::sleep(Duration::from_millis(300));
|
|
||||||
let size = mdm.tcp_receive_reply_len()?;
|
|
||||||
|
|
||||||
let mut reply = vec![0 as u8; size];
|
|
||||||
let received_size = mdm.tcp_receive(&mut reply)?;
|
|
||||||
println!("expected: {} / received: {}", size, received_size);
|
|
||||||
println!("+++++++++++++++++++++++++++++++++");
|
|
||||||
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
|
|
||||||
println!("+++++++++++++++++++++++++++++++++");
|
|
||||||
drop(reply);
|
|
||||||
|
|
||||||
let _ = mdm.tcp_close_connection()?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
115
src/modem.rs
115
src/modem.rs
|
@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
|
||||||
use embedded_hal::serial::{Read, Write};
|
use embedded_hal::serial::{Read, Write};
|
||||||
use embedded_hal::digital::v2::OutputPin;
|
use embedded_hal::digital::v2::OutputPin;
|
||||||
use esp_idf_hal::serial::{self, Rx, Tx};
|
use esp_idf_hal::serial::{self, Rx, Tx};
|
||||||
|
use minimq::embedded_nal::{TcpClientStack, SocketAddr};
|
||||||
|
|
||||||
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
|
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
|
||||||
|
|
||||||
|
@ -182,7 +183,7 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
self.read_response(cmd.contains, cmd.timeout)
|
self.read_response(cmd.contains, cmd.timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_data(&mut self, buf: &[u8]) -> Result<String> {
|
fn send_data(&mut self, buf: &[u8]) -> Result<usize> {
|
||||||
self.rx.clear();
|
self.rx.clear();
|
||||||
let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?;
|
let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?;
|
||||||
let send_request: String = self.rx.reset(Duration::from_millis(3000))
|
let send_request: String = self.rx.reset(Duration::from_millis(3000))
|
||||||
|
@ -194,15 +195,8 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data
|
self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data
|
||||||
let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000));
|
let _ = self.read_response(Some("SEND OK".to_string()), Duration::from_millis(1000))?;
|
||||||
|
Ok(buf.len())
|
||||||
self.rx.clear();
|
|
||||||
let res = self.send_command(Command {
|
|
||||||
text: "AT+CIPACK".to_string(),
|
|
||||||
contains: Some("OK".to_string()),
|
|
||||||
timeout: Duration::from_millis(3000),
|
|
||||||
})?;
|
|
||||||
Ok(res)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_ip_addr(&mut self) -> Result<String> {
|
pub fn get_ip_addr(&mut self) -> Result<String> {
|
||||||
|
@ -255,14 +249,13 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tcp_set_manual_receive(&mut self) -> Result<()> {
|
pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> {
|
||||||
self.send_command(Command::tcp_set_manual_receive())?;
|
self.send_command(Command::tcp_set_manual_receive(is_manual))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> {
|
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<usize> {
|
||||||
self.send_data(buf)?;
|
self.send_data(buf)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> {
|
fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> {
|
||||||
|
@ -287,30 +280,106 @@ impl<UART: serial::Uart> Modem<UART> {
|
||||||
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
|
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||||
let mut size = 0;
|
let mut size = 0;
|
||||||
loop {
|
loop {
|
||||||
let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
|
let reply_len: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
|
||||||
.map(|reply: String| {
|
.map(|reply: String| {
|
||||||
reply.lines()
|
reply.lines()
|
||||||
.map(|line| {
|
.fold(0, |acc, line| {
|
||||||
if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
|
if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
|
||||||
line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
|
acc += line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.sum()
|
|
||||||
})?;
|
})?;
|
||||||
if reply == 0 {
|
if reply_len == 0 {
|
||||||
break Ok(size)
|
break Ok(size)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
size += reply;
|
size += reply_len;
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tcp_close_connection(&mut self) -> Result<String> {
|
pub fn tcp_close_connection(&mut self) -> Result<()> {
|
||||||
self.send_command(Command::tcp_close())
|
self.send_command(Command::tcp_close())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tcp_is_connected(&mut self) -> Result<bool> {
|
||||||
|
let response = self.send_command(Command::tcp_connection_state())?;
|
||||||
|
let state = response.lines().last().and_then(|line| line.split(",").last());
|
||||||
|
Ok(state.unwrap_or("CLOSED") == "CONNECTED")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ModemTcpStack;
|
||||||
|
|
||||||
|
pub struct ModemSocket<T> {
|
||||||
|
state: SocketState<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SocketState<UART: serial::Uart> {
|
||||||
|
Building,
|
||||||
|
Connected(Modem<UART>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: serial::Uart> SocketState<T> {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self::Building
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_running(&mut self) -> std::io::Result<&mut T> {
|
||||||
|
match self {
|
||||||
|
SocketState::Connected(ref mut s) => Ok(s),
|
||||||
|
_ => OutOfOrder.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: serial::Uart> ModemSocket<T> {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
state: SocketState::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_running(s: Modem<T>) -> Self {
|
||||||
|
Self {
|
||||||
|
state: SocketState::Connected(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<UART: serial::Uart> TcpClientStack for ModemTcpStack {
|
||||||
|
type Error = ModemError;
|
||||||
|
type TcpSocket = ModemSocket<UART>;
|
||||||
|
|
||||||
|
fn socket(&mut self) -> Result<Self::TcpSocket> {
|
||||||
|
Ok(self.modem)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(&mut self, socket: &mut Self::TcpSocket, remote: SocketAddr) -> std::result::Result<(), nb::Error<Self::Error>> {
|
||||||
|
self.tcp_connect(&format!("{}", remote.ip()), remote.port())
|
||||||
|
.map_err(|err| nb::Error::Other(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_connected(&mut self, _socket: &Self::TcpSocket) -> Result<bool> {
|
||||||
|
self.tcp_is_connected()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send(&mut self, _socket: &mut Self::TcpSocket, buffer: &[u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
|
||||||
|
self.tcp_send(buffer)
|
||||||
|
.map_err(|err| nb::Error::Other(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn receive( &mut self, _socket: &mut Self::TcpSocket, buffer: &mut [u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
|
||||||
|
self.tcp_receive(buffer)
|
||||||
|
.map_err(|err| nb::Error::Other(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self, _socket: Self::TcpSocket) -> Result<()> {
|
||||||
|
self.tcp_close_connection()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue