Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement terminal flow control #114214

Merged
merged 10 commits into from
Jan 14, 2021
8 changes: 6 additions & 2 deletions src/vs/workbench/api/browser/mainThreadTerminalService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/

import { DisposableStore, Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest } from 'vs/workbench/contrib/terminal/common/terminal';
import { IShellLaunchConfig, ITerminalProcessExtHostProxy, ISpawnExtHostProcessRequest, ITerminalDimensions, IAvailableShellsRequest, IDefaultShellAndArgsRequest, IStartExtensionTerminalRequest, ITerminalConfiguration, TERMINAL_CONFIG_SECTION } from 'vs/workbench/contrib/terminal/common/terminal';
import { ExtHostContext, ExtHostTerminalServiceShape, MainThreadTerminalServiceShape, MainContext, IExtHostContext, IShellLaunchConfigDto, TerminalLaunchConfig, ITerminalDimensionsDto, TerminalIdentifier } from 'vs/workbench/api/common/extHost.protocol';
import { extHostNamedCustomer } from 'vs/workbench/api/common/extHostCustomers';
import { URI } from 'vs/base/common/uri';
Expand All @@ -16,6 +16,7 @@ import { TerminalDataBufferer } from 'vs/workbench/contrib/terminal/common/termi
import { IEnvironmentVariableService, ISerializableEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariable';
import { deserializeEnvironmentVariableCollection, serializeEnvironmentVariableCollection } from 'vs/workbench/contrib/terminal/common/environmentVariableShared';
import { ILogService } from 'vs/platform/log/common/log';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';

@extHostNamedCustomer(MainContext.MainThreadTerminalService)
export class MainThreadTerminalService implements MainThreadTerminalServiceShape {
Expand Down Expand Up @@ -46,6 +47,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape
@IRemoteAgentService private readonly _remoteAgentService: IRemoteAgentService,
@IInstantiationService private readonly _instantiationService: IInstantiationService,
@IEnvironmentVariableService private readonly _environmentVariableService: IEnvironmentVariableService,
@IConfigurationService private readonly _configurationService: IConfigurationService,
@ILogService private readonly _logService: ILogService,
) {
this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostTerminalService);
Expand Down Expand Up @@ -259,7 +261,8 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape
executable: request.shellLaunchConfig.executable,
args: request.shellLaunchConfig.args,
cwd: request.shellLaunchConfig.cwd,
env: request.shellLaunchConfig.env
env: request.shellLaunchConfig.env,
flowControl: this._configurationService.getValue<ITerminalConfiguration>(TERMINAL_CONFIG_SECTION).flowControl
};

this._logService.trace('Spawning ext host process', { terminalId: proxy.terminalId, shellLaunchConfigDto, request });
Expand All @@ -272,6 +275,7 @@ export class MainThreadTerminalService implements MainThreadTerminalServiceShape
request.isWorkspaceShellAllowed
).then(request.callback, request.callback);

proxy.onAcknowledgeDataEvent(charCount => this._proxy.$acceptProcessAckDataEvent(proxy.terminalId, charCount));
proxy.onInput(data => this._proxy.$acceptProcessInput(proxy.terminalId, data));
proxy.onResize(dimensions => this._proxy.$acceptProcessResize(proxy.terminalId, dimensions.cols, dimensions.rows));
proxy.onShutdown(immediate => this._proxy.$acceptProcessShutdown(proxy.terminalId, immediate));
Expand Down
2 changes: 2 additions & 0 deletions src/vs/workbench/api/common/extHost.protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,7 @@ export interface IShellLaunchConfigDto {
cwd?: string | UriComponents;
env?: { [key: string]: string | null; };
hideFromUser?: boolean;
flowControl?: boolean;
}

