Welcome to mirror list, hosted at ThFree Co, Russian Federation.

ipc.cp.ts « node « ipc « parts « base « vs « src - github.com/microsoft/vscode.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 127d0082d11882085de085c3de456b3e58764431 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/*---------------------------------------------------------------------------------------------
 *  Copyright (c) Microsoft Corporation. All rights reserved.
 *  Licensed under the MIT License. See License.txt in the project root for license information.
 *--------------------------------------------------------------------------------------------*/

import { ChildProcess, fork, ForkOptions } from 'child_process';
import { createCancelablePromise, Delayer } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { isRemoteConsoleLog, log } from 'vs/base/common/console';
import * as errors from 'vs/base/common/errors';
import { Emitter, Event } from 'vs/base/common/event';
import { dispose, IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { deepClone } from 'vs/base/common/objects';
import { createQueuedSender, removeDangerousEnvVariables } from 'vs/base/node/processes';
import { ChannelClient as IPCClient, ChannelServer as IPCServer, IChannel, IChannelClient } from 'vs/base/parts/ipc/common/ipc';

/**
 * This implementation doesn't perform well since it uses base64 encoding for buffers.
 * We should move all implementations to use named ipc.net, so we stop depending on cp.fork.
 */

export class Server<TContext extends string> extends IPCServer<TContext> {
	constructor(ctx: TContext) {
		super({
			send: r => {
				try {
					if (process.send) {
						process.send((<Buffer>r.buffer).toString('base64'));
					}
				} catch (e) { /* not much to do */ }
			},
			onMessage: Event.fromNodeEventEmitter(process, 'message', msg => VSBuffer.wrap(Buffer.from(msg, 'base64')))
		}, ctx);

		process.once('disconnect', () => this.dispose());
	}
}

export interface IIPCOptions {

	/**
	 * A descriptive name for the server this connection is to. Used in logging.
	 */
	serverName: string;

	/**
	 * Time in millies before killing the ipc process. The next request after killing will start it again.
	 */
	timeout?: number;

	/**
	 * Arguments to the module to execute.
	 */
	args?: string[];

	/**
	 * Environment key-value pairs to be passed to the process that gets spawned for the ipc.
	 */
	env?: any;

	/**
	 * Allows to assign a debug port for debugging the application executed.
	 */
	debug?: number;

	/**
	 * Allows to assign a debug port for debugging the application and breaking it on the first line.
	 */
	debugBrk?: number;

	/**
	 * If set, starts the fork with empty execArgv. If not set, execArgv from the parent process are inherited,
	 * except --inspect= and --inspect-brk= which are filtered as they would result in a port conflict.
	 */
	freshExecArgv?: boolean;

	/**
	 * Enables our createQueuedSender helper for this Client. Uses a queue when the internal Node.js queue is
	 * full of messages - see notes on that method.
	 */
	useQueue?: boolean;
}

export class Client implements IChannelClient, IDisposable {

	private disposeDelayer: Delayer<void> | undefined;
	private activeRequests = new Set<IDisposable>();
	private child: ChildProcess | null;
	private _client: IPCClient | null;
	private channels = new Map<string, IChannel>();

	private readonly _onDidProcessExit = new Emitter<{ code: number; signal: string }>();
	readonly onDidProcessExit = this._onDidProcessExit.event;

	constructor(private modulePath: string, private options: IIPCOptions) {
		const timeout = options && options.timeout ? options.timeout : 60000;
		this.disposeDelayer = new Delayer<void>(timeout);
		this.child = null;
		this._client = null;
	}

	getChannel<T extends IChannel>(channelName: string): T {
		const that = this;

		return {
			call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
				return that.requestPromise<T>(channelName, command, arg, cancellationToken);
			},
			listen(event: string, arg?: any) {
				return that.requestEvent(channelName, event, arg);
			}
		} as T;
	}

	protected requestPromise<T>(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<T> {
		if (!this.disposeDelayer) {
			return Promise.reject(new Error('disposed'));
		}

		if (cancellationToken.isCancellationRequested) {
			return Promise.reject(errors.canceled());
		}

		this.disposeDelayer.cancel();

		const channel = this.getCachedChannel(channelName);
		const result = createCancelablePromise(token => channel.call<T>(name, arg, token));
		const cancellationTokenListener = cancellationToken.onCancellationRequested(() => result.cancel());

		const disposable = toDisposable(() => result.cancel());
		this.activeRequests.add(disposable);

		result.finally(() => {
			cancellationTokenListener.dispose();
			this.activeRequests.delete(disposable);

			if (this.activeRequests.size === 0 && this.disposeDelayer) {
				this.disposeDelayer.trigger(() => this.disposeClient());
			}
		});

		return result;
	}

	protected requestEvent<T>(channelName: string, name: string, arg?: any): Event<T> {
		if (!this.disposeDelayer) {
			return Event.None;
		}

		this.disposeDelayer.cancel();

		let listener: IDisposable;
		const emitter = new Emitter<any>({
			onFirstListenerAdd: () => {
				const channel = this.getCachedChannel(channelName);
				const event: Event<T> = channel.listen(name, arg);

				listener = event(emitter.fire, emitter);
				this.activeRequests.add(listener);
			},
			onLastListenerRemove: () => {
				this.activeRequests.delete(listener);
				listener.dispose();

				if (this.activeRequests.size === 0 && this.disposeDelayer) {
					this.disposeDelayer.trigger(() => this.disposeClient());
				}
			}
		});

		return emitter.event;
	}

	private get client(): IPCClient {
		if (!this._client) {
			const args = this.options && this.options.args ? this.options.args : [];
			const forkOpts: ForkOptions = Object.create(null);

			forkOpts.env = { ...deepClone(process.env), 'VSCODE_PARENT_PID': String(process.pid) };

			if (this.options && this.options.env) {
				forkOpts.env = { ...forkOpts.env, ...this.options.env };
			}

			if (this.options && this.options.freshExecArgv) {
				forkOpts.execArgv = [];
			}

			if (this.options && typeof this.options.debug === 'number') {
				forkOpts.execArgv = ['--nolazy', '--inspect=' + this.options.debug];
			}

			if (this.options && typeof this.options.debugBrk === 'number') {
				forkOpts.execArgv = ['--nolazy', '--inspect-brk=' + this.options.debugBrk];
			}

			if (forkOpts.execArgv === undefined) {
				// if not set, the forked process inherits the execArgv of the parent process
				// --inspect and --inspect-brk can not be inherited as the port would conflict
				forkOpts.execArgv = process.execArgv.filter(a => !/^--inspect(-brk)?=/.test(a)); // remove
			}

			removeDangerousEnvVariables(forkOpts.env);

			this.child = fork(this.modulePath, args, forkOpts);

			const onMessageEmitter = new Emitter<VSBuffer>();
			const onRawMessage = Event.fromNodeEventEmitter(this.child, 'message', msg => msg);

			onRawMessage(msg => {

				// Handle remote console logs specially
				if (isRemoteConsoleLog(msg)) {
					log(msg, `IPC Library: ${this.options.serverName}`);
					return;
				}

				// Anything else goes to the outside
				onMessageEmitter.fire(VSBuffer.wrap(Buffer.from(msg, 'base64')));
			});

			const sender = this.options.useQueue ? createQueuedSender(this.child) : this.child;
			const send = (r: VSBuffer) => this.child && this.child.connected && sender.send((<Buffer>r.buffer).toString('base64'));
			const onMessage = onMessageEmitter.event;
			const protocol = { send, onMessage };

			this._client = new IPCClient(protocol);

			const onExit = () => this.disposeClient();
			process.once('exit', onExit);

			this.child.on('error', err => console.warn('IPC "' + this.options.serverName + '" errored with ' + err));

			this.child.on('exit', (code: any, signal: any) => {
				process.removeListener('exit' as 'loaded', onExit); // https://github.com/electron/electron/issues/21475

				this.activeRequests.forEach(r => dispose(r));
				this.activeRequests.clear();

				if (code !== 0 && signal !== 'SIGTERM') {
					console.warn('IPC "' + this.options.serverName + '" crashed with exit code ' + code + ' and signal ' + signal);
				}

				if (this.disposeDelayer) {
					this.disposeDelayer.cancel();
				}
				this.disposeClient();
				this._onDidProcessExit.fire({ code, signal });
			});
		}

		return this._client;
	}

	private getCachedChannel(name: string): IChannel {
		let channel = this.channels.get(name);

		if (!channel) {
			channel = this.client.getChannel(name);
			this.channels.set(name, channel);
		}

		return channel;
	}

	private disposeClient() {
		if (this._client) {
			if (this.child) {
				this.child.kill();
				this.child = null;
			}
			this._client = null;
			this.channels.clear();
		}
	}

	dispose() {
		this._onDidProcessExit.dispose();
		if (this.disposeDelayer) {
			this.disposeDelayer.cancel();
			this.disposeDelayer = undefined;
		}
		this.disposeClient();
		this.activeRequests.clear();
	}
}