From 0e56905932b511e66d9a44398cf14b010456fb18 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sat, 25 Feb 2023 03:49:25 +0100 Subject: [PATCH] forward an actual stream to the client --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + src/main.rs | 48 +++++++++++++++++++++++++----------------------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 683b5b0..0b5158d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,6 +111,7 @@ dependencies = [ "anyhow", "futures-util", "tokio", + "tokio-stream", "tokio-tungstenite", ] @@ -515,6 +516,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.18.0" diff --git a/Cargo.toml b/Cargo.toml index 4823b3c..34516be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" anyhow = "1.0.69" futures-util = { version = "0.3.26", features = ["sink", "std"] } tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "time"] } +tokio-stream = "0.1.12" tokio-tungstenite = { version = "0.18.0", features = ["rustls-native-certs"] } diff --git a/src/main.rs b/src/main.rs index 613ab41..206eecb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,21 @@ -use std::io; - use futures_util::{ - SinkExt, StreamExt, stream::{ SplitSink, SplitStream, + Stream, }, }; - use tokio::{ time::{ - sleep, Duration, + interval, }, net::{ TcpListener, TcpStream, }, }; - use tokio_tungstenite::{ accept_async_with_config, WebSocketStream, @@ -30,9 +26,10 @@ use tokio_tungstenite::{ protocol::WebSocketConfig, }, }; +use tokio_stream::wrappers::IntervalStream; -type Tx = SplitSink, Message>; -type Rx = SplitStream>; +type WsTx = SplitSink, Message>; +type WsRx = SplitStream>; async fn accept(tcp_stream: TcpStream) -> Result, WsError> { let wsconfig = WebSocketConfig { @@ -44,38 +41,43 @@ async fn accept(tcp_stream: TcpStream) -> Result, WsE accept_async_with_config(tcp_stream, Some(wsconfig)).await } -async fn handle(mut rx: Rx) -> anyhow::Result<()> { +async fn handle(mut rx: WsRx) -> anyhow::Result<()> { println!("enter incoming loop"); while let Some(msg) = rx.next().await { - msg.map(|x| { println!("Got: {x:?}"); })?; + match msg { + Ok(Message::Text(x)) => { println!("Got Text Message: {x:?}"); }, + Ok(x) => { println!("Got another Message: {x:?}"); }, + Err(x) => { println!("Got ERROR: {x:?}"); }, + } } Ok(()) } -async fn serve(mut tx: Tx) -> anyhow::Result<()> { - println!("enter send loop"); - let mut c = 0; - loop { - sleep(Duration::from_millis(1000)).await; - c += 1; - let txt = format!("bla bla {c}"); - println!("{txt}"); - tx.send(Message::Text(txt)).await?; - } +async fn serve>(tx: WsTx, input: S) -> Result<(), WsError> { + input + .map(Message::Text) + .map(Ok) + .forward(tx).await } #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> anyhow::Result<()> { let listener = TcpListener::bind("127.0.0.1:8080").await?; while let Ok((tcp_stream, addr)) = listener.accept().await { println!("Got a connection from: {addr}"); let ws_result = accept(tcp_stream).await; - let (tx, rx) = ws_result.map(|ws| ws.split()).unwrap(); + let (tx, rx) = ws_result.map(|ws| ws.split())?; + + // in reality this is an mqtt stream + let interval_stream = IntervalStream::new(interval(Duration::from_secs(1))); + let stream = interval_stream.zip(futures_util::stream::repeat("djshjh".to_string())) + .map(|(i, txt)| format!("{i:?}: {txt}")); + tokio::select! { res = handle(rx) => println!("Done waiting for messages. Reason: {res:?}"), - _ = serve(tx) => println!("Done serving locations.") + _ = serve(tx, stream) => println!("Done serving locations.") } }