refactor and make mqtt work

This commit is contained in:
Vladan Popovic 2022-11-26 18:59:33 +01:00
parent e42302988a
commit b6e7e64e72
2 changed files with 80 additions and 78 deletions

View file

@ -33,7 +33,7 @@ fn main() -> anyhow::Result<()> {
// UART interface for the GSM modem // UART interface for the GSM modem
let modem_uart = dp.uart1; let modem_uart = dp.uart1;
// let mut threads: Vec<JoinHandle<anyhow::Result<_>>> = vec![]; let mut threads: Vec<JoinHandle<anyhow::Result<_>>> = vec![];
// Rx/Tx pins for the GPS modem // Rx/Tx pins for the GPS modem
let gps_rx = dp.pins.gpio13; let gps_rx = dp.pins.gpio13;
@ -43,16 +43,15 @@ fn main() -> anyhow::Result<()> {
let (gps_sender, receiver) = std::sync::mpsc::sync_channel::<Msg>(1); let (gps_sender, receiver) = std::sync::mpsc::sync_channel::<Msg>(1);
let accel_sender = gps_sender.clone(); let accel_sender = gps_sender.clone();
let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; // let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?;
// threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender))); // threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender)));
//thread::sleep(Duration::from_millis(1000)); //thread::sleep(Duration::from_millis(1000));
//threads.push(thread::spawn(move || accel::main(accel_sender))); threads.push(thread::spawn(move || accel::main(accel_sender)));
//thread::sleep(Duration::from_millis(1000)); //thread::sleep(Duration::from_millis(1000));
//let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?;
Ok(()) Ok(())
} }

View file

