diff options
Diffstat (limited to 'cli/src/tunnels/control_server.rs')
-rw-r--r-- | cli/src/tunnels/control_server.rs | 72 |
1 files changed, 31 insertions, 41 deletions
diff --git a/cli/src/tunnels/control_server.rs b/cli/src/tunnels/control_server.rs index c04257f715f..5d62cd57d65 100644 --- a/cli/src/tunnels/control_server.rs +++ b/cli/src/tunnels/control_server.rs @@ -7,6 +7,8 @@ use crate::constants::{CONTROL_PORT, PROTOCOL_VERSION, VSCODE_CLI_VERSION}; use crate::log; use crate::self_update::SelfUpdate; use crate::state::LauncherPaths; +use crate::tunnels::protocol::HttpRequestParams; +use crate::tunnels::socket_signal::CloseReason; use crate::update_service::{Platform, UpdateService}; use crate::util::errors::{ wrap, AnyError, MismatchedLaunchModeError, NoAttachedServerError, ServerWriteError, @@ -18,7 +20,6 @@ use crate::util::io::SilentCopyProgress; use crate::util::sync::{new_barrier, Barrier}; use opentelemetry::trace::SpanKind; use opentelemetry::KeyValue; -use serde::Serialize; use std::collections::HashMap; use std::convert::Infallible; use std::env; @@ -38,12 +39,12 @@ use super::paths::prune_stopped_servers; use super::port_forwarder::{PortForwarding, PortForwardingProcessor}; use super::protocol::{ CallServerHttpParams, CallServerHttpResult, ClientRequestMethod, EmptyResult, ErrorResponse, - ForwardParams, ForwardResult, GetHostnameResponse, HttpRequestParams, RefServerMessageParams, - ResponseError, ServeParams, ServerLog, ServerMessageParams, ServerRequestMethod, - SuccessResponse, ToClientRequest, ToServerRequest, UnforwardParams, UpdateParams, UpdateResult, - VersionParams, + ForwardParams, ForwardResult, GetHostnameResponse, ResponseError, ServeParams, ServerLog, + ServerMessageParams, ServerRequestMethod, SuccessResponse, ToClientRequest, ToServerRequest, + UnforwardParams, UpdateParams, UpdateResult, VersionParams, }; -use super::server_bridge::{get_socket_rw_stream, FromServerMessage, ServerBridge}; +use super::server_bridge::{get_socket_rw_stream, ServerBridge}; +use super::socket_signal::{ClientMessageDecoder, ServerMessageSink, SocketSignal}; type ServerBridgeList = Option<Vec<(u16, ServerBridge)>>; type ServerBridgeListLock = Arc<Mutex<ServerBridgeList>>; @@ -122,39 +123,6 @@ enum ServerSignal { Respawn, } -struct CloseReason(String); - -enum SocketSignal { - /// Signals bytes to send to the socket. - Send(Vec<u8>), - /// Closes the socket (e.g. as a result of an error) - CloseWith(CloseReason), - /// Disposes ServerBridge corresponding to an ID - CloseServerBridge(u16), -} - -impl SocketSignal { - fn from_message<T>(msg: &T) -> Self - where - T: Serialize + ?Sized, - { - SocketSignal::Send(rmp_serde::to_vec_named(msg).unwrap()) - } -} - -impl FromServerMessage for SocketSignal { - fn from_server_message(i: u16, body: &[u8]) -> Self { - SocketSignal::from_message(&ToClientRequest { - id: None, - params: ClientRequestMethod::servermsg(RefServerMessageParams { i, body }), - }) - } - - fn from_closed_server_bridge(i: u16) -> Self { - SocketSignal::CloseServerBridge(i) - } -} - pub struct ServerTermination { /// Whether the server should be respawned in a new binary (see ServerSignal.Respawn). pub respawn: bool, @@ -719,7 +687,15 @@ async fn handle_serve( } }; - attach_server_bridge(&log, server, socket_tx, server_bridges, params.socket_id).await?; + attach_server_bridge( + &log, + server, + socket_tx, + server_bridges, + params.socket_id, + params.compress, + ) + .await?; Ok(EmptyResult {}) } @@ -729,8 +705,22 @@ async fn attach_server_bridge( socket_tx: mpsc::Sender<SocketSignal>, server_bridges: ServerBridgeListLock, socket_id: u16, + compress: bool, ) -> Result<u16, AnyError> { - let attached_fut = ServerBridge::new(&code_server.socket, socket_id, &socket_tx).await; + let (server_messages, decoder) = if compress { + ( + ServerMessageSink::new_compressed(socket_tx), + ClientMessageDecoder::new_compressed(), + ) + } else { + ( + ServerMessageSink::new_plain(socket_tx), + ClientMessageDecoder::new_plain(), + ) + }; + + let attached_fut = + ServerBridge::new(&code_server.socket, socket_id, server_messages, decoder).await; match attached_fut { Ok(a) => { |