Compare commits

...

1 Commits

Author SHA1 Message Date
Vladan Popovic b93b357007 tcp client stack and some improvements 2022-06-30 23:54:24 +02:00
4 changed files with 124 additions and 73 deletions

View File

@ -17,8 +17,9 @@ anyhow = "1.0.57"
embedded-hal = "0.2.7"
esp-idf-hal = "0.37.4"
esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] }
mqtt-protocol = "0.11.2"
minimq = "0.5.3"
nb = "1.0.0"
std-embedded-time = "0.1.0"
[build-dependencies]
embuild = "0.29"

View File

@ -327,9 +327,9 @@ impl Command {
}
}
pub fn tcp_set_manual_receive() -> Command {
pub fn tcp_set_manual_receive(is_manual: bool) -> Command {
Command {
text: "AT+CIPRXGET=1".to_string(),
text: format!("AT+CIPRXGET={}", is_manual as u8),
timeout: Duration::from_millis(3000),
contains: Some("OK".to_string()),
}
@ -342,4 +342,12 @@ impl Command {
contains: Some("CLOSE OK".to_string()),
}
}
pub fn tcp_connection_state() -> Command {
Command {
text: "AT+CIPSTATUS".to_string(),
timeout: Duration::from_millis(2000),
contains: Some("STATE".to_string()),
}
}
}

View File

@ -4,17 +4,11 @@ mod modem;
mod command;
use anyhow;
use std::time::Duration;
use std::thread;
use esp_idf_hal::delay;
use esp_idf_hal::prelude::*;
use esp_idf_hal::peripherals::Peripherals;
use esp_idf_hal::serial;
use mqtt::control::ConnectReturnCode;
use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier};
use mqtt::{Decodable, Encodable, TopicName};
use minimq::{Minimq, QoS, Retain};
fn main() -> anyhow::Result<()> {
esp_idf_sys::link_patches();
@ -68,50 +62,29 @@ fn main() -> anyhow::Result<()> {
let _ = mdm.tcp_ssl_disable()?;
}
let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive()?;
let _ = mdm.tcp_connect("51.158.66.64", 9988)?;
let _ = mdm.tcp_set_manual_receive(true)?;
let client_id = "e-bike-tracker";
let mut conn = ConnectPacket::new(client_id);
conn.set_clean_session(true);
let mut buf = Vec::new();
let _ = conn.encode(&mut buf)?;
let mut mqtt: Minimq<_, _, 256, 16> = Minimq::new(
"51.158.66.64".parse().unwrap(),
"e-bike-tracker",
mdm,
std_embedded_time::StandardClock::default()).unwrap();
let _ = mdm.tcp_send(&mut buf)?;
drop(buf);
mqtt.client
.set_will(
"exit",
"Test complete".as_bytes(),
QoS::AtMostOnce,
Retain::NotRetained,
&[],
)
.unwrap();
println!("+++++++++++++++++++++++++++++++++");
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let topic = TopicName::new("location")?;
println!("created mqtt client ... ");
let message = "{\"lat\": 20, \"long\": 44}";
let qos = QoSWithPacketIdentifier::Level0;
let publish_packet = PublishPacketRef::new(&topic, qos, message.as_bytes());
let mut buf = Vec::new();
publish_packet.encode(&mut buf)?;
let _ = mdm.tcp_send(&mut buf)?;
drop(buf);
thread::sleep(Duration::from_millis(300));
let size = mdm.tcp_receive_reply_len()?;
let mut reply = vec![0 as u8; size];
let received_size = mdm.tcp_receive(&mut reply)?;
println!("expected: {} / received: {}", size, received_size);
println!("+++++++++++++++++++++++++++++++++");
println!("REPLY({}) = {}", reply.len(), reply.iter().map(|b| char::from(*b)).collect::<String>());
println!("+++++++++++++++++++++++++++++++++");
drop(reply);
let _ = mdm.tcp_close_connection()?;
println!("message = {}", message);
mqtt.client.publish("devices/location", message.as_bytes(), QoS::AtMostOnce, Retain::NotRetained, &[]).unwrap();
println!("published message ... ");
}
Ok(())

View File

