tcp client stack and some improvements

This commit is contained in:
Vladan Popovic 2022-06-30 23:54:24 +02:00
parent edf427dcb1
commit b93b357007
4 changed files with 124 additions and 73 deletions

View file

@ -17,8 +17,9 @@ anyhow = "1.0.57"
embedded-hal = "0.2.7" embedded-hal = "0.2.7"
esp-idf-hal = "0.37.4" esp-idf-hal = "0.37.4"
esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] } esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] }
mqtt-protocol = "0.11.2" minimq = "0.5.3"
nb = "1.0.0" nb = "1.0.0"
std-embedded-time = "0.1.0"
[build-dependencies] [build-dependencies]
embuild = "0.29" embuild = "0.29"

View file

@ -327,9 +327,9 @@ impl Command {
} }
} }
pub fn tcp_set_manual_receive() -> Command { pub fn tcp_set_manual_receive(is_manual: bool) -> Command {
Command { Command {
text: "AT+CIPRXGET=1".to_string(), text: format!("AT+CIPRXGET={}", is_manual as u8),
timeout: Duration::from_millis(3000), timeout: Duration::from_millis(3000),
contains: Some("OK".to_string()), contains: Some("OK".to_string()),
} }
@ -342,4 +342,12 @@ impl Command {
contains: Some("CLOSE OK".to_string()), contains: Some("CLOSE OK".to_string()),
} }
} }
pub fn tcp_connection_state() -> Command {
Command {
text: "AT+CIPSTATUS".to_string(),
timeout: Duration::from_millis(2000),
contains: Some("STATE".to_string()),
}
}
} }

View file

@ -4,17 +4,11 @@ mod modem;
mod command; mod command;
use anyhow; use anyhow;
use std::time::Duration;
use std::thread;
use esp_idf_hal::delay;
use esp_idf_hal::prelude::*; use esp_idf_hal::prelude::*;
use esp_idf_hal::peripherals::Peripherals; use esp_idf_hal::peripherals::Peripherals;
use esp_idf_hal::serial; use esp_idf_hal::serial;
use mqtt::control::ConnectReturnCode; use minimq::{Minimq, QoS, Retain};
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier};
use mqtt::{Decodable, Encodable, TopicName};
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
esp_idf_sys::link_patches(); esp_idf_sys::link_patches();
@ -68,50 +62,29 @@ fn main() -> anyhow::Result<()> {
let _ = mdm.tcp_ssl_disable()?; let _ = mdm.tcp_ssl_disable()?;
} }
let _ = mdm.tcp_set_quick_mode(false); let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive()?; let _ = mdm.tcp_set_manual_receive(true)?;
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
let client_id = "e-bike-tracker"; let mut mqtt: Minimq<_, _, 256, 16> = Minimq::new(
let mut conn = ConnectPacket::new(client_id); "51.158.66.64".parse().unwrap(),
conn.set_clean_session(true); "e-bike-tracker",
let mut buf = Vec::new(); mdm,
let _ = conn.encode(&mut buf)?; std_embedded_time::StandardClock::default()).unwrap();
let _ = mdm.tcp_send(&mut buf)?; mqtt.client
drop(buf); .set_will(
"exit",
"Test complete".as_bytes(),
QoS::AtMostOnce,
Retain::NotRetained,
&[],
)
.unwrap();
println!("+++++++++++++++++++++++++++++++++"); println!("created mqtt client ... ");
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let topic = TopicName::new("location")?;
let message = "{\"lat\": 20, \"long\": 44}"; let message = "{\"lat\": 20, \"long\": 44}";
let qos = QoSWithPacketIdentifier::Level0; println!("message = {}", message);
mqtt.client.publish("devices/location", message.as_bytes(), QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap();
let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes()); println!("published message ... ");
let mut buf = Vec::new();
publish_packet.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
drop(buf);
thread::sleep(Duration::from_millis(300));
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let _ = mdm.tcp_close_connection()?;
} }
Ok(()) Ok(())

View file

