From d023f5db76fd00c2710fcf75cba93efb51226902 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Mon, 4 Jul 2022 17:04:09 +0200 Subject: [PATCH] mqtt works ... connect + publish --- src/command.rs | 98 ++++++++++++++++++++++++++++------------ src/main.rs | 71 +++++++++++++++++++++-------- src/modem.rs | 120 +++++++++++++++++++++++++++++++++---------------- 3 files changed, 202 insertions(+), 87 deletions(-) diff --git a/src/command.rs b/src/command.rs index e8e6a49..cfebef1 100644 --- a/src/command.rs +++ b/src/command.rs @@ -151,23 +151,7 @@ impl Command { } } - pub fn http_enable_ssl() -> Command { - Command { - text: "AT+HTTPSSL=1".to_string(), - timeout: Duration::from_millis(3000), - contains: Some("OK".to_string()), - } - } - - pub fn http_disable_ssl() -> Command { - Command { - text: "AT+HTTPSSL=0".to_string(), - timeout: Duration::from_millis(3000), - contains: Some("OK".to_string()), - } - } - - pub fn http_init_url(url: &str) -> Command { + pub fn http_set_url(url: &str) -> Command { Command { text: format!("AT+HTTPPARA=\"URL\",\"{}\"", url), timeout: Duration::from_millis(3000), @@ -199,6 +183,14 @@ impl Command { } } + pub fn http_post() -> Command { + Command { + text: "AT+HTTPACTION=1".to_string(), + timeout: Duration::from_millis(10000), + contains: Some("HTTPACTION".to_string()), + } + } + pub fn http_set_content(content: &str) -> Command { Command { text: format!("AT+HTTPPARA=\"CONTENT\",\"{}\"", content), @@ -207,6 +199,14 @@ impl Command { } } + pub fn http_set_redirect(redirect: bool) -> Command { + Command { + text: format!("AT+HTTPPARA=\"REDIR\",\"{}\"", redirect as u8), + timeout: Duration::from_millis(3000), + contains: Some("OK".to_string()), + } + } + pub fn http_post_len(size: usize, time: usize) -> Command { Command { text: format!("AT+HTTPDATA={},{}", size, time), @@ -215,15 +215,7 @@ impl Command { } } - pub fn http_post() -> Command { - Command { - text: "AT+HTTPACTION=1".to_string(), - timeout: Duration::from_millis(10000), - contains: Some("HTTPACTION".to_string()), - } - } - - pub fn http_response() -> Command { + pub fn http_read_response() -> Command { Command { text: "AT+HTTPREAD".to_string(), timeout: Duration::from_millis(3000), @@ -243,7 +235,7 @@ impl Command { Command { text: "AT".to_string(), timeout: Duration::from_millis(3000), - contains: Some("OK".to_string()), + contains: Some("+CIEV".to_string()), } } @@ -335,9 +327,9 @@ impl Command { } } - pub fn tcp_set_manual_receive() -> Command { + pub fn tcp_set_manual_receive(is_manual: bool) -> Command { Command { - text: "AT+CIPRXGET=1".to_string(), + text: format!("AT+CIPRXGET={}", is_manual as u8), timeout: Duration::from_millis(3000), contains: Some("OK".to_string()), } @@ -390,4 +382,52 @@ impl Command { contains: Some("OK".to_string()), } } + + pub fn ssl_set_client_cert(path: &str, password: &str) -> Command { + Command { + text: format!("AT+SSLSETCERT={},{}", path, password), + timeout: Duration::from_millis(2000), + contains: Some("SSLSETCERT".to_string()), + } + } + + pub fn ssl_set_root_cert(path: &str, size: usize) -> Command { + Command { + text: format!("AT+SSLSETROOT={},{}", path, size), + timeout: Duration::from_millis(2000), + contains: Some("SSLSETCERT".to_string()), + } + } + + pub fn fs_file_create(path: &str) -> Command { + Command { + text: format!("AT+FSCREATE={}", path), + timeout: Duration::from_millis(2000), + contains: Some("OK".to_string()), + } + } + + pub fn fs_file_write(path: &str, append: bool, size: usize, input_time_sec: usize) -> Command { + Command { + text: format!("AT+FSWRITE={},{},{},{}", path, append as u8, size, input_time_sec), + timeout: Duration::from_millis(20000), + contains: Some("OK".to_string()), + } + } + + pub fn fs_list(path: &str) -> Command { + Command { + text: format!("AT+FSLS={}", path), + timeout: Duration::from_millis(2000), + contains: Some("OK".to_string()), + } + } + + pub fn fs_free_size() -> Command { + Command { + text: "AT+FSMEM".to_string(), + timeout: Duration::from_millis(2000), + contains: Some("OK".to_string()), + } + } } diff --git a/src/main.rs b/src/main.rs index a441730..8e70b67 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,13 +6,12 @@ mod command; use anyhow; use std::time::Duration; use std::thread; -use esp_idf_hal::delay; use esp_idf_hal::prelude::*; use esp_idf_hal::peripherals::Peripherals; use esp_idf_hal::serial; use mqtt::control::ConnectReturnCode; -use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacketRef, QoSWithPacketIdentifier}; +use mqtt::packet::{ConnackPacket, ConnectPacket, PublishPacket, QoSWithPacketIdentifier}; use mqtt::{Decodable, Encodable, TopicName}; @@ -59,7 +58,13 @@ fn main() -> anyhow::Result<()> { thread::sleep(Duration::from_millis(2000)); - let _ = mdm.chip_info()?; + //println!("setting up client TLS cert"); + //let client_cert = include_bytes!("../certs/full-bin.p12"); + //let client_cert_path = "C:\\USER\\fullchain.pem"; + + //let _ = mdm.upload_cert(client_cert_path, client_cert)?; + //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; + //let _ = mdm.fs_list("C:\\USER\\")?; loop { if mdm.is_gprs_attached()? { @@ -73,26 +78,54 @@ fn main() -> anyhow::Result<()> { } } - let _ = mdm.location()?; - let _ = mdm.ssl_opt()?; - - println!("connecting to server!"); + //println!("connecting to server!"); //if !mdm.tcp_is_ssl_enabled()? { // let _ = mdm.tcp_ssl_enable()?; //} - //if mdm.tcp_is_ssl_enabled()? { - // let _ = mdm.tcp_ssl_disable()?; - //} - // + if mdm.tcp_is_ssl_enabled()? { + let _ = mdm.tcp_ssl_disable()?; + } + + let client_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; let message = "{\"lat\": 20.475370, \"long\": 44.747224}"; - let url = "https://a.tracker.called.quest"; - //let url = "https://rest.iot.fr-par.scw.cloud"; - let token = "eyJOZXRJRCI6ImUwYTc5YzM2LWExZjYtNDNiMC1hMGY3LWE2YTk1OGI3Zjk1ZCIsIk5ldEtleSI6IjZhZGU2ZWZkLThiMGYtNDQ3ZC1hNWY5LThkNDJjNDk4NDQ3MSJ9"; - let reply = mdm.http_post(url, token, message.as_bytes())?; - println!("+++++++++++++++++++++++++++++++++"); - println!("REPLY({}) = {}", reply.len(), reply); - println!("+++++++++++++++++++++++++++++++++"); - let _ = mdm.http_close()?; + + let _ = mdm.tcp_set_quick_mode(false); + let _ = mdm.tcp_set_manual_receive(true); + + let _ = mdm.tcp_connect("51.158.66.64", 1883)?; + + let mut buf = Vec::new(); + + let mut conn = ConnectPacket::new(client_id); + conn.set_clean_session(true); + conn.set_keep_alive(0); + let _ = conn.encode(&mut buf)?; + let _ = mdm.tcp_send(&mut buf)?; + buf.clear(); + + thread::sleep(Duration::from_millis(1000)); + + let size = mdm.tcp_receive_reply_len()?; + let mut reply = vec![0 as u8; size]; + let _ = mdm.tcp_receive(&mut reply); + println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::()); + + println!("sending publish packet"); + let packet = PublishPacket::new( + TopicName::new("bajsevi/test").unwrap(), + QoSWithPacketIdentifier::Level0, + message.as_bytes(), + ); + let _ = packet.encode(&mut buf)?; + let _ = mdm.tcp_send(&mut buf)?; + buf.clear(); + + let size = mdm.tcp_receive_reply_len()?; + let mut reply = vec![0 as u8; size]; + let _ = mdm.tcp_receive(&mut reply); + println!("TCP Received: {}", reply.iter().map(|b| char::from(*b)).collect::()); + + let _ = mdm.tcp_close_connection()?; break } diff --git a/src/modem.rs b/src/modem.rs index 190eaea..9a12acb 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -140,12 +140,10 @@ impl Modem { Ok(()) } - /// Reads the serial RX until a \\n char is encoutered, or a timeout is reached. The timeout is - /// provided on input via the `timeout` argument. The first argument `contains` is checked - /// against a line in the response, if it's there the reading stops. - /// - /// If `contains` is `None`, the first line only is returned in the response. If it's - /// `Some(match_txt)`, then the end of the response is matched against `match_txt`. + /// Reads the serial RX until the `contains` string is encoutered if `contains` is Some(s), if + /// None, then the first line is returned. If a timeout is reached. The timeout is provided on + /// input via the `timeout` argument. The first argument `contains` is checked against every + /// line in the response. fn read_response(&mut self, contains: Option, timeout: Duration) -> Result { let mut response = String::new(); let start = Instant::now(); @@ -166,7 +164,7 @@ impl Modem { #[inline(always)] fn send_bytes(&mut self, payload: &[u8], eos: Option) -> Result<()> { - self.rx.clear(); + //self.rx.clear(); for b in payload.iter() { nb::block!(self.tx.write(*b)) .map_err(|_| ModemError::CommandError(format!("error writing {} to serial", b)))?; @@ -188,19 +186,13 @@ impl Modem { .map(char::from) .take_while(|c| *c != '>').collect(); - if send_request == "" { + if send_request != "\r\n" { + println!("{:?}", send_request.as_bytes()); return Err(ModemError::SendDataError); } self.send_bytes(buf, Some(26))?; // 26_u8 = Ctrl+z - to end sending data - let _ = self.read_response(Some("DATA ACCEPT".to_string()), Duration::from_millis(3000)); - - let res = self.send_command(Command { - text: "AT+CIPACK".to_string(), - contains: Some("OK".to_string()), - timeout: Duration::from_millis(3000), - })?; - Ok(res) + self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000)) } pub fn gprs_status(&mut self) -> Result { @@ -257,8 +249,8 @@ impl Modem { Ok(()) } - pub fn tcp_set_manual_receive(&mut self) -> Result<()> { - self.send_command(Command::tcp_set_manual_receive())?; + pub fn tcp_set_manual_receive(&mut self, is_manual: bool) -> Result<()> { + self.send_command(Command::tcp_set_manual_receive(is_manual))?; Ok(()) } @@ -289,25 +281,22 @@ impl Modem { pub fn tcp_receive(&mut self, buf: &mut [u8]) -> Result { let mut size = 0; loop { - let reply: usize = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) - .map(|reply: String| { - reply.lines() - .map(|line| { - if !line.starts_with("\r") && !line.contains("+CIPRXGET: 2,") && !line.contains("OK") { - line.chars().enumerate().map(|(idx, c)| { buf[size + idx] = c as u8; }).count() - } - else { - 0 - } - }) - .sum() + let reply = self.send_command(Command::tcp_receive(MAX_TCP_MANUAL_REPLY_SIZE)) + .map(|reply| { + reply + .split("\r\n") + .filter(|line| line.len() > 2 && !line.contains("+CIPRXGET: 2,")) + .next() + .map(|line| line.chars().enumerate().map(|(idx, c)| buf[size + idx] = c as u8).count()) })?; - if reply == 0 { - break Ok(size) - } - else { - size += reply; - continue + match reply { + Some(0) | None => { + break Ok(size) + }, + Some(x) => { + size += x; + continue + }, } } } @@ -318,16 +307,26 @@ impl Modem { pub fn http_post(&mut self, url: &str, token: &str, content: &[u8]) -> Result { let _ = self.send_command(Command::http_init()); - let _ = self.send_command(Command::http_set_ssl(true)); let _ = self.send_command(Command::http_set_cid()); - let _ = self.send_command(Command::http_init_url(url)); + let _ = self.send_command(Command::http_set_url(url)); let _ = self.send_command(Command::http_set_header("X-Secret", token)); let _ = self.send_command(Command::http_set_header("X-Topic", "device-dev")); let _ = self.send_command(Command::http_set_content("application/json")); + let _ = self.send_command(Command::http_set_ssl(true)); let _ = self.send_command(Command::http_post_len(content.len(), 100000)); let _ = self.send_bytes(content, Some(26)); let _ = self.send_command(Command::http_post()); - self.send_command(Command::http_response()) + self.send_command(Command::http_read_response()) + } + + pub fn http_get(&mut self, url: &str) -> Result { + let _ = self.send_command(Command::http_init()); + let _ = self.send_command(Command::http_set_cid()); + let _ = self.send_command(Command::http_set_url(url)); + let _ = self.send_command(Command::http_set_redirect(true)); + let _ = self.send_command(Command::http_set_ssl(true)); + let _ = self.send_command(Command::http_get()); + self.send_command(Command::http_read_response()) } pub fn http_close(&mut self) -> Result<()> { @@ -353,4 +352,47 @@ impl Modem { let _ = self.send_command(Command::ssl_opt())?; Ok(()) } + + fn file_write(&mut self, buf: &[u8], path: &str, append: bool, input_time_sec: usize) -> Result<()> { + let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec); + let _ = self.send_bytes(cmd.text.as_bytes(), Some('\r' as u8))?; + let send_request: String = self.rx.reset(Duration::from_millis(3000)) + .map(char::from) + .take_while(|c| *c != '>').collect(); + + if send_request == "" { + return Err(ModemError::SendDataError); + } + + self.send_bytes(buf, None)?; + let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000)); + + Ok(()) + } + + pub fn upload_cert(&mut self, path: &str, cert: &[u8]) -> Result<()> { + let _ = self.send_command(Command::fs_file_create(path))?; + let _ = self.file_write(cert, path, false, 20000)?; + Ok(()) + } + + pub fn fs_list(&mut self, path: &str) -> Result<()> { + let _ = self.send_command(Command::fs_list(path))?; + Ok(()) + } + + pub fn fs_free_space(&mut self) -> Result<()> { + let _ = self.send_command(Command::fs_free_size())?; + Ok(()) + } + + pub fn ssl_set_client_cert(&mut self, path: &str, password: &str) -> Result<()> { + let _ = self.send_command(Command::ssl_set_client_cert(path, password))?; + Ok(()) + } + + pub fn ssl_set_root_cert(&mut self, path: &str, filesize: usize) -> Result<()> { + let _ = self.send_command(Command::ssl_set_root_cert(path, filesize))?; + Ok(()) + } }