Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/rx.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/NET/Samples/EventCorrelationSample')
-rw-r--r--Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln29
-rw-r--r--Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config6
-rw-r--r--Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj77
-rw-r--r--Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs234
-rw-r--r--Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs36
5 files changed, 382 insertions, 0 deletions
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln
new file mode 100644
index 0000000..3617a83
--- /dev/null
+++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample.sln
@@ -0,0 +1,29 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio 2012
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventCorrelationSample", "EventCorrelationSample\EventCorrelationSample.csproj", "{268FE559-08B0-4258-A4FD-48DDD7B167B8}"
+EndProject
+Global
+ GlobalSection(TeamFoundationVersionControl) = preSolution
+ SccNumberOfProjects = 2
+ SccEnterpriseProvider = {4CA58AB2-18FA-4F8D-95D4-32DDF27D184C}
+ SccTeamFoundationServer = http://sqlbuvsts01:8080/main
+ SccLocalPath0 = .
+ SccProjectUniqueName1 = EventCorrelationSample\\EventCorrelationSample.csproj
+ SccProjectName1 = EventCorrelationSample
+ SccLocalPath1 = EventCorrelationSample
+ EndGlobalSection
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {268FE559-08B0-4258-A4FD-48DDD7B167B8}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config
new file mode 100644
index 0000000..8e15646
--- /dev/null
+++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/App.config
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+ <startup>
+ <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
+ </startup>
+</configuration> \ No newline at end of file
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj
new file mode 100644
index 0000000..16aaa03
--- /dev/null
+++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/EventCorrelationSample.csproj
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProjectGuid>{268FE559-08B0-4258-A4FD-48DDD7B167B8}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>EventCorrelationSample</RootNamespace>
+ <AssemblyName>EventCorrelationSample</AssemblyName>
+ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <SccProjectName>SAK</SccProjectName>
+ <SccLocalPath>SAK</SccLocalPath>
+ <SccAuxPath>SAK</SccAuxPath>
+ <SccProvider>SAK</SccProvider>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="System.Reactive.Core, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Interfaces, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Interfaces.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.Linq, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.Linq.dll</HintPath>
+ </Reference>
+ <Reference Include="System.Reactive.PlatformServices, Version=2.0.20905.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\..\..\..\Program Files (x86)\Microsoft SDKs\Reactive Extensions\v2.0\Binaries\.NETFramework\v4.5\System.Reactive.PlatformServices.dll</HintPath>
+ </Reference>
+ <Reference Include="Microsoft.CSharp" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Program.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="App.config" />
+ </ItemGroup>
+ <ItemGroup>
+ <WCFMetadata Include="Service References\" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project> \ No newline at end of file
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs
new file mode 100644
index 0000000..3117e8f
--- /dev/null
+++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Program.cs
@@ -0,0 +1,234 @@
+/*
+ * Example showing the use of Rx for monitoring correlated activity event tracing streams.
+ *
+ * bartde - 10/10/2012
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace EventCorrelationSample
+{
+ class Program
+ {
+ //
+ // Those fields represent the ETW infrastructure.
+ //
+ private static Subject<BeginActivity> _beginActivities = new Subject<BeginActivity>();
+ private static Subject<EndActivity> _endActivities = new Subject<EndActivity>();
+ private static Subject<BeginTaskA> _beginAs = new Subject<BeginTaskA>();
+ private static Subject<EndTaskA> _endAs = new Subject<EndTaskA>();
+ private static Subject<BeginTaskB> _beginBs = new Subject<BeginTaskB>();
+ private static Subject<EndTaskB> _endBs = new Subject<EndTaskB>();
+
+ static void Main(string[] args)
+ {
+ //
+ // Timestamps would really be inserted by ETW, but we'll add them ourselves using Rx's Timestamp operator.
+ //
+ var beginActivities = _beginActivities.Timestamp();
+ var endActivities = _endActivities.Timestamp();
+ var beginAs = _beginAs.Timestamp();
+ var endAs = _endAs.Timestamp();
+ var beginBs = _beginBs.Timestamp();
+ var endBs = _endBs.Timestamp();
+
+ //
+ // Analyze length of requests.
+ //
+ var requests = from b in beginActivities
+ from e in endActivities.Where(e => e.Value.Id == b.Value.Id)
+ select new { Id = b.Value.Id, Length = e.Timestamp - b.Timestamp };
+
+ requests.Subscribe(Print(ConsoleColor.Yellow));
+
+ //
+ // Correlate task information.
+ //
+ var info = from beginAct in beginActivities
+ let activityId = beginAct.Value.Id /* improve readability */
+ //
+ // Obtain correlated streams. Subscription doesn't happen here yet.
+ //
+ let endAct = endActivities.FirstAsync(e => e.Value.Id == activityId) /* correlated end event */
+ let taskAs = /* correlated task A events; will contain payload and start/end times */
+ from beginA in beginAs.TakeUntil(endAct).Where(a => a.Value.ActivityId == activityId)
+ from endA in endAs.FirstAsync(e => e.Value.Id == beginA.Value.Id) /* correlation between the task's begin/end events */
+ select new { Value = (int?)beginA.Value.Value, Start = beginA.Timestamp, End = endA.Timestamp }
+ let taskBs = /* correlated task B events; will contain payload and start/end times */
+ from beginB in beginBs.TakeUntil(endAct).Where(b => b.Value.ActivityId == activityId)
+ from endB in endBs.FirstAsync(e => e.Value.Id == beginB.Value.Id) /* correlation between the task's begin/end events */
+ select new { Value = beginB.Value.Value, Start = beginB.Timestamp, End = endB.Timestamp }
+ //
+ // Parallel observation of all of the above, using CombineLatest with seed values for events that can be absent.
+ // The SelectMany operator, bound to by from...from... syntax causes the flattening of beginActivities and the result of parallel observation.
+ //
+ from res in Observable.CombineLatest(
+ endAct,
+ taskAs.StartWith(new { Value = default(int?), Start = DateTimeOffset.MinValue, End = DateTimeOffset.MinValue }),
+ taskBs.StartWith(new { Value = default(string), Start = DateTimeOffset.MinValue, End = DateTimeOffset.MinValue }),
+ (e, a, b) => new { e, a, b }).LastAsync() /* the stream will end because all substreams end (due to FirstAsync and TakeUntil use); only forward the last combined result */
+ select new {
+ Activity = activityId, StartTime = beginAct.Timestamp, EndTime = res.e.Timestamp,
+ PayloadA = res.a.Value != null ? res.a.Value.ToString() : "(none)", DurationA = res.a.End - res.a.Start,
+ PayloadB = res.b.Value ?? "(none)", DurationB = res.b.End - res.b.Start
+ };
+
+ info.Subscribe(Print(ConsoleColor.Cyan));
+
+ StartService();
+ }
+
+ static void StartService()
+ {
+ //
+ // Mimics talking to ETW.
+ //
+ var beginActivitiesObserver = Observer.Synchronize(_beginActivities);
+ var endActivitiesObserver = Observer.Synchronize(_endActivities);
+ var beginAsObserver = Observer.Synchronize(_beginAs);
+ var endAsObserver = Observer.Synchronize(_endAs);
+ var beginBsObserver = Observer.Synchronize(_beginBs);
+ var endBsObserver = Observer.Synchronize(_endBs);
+
+ //
+ // Master random number generator + throttle to max 10 requests at the same time.
+ //
+ var rnd = new Random();
+ var semaphore = new Semaphore(10, 10);
+
+ while (true)
+ {
+ semaphore.WaitOne();
+
+ var seed = rnd.Next();
+ Task.Run(async () =>
+ {
+ var rand = new Random(seed);
+
+ var requestId = Guid.NewGuid();
+
+ Print(ConsoleColor.Green)("Starting request " + requestId);
+
+ var measure = Stopwatch.StartNew();
+ beginActivitiesObserver.OnNext(new BeginActivity { Id = requestId });
+
+ Thread.Sleep(rand.Next(50, 300));
+
+ var tossA = rand.Next(2);
+ var tossB = rand.Next(2);
+
+ var tasks = new List<Task>(tossA + tossB);
+ var diag = new List<string>();
+
+ if (tossA == 1)
+ {
+ var aDelay = rand.Next(20, 70);
+ var aLength = rand.Next(100, 2000);
+ var aValue = rand.Next(0, 43);
+
+ diag.Add(string.Format("A(d = {0}, t = {1}, x = {2})", aDelay, aLength, aValue));
+
+ tasks.Add(Task.Run(() =>
+ {
+ var taskId = Guid.NewGuid();
+
+ Thread.Sleep(aDelay);
+ beginAsObserver.OnNext(new BeginTaskA { Id = taskId, ActivityId = requestId, Value = aValue });
+ Thread.Sleep(aLength);
+ endAsObserver.OnNext(new EndTaskA { Id = taskId });
+ }));
+ }
+
+ if (tossB == 1)
+ {
+ var bDelay = rand.Next(10, 40);
+ var bLength = rand.Next(100, 2000);
+ var alphabet = "abcdefghijklmnopqrstuvwxyz0123456789";
+ var bValue = new string(Enumerable.Range(0, rand.Next(0, 10)).Select(x => alphabet[rand.Next(alphabet.Length)]).ToArray());
+
+ diag.Add(string.Format("B(d = {0}, t = {1}, x = {2})", bDelay, bLength, bValue));
+
+ tasks.Add(Task.Run(() =>
+ {
+ var taskId = Guid.NewGuid();
+
+ Thread.Sleep(bDelay);
+ beginBsObserver.OnNext(new BeginTaskB { Id = taskId, ActivityId = requestId, Value = bValue });
+ Thread.Sleep(bLength);
+ endBsObserver.OnNext(new EndTaskB { Id = taskId });
+ }));
+ }
+
+ await Task.WhenAll(tasks);
+
+ Thread.Sleep(rand.Next(50, 100));
+
+ measure.Stop();
+ Print(ConsoleColor.Red)(string.Format("Ending request {0} (d = {1}) {2}", requestId, measure.Elapsed, string.Join(" ", diag)));
+ endActivitiesObserver.OnNext(new EndActivity { Id = requestId });
+
+ semaphore.Release();
+ });
+ }
+ }
+
+ static object s_gate = new object();
+
+ static void WriteLine(ConsoleColor color, object message)
+ {
+ lock (s_gate)
+ {
+ Console.ForegroundColor = color;
+ Console.WriteLine(message);
+ Console.ResetColor();
+ }
+ }
+
+ static Action<object> Print(ConsoleColor color)
+ {
+ return s => WriteLine(color, s);
+ }
+ }
+
+ class Event
+ {
+ public Guid Id { get; set; }
+ }
+
+ class BeginActivity : Event
+ {
+ }
+
+ class EndActivity : Event
+ {
+ }
+
+ class BeginTaskA : Event
+ {
+ public Guid ActivityId { get; set; }
+ public int Value { get; set; }
+ }
+
+ class EndTaskA : Event
+ {
+ }
+
+ class BeginTaskB : Event
+ {
+ public Guid ActivityId { get; set; }
+ public string Value { get; set; }
+ }
+
+ class EndTaskB : Event
+ {
+ public string Value { get; set; }
+ }
+}
diff --git a/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..88d0be4
--- /dev/null
+++ b/Rx/NET/Samples/EventCorrelationSample/EventCorrelationSample/Properties/AssemblyInfo.cs
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("EventCorrelationSample")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("EventCorrelationSample")]
+[assembly: AssemblyCopyright("Copyright © 2012")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("d6d7c0d7-c065-4f58-b078-a1ab8dd23449")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]