// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
using System.Threading;
namespace System.Reactive
{
///
/// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
///
/// Type of the resulting sequence's elements.
/// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.
internal abstract class Sink : IDisposable
{
protected internal volatile IObserver _observer;
private IDisposable _cancel;
public Sink(IObserver observer, IDisposable cancel)
{
_observer = observer;
_cancel = cancel;
}
public virtual void Dispose()
{
_observer = NopObserver.Instance;
var cancel = Interlocked.Exchange(ref _cancel, null);
if (cancel != null)
{
cancel.Dispose();
}
}
public IObserver GetForwarder()
{
return new _(this);
}
class _ : IObserver
{
private readonly Sink _forward;
public _(Sink forward)
{
_forward = forward;
}
public void OnNext(TSource value)
{
_forward._observer.OnNext(value);
}
public void OnError(Exception error)
{
_forward._observer.OnError(error);
_forward.Dispose();
}
public void OnCompleted()
{
_forward._observer.OnCompleted();
_forward.Dispose();
}
}
}
}
#endif