Welcome to mirror list, hosted at ThFree Co, Russian Federation.

server_bridge_windows.rs « tunnels « src « cli - github.com/microsoft/vscode.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: fb4b2b321f0c7fb8a85570a015584757a4c07e2f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*---------------------------------------------------------------------------------------------
 *  Copyright (c) Microsoft Corporation. All rights reserved.
 *  Licensed under the MIT License. See License.txt in the project root for license information.
 *--------------------------------------------------------------------------------------------*/

use std::{path::Path, time::Duration};

use tokio::{
	io::{self, Interest},
	net::windows::named_pipe::{ClientOptions, NamedPipeClient},
	sync::mpsc,
	time::sleep,
};

use crate::util::errors::{wrap, AnyError};

pub struct ServerBridge {
	write_tx: mpsc::Sender<Vec<u8>>,
}

pub trait FromServerMessage {
	fn from_server_message(index: u16, message: &[u8]) -> Self;
	fn from_closed_server_bridge(i: u16) -> Self;
}

const BUFFER_SIZE: usize = 65536;

pub async fn get_socket_rw_stream(path: &Path) -> Result<NamedPipeClient, AnyError> {
	// Tokio says we can need to try in a loop. Do so.
	// https://docs.rs/tokio/latest/tokio/net/windows/named_pipe/struct.NamedPipeClient.html
	let client = loop {
		match ClientOptions::new().open(path) {
			Ok(client) => break client,
			// ERROR_PIPE_BUSY https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
			Err(e) if e.raw_os_error() == Some(231) => sleep(Duration::from_millis(100)).await,
			Err(e) => {
				return Err(AnyError::WrappedError(wrap(
					e,
					format!(
						"error connecting to vscode server socket in {}",
						path.display()
					),
				)))
			}
		}
	};

	Ok(client)
}

impl ServerBridge {
	pub async fn new<T>(path: &Path, index: u16, target: &mpsc::Sender<T>) -> Result<Self, AnyError>
	where
		T: 'static + FromServerMessage + Send,
	{
		let client = get_socket_rw_stream(path).await?;
		let (write_tx, mut write_rx) = mpsc::channel(4);
		let read_tx = target.clone();
		tokio::spawn(async move {
			let mut read_buf = vec![0; BUFFER_SIZE];
			let mut pending_recv: Option<Vec<u8>> = None;

			// See https://docs.rs/tokio/1.17.0/tokio/net/windows/named_pipe/struct.NamedPipeClient.html#method.ready
			// With additional complications. If there's nothing queued to write, we wait for the
			// pipe to be readable, or for something to come in. If there is something to
			// write, wait until the pipe is either readable or writable.
			loop {
				let ready_result = if pending_recv.is_none() {
					tokio::select! {
					  msg = write_rx.recv() => match msg {
						Some(msg) => {
						  pending_recv = Some(msg);
						  client.ready(Interest::READABLE | Interest::WRITABLE).await
						},
						None => return
					  },
					  r = client.ready(Interest::READABLE) => r,
					}
				} else {
					client.ready(Interest::READABLE | Interest::WRITABLE).await
				};

				let ready = match ready_result {
					Ok(r) => r,
					Err(_) => return,
				};

				if ready.is_readable() {
					match client.try_read(&mut read_buf) {
						Ok(0) => return, // EOF
						Ok(s) => {
							let send = read_tx
								.send(T::from_server_message(index, &read_buf[..s]))
								.await;
							if send.is_err() {
								return;
							}
						}
						Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
							continue;
						}
						Err(_) => return,
					}
				}

				if let Some(msg) = &pending_recv {
					if ready.is_writable() {
						match client.try_write(msg) {
							Ok(n) if n == msg.len() => pending_recv = None,
							Ok(n) => pending_recv = Some(msg[n..].to_vec()),
							Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
								continue;
							}
							Err(_) => return,
						}
					}
				}
			}
		});

		Ok(ServerBridge { write_tx })
	}

	pub async fn write(&self, b: Vec<u8>) -> std::io::Result<()> {
		self.write_tx.send(b).await.ok();
		Ok(())
	}

	pub async fn close(self) -> std::io::Result<()> {
		drop(self.write_tx);
		Ok(())
	}
}