// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq { #if !NO_PERF using Observαble; #endif internal partial class QueryLanguage { #region + Subscribe + public virtual IDisposable Subscribe(IEnumerable source, IObserver observer) { return Subscribe_(source, observer, SchedulerDefaults.Iteration); } public virtual IDisposable Subscribe(IEnumerable source, IObserver observer, IScheduler scheduler) { return Subscribe_(source, observer, scheduler); } private static IDisposable Subscribe_(IEnumerable source, IObserver observer, IScheduler scheduler) { #if !NO_PERF // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return new ToObservable(source, scheduler).Subscribe/*Unsafe*/(observer); #else var e = source.GetEnumerator(); var flag = new BooleanDisposable(); scheduler.Schedule(self => { var hasNext = false; var ex = default(Exception); var current = default(TSource); if (flag.IsDisposed) { e.Dispose(); return; } try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception exception) { ex = exception; } if (!hasNext || ex != null) { e.Dispose(); } if (ex != null) { observer.OnError(ex); return; } if (!hasNext) { observer.OnCompleted(); return; } observer.OnNext(current); self(); }); return flag; #endif } #endregion #region + ToEnumerable + public virtual IEnumerable ToEnumerable(IObservable source) { return new AnonymousEnumerable(() => source.GetEnumerator()); } #endregion #region ToEvent public virtual IEventSource ToEvent(IObservable source) { return new EventSource(source, (h, _) => h(Unit.Default)); } public virtual IEventSource ToEvent(IObservable source) { return new EventSource(source, (h, value) => h(value)); } #endregion #region ToEventPattern public virtual IEventPatternSource ToEventPattern(IObservable> source) #if !NO_EVENTARGS_CONSTRAINT where TEventArgs : EventArgs #endif { return new EventPatternSource( #if !NO_VARIANCE source, #else source.Select(x => (EventPattern)x), #endif (h, evt) => h(evt.Sender, evt.EventArgs) ); } #endregion #region + ToObservable + public virtual IObservable ToObservable(IEnumerable source) { #if !NO_PERF return new ToObservable(source, SchedulerDefaults.Iteration); #else return ToObservable_(source, SchedulerDefaults.Iteration); #endif } public virtual IObservable ToObservable(IEnumerable source, IScheduler scheduler) { #if !NO_PERF return new ToObservable(source, scheduler); #else return ToObservable_(source, scheduler); #endif } #if NO_PERF private static IObservable ToObservable_(IEnumerable source, IScheduler scheduler) { return new AnonymousObservable(observer => source.Subscribe(observer, scheduler)); } #endif #endregion } }