@ -31,7 +31,7 @@ pub struct Modem<UART: serial::Uart> {
pub enum ModemError { pub enum ModemError {
CommandError(String), CommandError(String),
SetupError(String), SetupError(String),
SendDataError, SendDataError(String),
ReadError, ReadError,
TimeoutError, TimeoutError,
} }
@ -113,26 +113,26 @@ impl<UART: serial::Uart> Modem<UART> {
println!("Sending {} ...", cmd.text); println!("Sending {} ...", cmd.text);
let _ = self.serial let _ = self.serial
.send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) .send_bytes(cmd.text.as_bytes(), Some('\r' as u8))
.map_err(|_| ModemError::SendDataError)?; .map_err(|_| ModemError::SendDataError(format!("Error in send_command({:?})", cmd)))?;
self.read_response(cmd.contains, cmd.timeout) self.read_response(cmd.contains, cmd.timeout)
} }
fn tcp_send_data(&mut self, buf: &[u8]) -> Result<String> { fn tcp_manual_send_data(&mut self, buf: &[u8]) -> Result<String> {
let _ = self.serial let _ = self.serial
.write("AT+CIPSEND\r".as_bytes()) .write("AT+CIPSEND\r".as_bytes())
.map_err(|_| ModemError::SendDataError)?; .map_err(|_| ModemError::SendDataError("Error in tcp_manual_send_data ... AT_CIPSEND\\r".to_string()))?;
let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000))
.map(char::from) .map(char::from)
.take_while(|c| *c != '>').collect(); .take_while(|c| *c != '>').collect();
if send_prompt != "\r\n" { if send_prompt != "\r\n" {
println!("{:?}", send_prompt.as_bytes()); let msg = format!("Prompt error, expected \\r\\n, got {:?}", send_prompt.as_bytes());
return Err(ModemError::SendDataError); return Err(ModemError::SendDataError(msg));
} }
self.serial self.serial
.send_bytes(buf, Some(26)) // 26_u8 = Ctrl+z - to end sending data .send_bytes(buf, Some(26_u8)) // 26_u8 = Ctrl+z - to end sending data
.map_err(|_| ModemError::SendDataError)?; .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?;
self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000)) self.read_response(Some("SEND OK".to_string()), Duration::from_millis(3000))
} }
@ -164,6 +164,26 @@ impl<UART: serial::Uart> Modem<UART> {
Ok(res.contains("+CGATT: 1")) Ok(res.contains("+CGATT: 1"))
} }
fn try_connect_gprs(&mut self) -> Result<()> {
let mut retries = 0;
loop {
if self.is_gprs_attached()? {
let _ = self.gprs_connect()?;
thread::sleep(Duration::from_millis(1000));
let ip_addr = self.gprs_status()?;
if ip_addr.contains("0.0.0.0") && retries < 5 {
thread::sleep(Duration::from_millis(1000));
retries += 1;
continue
} else if retries < 5 {
break Ok(())
} else {
break Err(ModemError::SetupError(format!("Cannot connect to GPRS after {} retries ... bailing!", retries)));
}
}
}
}
pub fn tcp_is_ssl_enabled(&mut self) -> Result<bool> { pub fn tcp_is_ssl_enabled(&mut self) -> Result<bool> {
let res = self.send_command(Command::tcp_ssl_check())?; let res = self.send_command(Command::tcp_ssl_check())?;
Ok(res.contains("+CIPSSL: (1)")) Ok(res.contains("+CIPSSL: (1)"))
@ -194,8 +214,8 @@ impl<UART: serial::Uart> Modem<UART> {
.map(|_| ()) .map(|_| ())
} }
pub fn tcp_send(&mut self, buf: &[u8]) -> Result<()> { pub fn tcp_manual_send(&mut self, buf: &[u8]) -> Result<()> {
self.tcp_send_data(buf) self.tcp_manual_send_data(buf)
.map(|_| ()) .map(|_| ())
} }
@ -303,18 +323,18 @@ impl<UART: serial::Uart> Modem<UART> {
let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec); let cmd = Command::fs_file_write(path, append, buf.len(), input_time_sec);
let _ = self.serial let _ = self.serial
.send_bytes(cmd.text.as_bytes(), Some('\r' as u8)) .send_bytes(cmd.text.as_bytes(), Some('\r' as u8))
.map_err(|_| ModemError::SendDataError)?; .map_err(|err| ModemError::SendDataError(format!("File write error ({:?})", err)))?;
let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000)) let send_prompt: String = self.serial.rx.reset(Duration::from_millis(3000))
.map(char::from) .map(char::from)
.take_while(|c| *c != '>').collect(); .take_while(|c| *c != '>').collect();
if send_prompt == "" { if send_prompt == "" {
return Err(ModemError::SendDataError); return Err(ModemError::SendDataError("Prompt empty, expected: \\r\\n".to_string()));
} }
self.serial self.serial
.send_bytes(buf, None) .send_bytes(buf, None)
.map_err(|_| ModemError::SendDataError)?; .map_err(|err| ModemError::SendDataError(format!("Error sending bytes via serial ({:?})", err)))?;
let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000)); let _ = self.read_response(Some("OK".to_string()), Duration::from_millis(3000));
Ok(()) Ok(())
@ -347,7 +367,7 @@ impl<UART: serial::Uart> Modem<UART> {
} }
fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> { fn mqtt_receive_reply(&mut self) -> std::result::Result<(), anyhow::Error> {
println!("entered receiving modem reply ..."); println!("receiving mqtt reply from modem ...");
let size = self.tcp_receive_reply_len()?; let size = self.tcp_receive_reply_len()?;
println!("receiving reply len({}) ...", size); println!("receiving reply len({}) ...", size);
let mut reply = vec![0 as u8; size]; let mut reply = vec![0 as u8; size];
@ -361,9 +381,9 @@ impl<UART: serial::Uart> Modem<UART> {
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut conn = ConnectPacket::new(device_id); let mut conn = ConnectPacket::new(device_id);
conn.set_clean_session(true); conn.set_clean_session(true);
conn.set_keep_alive(0); conn.set_keep_alive(100);
let _ = conn.encode(&mut buf)?; let _ = conn.encode(&mut buf)?;
let _ = self.tcp_send(&mut buf)?; let _ = self.tcp_manual_send(&mut buf)?;
thread::sleep(Duration::from_millis(2000)); thread::sleep(Duration::from_millis(2000));
drop(buf); drop(buf);
@ -383,7 +403,7 @@ impl<UART: serial::Uart> Modem<UART> {
println!("created mqtt publish packet ..."); println!("created mqtt publish packet ...");
let _ = packet.encode(&mut buf)?; let _ = packet.encode(&mut buf)?;
println!("modem tcp send publish pakage ..."); println!("modem tcp send publish pakage ...");
let _ = self.tcp_send(&mut buf)?; let _ = self.tcp_manual_send(&mut buf)?;
thread::sleep(Duration::from_millis(2000)); thread::sleep(Duration::from_millis(2000));
drop(buf); drop(buf);
@ -402,7 +422,7 @@ impl<UART: serial::Uart> std::io::Read for Modem<UART> {
impl<UART: serial::Uart> std::io::Write for Modem<UART> { impl<UART: serial::Uart> std::io::Write for Modem<UART> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.tcp_send(buf) self.tcp_manual_send(buf)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?; .map_err(|_| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?;
Ok(buf.len()) Ok(buf.len())
} }
@ -439,15 +459,7 @@ pub fn main<T: Sync + Send>(
mdm.init(pwrkey, rst, power)?; mdm.init(pwrkey, rst, power)?;
if !mdm.is_gprs_attached()? { // thread::sleep(Duration::from_millis(500));
let _ = mdm.gprs_attach_ap(
crate::config::A1_GPRS_AP.apn,
crate::config::A1_GPRS_AP.username,
crate::config::A1_GPRS_AP.password,
)?;
}
thread::sleep(Duration::from_millis(1000));
//println!("setting up client TLS cert"); //println!("setting up client TLS cert");
//let client_cert = include_bytes!("../certs/full-bin.p12"); //let client_cert = include_bytes!("../certs/full-bin.p12");
@ -457,49 +469,41 @@ pub fn main<T: Sync + Send>(
//let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?;
//let _ = mdm.fs_list("C:\\USER\\")?; //let _ = mdm.fs_list("C:\\USER\\")?;
let mut retries = 0; loop {
let is_connected: bool = loop { if !mdm.is_gprs_attached()? {
if mdm.is_gprs_attached()? { let _ = mdm.gprs_attach_ap(
let _ = mdm.gprs_connect()?; crate::config::A1_GPRS_AP.apn,
thread::sleep(Duration::from_millis(1000)); crate::config::A1_GPRS_AP.username,
let ip_addr = mdm.gprs_status()?; crate::config::A1_GPRS_AP.password,
if ip_addr.contains("0.0.0.0") && retries < 5 { )?;
thread::sleep(Duration::from_millis(1000));
retries += 1;
continue
} else if retries < 5 {
break true
} else {
break false
} }
} if let Ok(()) = mdm.try_connect_gprs() {
};
if is_connected {
let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267";
let _ = mdm.tcp_set_quick_mode(false); let _ = mdm.tcp_set_quick_mode(false);
let _ = mdm.tcp_set_manual_receive(true); let _ = mdm.tcp_set_manual_receive(true);
let _ = mdm.tcp_connect("51.158.66.64", 1883)?; let _ = mdm.tcp_connect("51.158.66.64", 7887)?;
let _ = mdm.mqtt_connect(device_id)?; let _ = mdm.mqtt_connect(device_id)?;
println!("entering queue receive loop ..."); println!("entering queue receive loop ...");
let mut err_count = 0; let mut err_count = 0;
loop { let _ = loop {
match receiver.recv() { match receiver.recv() {
Ok(Msg::Gps(solution)) => { Ok(Msg::Gps(solution)) => {
println!("received GPS solution {:?} | sending to mqtt ...", solution); println!("received GPS solution {:?} | sending to mqtt ...", solution);
let _ = mdm.mqtt_publish(device_id, &format!("{:?}", solution))?; let _ = mdm.mqtt_publish(device_id, &format!("{:?}", solution))?;
err_count = 0;
}, },
Ok(Msg::Accelerometer(acc)) => { Ok(Msg::Accelerometer(acc)) => {
println!("received message {} | sending to mqtt ...", acc); println!("received accel {} | sending to mqtt ...", acc);
let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?; let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?;
err_count = 0;
} }
Err(e) => { Err(e) => {
println!("received error {} | NOT sending to mqtt ...", e);
if err_count < 10 { if err_count < 10 {
err_count += 1; err_count += 1;
println!("received error {} | NOT sending to mqtt ...", e);
} }
else { else {
break break
@ -510,6 +514,5 @@ pub fn main<T: Sync + Send>(
let _ = mdm.tcp_close_connection()?; let _ = mdm.tcp_close_connection()?;
} }
}
Ok(())
} }