export interface IShellDefinitionDto {
Expand Down Expand Up @@ -1545,6 +1546,7 @@ export interface ExtHostTerminalServiceShape {
$acceptTerminalMaximumDimensions(id: number, cols: number, rows: number): void;
$spawnExtHostProcess(id: number, shellLaunchConfig: IShellLaunchConfigDto, activeWorkspaceRootUri: UriComponents | undefined, cols: number, rows: number, isWorkspaceShellAllowed: boolean): Promise<ITerminalLaunchError | undefined>;
$startExtensionTerminal(id: number, initialDimensions: ITerminalDimensionsDto | undefined): Promise<ITerminalLaunchError | undefined>;
$acceptProcessAckDataEvent(id: number, charCount: number): void;
$acceptProcessInput(id: number, data: string): void;
$acceptProcessResize(id: number, cols: number, rows: number): void;
$acceptProcessShutdown(id: number, immediate: boolean): void;
Expand Down
9 changes: 9 additions & 0 deletions src/vs/workbench/api/common/extHostTerminalService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ export class ExtHostPseudoterminal implements ITerminalChildProcess {
}
}

acknowledgeDataEvent(charCount: number): void {
// No-op, flow control is not supported in extension owned terminals. If this is ever
// implemented it will need new pause and resume VS Code APIs.
}

getInitialCwd(): Promise<string> {
return Promise.resolve('');
}
Expand Down Expand Up @@ -488,6 +493,10 @@ export abstract class BaseExtHostTerminalService extends Disposable implements I
return disposables;
}

public $acceptProcessAckDataEvent(id: number, charCount: number): void {
this._terminalProcesses.get(id)?.acknowledgeDataEvent(charCount);
}

public $acceptProcessInput(id: number, data: string): void {
this._terminalProcesses.get(id)?.input(data);
}
Expand Down
3 changes: 2 additions & 1 deletion src/vs/workbench/api/node/extHostTerminalService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ export class ExtHostTerminalService extends BaseExtHostTerminalService {
executable: shellLaunchConfigDto.executable,
args: shellLaunchConfigDto.args,
cwd: typeof shellLaunchConfigDto.cwd === 'string' ? shellLaunchConfigDto.cwd : URI.revive(shellLaunchConfigDto.cwd),
env: shellLaunchConfigDto.env
env: shellLaunchConfigDto.env,
flowControl: shellLaunchConfigDto.flowControl
};

// Merge in shell and args from settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ export class RemoteTerminalProcess extends Disposable implements ITerminalChildP
});
}

public acknowledgeDataEvent(charCount: number): void {
// TODO: Support flow control for server spawned processes
}

