diff options
Diffstat (limited to 'Rx/NET/Samples/EventCorrelationSample')
5 files changed, 382 insertions, 0 deletions
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln new file mode 100644 index 0000000..3617a83 --- /dev/null +++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln @@ -0,0 +1,29 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 2012 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventCorrelationSample", "EventCorrelationSample\EventCorrelationSample.csproj", "{268FE559-08B0-4258-A4FD-48DDD7B167B8}" +EndProject +Global + GlobalSection(TeamFoundationVersionControl) = preSolution + SccNumberOfProjects = 2 + SccEnterpriseProvider = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C} + SccTeamFoundationServer = http://sqlbuvsts01:8080/main + SccLocalPath0 = . + SccProjectUniqueName1 = EventCorrelationSample\\EventCorrelationSample.csproj + SccProjectName1 = EventCorrelationSample + SccLocalPath1 = EventCorrelationSample + EndGlobalSection + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config new file mode 100644 index 0000000..8e15646 --- /dev/null +++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="utf-8" ?> +<configuration> + <startup> + <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" /> + </startup> +</configuration>
\ No newline at end of file diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj new file mode 100644 index 0000000..16aaa03 --- /dev/null +++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{268FE559-08B0-4258-A4FD-48DDD7B167B8}</ProjectGuid> + <OutputType>Exe</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>EventCorrelationSample</RootNamespace> + <AssemblyName>EventCorrelationSample</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SccProjectName>SAK</SccProjectName> + <SccLocalPath>SAK</SccLocalPath> + <SccAuxPath>SAK</SccAuxPath> + <SccProvider>SAK</SccProvider> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Debug\</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\Release\</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Reactive.Core, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Core.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Interfaces, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Interfaces.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.Linq, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Linq.dll</HintPath> + </Reference> + <Reference Include="System.Reactive.PlatformServices, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.PlatformServices.dll</HintPath> + </Reference> + <Reference Include="Microsoft.CSharp" /> + </ItemGroup> + <ItemGroup> + <Compile Include="Program.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="App.config" /> + </ItemGroup> + <ItemGroup> + <WCFMetadata Include="Service References\" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project>
\ No newline at end of file diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs new file mode 100644 index 0000000..3117e8f --- /dev/null +++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs @@ -0,0 +1,234 @@ +/* + * Example showing the use of Rx for monitoring correlated activity event tracing streams. + * + * bartde - 10/10/2012 + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +namespace EventCorrelationSample +{ + class Program + { + // + // Those fields represent the ETW infrastructure. + // + private static Subject<BeginActivity> _beginActivities = new Subject<BeginActivity>(); + private static Subject<EndActivity> _endActivities = new Subject<EndActivity>(); + private static Subject<BeginTaskA> _beginAs = new Subject<BeginTaskA>(); + private static Subject<EndTaskA> _endAs = new Subject<EndTaskA>(); + private static Subject<BeginTaskB> _beginBs = new Subject<BeginTaskB>(); + private static Subject<EndTaskB> _endBs = new Subject<EndTaskB>(); + + static void Main(string[] args) + { + // + // Timestamps would really be inserted by ETW, but we'll add them ourselves using Rx's Timestamp operator. + // + var beginActivities = _beginActivities.Timestamp(); + var endActivities = _endActivities.Timestamp(); + var beginAs = _beginAs.Timestamp(); + var endAs = _endAs.Timestamp(); + var beginBs = _beginBs.Timestamp(); + var endBs = _endBs.Timestamp(); + + // + // Analyze length of requests. + // + var requests = from b in beginActivities + from e in endActivities.Where(e => e.Value.Id == b.Value.Id) + select new { Id = b.Value.Id, Length = e.Timestamp - b.Timestamp }; + + requests.Subscribe(Print(ConsoleColor.Yellow)); + + // + // Correlate task information. + // + var info = from beginAct in beginActivities + let activityId = beginAct.Value.Id /* improve readability */ + // + // Obtain correlated streams. Subscription doesn't happen here yet. + // + let endAct = endActivities.FirstAsync(e => e.Value.Id == activityId) /* correlated end event */ + let taskAs = /* correlated task A events; will contain payload and start/end times */ + from beginA in beginAs.TakeUntil(endAct).Where(a => a.Value.ActivityId == activityId) + from endA in endAs.FirstAsync(e => e.Value.Id == beginA.Value.Id) /* correlation between the task's begin/end events */ + select new { Value = (int?)beginA.Value.Value, Start = beginA.Timestamp, End = endA.Timestamp } + let taskBs = /* correlated task B events; will contain payload and start/end times */ + from beginB in beginBs.TakeUntil(endAct).Where(b => b.Value.ActivityId == activityId) + from endB in endBs.FirstAsync(e => e.Value.Id == beginB.Value.Id) /* correlation between the task's begin/end events */ + select new { Value = beginB.Value.Value, Start = beginB.Timestamp, End = endB.Timestamp } + // + // Parallel observation of all of the above, using CombineLatest with seed values for events that can be absent. + // The SelectMany operator, bound to by from...from... syntax causes the flattening of beginActivities and the result of parallel observation. + // + from res in Observable.CombineLatest( + endAct, + taskAs.StartWith(new { Value = default(int?), Start = DateTimeOffset.MinValue, End = DateTimeOffset.MinValue }), + taskBs.StartWith(new { Value = default(string), Start = DateTimeOffset.MinValue, End = DateTimeOffset.MinValue }), + (e, a, b) => new { e, a, b }).LastAsync() /* the stream will end because all substreams end (due to FirstAsync and TakeUntil use); only forward the last combined result */ + select new { + Activity = activityId, StartTime = beginAct.Timestamp, EndTime = res.e.Timestamp, + PayloadA = res.a.Value != null ? res.a.Value.ToString() : "(none)", DurationA = res.a.End - res.a.Start, + PayloadB = res.b.Value ?? "(none)", DurationB = res.b.End - res.b.Start + }; + + info.Subscribe(Print(ConsoleColor.Cyan)); + + StartService(); + } + + static void StartService() + { + // + // Mimics talking to ETW. + // + var beginActivitiesObserver = Observer.Synchronize(_beginActivities); + var endActivitiesObserver = Observer.Synchronize(_endActivities); + var beginAsObserver = Observer.Synchronize(_beginAs); + var endAsObserver = Observer.Synchronize(_endAs); + var beginBsObserver = Observer.Synchronize(_beginBs); + var endBsObserver = Observer.Synchronize(_endBs); + + // + // Master random number generator + throttle to max 10 requests at the same time. + // + var rnd = new Random(); + var semaphore = new Semaphore(10, 10); + + while (true) + { + semaphore.WaitOne(); + + var seed = rnd.Next(); + Task.Run(async () => + { + var rand = new Random(seed); + + var requestId = Guid.NewGuid(); + + Print(ConsoleColor.Green)("Starting request " + requestId); + + var measure = Stopwatch.StartNew(); + beginActivitiesObserver.OnNext(new BeginActivity { Id = requestId }); + + Thread.Sleep(rand.Next(50, 300)); + + var tossA = rand.Next(2); + var tossB = rand.Next(2); + + var tasks = new List<Task>(tossA + tossB); + var diag = new List<string>(); + + if (tossA == 1) + { + var aDelay = rand.Next(20, 70); + var aLength = rand.Next(100, 2000); + var aValue = rand.Next(0, 43); + + diag.Add(string.Format("A(d = {0}, t = {1}, x = {2})", aDelay, aLength, aValue)); + + tasks.Add(Task.Run(() => + { + var taskId = Guid.NewGuid(); + + Thread.Sleep(aDelay); + beginAsObserver.OnNext(new BeginTaskA { Id = taskId, ActivityId = requestId, Value = aValue }); + Thread.Sleep(aLength); + endAsObserver.OnNext(new EndTaskA { Id = taskId }); + })); + } + + if (tossB == 1) + { + var bDelay = rand.Next(10, 40); + var bLength = rand.Next(100, 2000); + var alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"; + var bValue = new string(Enumerable.Range(0, rand.Next(0, 10)).Select(x => alphabet[rand.Next(alphabet.Length)]).ToArray()); + + diag.Add(string.Format("B(d = {0}, t = {1}, x = {2})", bDelay, bLength, bValue)); + + tasks.Add(Task.Run(() => + { + var taskId = Guid.NewGuid(); + + Thread.Sleep(bDelay); + beginBsObserver.OnNext(new BeginTaskB { Id = taskId, ActivityId = requestId, Value = bValue }); + Thread.Sleep(bLength); + endBsObserver.OnNext(new EndTaskB { Id = taskId }); + })); + } + + await Task.WhenAll(tasks); + + Thread.Sleep(rand.Next(50, 100)); + + measure.Stop(); + Print(ConsoleColor.Red)(string.Format("Ending request {0} (d = {1}) {2}", requestId, measure.Elapsed, string.Join(" ", diag))); + endActivitiesObserver.OnNext(new EndActivity { Id = requestId }); + + semaphore.Release(); + }); + } + } + + static object s_gate = new object(); + + static void WriteLine(ConsoleColor color, object message) + { + lock (s_gate) + { + Console.ForegroundColor = color; + Console.WriteLine(message); + Console.ResetColor(); + } + } + + static Action<object> Print(ConsoleColor color) + { + return s => WriteLine(color, s); + } + } + + class Event + { + public Guid Id { get; set; } + } + + class BeginActivity : Event + { + } + + class EndActivity : Event + { + } + + class BeginTaskA : Event + { + public Guid ActivityId { get; set; } + public int Value { get; set; } + } + + class EndTaskA : Event + { + } + + class BeginTaskB : Event + { + public Guid ActivityId { get; set; } + public string Value { get; set; } + } + + class EndTaskB : Event + { + public string Value { get; set; } + } +} diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..88d0be4 --- /dev/null +++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("EventCorrelationSample")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("EventCorrelationSample")] +[assembly: AssemblyCopyright("Copyright © 2012")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("d6d7c0d7-c065-4f58-b078-a1ab8dd23449")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] |