move connecting and sending to func

This commit is contained in:
Vladan Popovic 2023-02-20 14:55:00 +01:00
parent 09402bbf83
commit 9d536bdce9

View file

@ -483,7 +483,7 @@ impl<UART: serial::Uart> Modem<UART> {
conn.set_user_name(Some(username.to_string())); conn.set_user_name(Some(username.to_string()));
conn.set_password(Some(password.to_string())); conn.set_password(Some(password.to_string()));
let _ = conn.encode(&mut buf)?; let _ = conn.encode(&mut buf)?;
self.tcp_manual_send(&mut buf).ok(); let _ = self.tcp_manual_send(&mut buf)?;
let reply = self.mqtt_receive_reply()?; let reply = self.mqtt_receive_reply()?;
println!("mqtt decoded packet: ({:?})", reply); println!("mqtt decoded packet: ({:?})", reply);
@ -503,9 +503,7 @@ impl<UART: serial::Uart> Modem<UART> {
message.as_bytes(), message.as_bytes(),
); );
let _ = packet.encode(&mut buf)?; let _ = packet.encode(&mut buf)?;
println!("created mqtt publish packet ... ({})", self.tcp_manual_send(&mut buf)?;
std::str::from_utf8(buf.as_slice()).unwrap_or(""));
self.tcp_manual_send(&mut buf).ok();
Ok(()) Ok(())
} }
} }
@ -552,8 +550,6 @@ where
let mqtt_username = include_str!("../secret/username").trim(); let mqtt_username = include_str!("../secret/username").trim();
let mqtt_password = include_str!("../secret/password").trim(); let mqtt_password = include_str!("../secret/password").trim();
mdm.init(pwrkey, rst, power)?;
// thread::sleep(Duration::from_millis(500)); // thread::sleep(Duration::from_millis(500));
//println!("setting up client TLS cert"); //println!("setting up client TLS cert");
@ -564,61 +560,63 @@ where
//let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?; //let _ = mdm.ssl_set_client_cert(client_cert_path, "t")?;
//let _ = mdm.fs_list("C:\\USER\\")?; //let _ = mdm.fs_list("C:\\USER\\")?;
let _ = mdm.echo(false); fn start_sending(mdm: &mut Modem<serial::UART1>, mqtt_username: &str, mqtt_password: &str, receiver: Receiver<Msg>) -> anyhow::Result<()> {
// Retry 5 times to open a TCP connection, otherwise fail and wait for reboot.
let mut retries = 0;
loop {
if !mdm.is_gprs_attached()? { if !mdm.is_gprs_attached()? {
let _ = mdm.gprs_attach_ap(crate::config::MTS)?; let _ = mdm.gprs_attach_ap(crate::config::MTS)?;
let _ = mdm.try_connect_gprs()?;
} }
if let Ok(()) = mdm.try_connect_gprs() { // When command AT+CIPQSEND=0, it is in normal sending mode. In this mode, after user
// When command AT+CIPQSEND=0, it is in normal sending mode. In this mode, after user // sends data by AT+CIPSEND, if the server receives TCP data, it will give ACK message
// sends data by AT+CIPSEND, if the server receives TCP data, it will give ACK message // to module, and the module will respond SEND OK.
// to module, and the module will respond SEND OK. let _ = mdm.send("AT+CIPQSEND=0", "OK");
let _ = mdm.send("AT+CIPQSEND=0", "OK"); // Enables getting data from network manually.
// Enables getting data from network manually. let _ = mdm.send("AT+CIPRXGET=1", "OK");
let _ = mdm.send("AT+CIPRXGET=1", "OK");
if let Err(_) = mdm.tcp_connect("51.158.66.64", 7887) { for _ in 0..5 {
if retries < 5 { if let Ok(_) = mdm.tcp_connect("51.158.66.64", 7887) {
retries += 1; break
continue; }
}
let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267";
println!("connecting to MQTT with ({}:{})", mqtt_username, mqtt_password);
let _ = mdm.mqtt_connect(device_id, mqtt_username, mqtt_password)?;
println!("entering queue receive loop ...");
let mut err_count = 0;
let _ = loop {
match receiver.recv() {
Ok(Msg::Gps(solution)) => {
println!("received GPS solution {:?} | sending to mqtt ...", solution);
serde_json_core::ser::to_string::<Solution, 512>(&solution)
.map_err(|e| anyhow::Error::new(e))
.and_then(|sol| mdm.mqtt_publish(device_id, &sol))?;
err_count = 0;
},
Ok(Msg::Accelerometer(acc)) => {
println!("received accel {} | sending to mqtt ...", acc);
let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?;
err_count = 0;
}
Err(e) => {
if err_count < 5 {
err_count += 1;
println!("received error {} | NOT sending to mqtt ...", e);
}
else {
break
}
} }
} }
};
let device_id = "c36a72df-5bd6-4f9b-995d-059433bc3267"; Ok(())
println!("connecting to MQTT with ({}:{})", mqtt_username, mqtt_password);
let _ = mdm.mqtt_connect(device_id, mqtt_username, mqtt_password)?;
println!("entering queue receive loop ...");
let mut err_count = 0;
let _ = loop {
match receiver.recv() {
Ok(Msg::Gps(solution)) => {
println!("received GPS solution {:?} | sending to mqtt ...", solution);
serde_json_core::ser::to_string::<Solution, 512>(&solution)
.map_err(|e| anyhow::Error::new(e))
.and_then(|sol| mdm.mqtt_publish(device_id, &sol))?;
err_count = 0;
},
Ok(Msg::Accelerometer(acc)) => {
println!("received accel {} | sending to mqtt ...", acc);
let _ = mdm.mqtt_publish(device_id, &format!("{:?}", acc))?;
err_count = 0;
}
Err(e) => {
if err_count < 5 {
err_count += 1;
println!("received error {} | NOT sending to mqtt ...", e);
}
else {
break
}
}
}
};
let _ = mdm.tcp_close_connection()?;
}
} }
mdm.init(pwrkey, rst, power)?;
let _ = mdm.echo(false)?;
let _ = start_sending(&mut mdm, mqtt_username, mqtt_password, receiver)?;
let _ = mdm.tcp_close_connection()?;
Ok(())
} }