forward an actual stream to the client

This commit is contained in:
Vladan Popovic 2023-02-25 03:49:25 +01:00
parent b4c746ed55
commit 0e56905932
3 changed files with 38 additions and 23 deletions

12
Cargo.lock generated
View file

@ -111,6 +111,7 @@ dependencies = [
"anyhow", "anyhow",
"futures-util", "futures-util",
"tokio", "tokio",
"tokio-stream",
"tokio-tungstenite", "tokio-tungstenite",
] ]
@ -515,6 +516,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.18.0" version = "0.18.0"

View file

@ -7,4 +7,5 @@ edition = "2021"
anyhow = "1.0.69" anyhow = "1.0.69"
futures-util = { version = "0.3.26", features = ["sink", "std"] } futures-util = { version = "0.3.26", features = ["sink", "std"] }
tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1.25.0", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1.12"
tokio-tungstenite = { version = "0.18.0", features = ["rustls-native-certs"] } tokio-tungstenite = { version = "0.18.0", features = ["rustls-native-certs"] }

View file

@ -1,25 +1,21 @@
use std::io;
use futures_util::{ use futures_util::{
SinkExt,
StreamExt, StreamExt,
stream::{ stream::{
SplitSink, SplitSink,
SplitStream, SplitStream,
Stream,
}, },
}; };
use tokio::{ use tokio::{
time::{ time::{
sleep,
Duration, Duration,
interval,
}, },
net::{ net::{
TcpListener, TcpListener,
TcpStream, TcpStream,
}, },
}; };
use tokio_tungstenite::{ use tokio_tungstenite::{
accept_async_with_config, accept_async_with_config,
WebSocketStream, WebSocketStream,
@ -30,9 +26,10 @@ use tokio_tungstenite::{
protocol::WebSocketConfig, protocol::WebSocketConfig,
}, },
}; };
use tokio_stream::wrappers::IntervalStream;
type Tx = SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>; type WsTx = SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>;
type Rx = SplitStream<WebSocketStream<tokio::net::TcpStream>>; type WsRx = SplitStream<WebSocketStream<tokio::net::TcpStream>>;
async fn accept(tcp_stream: TcpStream) -> Result<WebSocketStream<TcpStream>, WsError> { async fn accept(tcp_stream: TcpStream) -> Result<WebSocketStream<TcpStream>, WsError> {
let wsconfig = WebSocketConfig { let wsconfig = WebSocketConfig {
@ -44,38 +41,43 @@ async fn accept(tcp_stream: TcpStream) -> Result<WebSocketStream<TcpStream>, WsE
accept_async_with_config(tcp_stream, Some(wsconfig)).await accept_async_with_config(tcp_stream, Some(wsconfig)).await
} }
async fn handle(mut rx: Rx) -> anyhow::Result<()> { async fn handle(mut rx: WsRx) -> anyhow::Result<()> {
println!("enter incoming loop"); println!("enter incoming loop");
while let Some(msg) = rx.next().await { while let Some(msg) = rx.next().await {
msg.map(|x| { println!("Got: {x:?}"); })?; match msg {
Ok(Message::Text(x)) => { println!("Got Text Message: {x:?}"); },
Ok(x) => { println!("Got another Message: {x:?}"); },
Err(x) => { println!("Got ERROR: {x:?}"); },
}
} }
Ok(()) Ok(())
} }
async fn serve(mut tx: Tx) -> anyhow::Result<()> { async fn serve<S: Stream<Item = String>>(tx: WsTx, input: S) -> Result<(), WsError> {
println!("enter send loop"); input
let mut c = 0; .map(Message::Text)
loop { .map(Ok)
sleep(Duration::from_millis(1000)).await; .forward(tx).await
c += 1;
let txt = format!("bla bla {c}");
println!("{txt}");
tx.send(Message::Text(txt)).await?;
}
} }
#[tokio::main] #[tokio::main]
async fn main() -> io::Result<()> { async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?; let listener = TcpListener::bind("127.0.0.1:8080").await?;
while let Ok((tcp_stream, addr)) = listener.accept().await { while let Ok((tcp_stream, addr)) = listener.accept().await {
println!("Got a connection from: {addr}"); println!("Got a connection from: {addr}");
let ws_result = accept(tcp_stream).await; let ws_result = accept(tcp_stream).await;
let (tx, rx) = ws_result.map(|ws| ws.split()).unwrap(); let (tx, rx) = ws_result.map(|ws| ws.split())?;
// in reality this is an mqtt stream
let interval_stream = IntervalStream::new(interval(Duration::from_secs(1)));
let stream = interval_stream.zip(futures_util::stream::repeat("djshjh".to_string()))
.map(|(i, txt)| format!("{i:?}: {txt}"));
tokio::select! { tokio::select! {
res = handle(rx) => println!("Done waiting for messages. Reason: {res:?}"), res = handle(rx) => println!("Done waiting for messages. Reason: {res:?}"),
_ = serve(tx) => println!("Done serving locations.") _ = serve(tx, stream) => println!("Done serving locations.")
} }
} }