@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
use embedded_hal::serial::{Read, Write}; use embedded_hal::serial::{Read, Write};
use embedded_hal::digital::v2::OutputPin; use embedded_hal::digital::v2::OutputPin;
use esp_idf_hal::serial::{self, Rx, Tx}; use esp_idf_hal::serial::{self, Rx, Tx};
use minimq::embedded_nal::{TcpClientStack, SocketAddr};
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300; const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
@ -182,7 +183,7 @@ impl<UART: serial::Uart> Modem<UART> {
self.read_response(cmd.contains, cmd.timeout) self.read_response(cmd.contains, cmd.timeout)
} }
fn send_data(&mut self, buf: &[u8]) -> Result<String> { fn send_data(&mut self, buf: &[u8]) -> Result<usize> {
self.rx.clear(); self.rx.clear();
let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?; let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?;
let send_request: String = self.rx.reset(Duration::from_millis(3000)) let send_request: String = self.rx.reset(Duration::from_millis(3000))
@ -194,15 +195,8 @@ impl<UART: serial::Uart> Modem<UART> {
} }
self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data
let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000)); let _ = self.read_response(Some("SEND OK".to_string()), Duration::from_millis(1000))?;
Ok(buf.len())
self.rx.clear();
let res = self.send_command(Command {
text: "AT+CIPACK".to_string(),
contains: Some("OK".to_string()),
timeout: Duration::from_millis(3000),
})?;
Ok(res)
} }
pub fn get_ip_addr(&mut self) -> Result<String> { pub fn get_ip_addr(&mut self) -> Result<String> {
@ -255,14 +249,13 @@ impl<UART: serial::Uart> Modem<UART> {
Ok(()) Ok(())
} }
pub fn tcp_set_manual_receive(&mut self) -> Result<()> { pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> {
self.send_command(Command::tcp_set_manual_receive())?; self.send_command(Command::tcp_set_manual_receive(is_manual))?;
Ok(()) Ok(())
} }
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> { pub fn tcp_send(&mut self, buf: &[u8]) -> Result<usize> {
self.send_data(buf)?; self.send_data(buf)
Ok(())
} }
fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> { fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> {
@ -287,30 +280,106 @@ impl<UART: serial::Uart> Modem<UART> {
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> { pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut size = 0; let mut size = 0;
loop { loop {
let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) let reply_len: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
.map(|reply: String| { .map(|reply: String| {
reply.lines() reply.lines()
.map(|line| { .fold(0, |acc, line| {
if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") { if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count() acc += line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
} }
else { else {
0 0
} }
}) })
.sum()
})?; })?;
if reply == 0 { if reply_len == 0 {
break Ok(size) break Ok(size)
} }
else { else {
size += reply; size += reply_len;
continue continue
} }
} }
} }
pub fn tcp_close_connection(&mut self) -> Result<String> { pub fn tcp_close_connection(&mut self) -> Result<()> {
self.send_command(Command::tcp_close()) self.send_command(Command::tcp_close())?;
Ok(())
}
pub fn tcp_is_connected(&mut self) -> Result<bool> {
let response = self.send_command(Command::tcp_connection_state())?;
let state = response.lines().last().and_then(|line| line.split(",").last());
Ok(state.unwrap_or("CLOSED") == "CONNECTED")
}
}
pub struct ModemTcpStack;
pub struct ModemSocket<T> {
state: SocketState<T>,
}
pub enum SocketState<UART: serial::Uart> {
Building,
Connected(Modem<UART>),
}
impl<T: serial::Uart> SocketState<T> {
fn new() -> Self {
Self::Building
}
fn get_running(&mut self) -> std::io::Result<&mut T> {
match self {
SocketState::Connected(ref mut s) => Ok(s),
_ => OutOfOrder.into(),
}
}
}
impl<T: serial::Uart> ModemSocket<T> {
fn new() -> Self {
Self {
state: SocketState::new(),
}
}
fn get_running(s: Modem<T>) -> Self {
Self {
state: SocketState::Connected(s)
}
}
}
impl<UART: serial::Uart> TcpClientStack for ModemTcpStack {
type Error = ModemError;
type TcpSocket = ModemSocket<UART>;
fn socket(&mut self) -> Result<Self::TcpSocket> {
Ok(self.modem)
}
fn connect(&mut self, socket: &mut Self::TcpSocket, remote: SocketAddr) -> std::result::Result<(), nb::Error<Self::Error>> {
self.tcp_connect(&format!("{}", remote.ip()), remote.port())
.map_err(|err| nb::Error::Other(err))
}
fn is_connected(&mut self, _socket: &Self::TcpSocket) -> Result<bool> {
self.tcp_is_connected()
}
fn send(&mut self, _socket: &mut Self::TcpSocket, buffer: &[u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
self.tcp_send(buffer)
.map_err(|err| nb::Error::Other(err))
}
fn receive( &mut self, _socket: &mut Self::TcpSocket, buffer: &mut [u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
self.tcp_receive(buffer)
.map_err(|err| nb::Error::Other(err))
}
fn close(&mut self, _socket: Self::TcpSocket) -> Result<()> {
self.tcp_close_connection()
} }
} }