@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
use embedded_hal::serial::{Read, Write};
use embedded_hal::digital::v2::OutputPin;
use esp_idf_hal::serial::{self, Rx, Tx};
use minimq::embedded_nal::{TcpClientStack, SocketAddr};
const MAX_TCP_MANUAL_REPLY_SIZE: usize = 300;
@ -182,7 +183,7 @@ impl<UART: serial::Uart> Modem<UART> {
self.read_response(cmd.contains, cmd.timeout)
}
fn send_data(&mut self, buf: &[u8]) -> Result<String> {
fn send_data(&mut self, buf: &[u8]) -> Result<usize> {
self.rx.clear();
let _ = self.send_bytes("AT+CIPSEND".as_bytes(), '\r')?;
let send_request: String = self.rx.reset(Duration::from_millis(3000))
@ -194,15 +195,8 @@ impl<UART: serial::Uart> Modem<UART> {
}
self.send_bytes(buf, 26 as char)?; // 26_u8 = Ctrl+z - to end sending data
let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000));
self.rx.clear();
let res = self.send_command(Command {
text: "AT+CIPACK".to_string(),
contains: Some("OK".to_string()),
timeout: Duration::from_millis(3000),
})?;
Ok(res)
let _ = self.read_response(Some("SEND OK".to_string()), Duration::from_millis(1000))?;
Ok(buf.len())
}
pub fn get_ip_addr(&mut self) -> Result<String> {
@ -255,14 +249,13 @@ impl<UART: serial::Uart> Modem<UART> {
Ok(())
}
pub fn tcp_set_manual_receive(&mut self) -> Result<()> {
self.send_command(Command::tcp_set_manual_receive())?;
pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> {
self.send_command(Command::tcp_set_manual_receive(is_manual))?;
Ok(())
}
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> {
self.send_data(buf)?;
Ok(())
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<usize> {
self.send_data(buf)
}
fn tcp_parse_response_size(&mut self, reply_line: &str) -> Result<usize> {
@ -287,30 +280,106 @@ impl<UART: serial::Uart> Modem<UART> {
pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut size = 0;
loop {
let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
let reply_len: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE))
.map(|reply: String| {
reply.lines()
.map(|line| {
.fold(0, |acc, line| {
if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") {
line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
acc += line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count()
}
else {
0
}
})
.sum()
})?;
if reply == 0 {
if reply_len == 0 {
break Ok(size)
}
else {
size += reply;
size += reply_len;
continue
}
}
}
pub fn tcp_close_connection(&mut self) -> Result<String> {
self.send_command(Command::tcp_close())
pub fn tcp_close_connection(&mut self) -> Result<()> {
self.send_command(Command::tcp_close())?;
Ok(())
}
pub fn tcp_is_connected(&mut self) -> Result<bool> {
let response = self.send_command(Command::tcp_connection_state())?;
let state = response.lines().last().and_then(|line| line.split(",").last());
Ok(state.unwrap_or("CLOSED") == "CONNECTED")
}
}
pub struct ModemTcpStack;
pub struct ModemSocket<T> {
state: SocketState<T>,
}
pub enum SocketState<UART: serial::Uart> {
Building,
Connected(Modem<UART>),
}
impl<T: serial::Uart> SocketState<T> {
fn new() -> Self {
Self::Building
}
fn get_running(&mut self) -> std::io::Result<&mut T> {
match self {
SocketState::Connected(ref mut s) => Ok(s),
_ => OutOfOrder.into(),
}
}
}
impl<T: serial::Uart> ModemSocket<T> {
fn new() -> Self {
Self {
state: SocketState::new(),
}
}
fn get_running(s: Modem<T>) -> Self {
Self {
state: SocketState::Connected(s)
}
}
}
impl<UART: serial::Uart> TcpClientStack for ModemTcpStack {
type Error = ModemError;
type TcpSocket = ModemSocket<UART>;
fn socket(&mut self) -> Result<Self::TcpSocket> {
Ok(self.modem)
}
fn connect(&mut self, socket: &mut Self::TcpSocket, remote: SocketAddr) -> std::result::Result<(), nb::Error<Self::Error>> {
self.tcp_connect(&format!("{}", remote.ip()), remote.port())
.map_err(|err| nb::Error::Other(err))
}
fn is_connected(&mut self, _socket: &Self::TcpSocket) -> Result<bool> {
self.tcp_is_connected()
}
fn send(&mut self, _socket: &mut Self::TcpSocket, buffer: &[u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
self.tcp_send(buffer)
.map_err(|err| nb::Error::Other(err))
}
fn receive( &mut self, _socket: &mut Self::TcpSocket, buffer: &mut [u8]) -> std::result::Result<usize, nb::Error<Self::Error>> {
self.tcp_receive(buffer)
.map_err(|err| nb::Error::Other(err))
}
fn close(&mut self, _socket: Self::TcpSocket) -> Result<()> {
self.tcp_close_connection()
}
}