ws server with hardcoded coordinates

This commit is contained in:
Vladan Popovic 2023-05-31 15:55:42 +02:00
parent 0e56905932
commit e83b853925
1 changed files with 94 additions and 67 deletions

View File

@ -1,85 +1,112 @@
use futures_util::{ use tokio::net::{TcpListener, TcpStream};
StreamExt, use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
stream::{ use tokio_tungstenite::tungstenite::protocol::Message;
SplitSink, use tokio_tungstenite::WebSocketStream;
SplitStream, use futures_util::{StreamExt, SinkExt};
Stream, use futures_util::stream::SplitSink;
}, use serde::Serialize;
};
use tokio::{
time::{
Duration,
interval,
},
net::{
TcpListener,
TcpStream,
},
};
use tokio_tungstenite::{
accept_async_with_config,
WebSocketStream,
tungstenite::{
Message,
Result,
Error as WsError,
protocol::WebSocketConfig,
},
};
use tokio_stream::wrappers::IntervalStream;
type WsTx = SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>; #[derive(Debug, Serialize)]
type WsRx = SplitStream<WebSocketStream<tokio::net::TcpStream>>; pub enum Hemisphere {
North,
async fn accept(tcp_stream: TcpStream) -> Result<WebSocketStream<TcpStream>, WsError> { South,
let wsconfig = WebSocketConfig { East,
max_send_queue: Some(5), West,
max_message_size: None,
max_frame_size: None,
accept_unmasked_frames: false,
};
accept_async_with_config(tcp_stream, Some(wsconfig)).await
} }
async fn handle(mut rx: WsRx) -> anyhow::Result<()> { #[derive(Debug, Serialize)]
println!("enter incoming loop"); pub struct Latitude {
while let Some(msg) = rx.next().await { degrees: u8,
match msg { minutes: u8,
Ok(Message::Text(x)) => { println!("Got Text Message: {x:?}"); }, seconds: f32,
Ok(x) => { println!("Got another Message: {x:?}"); }, hemisphere: Hemisphere,
Err(x) => { println!("Got ERROR: {x:?}"); },
} }
#[derive(Debug, Serialize)]
pub struct Longitude {
degrees: u8,
minutes: u8,
seconds: f32,
hemisphere: Hemisphere,
} }
pub struct Solution {
pub latitude: Latitude,
pub longitude: Longitude,
pub altitude: Option<f32>,
pub speed: Option<f32>,
pub direction: Option<f32>,
}
struct DeviceLocation {
device_id: String,
solution: Solution,
}
async fn stream_locations(mut writer: SplitSink<WebSocketStream<TcpStream>, Message>) -> anyhow::Result<()> {
let device_id = "TESTTEST6355432da4242TESTTEST";
let mut lat = 44.76027;
let mut long = 20.47723;
for i in 1..100 {
lat = lat + if i > 50 { -0.001 } else { 0.001 };
long = long + if i > 50 { -0.001 } else { 0.001 };
let msg = Message::Text(
format!("{{\"device_id\":\"{}\", \"lat\":{}, \"long\":{}}}",
device_id, lat, long,
)
);
let _ = writer.send(msg).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Ok(()) Ok(())
} }
async fn serve<S: Stream<Item = String>>(tx: WsTx, input: S) -> Result<(), WsError> {
input
.map(Message::Text)
.map(Ok)
.forward(tx).await
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?; let addr = "127.0.0.1:19999";
while let Ok((tcp_stream, addr)) = listener.accept().await { let tcp_server = TcpListener::bind(&addr).await?;
println!("Got a connection from: {addr}");
let ws_result = accept(tcp_stream).await; println!("listening on {}", addr);
let (tx, rx) = ws_result.map(|ws| ws.split())?;
// in reality this is an mqtt stream while let Ok((tcp_stream, addr)) = tcp_server.accept().await {
let interval_stream = IntervalStream::new(interval(Duration::from_secs(1))); println!("Got a connection from: {}", addr);
let stream = interval_stream.zip(futures_util::stream::repeat("djshjh".to_string()))
.map(|(i, txt)| format!("{i:?}: {txt}"));
tokio::select! { let wsconfig = WebSocketConfig {
res = handle(rx) => println!("Done waiting for messages. Reason: {res:?}"), max_send_queue: Some(10),
_ = serve(tx, stream) => println!("Done serving locations.") max_message_size: None,
max_frame_size: None,
accept_unmasked_frames: true,
};
let ws_stream = tokio_tungstenite::accept_async_with_config(tcp_stream, Some(wsconfig))
.await
.expect("Error during the websocket handshake occurred");
let (writer, mut reader) = ws_stream.split();
let token_optres = reader.next().await;
if let Some(Ok(token)) = token_optres {
let local_token = include_str!("../secret/token").trim();
println!("{}", local_token);
if token.into_text()? == local_token {
println!("streaming locations");
match stream_locations(writer).await {
Ok(_) => println!("closing connection to client ..."),
Err(error) => println!("connection to client raised an error: ({})", error),
} }
} }
else {
println!("Closing client connection due to bad token!");
let mut s = writer.reunite(reader)?;
s.close(None).await?;
}
}
}
Ok(()) Ok(())
} }