public async getInitialCwd(): Promise<string> {
await this._startBarrier.wait();
return this._remoteTerminalChannel.getTerminalInitialCwd(this._remoteTerminalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,12 @@ export class TerminalInstance extends Disposable implements ITerminalInstance {
this._xtermCore?.writeSync(ev.data);
} else {
const messageId = ++this._latestXtermWriteData;
this._xterm?.write(ev.data, () => this._latestXtermParseData = messageId);
this._xterm?.write(ev.data, () => {
this._latestXtermParseData = messageId;
if (this._shellLaunchConfig.flowControl) {
this._processManager.acknowledgeDataEvent(ev.data.length);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal
public readonly onInput: Event<string> = this._onInput.event;
private readonly _onResize: Emitter<{ cols: number, rows: number }> = this._register(new Emitter<{ cols: number, rows: number }>());
public readonly onResize: Event<{ cols: number, rows: number }> = this._onResize.event;
private readonly _onAcknowledgeDataEvent = this._register(new Emitter<number>());
public readonly onAcknowledgeDataEvent: Event<number> = this._onAcknowledgeDataEvent.event;
private readonly _onShutdown = this._register(new Emitter<boolean>());
public readonly onShutdown: Event<boolean> = this._onShutdown.event;
private readonly _onRequestInitialCwd = this._register(new Emitter<void>());
Expand Down Expand Up @@ -139,6 +141,10 @@ export class TerminalProcessExtHostProxy extends Disposable implements ITerminal
this._onResize.fire({ cols, rows });
}

public acknowledgeDataEvent(charCount: number): void {
this._onAcknowledgeDataEvent.fire(charCount);
}

public getInitialCwd(): Promise<string> {
return new Promise<string>(resolve => {
this._onRequestInitialCwd.fire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import * as platform from 'vs/base/common/platform';
import * as terminalEnvironment from 'vs/workbench/contrib/terminal/common/terminalEnvironment';
import { env as processEnv } from 'vs/base/common/process';
import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride } from 'vs/workbench/contrib/terminal/common/terminal';
import { ProcessState, ITerminalProcessManager, IShellLaunchConfig, ITerminalConfigHelper, ITerminalChildProcess, IBeforeProcessDataEvent, ITerminalEnvironment, ITerminalLaunchError, IProcessDataEvent, ITerminalDimensionsOverride, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal';
import { ILogService } from 'vs/platform/log/common/log';
import { Emitter, Event } from 'vs/base/common/event';
import { IHistoryService } from 'vs/workbench/services/history/common/history';
Expand Down Expand Up @@ -64,6 +64,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
private _initialCwd: string | undefined;
private _extEnvironmentVariableCollection: IMergedEnvironmentVariableCollection | undefined;
private _environmentVariableInfo: IEnvironmentVariableInfo | undefined;
private _ackDataBufferer: AckDataBufferer;

private readonly _onProcessReady = this._register(new Emitter<void>());
public get onProcessReady(): Event<void> { return this._onProcessReady.event; }
Expand Down Expand Up @@ -111,6 +112,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
});
});
this.ptyProcessReady.then(async () => await this.getLatency());
this._ackDataBufferer = new AckDataBufferer(e => this._process?.acknowledgeDataEvent(e));
}

public dispose(immediate: boolean = false): void {
Expand All @@ -131,7 +133,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
rows: number,
isScreenReaderModeEnabled: boolean
): Promise<ITerminalLaunchError | undefined> {
shellLaunchConfig.flowControl = this._configHelper.config.flowControl;
if (shellLaunchConfig.isExtensionTerminal) {
// Flow control is not supported for extension terminals
shellLaunchConfig.flowControl = false;
this._processType = ProcessType.ExtensionTerminal;
this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, undefined, cols, rows, this._configHelper);
} else {
Expand Down Expand Up @@ -167,7 +172,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
this._process = this._instantiationService.createInstance(TerminalProcessExtHostProxy, this._terminalId, shellLaunchConfig, activeWorkspaceRootUri, cols, rows, this._configHelper);
}
} else {
this._process = await this._launchProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled);
// Flow control is not needed for ptys hosted in the same process (ie. the electron
// renderer).
shellLaunchConfig.flowControl = false;
this._process = await this._launchLocalProcess(shellLaunchConfig, cols, rows, this.userHome, isScreenReaderModeEnabled);
}
}

Expand Down Expand Up @@ -221,7 +229,7 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
return undefined;
}

private async _launchProcess(
private async _launchLocalProcess(
shellLaunchConfig: IShellLaunchConfig,
cols: number,
rows: number,
Expand Down Expand Up @@ -331,6 +339,10 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
return Promise.resolve(this._latency);
}

public acknowledgeDataEvent(charCount: number): void {
this._ackDataBufferer.ack(charCount);
}

private _onExit(exitCode: number | undefined): void {
this._process = null;

Expand Down Expand Up @@ -359,3 +371,20 @@ export class TerminalProcessManager extends Disposable implements ITerminalProce
this._onEnvironmentVariableInfoChange.fire(this._environmentVariableInfo);
}
}

