diff options
Diffstat (limited to 'cli/src/tunnels/server_bridge_windows.rs')
-rw-r--r-- | cli/src/tunnels/server_bridge_windows.rs | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/cli/src/tunnels/server_bridge_windows.rs b/cli/src/tunnels/server_bridge_windows.rs index fb4b2b321f0..3a4d911d53e 100644 --- a/cli/src/tunnels/server_bridge_windows.rs +++ b/cli/src/tunnels/server_bridge_windows.rs @@ -14,13 +14,11 @@ use tokio::{ use crate::util::errors::{wrap, AnyError}; +use super::socket_signal::{ClientMessageDecoder, ServerMessageSink}; + pub struct ServerBridge { write_tx: mpsc::Sender<Vec<u8>>, -} - -pub trait FromServerMessage { - fn from_server_message(index: u16, message: &[u8]) -> Self; - fn from_closed_server_bridge(i: u16) -> Self; + decoder: ClientMessageDecoder, } const BUFFER_SIZE: usize = 65536; @@ -49,13 +47,14 @@ pub async fn get_socket_rw_stream(path: &Path) -> Result<NamedPipeClient, AnyErr } impl ServerBridge { - pub async fn new<T>(path: &Path, index: u16, target: &mpsc::Sender<T>) -> Result<Self, AnyError> - where - T: 'static + FromServerMessage + Send, - { + pub async fn new( + path: &Path, + index: u16, + mut target: ServerMessageSink, + decoder: ClientMessageDecoder, + ) -> Result<Self, AnyError> { let client = get_socket_rw_stream(path).await?; let (write_tx, mut write_rx) = mpsc::channel(4); - let read_tx = target.clone(); tokio::spawn(async move { let mut read_buf = vec![0; BUFFER_SIZE]; let mut pending_recv: Option<Vec<u8>> = None; @@ -89,9 +88,7 @@ impl ServerBridge { match client.try_read(&mut read_buf) { Ok(0) => return, // EOF Ok(s) => { - let send = read_tx - .send(T::from_server_message(index, &read_buf[..s])) - .await; + let send = target.server_message(index, &read_buf[..s]).await; if send.is_err() { return; } @@ -118,11 +115,14 @@ impl ServerBridge { } }); - Ok(ServerBridge { write_tx }) + Ok(ServerBridge { write_tx, decoder }) } pub async fn write(&self, b: Vec<u8>) -> std::io::Result<()> { - self.write_tx.send(b).await.ok(); + let dec = self.decoder.decode(&b)?; + if !dec.is_empty() { + self.write_tx.send(dec.to_vec()).await.ok(); + } Ok(()) } |