// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System.Threading; namespace System.Reactive { /// /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. /// /// Type of the resulting sequence's elements. /// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer. internal abstract class Sink : IDisposable { protected internal volatile IObserver _observer; private IDisposable _cancel; public Sink(IObserver observer, IDisposable cancel) { _observer = observer; _cancel = cancel; } public virtual void Dispose() { _observer = NopObserver.Instance; var cancel = Interlocked.Exchange(ref _cancel, null); if (cancel != null) { cancel.Dispose(); } } public IObserver GetForwarder() { return new _(this); } class _ : IObserver { private readonly Sink _forward; public _(Sink forward) { _forward = forward; } public void OnNext(TSource value) { _forward._observer.OnNext(value); } public void OnError(Exception error) { _forward._observer.OnError(error); _forward.Dispose(); } public void OnCompleted() { _forward._observer.OnCompleted(); _forward.Dispose(); } } } } #endif