diff --git a/src/main.rs b/src/main.rs index 206eecb..62fb868 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,84 +1,111 @@ -use futures_util::{ - StreamExt, - stream::{ - SplitSink, - SplitStream, - Stream, - }, -}; -use tokio::{ - time::{ - Duration, - interval, - }, - net::{ - TcpListener, - TcpStream, - }, -}; -use tokio_tungstenite::{ - accept_async_with_config, - WebSocketStream, - tungstenite::{ - Message, - Result, - Error as WsError, - protocol::WebSocketConfig, - }, -}; -use tokio_stream::wrappers::IntervalStream; +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; +use tokio_tungstenite::tungstenite::protocol::Message; +use tokio_tungstenite::WebSocketStream; +use futures_util::{StreamExt, SinkExt}; +use futures_util::stream::SplitSink; +use serde::Serialize; -type WsTx = SplitSink, Message>; -type WsRx = SplitStream>; - -async fn accept(tcp_stream: TcpStream) -> Result, WsError> { - let wsconfig = WebSocketConfig { - max_send_queue: Some(5), - max_message_size: None, - max_frame_size: None, - accept_unmasked_frames: false, - }; - accept_async_with_config(tcp_stream, Some(wsconfig)).await +#[derive(Debug, Serialize)] +pub enum Hemisphere { + North, + South, + East, + West, } -async fn handle(mut rx: WsRx) -> anyhow::Result<()> { - println!("enter incoming loop"); - while let Some(msg) = rx.next().await { - match msg { - Ok(Message::Text(x)) => { println!("Got Text Message: {x:?}"); }, - Ok(x) => { println!("Got another Message: {x:?}"); }, - Err(x) => { println!("Got ERROR: {x:?}"); }, - } +#[derive(Debug, Serialize)] +pub struct Latitude { + degrees: u8, + minutes: u8, + seconds: f32, + hemisphere: Hemisphere, +} + +#[derive(Debug, Serialize)] +pub struct Longitude { + degrees: u8, + minutes: u8, + seconds: f32, + hemisphere: Hemisphere, +} + +pub struct Solution { + pub latitude: Latitude, + pub longitude: Longitude, + pub altitude: Option, + pub speed: Option, + pub direction: Option, +} + +struct DeviceLocation { + device_id: String, + solution: Solution, +} + +async fn stream_locations(mut writer: SplitSink, Message>) -> anyhow::Result<()> { + let device_id = "TESTTEST6355432da4242TESTTEST"; + let mut lat = 44.76027; + let mut long = 20.47723; + + for i in 1..100 { + lat = lat + if i > 50 { -0.001 } else { 0.001 }; + long = long + if i > 50 { -0.001 } else { 0.001 }; + let msg = Message::Text( + format!("{{\"device_id\":\"{}\", \"lat\":{}, \"long\":{}}}", + device_id, lat, long, + ) + ); + let _ = writer.send(msg).await?; + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } - Ok(()) -} -async fn serve>(tx: WsTx, input: S) -> Result<(), WsError> { - input - .map(Message::Text) - .map(Ok) - .forward(tx).await + Ok(()) } #[tokio::main] async fn main() -> anyhow::Result<()> { - let listener = TcpListener::bind("127.0.0.1:8080").await?; + let addr = "127.0.0.1:19999"; - while let Ok((tcp_stream, addr)) = listener.accept().await { - println!("Got a connection from: {addr}"); + let tcp_server = TcpListener::bind(&addr).await?; - let ws_result = accept(tcp_stream).await; - let (tx, rx) = ws_result.map(|ws| ws.split())?; + println!("listening on {}", addr); - // in reality this is an mqtt stream - let interval_stream = IntervalStream::new(interval(Duration::from_secs(1))); - let stream = interval_stream.zip(futures_util::stream::repeat("djshjh".to_string())) - .map(|(i, txt)| format!("{i:?}: {txt}")); + while let Ok((tcp_stream, addr)) = tcp_server.accept().await { + println!("Got a connection from: {}", addr); - tokio::select! { - res = handle(rx) => println!("Done waiting for messages. Reason: {res:?}"), - _ = serve(tx, stream) => println!("Done serving locations.") + let wsconfig = WebSocketConfig { + max_send_queue: Some(10), + max_message_size: None, + max_frame_size: None, + accept_unmasked_frames: true, + }; + + let ws_stream = tokio_tungstenite::accept_async_with_config(tcp_stream, Some(wsconfig)) + .await + .expect("Error during the websocket handshake occurred"); + + let (writer, mut reader) = ws_stream.split(); + + let token_optres = reader.next().await; + + if let Some(Ok(token)) = token_optres { + let local_token = include_str!("../secret/token").trim(); + println!("{}", local_token); + if token.into_text()? == local_token { + println!("streaming locations"); + match stream_locations(writer).await { + Ok(_) => println!("closing connection to client ..."), + Err(error) => println!("connection to client raised an error: ({})", error), + } + } + else { + println!("Closing client connection due to bad token!"); + let mut s = writer.reunite(reader)?; + s.close(None).await?; + } } + } Ok(())