From 09402bbf83e6e5a45a1b84ce19a342f8d55e1ddb Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sat, 18 Feb 2023 15:42:11 +0100 Subject: [PATCH] gps and mqtt pub/sub works! --- Cargo.toml | 4 +- sdkconfig.defaults | 2 +- src/gps.rs | 195 ++++++++++----------------------------------- src/main.rs | 15 ++-- src/modem.rs | 20 ++++- src/types.rs | 103 ++++++++++++++++++++++-- 6 files changed, 168 insertions(+), 171 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ff3999..5eaa44d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,9 @@ esp-idf-hal = "0.37.4" esp-idf-sys = { version = "0.31.5", features = ["binstart", "native"] } mqtt-protocol = "0.11.2" nb = "1.0.0" -ublox = "0.4.2" +nmea0183 = "0.3.0" +serde-json-core = "0.5.0" +serde = "*" [build-dependencies] embuild = "0.29" diff --git a/sdkconfig.defaults b/sdkconfig.defaults index cd6dab8..6e54cee 100644 --- a/sdkconfig.defaults +++ b/sdkconfig.defaults @@ -1,5 +1,5 @@ # Rust often needs a bit of an extra main task stack size compared to C (the default is 3K) -CONFIG_ESP_MAIN_TASK_STACK_SIZE=64000 +CONFIG_ESP_MAIN_TASK_STACK_SIZE=128000 # Use this to set FreeRTOS kernel tick frequency to 1000 Hz (100 Hz by default). # This allows to use 1 ms granuality for thread sleeps (10 ms by default). diff --git a/src/gps.rs b/src/gps.rs index 2b0b943..40e84af 100644 --- a/src/gps.rs +++ b/src/gps.rs @@ -3,96 +3,41 @@ use anyhow; use std::{ sync::mpsc::SyncSender, thread, - time::{Duration, Instant}, - io::{Read, Write}, + time::Duration, + io::Read, }; use esp_idf_hal::prelude::*; use esp_idf_hal::serial::{self, Rx, Tx}; -use ublox::*; +use nmea0183::{Parser, ParseResult, Sentence, Source}; -use crate::types::*; +use crate::types::Msg; use crate::serial::SerialIO; struct GpsModule { port: SerialIO, - parser: Parser>, } impl GpsModule { pub fn new(tx: Tx, rx: Rx) -> Self { - let parser = Parser::default(); - GpsModule { port: SerialIO::new(tx, rx), parser } - } - - pub fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> { - println!("WRITE: {:?}", data); - self.port.write(data).map(|_| ()) - } - - pub fn update(&mut self, mut cb: T) -> std::io::Result<()> { - println!("UPDATING ... ... ..."); - loop { - let mut local_buf = [0; 1024]; - let nbytes = self.read_port(&mut local_buf)?; - if nbytes == 0 { - println!("no bytes to read :("); - break; - } - // parser.consume adds the buffer to its internal buffer, and - // returns an iterator-like object we can use to process the packets - let mut it = self.parser.consume(&local_buf[..nbytes]); - while let Some(Ok(packet)) = it.next() { - println!("READ: {:?}", packet); - cb(packet); - } - } - Ok(()) - } - - pub fn wait_for_ack(&mut self, timeout: Duration) -> std::io::Result { - let mut found_packet = false; - println!("LOOKING FOR ACK ..."); - let start = Instant::now(); - while !found_packet && start.elapsed() < timeout { - self.update(|packet| { - if let PacketRef::AckAck(ack) = packet { - if ack.class() == T::CLASS && ack.msg_id() == T::ID { - println!("FOUND PACKET: {} {}", ack.class(), ack.msg_id()); - found_packet = true; - } - } - else if let PacketRef::AckNak(nak) = packet { - println!("NAK PACKET: {} {}", nak.class(), nak.msg_id()); - } - })?; - } - println!("exiting wait_for_ack"); - Ok(found_packet) - } - - /// Reads the serial port, converting timeouts into "no data received" - fn read_port(&mut self, output: &mut [u8]) -> std::io::Result { - match self.port.read(output) { - Ok(b) => Ok(b), - Err(e) => { - if e.kind() == std::io::ErrorKind::TimedOut { - Ok(0) - } else { - Err(e) - } - } + GpsModule { + port: SerialIO::new(tx, rx), } } } -pub fn main( - tx: esp_idf_hal::gpio::Gpio12, - rx: esp_idf_hal::gpio::Gpio13, +pub fn main +( + tx: PTx, + rx: PRx, uart: serial::UART2, sender: SyncSender, -) -> Result<(), anyhow::Error> { +) -> std::result::Result<(), anyhow::Error> +where + PRx: esp_idf_hal::gpio::Pin + esp_idf_hal::gpio::InputPin + esp_idf_hal::gpio::OutputPin, + PTx: esp_idf_hal::gpio::Pin + esp_idf_hal::gpio::InputPin + esp_idf_hal::gpio::OutputPin, +{ let serial_pins = serial::Pins { tx, rx, @@ -100,100 +45,46 @@ pub fn main( rts: None, }; - let serial_config = serial::config::Config::default() - .baudrate(Hertz(9600)) - .data_bits(serial::config::DataBits::DataBits8) - .parity_none() - .flow_control(serial::config::FlowControl::None) - .stop_bits(serial::config::StopBits::STOP1); - let serial: serial::Serial = serial::Serial::new( uart, serial_pins, - serial_config, + serial::config::Config::default().baudrate(Hertz(9600)), )?; let (tx, rx) = serial.split(); let mut device = GpsModule::new(tx, rx); - // Configure the device to talk UBX - device - .write_all( - &CfgPrtUartBuilder { - portid: UartPortId::Uart1, - reserved0: 0, - tx_ready: 0, - mode: UartMode::new(DataBits::Eight, Parity::None, StopBits::One), - baud_rate: 9600, - in_proto_mask: InProtoMask::all(), - out_proto_mask: OutProtoMask::UBOX, - flags: 0, - reserved5: 0, - } - .into_packet_bytes(), - )?; - device.wait_for_ack::(Duration::from_millis(3000)).unwrap(); - println!("CfgPrtUart acked!"); + let mut parser = Parser::new() + .source_only(Source::GPS) + .sentence_filter(Sentence::GLL | Sentence::GGA); - // Set interval for the NavPosVelTime packet - println!("Sending set_rate_for:: ..."); - for i in 1..5 { - device - .write_all( - &CfgMsgAllPortsBuilder::set_rate_for::([0, 1, 1, 0, 0, 0]) - .into_packet_bytes(), - ) - .unwrap(); - println!("SENT set_rate_for::({}) !!!", i); - if let Ok(true) = device.wait_for_ack::(Duration::from_millis(3000)) { - println!("Setting rate for NavPosVelTime acked! Exiting loop ..."); - break - } - } - - // Send a packet request for the MonVer packet - //device - // .write_all(&UbxPacketRequest::request_for::().into_packet_bytes()) - // .unwrap(); - - // Start reading data - println!("Opened u-blox device, waiting for solutions..."); - for _ in 0..20 { - device - .update(|packet| match packet { - PacketRef::MonVer(packet) => { - println!( - "SW version: {} HW version: {}", - packet.software_version(), - packet.hardware_version() - ); - println!("{:?}", packet); - } - PacketRef::NavPosVelTime(sol) => { - let has_posvel = sol.fix_type() == GpsFix::Fix3D - || sol.fix_type() == GpsFix::GPSPlusDeadReckoning; - - if has_posvel { - let pos: Position = (&sol).into(); - let vel: Velocity = (&sol).into(); - let solution = Solution { - latitude: pos.lat, - longitude: pos.lon, - altitude: pos.alt, - speed: vel.speed, - direction: vel.heading, - }; - println!("Sol: {:?}", solution); - sender.send(Msg::Gps(solution)); + let mut c = 0; + let mut nmea = [0_u8; 1024]; + loop { + if let Ok(_) = device.port.read(nmea.as_mut_slice()) { + println!("\r\n\r\n\r\n\r\n"); + for result in parser.parse_from_bytes(&nmea[..]) { + match result { + Ok(ParseResult::GLL(Some(gll))) => { + sender.send(Msg::Gps(gll.into()))?; + }, + Ok(ParseResult::GGA(Some(gga))) => { + sender.send(Msg::Gps(gga.into()))?; } + _ => { } } - _ => { - println!("{:?}", packet); - } - })?; - thread::sleep(Duration::from_millis(1000)); + } + c = 0; + } else { + println!("nothing to read after {} tries ...", c); + if c > 100 { + println!("reached {} retries ... bailing!", c); + break; + } + } + thread::sleep(Duration::from_millis(5000)); + c += 1; } - println!("exiting GPS sender loop :)"); Ok(()) } diff --git a/src/main.rs b/src/main.rs index e4449fd..948a843 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod command; mod modem; mod serial; mod types; +mod gps; use anyhow; use std::{thread::{self, JoinHandle}, time::Duration}; @@ -31,19 +32,19 @@ fn main() -> anyhow::Result<()> { let mut threads: Vec>> = vec![]; // // Rx/Tx pins for the GPS modem - // let gps_rx = dp.pins.gpio13; - // let gps_tx = dp.pins.gpio12; + let gps_rx = dp.pins.gpio32; + let gps_tx = dp.pins.gpio33; // // UART interface for the GPS modem - // let gps_uart = dp.uart2; + let gps_uart = dp.uart2; let (gps_sender, receiver) = std::sync::mpsc::sync_channel::(1); - let accel_sender = gps_sender.clone(); + //let accel_sender = gps_sender.clone(); - // let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; - // threads.push(thread::spawn(move || gps::main(gps_rx, gps_tx, gps_uart, gps_sender))); + //let _ = gps::main(gps_tx, gps_rx, gps_uart, gps_sender)?; + threads.push(thread::spawn(move || gps::main(gps_tx, gps_rx, gps_uart, gps_sender))); thread::sleep(Duration::from_millis(1000)); - threads.push(thread::spawn(move || accel::main(accel_sender))); + //threads.push(thread::spawn(move || accel::main(accel_sender))); let _ = modem::main(modem_rx, modem_tx, modem_uart, modem_pwrkey, modem_rst, modem_power, receiver)?; diff --git a/src/modem.rs b/src/modem.rs index 4594936..db49dd8 100644 --- a/src/modem.rs +++ b/src/modem.rs @@ -18,8 +18,18 @@ use esp_idf_hal::serial::{self, Rx, Tx}; use embedded_hal::digital::v2::OutputPin; -use mqtt::packet::{ConnectPacket, PublishPacket, QoSWithPacketIdentifier, VariablePacket}; -use mqtt::{Encodable, Decodable, TopicName}; +use mqtt::{ + Encodable, + Decodable, + TopicName, + packet::{ + ConnectPacket, + PublishPacket, + QoSWithPacketIdentifier, + VariablePacket, + }, +}; +use serde_json_core; pub type Result = std::result::Result; @@ -173,7 +183,7 @@ impl Modem { let _ = self.handle_prompt()?; println!("Handled prompt OK!!"); - println!("Writing bytes in serial TX! ({:?})", String::from_utf8(buf.to_vec())); + println!("Writing bytes in serial TX! ({:?})", buf.into_iter().map(|b| char::from(*b)).collect::()); self.serial .write_bytes(buf) .map_err(|err| ModemError::SendDataError(format!("{:?}", err)))?; @@ -587,7 +597,9 @@ where match receiver.recv() { Ok(Msg::Gps(solution)) => { println!("received GPS solution {:?} | sending to mqtt ...", solution); - let _ = mdm.mqtt_publish(device_id, &format!("{:?}", solution))?; + serde_json_core::ser::to_string::(&solution) + .map_err(|e| anyhow::Error::new(e)) + .and_then(|sol| mdm.mqtt_publish(device_id, &sol))?; err_count = 0; }, Ok(Msg::Accelerometer(acc)) => { diff --git a/src/types.rs b/src/types.rs index ff52839..89c186d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,12 +1,103 @@ #![allow(dead_code)] +use nmea0183::{ + GGA, + GLL, + coords::{ + Latitude as NMEALatitude, + Longitude as NMEALongitude, + Hemisphere as NMEAHemisphere, + }, +}; +use serde::Serialize; -#[derive(Debug)] +#[derive(Debug, Serialize)] +pub enum Hemisphere { + North, + South, + East, + West, +} + +#[derive(Debug, Serialize)] +pub struct Latitude { + degrees: u8, + minutes: u8, + seconds: f32, + hemisphere: Hemisphere, +} + +#[derive(Debug, Serialize)] +pub struct Longitude { + degrees: u8, + minutes: u8, + seconds: f32, + hemisphere: Hemisphere, +} + +impl From for Latitude { + fn from(lat: NMEALatitude) -> Self { + Self { + degrees: lat.degrees, + minutes: lat.minutes, + seconds: lat.seconds, + hemisphere: lat.hemisphere.into(), + } + } +} + +impl From for Longitude { + fn from(lon: NMEALongitude) -> Self { + Self { + degrees: lon.degrees, + minutes: lon.minutes, + seconds: lon.seconds, + hemisphere: lon.hemisphere.into(), + } + } +} + +impl From for Hemisphere { + fn from(hem: NMEAHemisphere) -> Self { + match hem { + NMEAHemisphere::North => Self::North, + NMEAHemisphere::South => Self::South, + NMEAHemisphere::East => Self::East, + NMEAHemisphere::West => Self::West, + } + } +} + +#[derive(Debug, Serialize)] pub struct Solution { - pub latitude: f64, - pub longitude: f64, - pub altitude: f64, - pub speed: f64, - pub direction: f64, + pub latitude: Latitude, + pub longitude: Longitude, + pub altitude: Option, + pub speed: Option, + pub direction: Option, +} + +impl From for Solution { + fn from(gga: GGA) -> Self { + Self { + latitude: gga.latitude.into(), + longitude: gga.longitude.into(), + altitude: Some(gga.altitude.meters), + speed: None, + direction: None, + } + } +} + +impl From for Solution { + fn from(gll: GLL) -> Self { + Self { + latitude: gll.latitude.into(), + longitude: gll.longitude.into(), + altitude: None, + speed: None, + direction: None, + } + } } pub enum Msg {