class AckDataBufferer {
private _unsentCharCount: number = 0;

constructor(
private readonly _callback: (charCount: number) => void
) {
}

ack(charCount: number) {
this._unsentCharCount += charCount;
while (this._unsentCharCount > FlowControlConstants.CharCountAckSize) {
this._unsentCharCount -= FlowControlConstants.CharCountAckSize;
this._callback(FlowControlConstants.CharCountAckSize);
}
}
}
39 changes: 39 additions & 0 deletions src/vs/workbench/contrib/terminal/common/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export interface ITerminalConfiguration {
localEchoStyle: 'bold' | 'dim' | 'italic' | 'underlined' | 'inverted' | string;
serverSpawn: boolean;
enablePersistentSessions: boolean;
flowControl: boolean;
}

export const DEFAULT_LOCAL_ECHO_EXCLUDE: ReadonlyArray<string> = ['vim', 'vi', 'nano', 'tmux'];
Expand Down Expand Up @@ -287,6 +288,11 @@ export interface IShellLaunchConfig {
* a terminal used to drive some VS Code feature.
*/
isFeatureTerminal?: boolean;

/**
* Whether flow control is enabled for this terminal.
*/
flowControl?: boolean;
}

/**
Expand Down Expand Up @@ -379,6 +385,7 @@ export interface ITerminalProcessManager extends IDisposable {
createProcess(shellLaunchConfig: IShellLaunchConfig, cols: number, rows: number, isScreenReaderModeEnabled: boolean): Promise<ITerminalLaunchError | undefined>;
write(data: string): void;
setDimensions(cols: number, rows: number): void;
acknowledgeDataEvent(charCount: number): void;

getInitialCwd(): Promise<string>;
getCwd(): Promise<string>;
Expand Down Expand Up @@ -419,6 +426,7 @@ export interface ITerminalProcessExtHostProxy extends IDisposable {

onInput: Event<string>;
onResize: Event<{ cols: number, rows: number }>;
onAcknowledgeDataEvent: Event<number>;
onShutdown: Event<boolean>;
onRequestInitialCwd: Event<void>;
onRequestCwd: Event<void>;
Expand Down Expand Up @@ -507,11 +515,42 @@ export interface ITerminalChildProcess {
input(data: string): void;
resize(cols: number, rows: number): void;

/**
* Acknowledge a data event has been parsed by the terminal, this is used to implement flow
* control to ensure remote processes to not get too far ahead of the client and flood the
* connection.
* @param charCount The number of characters being acknowledged.
*/
acknowledgeDataEvent(charCount: number): void;

getInitialCwd(): Promise<string>;
getCwd(): Promise<string>;
getLatency(): Promise<number>;
}

export const enum FlowControlConstants {
/**
* The number of _unacknowledged_ chars to have been sent before the pty is paused in order for
* the client to catch up.
*/
HighWatermarkChars = 100000,
/**
* After flow control pauses the pty for the client the catch up, this is the number of
* _unacknowledged_ chars to have been caught up to on the client before resuming the pty again.
* This is used to attempt to prevent pauses in the flowing data; ideally while the pty is
* paused the number of unacknowledged chars would always be greater than 0 or the client will
* appear to stutter. In reality this balance is hard to accomplish though so heavy commands
* will likely pause as latency grows, not flooding the connection is the important thing as
* it's shared with other core functionality.
*/
LowWatermarkChars = 5000,
/**
* The number characters that are accumulated on the client side before sending an ack event.
* This must be less than or equal to LowWatermarkChars or the terminal max never unpause.
*/
CharCountAckSize = 5000
}

export const enum TERMINAL_COMMAND_ID {
FIND_NEXT = 'workbench.action.terminal.findNext',
FIND_PREVIOUS = 'workbench.action.terminal.findPrevious',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ export const terminalConfiguration: IConfigurationNode = {
description: localize('terminal.integrated.enablePersistentSessions', "Experimental: persist terminal sessions for the workspace across window reloads. Currently only supported in VS Code Remote workspaces."),
type: 'boolean',
default: true
},
'terminal.integrated.flowControl': {
description: localize('terminal.integrated.flowControl', "Experimental: whether to enable flow control which will slow the program on the remote side to avoid flooding remote connections with terminal output. This setting has no effect for local terminals and terminals where the output/input is controlled by an extension. Changing this will only affect new terminals."),
type: 'boolean',
default: false
}
}
};
Expand Down
31 changes: 30 additions & 1 deletion src/vs/workbench/contrib/terminal/node/terminalProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as os from 'os';
import { Event, Emitter } from 'vs/base/common/event';
import { getWindowsBuildNumber } from 'vs/workbench/contrib/terminal/node/terminal';
import { Disposable } from 'vs/base/common/lifecycle';
import { IShellLaunchConfig, ITerminalChildProcess, ITerminalLaunchError } from 'vs/workbench/contrib/terminal/common/terminal';
import { IShellLaunchConfig, ITerminalChildProcess, ITerminalDimensionsOverride, ITerminalLaunchError, FlowControlConstants } from 'vs/workbench/contrib/terminal/common/terminal';
import { exec } from 'child_process';
import { ILogService } from 'vs/platform/log/common/log';
import { stat } from 'vs/base/node/pfs';
Expand Down Expand Up @@ -41,6 +41,9 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess
private readonly _initialCwd: string;
private readonly _ptyOptions: pty.IPtyForkOptions | pty.IWindowsPtyForkOptions;

private _isPtyPaused: boolean = false;
private _unacknowledgedCharCount: number = 0;

public get exitMessage(): string | undefined { return this._exitMessage; }

private readonly _onProcessData = this._register(new Emitter<string>());
Expand Down Expand Up @@ -98,6 +101,8 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess
}));
}
}
onProcessOverrideDimensions?: Event<ITerminalDimensionsOverride | undefined> | undefined;
onProcessResolvedShellLaunchConfig?: Event<IShellLaunchConfig> | undefined;

public async start(): Promise<ITerminalLaunchError | undefined> {
const results = await Promise.all([this._validateCwd(), this._validateExecutable()]);
Expand Down Expand Up @@ -162,6 +167,15 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess
this.onProcessReady(() => c());
});
ptyProcess.onData(data => {
if (this._shellLaunchConfig.flowControl) {
this._unacknowledgedCharCount += data.length;
if (!this._isPtyPaused && this._unacknowledgedCharCount > FlowControlConstants.HighWatermarkChars) {
this._logService.trace(`Flow control: Pause (${this._unacknowledgedCharCount} > ${FlowControlConstants.HighWatermarkChars})`);
this._isPtyPaused = true;
// TODO: Expose as public API in node-pty
(ptyProcess as any).pause();
}
}
this._onProcessData.fire(data);
if (this._closeTimeout) {
clearTimeout(this._closeTimeout);
Expand Down Expand Up @@ -324,6 +338,21 @@ export class TerminalProcess extends Disposable implements ITerminalChildProcess
}
}

public acknowledgeDataEvent(charCount: number): void {
if (!this._shellLaunchConfig.flowControl) {
return;
}
// Prevent lower than 0 to heal from errors
this._unacknowledgedCharCount = Math.max(this._unacknowledgedCharCount - charCount, 0);
this._logService.trace(`Flow control: Ack ${charCount} chars (unacknowledged: ${this._unacknowledgedCharCount})`);
if (this._isPtyPaused && this._unacknowledgedCharCount < FlowControlConstants.LowWatermarkChars) {
this._logService.trace(`Flow control: Resume (${this._unacknowledgedCharCount} < ${FlowControlConstants.LowWatermarkChars})`);
// TODO: Expose as public API in node-pty
(this._ptyProcess as any).resume();
this._isPtyPaused = false;
}
}

public getInitialCwd(): Promise<string> {
return Promise.resolve(this._initialCwd);
}
Expand Down