Compare commits
1 commit
main
...
mqtt-stack
Author | SHA1 | Date | |
---|---|---|---|
b93b357007 |
4 changed files with 124 additions and 73 deletions
|
@ -17,8 +17,9 @@ anyhow = "1.0.57"
|
|||
embedded-hal = "0.2.7"
|
||||
esp-idf-hal = "0.37.4"
|
||||
esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] }
|
||||
mqtt-protocol = "0.11.2"
|
||||
minimq = "0.5.3"
|
||||
nb = "1.0.0"
|
||||
std-embedded-time = "0.1.0"
|
||||
|
||||
[build-dependencies]
|
||||
embuild = "0.29"
|
||||
|
|
|
@ -327,9 +327,9 @@ impl Command {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn tcp_set_manual_receive() -> Command {
|
||||
pub fn tcp_set_manual_receive(is_manual: bool) -> Command {
|
||||
Command {
|
||||
text: "AT+CIPRXGET=1".to_string(),
|
||||
text: format!("AT+CIPRXGET={}", is_manual as u8),
|
||||
timeout: Duration::from_millis(3000),
|
||||
contains: Some("OK".to_string()),
|
||||
}
|
||||
|
@ -342,4 +342,12 @@ impl Command {
|
|||
contains: Some("CLOSE OK".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tcp_connection_state() -> Command {
|
||||
Command {
|
||||
text: "AT+CIPSTATUS".to_string(),
|
||||
timeout: Duration::from_millis(2000),
|
||||
contains: Some("STATE".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
67
src/main.rs
67
src/main.rs
|
@ -4,17 +4,11 @@ mod modem;
|
|||
mod command;
|
||||
|
||||
use anyhow;
|
||||
use std::time::Duration;
|
||||
use std::thread;
|
||||
use esp_idf_hal::delay;
|
||||
use esp_idf_hal::prelude::*;
|
||||
use esp_idf_hal::peripherals::Peripherals;
|
||||
use esp_idf_hal::serial;
|
||||
|
||||
use mqtt::control::ConnectReturnCode;
|
||||
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier};
|
||||
use mqtt::{Decodable, Encodable, TopicName};
|
||||
|
||||
use minimq::{Minimq, QoS, Retain};
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
esp_idf_sys::link_patches();
|
||||
|
@ -68,50 +62,29 @@ fn main() -> anyhow::Result<()> {
|
|||
let _ = mdm.tcp_ssl_disable()?;
|
||||
}
|
||||
let _ = mdm.tcp_set_quick_mode(false);
|
||||
let _ = mdm.tcp_set_manual_receive()?;
|
||||
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
|
||||
let _ = mdm.tcp_set_manual_receive(true)?;
|
||||
|
||||
let client_id = "e-bike-tracker";
|
||||
let mut conn = ConnectPacket::new(client_id);
|
||||
conn.set_clean_session(true);
|
||||
let mut buf = Vec::new();
|
||||
let _ = conn.encode(&mut buf)?;
|
||||
let mut mqtt: Minimq<_, _, 256, 16> = Minimq::new(
|
||||
"51.158.66.64".parse().unwrap(),
|
||||
"e-bike-tracker",
|
||||
mdm,
|
||||
std_embedded_time::StandardClock::default()).unwrap();
|
||||
|
||||
let _ = mdm.tcp_send(&mut buf)?;
|
||||
drop(buf);
|
||||
mqtt.client
|
||||
.set_will(
|
||||
"exit",
|
||||
"Test complete".as_bytes(),
|
||||
QoS::AtMostOnce,
|
||||
Retain::NotRetained,
|
||||
&[],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
println!("+++++++++++++++++++++++++++++++++");
|
||||
let size = mdm.tcp_receive_reply_len()?;
|
||||
|
||||
let mut reply = vec![0 as u8; size];
|
||||
let received_size = mdm.tcp_receive(&mut reply)?;
|
||||
|
||||
println!("expected: {} / received: {}", size, received_size);
|
||||
println!("+++++++++++++++++++++++++++++++++");
|
||||
drop(reply);
|
||||
|
||||
let topic = TopicName::new("location")?;
|
||||
println!("created mqtt client ... ");
|
||||
let message = "{\"lat\": 20, \"long\": 44}";
|
||||
let qos = QoSWithPacketIdentifier::Level0;
|
||||
|
||||
let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes());
|
||||
let mut buf = Vec::new();
|
||||
publish_packet.encode(&mut buf)?;
|
||||
let _ = mdm.tcp_send(&mut buf)?;
|
||||
drop(buf);
|
||||
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
let size = mdm.tcp_receive_reply_len()?;
|
||||
|
||||
let mut reply = vec![0 as u8; size];
|
||||
let received_size = mdm.tcp_receive(&mut reply)?;
|
||||
println!("expected: {} / received: {}", size, received_size);
|
||||
println!("+++++++++++++++++++++++++++++++++");
|
||||
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
|
||||
println!("+++++++++++++++++++++++++++++++++");
|
||||
drop(reply);
|
||||
|
||||
let _ = mdm.tcp_close_connection()?;
|
||||
println!("message = {}", message);
|
||||
mqtt.client.publish("devices/location", message.as_bytes(), QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap();
|
||||
println!("published message ... ");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
115
src/modem.rs
115
src/modem.rs
|
@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
|
|||
use embedded_hal::serial::{Read, Write};
|
||||
use embedded_hal::digital::v2::OutputPin;
|
||||
use esp_idf_hal::serial::{self, Rx, Tx};
|
||||
use minimq::embedded_nal::{TcpClientStack, SocketAddr};
|
||||
|
||||
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
|
||||
|
||||
|
@ -182,7 +183,7 @@ impl<UART: serial::Uart> Modem<UART> {
|
|||
self.read_response(cmd.contains, cmd.timeout)
|
||||
}
|
||||
|
||||
fn send_data(&mut self, buf: &[u8]) -> Result<String> {
|
||||
fn send_data(&mut self, buf: &[u8]) -> Result<usize> {
|
||||
self.rx.clear();
|
||||
let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?;
|
||||
let send_request: String = self.rx.reset(Duration::from_millis(3000))
|
||||
|
@ -194,15 +195,8 @@ impl<UART: serial::Uart> Modem<UART> {
|
|||
}
|
||||
|
||||
self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data
|
||||
let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000));
|
||||
|
||||
self.rx.clear();
|
||||
let res = self.send_command(Command {
|
||||
text: "AT+CIPACK".to_string(),
|
||||
contains: Some("OK".to_string()),
|
||||
timeout: Duration::from_millis(3000),
|
||||
})?;
|
||||
Ok(res)
|
||||
let _ = self.read_response(Some("SEND OK".to_string()), Duration::from_millis(1000))?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
pub fn get_ip_addr(&mut self) -> Result<String> {
|
||||
|
@ -255,14 +249,13 @@ impl<UART: serial::Uart> Modem<UART> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tcp_set_manual_receive(&mut self) -> Result<()> {
|
||||
self.send_command(Command::tcp_set_manual_receive())?;
|
||||
pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> {
|
||||
self.send_command(Command::tcp_set_manual_receive(is_manual))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> {
|
||||
self.send_data(buf)?;
|
||||
Ok(())
|
||||
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<usize> {
|
||||
self.send_data(buf)
|
||||
}
|
||||
|
||||
fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> {
|
||||
|
@ -287,30 +280,106 @@ impl<UART: serial::Uart> Modem<UART> {
|
|||
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
let mut size = 0;
|
||||
loop {
|
||||
let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
|
||||
let reply_len: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
|
||||
.map(|reply: String| {
|
||||
reply.lines()
|
||||
.map(|line| {
|
||||
.fold(0, |acc, line| {
|
||||
if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
|
||||
line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
|
||||
acc += line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
|
||||
}
|
||||
else {
|
||||
0
|
||||
}
|
||||
})
|
||||
.sum()
|
||||
})?;
|
||||
if reply == 0 {
|
||||
if reply_len == 0 {
|
||||
break Ok(size)
|
||||
}
|
||||
else {
|
||||
size += reply;
|
||||
size += reply_len;
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tcp_close_connection(&mut self) -> Result<String> {
|
||||
self.send_command(Command::tcp_close())
|
||||
pub fn tcp_close_connection(&mut self) -> Result<()> {
|
||||
self.send_command(Command::tcp_close())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn tcp_is_connected(&mut self) -> Result<bool> {
|
||||
let response = self.send_command(Command::tcp_connection_state())?;
|
||||
let state = response.lines().last().and_then(|line| line.split(",").last());
|
||||
Ok(state.unwrap_or("CLOSED") == "CONNECTED")
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ModemTcpStack;
|
||||
|
||||
pub struct ModemSocket<T> {
|
||||
state: SocketState<T>,
|
||||
}
|
||||
|
||||
pub enum SocketState<UART: serial::Uart> {
|
||||
Building,
|
||||
Connected(Modem<UART>),
|
||||
}
|
||||
|
||||
impl<T: serial::Uart> SocketState<T> {
|
||||
fn new() -> Self {
|
||||
Self::Building
|
||||
}
|
||||
|
||||
fn get_running(&mut self) -> std::io::Result<&mut T> {
|
||||
match self {
|
||||
SocketState::Connected(ref mut s) => Ok(s),
|
||||
_ => OutOfOrder.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: serial::Uart> ModemSocket<T> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
state: SocketState::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_running(s: Modem<T>) -> Self {
|
||||
Self {
|
||||
state: SocketState::Connected(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<UART: serial::Uart> TcpClientStack for ModemTcpStack {
|
||||
type Error = ModemError;
|
||||
type TcpSocket = ModemSocket<UART>;
|
||||
|
||||
fn socket(&mut self) -> Result<Self::TcpSocket> {
|
||||
Ok(self.modem)
|
||||
}
|
||||
|
||||
fn connect(&mut self, socket: &mut Self::TcpSocket, remote: SocketAddr) -> std::result::Result<(), nb::Error<Self::Error>> {
|
||||
self.tcp_connect(&format!("{}", remote.ip()), remote.port())
|
||||
.map_err(|err| nb::Error::Other(err))
|
||||
}
|
||||
|
||||
fn is_connected(&mut self, _socket: &Self::TcpSocket) -> Result<bool> {
|
||||
self.tcp_is_connected()
|
||||
}
|
||||
|
||||
fn send(&mut self, _socket: &mut Self::TcpSocket, buffer: &[u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
|
||||
self.tcp_send(buffer)
|
||||
.map_err(|err| nb::Error::Other(err))
|
||||
}
|
||||
|
||||
fn receive( &mut self, _socket: &mut Self::TcpSocket, buffer: &mut [u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
|
||||
self.tcp_receive(buffer)
|
||||
.map_err(|err| nb::Error::Other(err))
|
||||
}
|
||||
|
||||
fn close(&mut self, _socket: Self::TcpSocket) -> Result<()> {
|
||||
self.tcp_close_connection()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue