Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2008-10-20 12:06:16 +0400
committerAtsushi Eno <atsushieno@gmail.com>2008-10-20 12:06:16 +0400
commit6abd64b91631e8c478d237a0c8670f9ae65a0c10 (patch)
treefaa0e3ff00a33f0ff393ad6d64e6d98bb96e595f /mcs/class/Mono.Messaging.RabbitMQ
parent7b8ca08adfbb964ac42340f983267b031d618eae (diff)
add missing files (see bug #432471).
svn path=/branches/messaging-2008/mcs/; revision=116496
Diffstat (limited to 'mcs/class/Mono.Messaging.RabbitMQ')
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs122
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs149
2 files changed, 271 insertions, 0 deletions
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
new file mode 100644
index 00000000000..65c1da8d900
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
@@ -0,0 +1,122 @@
+//
+// Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+using Mono.Messaging;
+
+using RabbitMQ.Client;
+using RabbitMQ.Client.Content;
+using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.MessagePatterns;
+using RabbitMQ.Util;
+
+namespace Mono.Messaging.RabbitMQ {
+
+ public class MessageFactory {
+
+ private static readonly string SENDER_VERSION_KEY = "SenderVersion";
+ private static readonly string SOURCE_MACHINE_KEY = "SourceMachine";
+ private static readonly string BODY_TYPE_KEY = "BodyType";
+
+ public static IMessageBuilder WriteMessage (IModel ch, IMessage msg)
+ {
+ BasicMessageBuilder mb = new BasicMessageBuilder (ch);
+ mb.Properties.MessageId = msg.Id;
+ if (msg.CorrelationId != null)
+ mb.Properties.CorrelationId = msg.CorrelationId;
+ mb.Properties.Timestamp = MessageFactory.DateTimeToAmqpTimestamp (DateTime.Now);
+ Hashtable headers = new Hashtable ();
+ headers[SENDER_VERSION_KEY] = msg.SenderVersion;
+ headers[SOURCE_MACHINE_KEY] = (string) System.Environment.MachineName;
+ headers[BODY_TYPE_KEY] = msg.BodyType;
+
+ mb.Properties.Headers = headers;
+ Stream s = msg.BodyStream;
+ s.Seek (0, SeekOrigin.Begin);
+ byte[] buf = new byte[s.Length];
+ int numRead = msg.BodyStream.Read (buf, 0, buf.Length);
+ mb.BodyStream.Write (buf, 0, buf.Length);
+ return mb;
+ }
+
+ public static IMessage ReadMessage (BasicDeliverEventArgs result)
+ {
+ MessageBase msg = new MessageBase ();
+ Stream s = new MemoryStream ();
+ s.Write (result.Body, 0, result.Body.Length);
+ Console.WriteLine ("Body.Length Out {0}", result.Body.Length);
+ DateTime arrivedTime = DateTime.Now;
+ long senderVersion = (long) result.BasicProperties.Headers[SENDER_VERSION_KEY];
+ string sourceMachine = GetString (result.BasicProperties, SOURCE_MACHINE_KEY);
+ DateTime sentTime = AmqpTimestampToDateTime (result.BasicProperties.Timestamp);
+ msg.SetDeliveryInfo (Acknowledgment.None,
+ arrivedTime,
+ null,
+ result.BasicProperties.MessageId,
+ MessageType.Normal,
+ new byte[0],
+ senderVersion,
+ sourceMachine,
+ null);
+ msg.CorrelationId = result.BasicProperties.CorrelationId;
+ msg.BodyStream = s;
+ msg.BodyType = (int) result.BasicProperties.Headers[BODY_TYPE_KEY];
+ return msg;
+ }
+
+ public static string GetString (IBasicProperties properties, String key)
+ {
+ byte[] b = (byte[]) properties.Headers[key];
+ if (b == null)
+ return null;
+
+ return Encoding.UTF8.GetString (b);
+ }
+
+ public static AmqpTimestamp DateTimeToAmqpTimestamp (DateTime t)
+ {
+ DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
+ TimeSpan ts = t.ToUniversalTime () - epoch;
+ return new AmqpTimestamp((long) ts.TotalSeconds);
+ }
+
+ public static DateTime AmqpTimestampToDateTime (AmqpTimestamp ats)
+ {
+ DateTime epoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
+ return epoch.AddSeconds (ats.UnixTime).ToLocalTime ();
+ }
+
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
new file mode 100644
index 00000000000..653ae97f093
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
@@ -0,0 +1,149 @@
+//
+// Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+
+using Mono.Messaging;
+
+using RabbitMQ.Client;
+using RabbitMQ.Client.Content;
+using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.MessagePatterns;
+using RabbitMQ.Util;
+
+namespace Mono.Messaging.RabbitMQ {
+
+ public class RabbitMQMessageEnumerator : IMessageEnumerator {
+
+ private readonly QueueReference qRef;
+ private IConnection cn = null;
+ private BasicDeliverEventArgs current = null;
+ private IModel model = null;
+ private Subscription subscription = null;
+
+ public RabbitMQMessageEnumerator (QueueReference qRef) {
+ this.qRef = qRef;
+ }
+
+ public IMessage Current {
+ get {
+ if (current == null)
+ throw new InvalidOperationException ();
+
+ return CreateMessage (current);
+ }
+ }
+
+ public void Close ()
+ {
+ if (subscription != null) {
+ subscription.Close ();
+ subscription = null;
+ }
+
+ if (model != null) {
+ model.Dispose ();
+ model = null;
+ }
+
+ if (cn != null) {
+ cn.Dispose ();
+ cn = null;
+ }
+ }
+
+ public void Dispose (bool disposing)
+ {
+ }
+
+ public void Dispose ()
+ {
+ Close ();
+ }
+
+ public void Reset ()
+ {
+ Close ();
+ }
+
+ private IModel Model {
+ get {
+ if (cn == null) {
+ ConnectionFactory cf = new ConnectionFactory ();
+ cn = cf.CreateConnection (qRef.Host);
+ }
+
+ if (model == null) {
+ model = cn.CreateModel ();
+ }
+
+ return model;
+ }
+ }
+
+ private Subscription Subscription {
+ get {
+ if (subscription == null) {
+ IModel ch = Model;
+
+ ushort ticket = ch.AccessRequest ("/data");
+ string finalName = ch.QueueDeclare (ticket, qRef.Queue, false);
+
+ subscription = new Subscription (ch, ticket, finalName);
+ }
+
+ return subscription;
+ }
+ }
+
+ public bool MoveNext ()
+ {
+ Subscription sub = Subscription;
+ return sub.Next (500, out current);
+ }
+
+ public IMessage RemoveCurrent ()
+ {
+ if (current == null)
+ throw new InvalidOperationException ();
+
+ IMessage msg = CreateMessage (current);
+ Subscription.Ack (current);
+ return msg;
+ }
+
+ private IMessage CreateMessage (BasicDeliverEventArgs result)
+ {
+ return MessageFactory.ReadMessage (result);
+ }
+
+ }
+}