diff options
author | Atsushi Eno <atsushieno@gmail.com> | 2008-12-07 12:55:03 +0300 |
---|---|---|
committer | Atsushi Eno <atsushieno@gmail.com> | 2008-12-07 12:55:03 +0300 |
commit | 56f2a5eee8ee484218835cc53663685ac70b06c5 (patch) | |
tree | 0d588368b1de72b2f8ee7120e377d3f226d243fb /mcs/class/Mono.Messaging.RabbitMQ | |
parent | 571a0e148cce4ff8797f85982b34707e632610e6 (diff) |
Apply RabbitMQ support patch by Michael Barker, on bug #457089.
svn path=/branches/messaging-2008/mcs/; revision=120957
Diffstat (limited to 'mcs/class/Mono.Messaging.RabbitMQ')
18 files changed, 2124 insertions, 173 deletions
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources index 21a917c3dec..193d4813a17 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources @@ -4,5 +4,6 @@ ./Mono.Messaging.RabbitMQ/MessageFactory.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs +./Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog index f3f3d3d1c25..3604f63ebf9 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog @@ -1,3 +1,25 @@ +2008-12-07 Michael Barker <mike@middlesoft.co.uk> + + * RabbitMQMessageQueue.cs: Throw MessageUnavailableException when there are + no messages. + +2008-11-23 Michael Barker <mike@middlesoft.co.uk> + + * RabbitMQMessageQueue.cs: Added selector support for ReceiveBy{Id,CorrelationId} + and support for MessageQueueTransactionType, currently only None and Single + are supported. Added PeekBy{Id,CorrelationId} methods. + * MessageFactory.cs: Made read/write message methods non-static and requires + the MessagingProvider as a constructor parameter. + +2008-11-02 Michael Barker <mike@middlesoft.co.uk> + + * RabbitMQMessageQueueTransaction.cs: New, Handles transactional delivery + by maintaining the transaction context. + * RabbitMQMessageQueue.cs: Added support for transactions, purging, deleting + and refactored some of the methods to improve the code reuse. + * RabbitMQMessagingProvider.cs: Added methods for queue deletion and + creating transactions. + 2008-10-26 Michael Barker <mike@middlesoft.co.uk> * MessageFactory.cs: Support all properties defined in the 1.1 version of diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs index aa0f852bf0d..e48da2ca2ba 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs @@ -67,10 +67,18 @@ namespace Mono.Messaging.RabbitMQ { private static readonly string USE_AUTHENTICATION_KEY = "UseAuthentication"; private static readonly string USE_DEAD_LETTER_QUEUE_KEY = "UseDeadLetterQueue"; private static readonly string USE_ENCRYPTION_KEY = "UseEncryption"; + private static readonly string TRANSACTION_ID_KEY = "TrandactionId"; private static readonly int PERSISTENT_DELIVERY_MODE = 2; - public static IMessageBuilder WriteMessage (IModel ch, IMessage msg) + private readonly RabbitMQMessagingProvider provider; + + public MessageFactory (RabbitMQMessagingProvider provider) + { + this.provider = provider; + } + + public IMessageBuilder WriteMessage (IModel ch, IMessage msg) { BasicMessageBuilder mb = new BasicMessageBuilder (ch); mb.Properties.MessageId = msg.Id; @@ -103,6 +111,7 @@ namespace Mono.Messaging.RabbitMQ { headers[SENDER_CERTIFICATE_KEY] = msg.SenderCertificate; headers[TIME_TO_BE_RECEIVED_KEY] = msg.TimeToBeReceived.Ticks; headers[TIME_TO_REACH_QUEUE_KEY] = msg.TimeToReachQueue.Ticks; + SetValue (headers, TRANSACTION_ID_KEY, msg.TransactionId); headers[USE_AUTHENTICATION_KEY] = msg.UseAuthentication; headers[USE_DEAD_LETTER_QUEUE_KEY] = msg.UseDeadLetterQueue; headers[USE_ENCRYPTION_KEY] = msg.UseEncryption; @@ -122,7 +131,7 @@ namespace Mono.Messaging.RabbitMQ { headers[name] = val; } - public static IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result) + public IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result) { /* if (destination == null) @@ -133,22 +142,24 @@ namespace Mono.Messaging.RabbitMQ { MessageBase msg = new MessageBase (); Stream s = new MemoryStream (); s.Write (result.Body, 0, result.Body.Length); - Console.WriteLine ("Body.Length Out {0}", result.Body.Length); DateTime arrivedTime = DateTime.Now; IDictionary headers = result.BasicProperties.Headers; long senderVersion = (long) headers[SENDER_VERSION_KEY]; string sourceMachine = GetString (headers, SOURCE_MACHINE_KEY); DateTime sentTime = AmqpTimestampToDateTime (result.BasicProperties.Timestamp); + string transactionId = GetString (headers, TRANSACTION_ID_KEY); msg.SetDeliveryInfo (Acknowledgment.None, arrivedTime, - new RabbitMQMessageQueue (destination), + new RabbitMQMessageQueue (provider, + destination, + true), result.BasicProperties.MessageId, MessageType.Normal, new byte[0], senderVersion, sentTime, sourceMachine, - null); + transactionId); msg.CorrelationId = result.BasicProperties.CorrelationId; msg.BodyStream = s; msg.BodyType = (int) result.BasicProperties.Headers[BODY_TYPE_KEY]; @@ -156,8 +167,12 @@ namespace Mono.Messaging.RabbitMQ { Enum.ToObject (typeof (AcknowledgeTypes), headers[ACKNOWLEDGE_TYPE_KEY]); string adminQueuePath = GetString (headers, ADMINISTRATION_QUEUE_KEY); - if (adminQueuePath != null) - msg.AdministrationQueue = new RabbitMQMessageQueue (QueueReference.Parse (adminQueuePath)); + if (adminQueuePath != null) { + QueueReference qRef = QueueReference.Parse (adminQueuePath); + msg.AdministrationQueue = new RabbitMQMessageQueue (provider, + qRef, + true); + } msg.AppSpecific = (int) headers[APP_SPECIFIC_KEY]; msg.AuthenticationProviderName = GetString (headers, AUTHENTICATION_PROVIDER_NAME_KEY); msg.AuthenticationProviderType = (CryptographicProviderType) Enum.ToObject (typeof (CryptographicProviderType), headers[AUTHENTICATION_PROVIDER_TYPE_KEY]); @@ -172,7 +187,7 @@ namespace Mono.Messaging.RabbitMQ { msg.Priority = (MessagePriority) Enum.ToObject (typeof (MessagePriority), result.BasicProperties.Priority); msg.Recoverable = result.BasicProperties.DeliveryMode == PERSISTENT_DELIVERY_MODE; if (result.BasicProperties.ReplyTo != null) - msg.ResponseQueue = new RabbitMQMessageQueue (QueueReference.Parse (result.BasicProperties.ReplyTo)); + msg.ResponseQueue = new RabbitMQMessageQueue (provider, QueueReference.Parse (result.BasicProperties.ReplyTo), true); msg.SenderCertificate = (byte[]) headers[SENDER_CERTIFICATE_KEY]; msg.TimeToBeReceived = new TimeSpan((long) headers[TIME_TO_BE_RECEIVED_KEY]); msg.TimeToReachQueue = new TimeSpan((long) headers[TIME_TO_REACH_QUEUE_KEY]); diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs index 33d6236426d..30d0ea3542b 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs @@ -43,13 +43,16 @@ namespace Mono.Messaging.RabbitMQ { public class RabbitMQMessageEnumerator : IMessageEnumerator { + private readonly MessageFactory helper; private readonly QueueReference qRef; private IConnection cn = null; private BasicDeliverEventArgs current = null; private IModel model = null; private Subscription subscription = null; - public RabbitMQMessageEnumerator (QueueReference qRef) { + public RabbitMQMessageEnumerator (MessageFactory helper, + QueueReference qRef) { + this.helper = helper; this.qRef = qRef; } @@ -140,9 +143,19 @@ namespace Mono.Messaging.RabbitMQ { return msg; } + public IMessage RemoveCurrent (IMessageQueueTransaction transaction) + { + throw new NotSupportedException ("Unable to remove messages within a transaction"); + } + + public IMessage RemoveCurrent (MessageQueueTransactionType transactionType) + { + throw new NotSupportedException ("Unable to remove messages within a transaction"); + } + private IMessage CreateMessage (BasicDeliverEventArgs result) { - return MessageFactory.ReadMessage (qRef, result); + return helper.ReadMessage (qRef, result); } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs index eacc97675d2..b83eaca15a9 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs @@ -55,14 +55,33 @@ namespace Mono.Messaging.RabbitMQ { private ISynchronizeInvoke synchronizingObject = null; private bool useJournalQueue = false; private QueueReference qRef = QueueReference.DEFAULT; + private readonly RabbitMQMessagingProvider provider; + private readonly MessageFactory helper; + private readonly string realm; + private readonly bool transactional; - public RabbitMQMessageQueue () + public RabbitMQMessageQueue (RabbitMQMessagingProvider provider, + bool transactional) + : this (provider, QueueReference.DEFAULT, transactional) { } - public RabbitMQMessageQueue (QueueReference qRef) + public RabbitMQMessageQueue (RabbitMQMessagingProvider provider, + QueueReference qRef, + bool transactional) + : this (provider, "/data", qRef, transactional) { + } + + public RabbitMQMessageQueue (RabbitMQMessagingProvider provider, + string realm, QueueReference qRef, + bool transactional) + { + this.provider = provider; + this.helper = new MessageFactory (provider); + this.realm = realm; this.qRef = qRef; + this.transactional = transactional; } public bool Authenticate { @@ -130,7 +149,7 @@ namespace Mono.Messaging.RabbitMQ { } public bool Transactional { - get { throw new NotImplementedException (); } + get { return transactional; } } public bool UseJournalQueue { @@ -155,23 +174,43 @@ namespace Mono.Messaging.RabbitMQ { return version; } - private void SetDeliveryInfo (IMessage msg, IConnection cn) + private void SetDeliveryInfo (IMessage msg, long senderVersion, + string transactionId) { - long senderVersion = GetVersion (cn); msg.SetDeliveryInfo (Acknowledgment.None, DateTime.MinValue, this, - Guid.NewGuid ().ToString (), + Guid.NewGuid ().ToString () + "\\0", MessageType.Normal, new byte[0], senderVersion, DateTime.UtcNow, null, - null); + transactionId); } + public void Close () + { + // No-op (Queue are currently stateless) + } + + public static void Delete (string realm, QueueReference qRef) + { + ConnectionFactory cf = new ConnectionFactory (); + + using (IConnection cn = cf.CreateConnection (qRef.Host)) { + using (IModel model = cn.CreateModel ()) { + ushort ticket = model.AccessRequest (realm); + model.QueueDelete (ticket, qRef.Queue, false, false, false); + } + } + } + public void Send (IMessage msg) { + if (QRef == QueueReference.DEFAULT) + throw new MonoMessagingException ("Path has not been specified"); + if (msg.BodyStream == null) throw new ArgumentException ("Message is not serialized properly"); @@ -179,17 +218,9 @@ namespace Mono.Messaging.RabbitMQ { try { using (IConnection cn = cf.CreateConnection (QRef.Host)) { + SetDeliveryInfo (msg, GetVersion (cn), null); using (IModel ch = cn.CreateModel ()) { - ushort ticket = ch.AccessRequest ("/data"); - string finalName = ch.QueueDeclare (ticket, QRef.Queue, false); - SetDeliveryInfo (msg, cn); - IMessageBuilder mb = MessageFactory.WriteMessage (ch, msg); - Console.WriteLine("Body.Length In {0}", mb.GetContentBody ().Length); - - ch.BasicPublish (ticket, "", - finalName, - (IBasicProperties) mb.GetContentHeader(), - mb.GetContentBody ()); + Send (ch, msg); } } } catch (BrokerUnreachableException e) { @@ -197,32 +228,525 @@ namespace Mono.Messaging.RabbitMQ { } } - public IMessage Receive () + public void Send (IMessage msg, IMessageQueueTransaction transaction) + { + if (QRef == QueueReference.DEFAULT) + throw new MonoMessagingException ("Path has not been specified"); + + if (msg.BodyStream == null) + throw new ArgumentException ("Message is not serialized properly"); + + RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction; + + tx.RunSend (SendInContext, msg); + } + + public void Send (IMessage msg, MessageQueueTransactionType transactionType) + { + switch (transactionType) { + case MessageQueueTransactionType.Single: + using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) { + try { + Send (msg, tx); + tx.Commit (); + } catch (Exception e) { + tx.Abort (); + throw new MonoMessagingException(e.Message, e); + } + } + break; + + case MessageQueueTransactionType.None: + Send (msg); + break; + + case MessageQueueTransactionType.Automatic: + throw new NotSupportedException("Automatic transaction types not supported"); + } + } + + private void SendInContext (ref string host, ref IConnection cn, + ref IModel model, IMessage msg, string txId) + { + if (host == null) + host = QRef.Host; + else if (host != QRef.Host) + throw new MonoMessagingException ("Transactions can not span multiple hosts"); + + if (cn == null) { + ConnectionFactory cf = new ConnectionFactory (); + cn = cf.CreateConnection (host); + } + + if (model == null) { + model = cn.CreateModel (); + model.TxSelect (); + } + + SetDeliveryInfo (msg, GetVersion (cn), txId); + Send (model, msg); + } + + private void Send (IModel model, IMessage msg) + { + ushort ticket = model.AccessRequest ("/data"); + string finalName = model.QueueDeclare (ticket, QRef.Queue, true); + IMessageBuilder mb = helper.WriteMessage (model, msg); + + model.BasicPublish (ticket, "", finalName, + (IBasicProperties) mb.GetContentHeader(), + mb.GetContentBody ()); + } + + public void Purge () + { + ConnectionFactory cf = new ConnectionFactory (); + + using (IConnection cn = cf.CreateConnection (QRef.Host)) { + using (IModel model = cn.CreateModel ()) { + ushort ticket = model.AccessRequest (realm); + model.QueuePurge (ticket, QRef.Queue, false); + } + } + } + + public IMessage Peek () { ConnectionFactory cf = new ConnectionFactory (); using (IConnection cn = cf.CreateConnection (QRef.Host)) { using (IModel ch = cn.CreateModel ()) { - ushort ticket = ch.AccessRequest ("/data"); - string finalName = ch.QueueDeclare (ticket, QRef.Queue, false); + return Receive (ch, -1, false); + } + } + } + + public IMessage Peek (TimeSpan timeout) + { + return Run (Peeker (timeout)); +// ConnectionFactory cf = new ConnectionFactory (); +// +// using (IConnection cn = cf.CreateConnection (QRef.Host)) { +// using (IModel ch = cn.CreateModel ()) { +// if (timeout == TimeSpan.MaxValue) { +// return Receive (ch, -1, false); +// } else { +// return Receive (ch, (int) timeout.TotalMilliseconds, false); +// } +// } +// } + } + + public IMessage PeekById (string id) + { + return Run (Peeker (ById (id))); +// ConnectionFactory cf = new ConnectionFactory (); +// +// using (IConnection cn = cf.CreateConnection (QRef.Host)) { +// using (IModel ch = cn.CreateModel ()) { +// return Receive (ch, 500, true, new IdMatcher (id).MatchById); +// } +// } + } + + public IMessage PeekById (string id, TimeSpan timeout) + { + return Run (Peeker (timeout, ById (id))); + } + + public IMessage PeekByCorrelationId (string id) + { + return Run (Peeker (ByCorrelationId (id))); +// ConnectionFactory cf = new ConnectionFactory (); +// +// using (IConnection cn = cf.CreateConnection (QRef.Host)) { +// using (IModel ch = cn.CreateModel ()) { +// return Receive (ch, 500, false, +// new CorrelationIdMatcher (id).MatchById); +// } +// } + } + + public IMessage PeekByCorrelationId (string id, TimeSpan timeout) + { + return Run (Peeker (timeout, ByCorrelationId (id))); + } + + public IMessage Receive () + { + return Run (Receiver ()); + } + + public IMessage Receive (TimeSpan timeout) + { + return Run (Receiver (timeout)); + } + + public IMessage Receive (TimeSpan timeout, + IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver (timeout)); + } + + public IMessage Receive (TimeSpan timeout, + MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver (timeout)); + } + + public IMessage Receive (IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver()); + } + + public IMessage Receive (MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver ()); + } + + public IMessage ReceiveById (string id) + { + return Run (Receiver (ById (id))); + } + + public IMessage ReceiveById (string id, TimeSpan timeout) + { + return Run (Receiver (timeout, ById (id))); + } + + public IMessage ReceiveById (string id, + IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver (ById (id))); + } + + public IMessage ReceiveById (string id, + MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver (ById (id))); + } + + public IMessage ReceiveById (string id, TimeSpan timeout, + IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver (timeout, ById (id))); + } + + public IMessage ReceiveById (string id, TimeSpan timeout, + MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver (timeout, ById (id))); + } + + public IMessage ReceiveByCorrelationId (string id) + { + return Run (Receiver (ByCorrelationId (id))); + } + + public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout) + { + return Run (Receiver (timeout, ByCorrelationId (id))); + } + + public IMessage ReceiveByCorrelationId (string id, + IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver (ByCorrelationId (id))); + } + + public IMessage ReceiveByCorrelationId (string id, + MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver (ByCorrelationId (id))); + } + + public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout, + IMessageQueueTransaction transaction) + { + return Run (transaction, Receiver (timeout, ByCorrelationId (id))); + } + + public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout, + MessageQueueTransactionType transactionType) + { + return Run (transactionType, Receiver (timeout, ByCorrelationId (id))); + } + + public IMessageEnumerator GetMessageEnumerator () + { + return new RabbitMQMessageEnumerator (helper, QRef); + } + + private IMessage Run (MessageQueueTransactionType transactionType, + TxReceiver.DoReceive r) + { + switch (transactionType) { + case MessageQueueTransactionType.Single: + using (RabbitMQMessageQueueTransaction tx = GetTx ()) { + bool success = false; + try { + IMessage msg = Run (tx, r); + tx.Commit (); + success = true; + return msg; + } finally { + if (!success) + tx.Abort (); + } + } + + case MessageQueueTransactionType.None: + return Run (r); + + default: + throw new NotSupportedException(transactionType + " not supported"); + } + } + + private IMessage Run (IMessageQueueTransaction transaction, + TxReceiver.DoReceive r) + { + TxReceiver txr = new TxReceiver (this, r); + RabbitMQMessageQueueTransaction tx = + (RabbitMQMessageQueueTransaction) transaction; + return tx.RunReceive (txr.ReceiveInContext); + } + + private IMessage Run (TxReceiver.DoReceive r) + { + ConnectionFactory cf = new ConnectionFactory (); + using (IConnection cn = cf.CreateConnection (QRef.Host)) { + using (IModel model = cn.CreateModel ()) { + return r (this, model); + } + } + } + + private IMessage ReceiveInContext (ref string host, ref IConnection cn, + ref IModel model, string txId) + { + if (host == null) + host = QRef.Host; + else if (host != QRef.Host) + throw new MonoMessagingException ("Transactions can not span multiple hosts"); + + if (cn == null) { + ConnectionFactory cf = new ConnectionFactory (); + cn = cf.CreateConnection (host); + } + + if (model == null) { + model = cn.CreateModel (); + model.TxSelect (); + } + + return Receive (model, -1, true); + } + + private class TxReceiver + { + private readonly DoReceive doReceive; + private readonly RabbitMQMessageQueue q; + + public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) { + this.q = q; + this.doReceive = doReceive; + } + + public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model); + + public IMessage ReceiveInContext (ref string host, ref IConnection cn, + ref IModel model, string txId) + { + if (host == null) + host = q.QRef.Host; + else if (host != q.QRef.Host) + throw new MonoMessagingException ("Transactions can not span multiple hosts"); + + if (cn == null) { + ConnectionFactory cf = new ConnectionFactory (); + cn = cf.CreateConnection (host); + } + + if (model == null) { + model = cn.CreateModel (); + model.TxSelect (); + } + + return doReceive (q, model); + } + } + + private class DoReceiveWithTimeout + { + private readonly int timeout; + private readonly IsMatch matcher; + private readonly bool ack; + + public DoReceiveWithTimeout (int timeout, IsMatch matcher) + : this (timeout, matcher, true) + { + } + + public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack) + { + if (matcher != null && timeout == -1) + this.timeout = 500; + else + this.timeout = timeout; + this.matcher = matcher; + this.ack = ack; + } + + public IMessage DoReceive (RabbitMQMessageQueue q, IModel model) + { + if (matcher == null) + return q.Receive (model, timeout, ack); + else + return q.Receive (model, timeout, ack, matcher); + } + } + + private static TxReceiver.DoReceive Receiver (TimeSpan timeout, + IsMatch matcher) + { + int to = TimeSpanToInt32 (timeout); + return new DoReceiveWithTimeout (to, matcher).DoReceive; + } + + private static TxReceiver.DoReceive Receiver (IsMatch matcher) + { + return new DoReceiveWithTimeout (-1, matcher).DoReceive; + } + + private static TxReceiver.DoReceive Receiver (TimeSpan timeout) + { + int to = TimeSpanToInt32 (timeout); + return new DoReceiveWithTimeout (to, null).DoReceive; + } + + private TxReceiver.DoReceive Receiver () + { + return new DoReceiveWithTimeout (-1, null).DoReceive; + } + + private TxReceiver.DoReceive Peeker () + { + return new DoReceiveWithTimeout (-1, null).DoReceive; + } + + private TxReceiver.DoReceive Peeker (TimeSpan timeout) + { + int to = TimeSpanToInt32 (timeout); + return new DoReceiveWithTimeout (to, null, false).DoReceive; + } + + private TxReceiver.DoReceive Peeker (IsMatch matcher) + { + return new DoReceiveWithTimeout (-1, matcher, false).DoReceive; + } + + private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher) + { + int to = TimeSpanToInt32 (timeout); + return new DoReceiveWithTimeout (to, matcher, false).DoReceive; + } + + delegate bool IsMatch (BasicDeliverEventArgs result); + + private class IdMatcher + { + private readonly string id; + public IdMatcher (string id) + { + this.id = id; + } + + public bool MatchById (BasicDeliverEventArgs result) + { + return result.BasicProperties.MessageId == id; + } + } + + private static IsMatch ById (string id) + { + return new IdMatcher (id).MatchById; + } + + private class CorrelationIdMatcher + { + private readonly string correlationId; + public CorrelationIdMatcher (string correlationId) + { + this.correlationId = correlationId; + } + + public bool MatchById (BasicDeliverEventArgs result) + { + return result.BasicProperties.CorrelationId == correlationId; + } + } + + private static IsMatch ByCorrelationId (string correlationId) + { + return new CorrelationIdMatcher (correlationId).MatchById; + } + + private IMessage Receive (IModel model, int timeout, bool doAck) + { + Console.WriteLine ("{0}, {1}", timeout, doAck); + + ushort ticket = model.AccessRequest (realm); + string finalName = model.QueueDeclare (ticket, QRef.Queue, false); + + using (Subscription sub = new Subscription (model, ticket, finalName)) { + BasicDeliverEventArgs result; + if (sub.Next (timeout, out result)) { + IMessage m = helper.ReadMessage (QRef, result); + if (doAck) + sub.Ack (result); + return m; + } else { + throw new MonoMessagingException ("No Message Available"); + } + } + } + + private IMessage Receive (IModel model, int timeout, + bool doAck, IsMatch matcher) + { + Console.WriteLine ("{0}, {1}", timeout, doAck); + + ushort ticket = model.AccessRequest (realm); + string finalName = model.QueueDeclare (ticket, QRef.Queue, false); + + using (Subscription sub = new Subscription (model, ticket, finalName)) { + BasicDeliverEventArgs result; + while (sub.Next (timeout, out result)) { - Subscription sub = new Subscription (ch, ticket, finalName); - BasicDeliverEventArgs result = sub.Next (); - sub.Ack (result); - sub.Close (); - if (result == null) { - throw new MonoMessagingException ("No Message Available"); - } else { - IMessage m = MessageFactory.ReadMessage (QRef, result); + if (matcher (result)) { + IMessage m = helper.ReadMessage (QRef, result); + if (doAck) + sub.Ack (result); return m; } } + + throw new MessageUnavailableException ("Message not available"); } } - public IMessageEnumerator GetMessageEnumerator () + private RabbitMQMessageQueueTransaction GetTx () + { + return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction (); + } + + private static int TimeSpanToInt32 (TimeSpan timespan) { - return new RabbitMQMessageEnumerator (QRef); + if (timespan == TimeSpan.MaxValue) + return -1; + else + return (int) timespan.TotalMilliseconds; } } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs new file mode 100644 index 00000000000..52e3a93cf6c --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs @@ -0,0 +1,140 @@ +// +// Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Collections; +using System.ComponentModel; +using System.IO; +using System.Text; + +using Mono.Messaging; + +using RabbitMQ.Client; +using RabbitMQ.Client.Content; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.MessagePatterns; +using RabbitMQ.Util; + +namespace Mono.Messaging.RabbitMQ { + + public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction { + + private readonly string txId; + private MessageQueueTransactionStatus status = MessageQueueTransactionStatus.Initialized; + private IConnection cn = null; + private IModel model = null; + private String host = null; + private bool isDisposed = false; + private Object syncObj = new Object (); + + public RabbitMQMessageQueueTransaction (string txId) + { + this.txId = txId; + } + + public MessageQueueTransactionStatus Status { + get { + lock (syncObj) + return status; + } + } + + public void Abort () + { + lock (syncObj) { + if (model != null) + model.TxRollback (); + status = MessageQueueTransactionStatus.Aborted; + } + } + + public void Begin () + { + lock (syncObj) { + if (status == MessageQueueTransactionStatus.Pending) + throw new InvalidOperationException ("Transaction already started"); + status = MessageQueueTransactionStatus.Pending; + } + } + + public void Commit () + { + lock (syncObj) { + model.TxCommit (); + status = MessageQueueTransactionStatus.Committed; + } + } + + public string Id { + get { return txId; } + } + + public delegate void Send (ref string host, ref IConnection cn, + ref IModel model, IMessage msg, string txId); + + public delegate IMessage Receive (ref string host, ref IConnection cn, + ref IModel model, string txId); + + public void RunSend (Send sendDelegate, IMessage msg) + { + lock (syncObj) { + sendDelegate (ref host, ref cn, ref model, msg, Id); + } + } + + public IMessage RunReceive (Receive receiveDelegate) + { + lock (syncObj) { + return receiveDelegate (ref host, ref cn, ref model, Id); + } + } + + public void Dispose () + { + Dispose (true); + GC.SuppressFinalize (this); + } + + protected virtual void Dispose (bool disposing) + { + lock (syncObj) { + if (!isDisposed && disposing) { + if (model != null) + model.Dispose (); + if (cn != null) + cn.Dispose (); + isDisposed = true; + } + } + } + + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs index 88236f635c7..45445659d0e 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs @@ -29,6 +29,10 @@ // using System; +using System.Collections; +using System.Net; +using System.Net.Sockets; +using System.Threading; using Mono.Messaging; @@ -36,26 +40,113 @@ namespace Mono.Messaging.RabbitMQ { public class RabbitMQMessagingProvider : IMessagingProvider { - public bool Exists (QueueReference qRef) - { - // In AMQP all queues exist, because they are declared rather - // than created. - return true; - } + private volatile uint txCounter = 0; + private readonly uint localIp; + private static readonly string DEFAULT_REALM = "/data"; - public IMessageQueue GetMessageQueue () + public RabbitMQMessagingProvider() { - return new RabbitMQMessageQueue (); + localIp = GetLocalIP (); } - public IMessageQueue CreateMessageQueue (QueueReference qRef) + private static uint GetLocalIP () { - return new RabbitMQMessageQueue (qRef); + //IPHostEntry host = Dns.GetHostEntry (Dns.GetHostName ()); + String strHostName = Dns.GetHostName (); + IPHostEntry ipEntry = Dns.GetHostByName (strHostName); + foreach (IPAddress ip in ipEntry.AddressList) { + if (AddressFamily.InterNetwork == ip.AddressFamily) { + byte[] addr = ip.GetAddressBytes (); + uint localIP = 0; + for (int i = 0; i < 4 && i < addr.Length; i++) { + localIP += (uint) (addr[i] << 8 * (3 - i)); + } + return localIP; + } + } + return 0; } public IMessage CreateMessage () { return new MessageBase (); } + + public IMessageQueueTransaction CreateMessageQueueTransaction () + { + string txId = localIp.ToString () + (++txCounter).ToString (); + return new RabbitMQMessageQueueTransaction (txId); + } + + public void DeleteQueue (QueueReference qRef) + { + RabbitMQMessageQueue.Delete (DEFAULT_REALM, qRef); + } + + private readonly IDictionary queues = new Hashtable (); + private readonly ReaderWriterLock qLock = new ReaderWriterLock (); + private const int TIMEOUT = 15000; + + public IMessageQueue[] GetPublicQueues () + { + IMessageQueue[] qs; + qLock.AcquireReaderLock (TIMEOUT); + try { + ICollection qCollection = queues.Values; + qs = new IMessageQueue[qCollection.Count]; + qCollection.CopyTo (qs, 0); + return qs; + } finally { + qLock.ReleaseReaderLock (); + } + } + + public bool Exists (QueueReference qRef) + { + qLock.AcquireReaderLock (TIMEOUT); + try { + return queues.Contains (qRef); + } finally { + qLock.ReleaseReaderLock (); + } + } + + public IMessageQueue CreateMessageQueue (QueueReference qRef, + bool transactional) + { + qLock.AcquireWriterLock (TIMEOUT); + try { + IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, + transactional); + queues[qRef] = mq; + return mq; + } finally { + qLock.ReleaseWriterLock (); + } + } + + public IMessageQueue GetMessageQueue (QueueReference qRef) + { + qLock.AcquireReaderLock (TIMEOUT); + try { + if (queues.Contains (qRef)) + return (IMessageQueue) queues[qRef]; + else { + LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT); + try { + IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, + false); + queues[qRef] = mq; + return mq; + } finally { + qLock.DowngradeFromWriterLock (ref lc); + } + } + } finally { + qLock.ReleaseReaderLock (); + } + } + + } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources index c2368a61c89..1845e0082bc 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources @@ -1,3 +1,9 @@ +Mono.Messaging.RabbitMQ/MQUtil.cs Mono.Messaging.RabbitMQ/BasicMessagingTest.cs +Mono.Messaging.RabbitMQ/AdminTest.cs +Mono.Messaging.RabbitMQ/FailuresTest.cs Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs +Mono.Messaging.RabbitMQ/PeekTest.cs +Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs +Mono.Messaging.RabbitMQ/SelectorTest.cs
\ No newline at end of file diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs new file mode 100644 index 00000000000..612f401988d --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs @@ -0,0 +1,130 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; +using System.Reflection; +using System.Threading; +using System.Text.RegularExpressions; + +using NUnit.Framework; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + [TestFixture] + public class AdminTest { + + [Test] + public void CreateNonTransactionalQueue () + { + string qName = @".\private$\admin-queue-1"; + Assert.IsFalse (MessageQueue.Exists (qName), "Queue should not exist"); + MessageQueue q = MessageQueue.Create (qName); + Assert.IsFalse (q.Transactional); + Assert.IsTrue (MessageQueue.Exists (qName), "Queue should exist"); + } + + [Test] + public void CreateTransactionalQueue () + { + string qName = @".\private$\admin-queue-2"; + Assert.IsFalse (MessageQueue.Exists (qName), "Queue should not exist"); + MessageQueue q = MessageQueue.Create (qName, true); + Assert.IsTrue (q.Transactional, "Queue should be transactional"); + Assert.IsTrue (MessageQueue.Exists (qName), "Queue should exist"); + } + + private bool Contains(MessageQueue[] qs, String qName) + { + foreach (MessageQueue q in qs) + { + if (q.QueueName == qName) + return true; + } + return false; + } + + [Test] + public void GetPublicQueues () + { + string qName1 = @".\admin-queue-3"; + string qName2 = @".\admin-queue-4"; + + MessageQueue.Create (qName1); + MessageQueue.Create (qName2); + + MessageQueue[] mq = MessageQueue.GetPublicQueues (); + Assert.IsTrue (Contains (mq, "admin-queue-3"), qName1 + " not found"); + Assert.IsTrue (Contains (mq, "admin-queue-4"), qName2 + " not found"); + } + + [Test] + public void GetQueue () + { + MessageQueue q1 = MQUtil.GetQueue(@".\private$\admin-queue-5", true); + Assert.IsTrue (q1.Transactional, "Queue should be transactional"); + MessageQueue q2 = MQUtil.GetQueue(@".\private$\admin-queue-5", true); + Assert.IsTrue (q2.Transactional, "Queue should be transactional"); + } + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void PurgeQueue () + { + MessageQueue q = MQUtil.GetQueue(@".\private$\purge-queue"); + Message m1 = new Message ("foobar1", new BinaryMessageFormatter ()); + Message m2 = new Message ("foobar2", new BinaryMessageFormatter ()); + Message m3 = new Message ("foobar3", new BinaryMessageFormatter ()); + Message m4 = new Message ("foobar4", new BinaryMessageFormatter ()); + + q.Send (m1); + q.Send (m2); + q.Send (m3); + q.Send (m4); + + Message received = q.Receive (); + q.Purge (); + q.Receive (new TimeSpan (0, 0, 2)); + } + + [Test] + public void DeleteQueue () + { + MessageQueue q = MQUtil.GetQueue(@".\private$\delete-queue"); + Message m1 = new Message ("foobar1", new BinaryMessageFormatter ()); + + q.Send (m1); + + Message received = q.Receive (); + + MessageQueue.Delete(@".\private$\delete-queue"); + } + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs index 31368cea674..57e02307c41 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs @@ -36,7 +36,7 @@ using System.Text.RegularExpressions; using NUnit.Framework; -namespace MonoTests.Mono.Messsaging.RabbitMQ +namespace MonoTests.Mono.Messaging.RabbitMQ { [TestFixture] public class BasicMessageTest { @@ -44,12 +44,14 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ [Test] public void SendReceiveBinaryMessage () { - String qName = "testq"; - MessageQueue mq = new MessageQueue (qName); - Assert.AreEqual (mq.QueueName, qName, "Queue name not set properly"); + String qName = @"private$\testq"; + String qPath = @".\" + qName; + MessageQueue mq = MQUtil.GetQueue (qPath); + Assert.AreEqual(qName, mq.QueueName, "Queue name not set properly"); String s = "Test: " + DateTime.Now; Message m = new Message (s, new BinaryMessageFormatter ()); - m.CorrelationId = "foo"; + m.CorrelationId = Guid.NewGuid () + "\\0"; + mq.MessageReadPropertyFilter.SetAll (); mq.Send (m); @@ -57,28 +59,55 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ m2.Formatter = new BinaryMessageFormatter (); Assert.AreEqual (s, m2.Body); - Assert.IsTrue (DateTime.MinValue == m.ArrivedTime); - Assert.IsNotNull (m2.Id, "Id is null"); + //Assert.IsTrue (DateTime.MinValue == m.ArrivedTime); + Assert.IsNotNull(m2.Id, "Id is null"); Assert.IsTrue (Guid.Empty.ToString () != m2.Id, "Id is Empty"); - Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set"); + Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set"); Assert.AreEqual (Acknowledgment.None, m2.Acknowledgment, "Acknowledgment"); Assert.AreEqual (m.CorrelationId, m2.CorrelationId, "CorrelationId not set properly"); Assert.IsTrue (0 != m2.SenderVersion); - Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null"); + // TODO: This is not support on a workgroup installation. + //Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null"); Assert.AreEqual (qName, m2.DestinationQueue.QueueName, "Destination Queue not set"); + + mq.Close (); + } + + [Test] + public void SendMessageWithLabel () + { + String qName = @".\private$\testq"; + String label = "mylabel"; + MessageQueue mq = MQUtil.GetQueue (qName); + Assert.AreEqual (@"private$\testq", mq.QueueName, "Queue name not set properly"); + String s = "Test: " + DateTime.Now; + Message m = new Message (s, new BinaryMessageFormatter ()); + m.CorrelationId = Guid.NewGuid () + "\\0" ; + + mq.Send (m, label); + + Message m2 = mq.Receive (); + m2.Formatter = new BinaryMessageFormatter (); + Assert.AreEqual (s, m2.Body, "Message not passed correctly"); + Assert.AreEqual (label, m2.Label, "Label not passed correctly"); } + [Test] public void CheckDefaults () { Message m = new Message ("Test", new BinaryMessageFormatter ()); Assert.AreEqual (true, m.AttachSenderId, "AttachSenderId has incorrect default"); - Assert.AreEqual ("Microsoft Base Cryptographic Provider version 1.0", + Assert.AreEqual (Guid.Empty.ToString () + "\\0", m.Id, "Id has incorrect default"); + Assert.AreEqual ("Microsoft Base Cryptographic Provider, Ver. 1.0", m.AuthenticationProviderName, "AuthenticationProviderName has incorrect default"); Assert.AreEqual (0, m.Extension.Length, "Extension has incorrect default"); Assert.AreEqual ("", m.Label, "Label has incorrect default"); Assert.IsFalse (m.Recoverable, "Recoverable has incorrect default"); + Assert.IsFalse (m.IsFirstInTransaction, "IsFirstInTransaction has incorrect default"); + Assert.IsFalse (m.IsLastInTransaction, "IsLastInTransaction has incorrect default"); + Assert.AreEqual ("", m.TransactionId, "TransactionId has incorrect default"); Assert.AreEqual (MessagePriority.Normal, m.Priority, "MessagePriority has incorrect default"); } @@ -87,8 +116,8 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ PropertyInfo pi = m.GetType ().GetProperty (property); try { Assert.IsNotNull (pi, "Property not defined: " + property); - pi.GetValue (m, null); - Assert.Fail (property); + object o = pi.GetValue (m, null); + Assert.Fail (property + ": " + o); } catch (InvalidOperationException e) { } catch (TargetInvocationException e) { Assert.AreEqual (typeof (InvalidOperationException), @@ -104,9 +133,9 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ CheckInvalidOperation (m, "ArrivedTime"); CheckInvalidOperation (m, "Authenticated"); CheckInvalidOperation (m, "DestinationQueue"); - CheckInvalidOperation (m, "Id"); - CheckInvalidOperation (m, "IsFirstInTransaction"); - CheckInvalidOperation (m, "IsLastInTransaction"); + //CheckInvalidOperation (m, "Id"); + //CheckInvalidOperation (m, "IsFirstInTransaction"); + //CheckInvalidOperation (m, "IsLastInTransaction"); // TODO: Support 2.0 features. //CheckInvalidOperation (m, "LookupId"); CheckInvalidOperation (m, "MessageType"); @@ -114,71 +143,75 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ CheckInvalidOperation (m, "SenderVersion"); CheckInvalidOperation (m, "SentTime"); CheckInvalidOperation (m, "SourceMachine"); - CheckInvalidOperation (m, "TransactionId"); + //CheckInvalidOperation (m, "TransactionId"); } - private static void CheckArgumentInvalid (Message m, String property) + private static void CheckArgumentInvalid(Message m, String property, Type exceptionType) { - PropertyInfo pi = m.GetType ().GetProperty (property); + PropertyInfo pi = m.GetType().GetProperty(property); try { - Assert.IsNotNull (pi, "Property not defined: " + property); - pi.SetValue (m, null, null); - Assert.Fail (property); + Assert.IsNotNull(pi, "Property not defined: " + property); + pi.SetValue(m, null, null); + Assert.Fail(property); } catch (InvalidOperationException e) { } catch (TargetInvocationException e) { - Assert.AreEqual (typeof (ArgumentException), - e.InnerException.GetType ()); + Assert.AreEqual(exceptionType, + e.InnerException.GetType(), + property); } } - + [Test] public void CheckArgumentInvalidForProperties () { Message m = new Message ("Stuff"); - CheckArgumentInvalid (m, "DestinationSymmetricKey"); - CheckArgumentInvalid (m, "DigitalSignature"); - CheckArgumentInvalid (m, "Extension"); + CheckArgumentInvalid (m, "DestinationSymmetricKey", typeof (ArgumentNullException)); + CheckArgumentInvalid (m, "DigitalSignature", typeof(ArgumentNullException)); + CheckArgumentInvalid (m, "Extension", typeof(ArgumentNullException)); } - + [Test] public void SendReceiveBinaryMessageWithAllPropertiesSet () - { - String qName = "testq"; - MessageQueue mq = new MessageQueue (qName); - Assert.AreEqual (mq.QueueName, qName, "Queue name not set properly"); + { + String qName = @"private$\testq"; + String qPath = @".\" + qName; + MessageQueue mq = MQUtil.GetQueue (qPath); + mq.MessageReadPropertyFilter.SetAll (); + Assert.AreEqual(qName, mq.QueueName, "Queue name not set properly"); - MessageQueue adminQ = new MessageQueue ("myAdmin"); - MessageQueue responseQ = new MessageQueue ("myResponse"); + MessageQueue adminQ = MQUtil.GetQueue (@".\private$\myadmin"); + MessageQueue responseQ = MQUtil.GetQueue (@".\private$\myresponse"); Guid connectorType = Guid.NewGuid (); String s = "Test: " + DateTime.Now; Message m = new Message (s, new BinaryMessageFormatter ()); - m.CorrelationId = "foo"; + m.CorrelationId = Guid.NewGuid () + "\\0"; m.AcknowledgeType = AcknowledgeTypes.PositiveArrival; m.AdministrationQueue = adminQ; m.AppSpecific = 5; - m.AuthenticationProviderName = "Test Provider Name"; - m.AuthenticationProviderType = CryptographicProviderType.None; - m.ConnectorType = connectorType; - m.DestinationSymmetricKey = new byte[] { 0x0A, 0x0B, 0x0C }; - m.DigitalSignature = new byte[] { 0x0C, 0x0D, 0x0E }; - m.EncryptionAlgorithm = EncryptionAlgorithm.Rc4; + //m.AuthenticationProviderName = "Test Provider Name"; + //m.AuthenticationProviderType = CryptographicProviderType.None; + //m.ConnectorType = connectorType; + //m.DestinationSymmetricKey = new byte[] { 0x0A, 0x0B, 0x0C }; + //m.DigitalSignature = new byte[] { 0x0C, 0x0D, 0x0E }; + //m.EncryptionAlgorithm = EncryptionAlgorithm.Rc4; m.Extension = new byte[] { 0x01, 0x02, 0x03 }; - m.HashAlgorithm = HashAlgorithm.Sha; + //m.HashAlgorithm = HashAlgorithm.Sha; m.Label = "MyLabel"; m.Priority = MessagePriority.AboveNormal; m.Recoverable = true; m.ResponseQueue = responseQ; m.SenderCertificate = new byte[] { 0x04, 0x05, 0x06 }; - m.TimeToBeReceived = new TimeSpan(0, 0, 5); - m.TimeToReachQueue = new TimeSpan(0, 0, 10); - m.UseAuthentication = true; + m.TimeToBeReceived = new TimeSpan(0, 0, 10); + m.TimeToReachQueue = new TimeSpan(0, 0, 5); + //m.UseAuthentication = true; m.UseDeadLetterQueue = true; - m.UseEncryption = true; + //m.UseEncryption = true; mq.Send (m); - + Message m2 = mq.Receive (); + m2.Formatter = new BinaryMessageFormatter (); Assert.AreEqual (s, m2.Body); @@ -187,23 +220,23 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ Assert.AreEqual (adminQ.QueueName, m2.AdministrationQueue.QueueName, "AdministrationQueue not passed correctly"); Assert.AreEqual (5, m2.AppSpecific, "AppSpecific not passed correctly"); - Assert.AreEqual (m.AuthenticationProviderName, m2.AuthenticationProviderName, - "AuthenticationProviderName not passed correctly"); - Assert.AreEqual (m.AuthenticationProviderType, m2.AuthenticationProviderType, - "AuthenticationProviderType not passed correctly"); - Assert.AreEqual (connectorType, m2.ConnectorType, - "ConnectorType not passed correctly"); + //Assert.AreEqual (m.AuthenticationProviderName, m2.AuthenticationProviderName, + // "AuthenticationProviderName not passed correctly"); + //Assert.AreEqual (m.AuthenticationProviderType, m2.AuthenticationProviderType, + // "AuthenticationProviderType not passed correctly"); + //Assert.AreEqual (connectorType, m2.ConnectorType, + // "ConnectorType not passed correctly"); Assert.AreEqual (m.CorrelationId, m2.CorrelationId, "CorrelationId not passed correctly"); - AreEqual (m.DestinationSymmetricKey, m2.DestinationSymmetricKey, - "DestinationSymmetricKey not passed correctly"); - AreEqual (m.DigitalSignature, m2.DigitalSignature, - "DigitalSignature not passed properly"); - Assert.AreEqual (EncryptionAlgorithm.Rc4, m2.EncryptionAlgorithm, - "EncryptionAlgorithm not passed properly"); + //AreEqual (m.DestinationSymmetricKey, m2.DestinationSymmetricKey, + // "DestinationSymmetricKey not passed correctly"); + //AreEqual (m.DigitalSignature, m2.DigitalSignature, + // "DigitalSignature not passed properly"); + //Assert.AreEqual (EncryptionAlgorithm.Rc4, m2.EncryptionAlgorithm, + // "EncryptionAlgorithm not passed properly"); AreEqual (m.Extension, m2.Extension, "Extension not passed properly"); - Assert.AreEqual (m.HashAlgorithm, m2.HashAlgorithm, - "HashAlgorithm not passed properly"); + //Assert.AreEqual (m.HashAlgorithm, m2.HashAlgorithm, + // "HashAlgorithm not passed properly"); Assert.AreEqual (m.Label, m2.Label, "Label not passed correctly"); Assert.AreEqual (MessagePriority.AboveNormal, m2.Priority, "Priority not passed properly"); @@ -211,25 +244,26 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ Assert.AreEqual (responseQ.QueueName, m2.ResponseQueue.QueueName, "ResponseQueue not passed properly"); AreEqual (m.SenderCertificate, m2.SenderCertificate, - "SenderCertificate not passed properly"); + "SenderCertificate not passed properly"); Assert.AreEqual (m.TimeToBeReceived, m2.TimeToBeReceived, "TimeToBeReceived not passed properly"); Assert.AreEqual (m.TimeToReachQueue, m2.TimeToReachQueue, "TimeToReachQueue not passed properly"); - Assert.IsTrue (m2.UseAuthentication, - "UseAuthentication not passed properly"); + //Assert.IsTrue (m2.UseAuthentication, + // "UseAuthentication not passed properly"); Assert.IsTrue (m2.UseDeadLetterQueue, "UseDeadLetterQueue not passed properly"); - Assert.IsTrue (m2.UseEncryption, "UseEncryption not pass properly"); + //Assert.IsTrue (m2.UseEncryption, "UseEncryption not pass properly"); //Assert.AreEqual (); - Assert.IsTrue (DateTime.MinValue == m.ArrivedTime); Assert.IsNotNull (m2.Id, "Id is null"); Assert.IsTrue (Guid.Empty.ToString () != m2.Id, "Id is Empty"); - Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set"); + Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set"); Assert.AreEqual (Acknowledgment.None, m2.Acknowledgment, "Acknowledgment"); Assert.IsTrue (0 != m2.SenderVersion); - Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null"); + + //Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null"); + } private static void AreEqual(byte[] expected, byte[] actual, string message) @@ -239,12 +273,35 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ Assert.AreEqual (expected[i], actual[i], message); } + //[Test] + // No supported by Rabbit + public void SendPriorityMessages () + { + MessageQueue mq = MQUtil.GetQueue ("testpriority"); + Message sent1 = new Message ("Highest", new BinaryMessageFormatter ()); + sent1.Priority = MessagePriority.Highest; + Message sent2 = new Message ("Lowest", new BinaryMessageFormatter ()); + sent2.Priority = MessagePriority.Lowest; + + mq.Send (sent1); + mq.Send (sent2); + + Message received1 = mq.Receive (); + Message received2 = mq.Receive (); + + Assert.AreEqual (MessagePriority.Highest, received2.Priority, + "Priority delivery incorrect"); + Assert.AreEqual (MessagePriority.Lowest, received1.Priority, + "Priority delivery incorrect"); + } + [Test] public void SendReceiveXmlMessage () { - MessageQueue mq = new MessageQueue ("testq"); + MessageQueue mq = MQUtil.GetQueue (@".\private$\testq"); String s = "Test: " + DateTime.Now; Message m = new Message (s, new XmlMessageFormatter (new Type[] { typeof (string) })); + mq.MessageReadPropertyFilter.SetAll(); mq.Send (m); Message m2 = mq.Receive (); @@ -257,26 +314,12 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ } [Test] - public void CreateQueue () - { - string path = @".\private$\MyQ"; - string body = "This is a test"; - - MessageQueue q = MessageQueue.Create (path); - Assert.IsNotNull (q); - } - - [Test] public void SendBinaryText () { string path = @".\private$\MyQ"; string body = "This is a test"; - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); + MessageQueue q = MQUtil.GetQueue (path); q.Formatter = new BinaryMessageFormatter (); @@ -295,17 +338,13 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ string path = @".\private$\MyQ"; string body = "This is a test"; - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); + MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ()); q.Send (body); Message m2 = q.Receive (); XmlMessageFormatter xmlf = (XmlMessageFormatter) q.Formatter; - Assert.AreEqual (typeof (string), xmlf.TargetTypes[0]); + //Assert.AreEqual (typeof (string), xmlf.TargetTypes[0]); Assert.AreEqual (typeof (XmlMessageFormatter), m2.Formatter.GetType ()); Assert.AreEqual (body, m2.Body); Assert.AreEqual (0, m2.BodyType); @@ -320,11 +359,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ body.MyProperty2 = "Something"; body.MyProperty3 = "Something else"; - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); + MessageQueue q = MQUtil.GetQueue (path); q.Formatter = new BinaryMessageFormatter (); @@ -349,15 +384,11 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ body.MyProperty2 = "Something"; body.MyProperty3 = "Something else"; - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); + MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ()); q.Send (body); - MessageQueue q2 = new MessageQueue (path); + MessageQueue q2 = MQUtil.GetQueue (path); q2.Formatter = new XmlMessageFormatter (new Type[] { typeof(Thingy) }); Message m2 = q2.Receive (); @@ -380,15 +411,9 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ Message m1 = new Message (body); m1.Formatter = new BinaryMessageFormatter (); - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); - + MessageQueue q = MQUtil.GetQueue (path); q.Send (m1); - //Assert.IsNotNull (m1.Id); Message m2 = q.Receive (); m2.Formatter = new BinaryMessageFormatter (); @@ -412,11 +437,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ Message m1 = new Message (body); Assert.IsNull (m1.Formatter); - MessageQueue q; - if (MessageQueue.Exists (path)) - q = new MessageQueue (path); - else - q = MessageQueue.Create (path); + MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ()); q.Send (m1); diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog index fd6cb0f7b13..fea66238c2b 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog @@ -1,3 +1,39 @@ +2008-12-06 Michael Barker <mike@middlesoft.co.uk> + + * AdminTest.cs: Updated to run against MS.NET + * BasicMessagingTest.cs: Updated to run against MS.NET + * FailuresTest.cs: Updated to run against MS.NET + * MessageEnumeratorTest.cs: Updated to run against MS.NET + * MQUtil.cs: Updated to run against MS.NET + * PeekTest.cs: Updated to run against MS.NET + * SelectorTest.cs: Updated to run against MS.NET + * TransactionMessagingTest.cs: Updated to run against MS.NET + +2008-12-01 Michael Barker <mike@middlesoft.co.uk> + + * AdminTest.cs: Added tests for queue discovery methods. + +2008-11-23 Michael Barker <mike@middlesoft.co.uk> + + * TransactionMessagingTest.cs: Added tests for all methods that the + transaction type argument, currently only Single is supported. Added methods + with transactions and timeout. + * PeekTest.cs: Added PeekBy{Id,CorrelationId} tests. + +2008-11-09 Michael Barker <mike@middlesoft.co.uk> + + * SelectorTest.cs: New, tests for ReceiveByXYZ() methods + +2008-11-04 Michael Barker <mike@middlesoft.co.uk> + + * PeekTest.cs: New, tests for peeking at messages. + +2008-11-02 Michael Barker <mike@middlesoft.co.uk> + + * TransactionMessagingTest.cs: New, tests for transactional messaging. + * FailuresTest.cs: New, tests for exceptions. + * AdminTest.cs: New, tests for administration functions. + 2008-09-29 Michael Barker <mike@middlesoft.co.uk> * BasicMessagingTest.cs: New diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs new file mode 100644 index 00000000000..c4ca13e1cef --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs @@ -0,0 +1,66 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; +using System.Reflection; +using System.Threading; +using System.Text.RegularExpressions; + +using NUnit.Framework; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + [TestFixture] + public class FailuresTest { + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void SendWithPathNotSet () + { + MessageQueue q = new MessageQueue (); + Message m = new Message ("foobar", new BinaryMessageFormatter ()); + + q.Send (m); + } + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void SendInTransactionWithPathNotSet () + { + MessageQueue q = new MessageQueue (); + Message m = new Message ("foobar", new BinaryMessageFormatter ()); + MessageQueueTransaction tx = new MessageQueueTransaction (); + + q.Send (m, tx); + } + + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs new file mode 100644 index 00000000000..6c32aca5393 --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs @@ -0,0 +1,67 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + public class MQUtil + { + public static MessageQueue GetQueue (string path) + { + return GetQueue (path, false); + } + + public static MessageQueue GetQueue (string path, bool isTransactional) + { + return GetQueue (path, isTransactional, + new BinaryMessageFormatter ()); + } + + public static MessageQueue GetQueue (string path, IMessageFormatter formatter) + { + return GetQueue (path, false, formatter); + } + + public static MessageQueue GetQueue (string path, bool isTransactional, + IMessageFormatter formatter) + { + MessageQueue q; + if (MessageQueue.Exists (path)) { + q = new MessageQueue (path); + } else { + q = MessageQueue.Create (path, isTransactional); + } + q.Formatter = formatter; + return q; + } + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs index e918e72df69..5c4c2bf3584 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs @@ -31,22 +31,22 @@ using System; using System.Messaging; -using Mono.Messaging; -using Mono.Messaging.RabbitMQ; +//using Mono.Messaging; +//using Mono.Messaging.RabbitMQ; using NUnit.Framework; -namespace MonoTests.Mono.Messsaging.RabbitMQ +namespace MonoTests.Mono.Messaging.RabbitMQ { [TestFixture] public class MessageEnumeratorTest { - private readonly String qName = "testq2"; + private readonly String qName = @".\private$\testq2"; private void SendMessage (string s) { - MessageQueue mq = new MessageQueue (qName); + MessageQueue mq = MQUtil.GetQueue (qName); Message m = new Message (s, new BinaryMessageFormatter ()); - m.CorrelationId = "foo"; + m.CorrelationId = Guid.NewGuid () + "\\0"; mq.Send (m); } @@ -58,7 +58,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ SendMessage ("message 3"); SendMessage ("message 4"); - MessageQueue mq0 = new MessageQueue (qName); + MessageQueue mq0 = MQUtil.GetQueue (qName); MessageEnumerator me0 = mq0.GetMessageEnumerator (); me0.MoveNext (); @@ -72,7 +72,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ me0.Dispose (); mq0.Dispose (); - MessageQueue mq1 = new MessageQueue (qName); + MessageQueue mq1 = MQUtil.GetQueue (qName); MessageEnumerator me1 = mq1.GetMessageEnumerator (); me1.MoveNext(); @@ -83,6 +83,33 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ m1.Formatter = new BinaryMessageFormatter (); Console.WriteLine ("{0}", m1.Body); Assert.AreEqual ("message 4", (String) m1.Body, "body incorrect"); + + mq1.Purge (); + MessageQueue.Delete (qName); + } + + //[Test] + // Not supported with AMQP + public void RemoveMessageWithTx () + { + MessageQueue q = MQUtil.GetQueue ("testq3"); + + q.Formatter = new BinaryMessageFormatter (); + q.Send ("foo1"); + q.Send ("foo2"); + + MessageEnumerator me1 = q.GetMessageEnumerator (); + MessageQueueTransaction tx = new MessageQueueTransaction (); + me1.MoveNext (); + Message m1 = me1.Current; + me1.RemoveCurrent (tx); + tx.Commit (); + me1.Close (); + + MessageEnumerator me2 = q.GetMessageEnumerator (); + Assert.IsTrue (me1.MoveNext ()); + me2.RemoveCurrent (); + Assert.IsFalse (me2.MoveNext ()); } } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs new file mode 100644 index 00000000000..2c6c60b49b3 --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs @@ -0,0 +1,187 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; + +using NUnit.Framework; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + [TestFixture] + public class PeekTest { + + [Test] + public void PeekMessage () + { + String body = "foo-" + DateTime.Now.ToString (); + Message s1 = new Message(body, new BinaryMessageFormatter()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\peek-queue-1"); + mq.Send (s1); + + Message r1 = mq.Peek (); + Assert.AreEqual (body, r1.Body); + + Message r2 = mq.Receive (); + Assert.AreEqual (body, r2.Body); + } + + [Test] + public void PeekMessageWithTimeout () + { + String body = "foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + MessageQueue mq = MQUtil.GetQueue(@".\private$\peek-queue-2"); + mq.Send (s1); + + Message r1 = mq.Peek (new TimeSpan (0, 0, 2)); + Assert.AreEqual (body, r1.Body); + + Message r2 = mq.Receive (); + Assert.AreEqual (body, r2.Body); + } + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void PeekNoMessageWithTimeout () + { + MessageQueue mq = MQUtil.GetQueue(@".\private$\peek-queue-3"); + Message r1 = mq.Peek (new TimeSpan (0, 0, 2)); + } + + [Test] + public void PeekById () + { + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-4"); + q.Send (s1); + + String id = s1.Id; + try { + Message r1 = q.PeekById (id); + Assert.AreEqual (body, r1.Body, "Unable to PeekById correctly"); + } finally { + q.Purge (); + } + } + + [Test] + public void PeekByIdWithTimeout () + { + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-5"); + q.Send (s1); + + String id = s1.Id; + try { + Message r1 = q.PeekById (id, new TimeSpan (0, 0, 2)); + Assert.AreEqual (body, r1.Body, "Unable to PeekById correctly"); + } finally { + q.Purge (); + } + } + + [Test] + [ExpectedException (typeof (InvalidOperationException))] + public void PeekByIdNotFound () + { + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-6"); + q.Send (s1); + + String id = "fail!"; + + try { + Message r1 = q.PeekById (id); + } finally { + q.Purge (); + } + } + + [Test] + public void PeekByCorrelationId () + { + String correlationId = Guid.NewGuid () + "\\0"; + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + s1.CorrelationId = correlationId; + MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-7"); + q.Formatter = new BinaryMessageFormatter (); + q.Send (s1); + + try { + Message r1 = q.PeekByCorrelationId (correlationId); + Assert.AreEqual (body, r1.Body, "Unable to PeekByCorrelationId correctly"); + } finally { + q.Purge (); + } + } + + [Test] + [ExpectedException (typeof (InvalidOperationException))] + public void PeekByCorrelationIdNotFound () + { + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body); + String correlationId = Guid.NewGuid() + "\\0"; + MessageQueue q = MQUtil.GetQueue(@".\private$\peek-queue-8"); + q.Formatter = new BinaryMessageFormatter (); + q.Send (s1); + + try { + Message r1 = q.PeekByCorrelationId ("fail!"); + } finally { + q.Purge (); + } + } + + [Test] + public void PeekByCorrelationIdWithTimeout () + { + String correlationId = Guid.NewGuid () + "\\0"; + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + s1.CorrelationId = correlationId; + MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-9"); + q.Formatter = new BinaryMessageFormatter (); + q.Send (s1); + + try { + Message r1 = q.PeekByCorrelationId (correlationId, new TimeSpan (0, 0, 2)); + Assert.AreEqual (body, r1.Body, "Unable to PeekByCorrelationId correctly"); + } finally { + q.Purge (); + } + } + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs index 65305f7c9ef..31fd8f19478 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs @@ -35,7 +35,7 @@ using Mono.Messaging.RabbitMQ; using NUnit.Framework; -namespace MonoTests.Mono.Messsaging.RabbitMQ +namespace MonoTests.Mono.Messaging.RabbitMQ { [TestFixture] public class RabbitMQMessagingProviderTest { @@ -52,7 +52,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ public void GetMessageQueue () { IMessagingProvider p = new RabbitMQMessagingProvider (); - IMessageQueue q = p.GetMessageQueue (); + IMessageQueue q = p.CreateMessageQueue (QueueReference.DEFAULT, true); Assert.IsNotNull (q); } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs new file mode 100644 index 00000000000..73caaedfc6e --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs @@ -0,0 +1,138 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; + +using NUnit.Framework; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + [TestFixture] + public class SelectorTest + { + + [Test] + public void SelectById () + { + String body = "Foo-" + DateTime.Now.ToString (); + Message s1 = new Message (body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue (@".\private$\selector-queue-1"); + q.Send (s1); + + String id = s1.Id; + + Message r1 = q.ReceiveById (id); + + Assert.AreEqual (body, r1.Body, "Unable to ReceiveById correctly"); + } + + [Test] + [ExpectedException (typeof (InvalidOperationException))] + public void SelectByIdNotFound () + { + String body = "Foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-2"); + q.Send (s1); + + String id = "fail!"; + + try { + Message r1 = q.ReceiveById (id); + } finally { + q.Purge (); + } + } + + [Test] + public void SelectByCorrelationId () + { + string correlationId = Guid.NewGuid () + "\\0"; + String body = "Foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + s1.CorrelationId = correlationId; + MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-3"); + q.Send (s1); + + Message r1 = q.ReceiveByCorrelationId (correlationId); + + Assert.AreEqual (body, r1.Body, "Unable to ReceiveByCorrelationId correctly"); + } + + [Test] + [ExpectedException (typeof (InvalidOperationException))] + public void SelectByCorrelationIdNotFound () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "Foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + s1.CorrelationId = correlationId; + MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-4"); + q.Send (s1); + + try { + Message r1 = q.ReceiveByCorrelationId ("fail!"); + } finally { + q.Purge (); + } + } + + [Test] + public void SelectByIdWithTimeout () + { + String body = "Foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-5"); + q.Send (s1); + + String id = s1.Id; + + Message r1 = q.ReceiveById (id, new TimeSpan (0, 0, 2)); + + Assert.AreEqual (body, r1.Body, "Unable to ReceiveById correctly"); + } + + [Test] + public void SelectByCorrelationIdWithTimeout () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "Foo-" + DateTime.Now.ToString(); + Message s1 = new Message(body, new BinaryMessageFormatter()); + s1.CorrelationId = correlationId; + MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-3"); + q.Send (s1); + + Message r1 = q.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2)); + + Assert.AreEqual (body, r1.Body, "Unable to ReceiveByCorrelationId correctly"); + } + } +} diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs new file mode 100644 index 00000000000..e0230952e26 --- /dev/null +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs @@ -0,0 +1,467 @@ +// +// Test.Mono.Messaging.RabbitMQ +// +// Authors: +// Michael Barker (mike@middlesoft.co.uk) +// +// (C) 2008 Michael Barker +// + +// +// Permission is hereby granted, free of charge, to any person obtaining +// a copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to +// permit persons to whom the Software is furnished to do so, subject to +// the following conditions: +// +// The above copyright notice and this permission notice shall be +// included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// + +using System; +using System.Messaging; +using System.Reflection; +using System.Threading; +using System.Text.RegularExpressions; + +using NUnit.Framework; + +namespace MonoTests.Mono.Messaging.RabbitMQ +{ + [TestFixture] + public class TransactionMessageTest { + + [Test] + public void Send2WithTransaction () + { + Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ()); + Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-1", true); + mq.MessageReadPropertyFilter.SetAll (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + mq.Send (sent1, tx); + mq.Send (sent2, tx); + + tx.Commit (); + + Message received1 = mq.Receive (); + Assert.IsNotNull (received1.TransactionId, "TransactionId not set"); + Message received2 = mq.Receive (); + Assert.IsNotNull (received2.TransactionId, "TransactionId not set"); + + Assert.AreEqual (received1.TransactionId, received2.TransactionId, "Messages have differing TransactionIds"); + Assert.IsTrue (received1.TransactionId.Length > 1); + Assert.AreEqual (sent1.Body, received1.Body, "Message 1 not delivered correctly"); + Assert.AreEqual (sent2.Body, received2.Body, "Message 2 not delivered correctly"); + } + } + + [Test] + public void Send2WithLabelWithTransaction () + { + String label1 = "label1"; + String label2 = "label2"; + Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ()); + Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-2", true); + mq.MessageReadPropertyFilter.SetAll (); + Assert.IsTrue(mq.Transactional, "Message Queue should be transactional"); + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + mq.Send (sent1, label1, tx); + mq.Send (sent2, label2, tx); + + tx.Commit (); + + Message received1 = mq.Receive (); + Assert.IsNotNull (received1.TransactionId, "TransactionId not set"); + Message received2 = mq.Receive (); + Assert.IsNotNull (received2.TransactionId, "TransactionId not set"); + + Assert.AreEqual (received1.TransactionId, received2.TransactionId, "Messages have differing TransactionIds"); + Assert.IsTrue (received1.TransactionId.Length > 1); + Assert.AreEqual (sent1.Body, received1.Body, "Message 1 not delivered correctly"); + Assert.AreEqual (sent2.Body, received2.Body, "Message 2 not delivered correctly"); + Assert.AreEqual (label1, received1.Label, "Label 1 not passed correctly"); + Assert.AreEqual (label2, received2.Label, "Label 2 not passed correctly"); + } + } + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void Send2WithTransactionAbort () + { + Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ()); + Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-1", true); + mq.MessageReadPropertyFilter.SetAll (); + Assert.IsTrue(mq.Transactional, "Message Queue should be transactional"); + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + mq.Send (sent1, tx); + mq.Send (sent2, tx); + + tx.Abort (); + mq.Receive (new TimeSpan (0, 0, 2)); + } + } + + [Test] + public void ReceiveWithTransaction () + { + String body = "Message 4"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-4", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.Receive (tx); + + tx.Commit (); + + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveWithTransactionAbort () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-5", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.Receive (tx); + + tx.Abort (); + } + + Message received2 = mq.Receive (); + Assert.AreEqual (body, received2.Body); + } + + [Test] + public void ReceiveWithTransactionType () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-6", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + Message received1 = mq.Receive (MessageQueueTransactionType.Single); + + Assert.AreEqual (body, received1.Body); + } + + [Test] + public void SendWithTransactionType () + { + Message sent1 = new Message ("Message 1"); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-7", true); + mq.MessageReadPropertyFilter.SetAll(); + mq.Send (sent1, MessageQueueTransactionType.Single); + + Message received1 = mq.Receive (); + Assert.IsNotNull (received1.TransactionId, "TransactionId not set"); + } + + [Test] + public void SendWithTransactionTypeAndLabel () + { + Message sent1 = new Message ("Message 1"); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-8", true); + mq.MessageReadPropertyFilter.SetAll(); + String label = "mylabel"; + + mq.Send (sent1, label, MessageQueueTransactionType.Single); + + Message received1 = mq.Receive (); + Assert.IsNotNull (received1.TransactionId, "TransactionId not set"); + Assert.AreEqual (label, received1.Label, "Label not set"); + } + + [Test] + public void ReceiveByIdWithTransaction () + { + String body = "Message 4"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-9", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveById (id, tx); + + tx.Commit (); + + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveByIdWithTransactionAbort () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-10", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveById (id, tx); + + tx.Abort (); + } + + Message received2 = mq.Receive (); + Assert.AreEqual (body, received2.Body); + } + + [Test] + public void ReceiveByIdWithTransactionType () + { + String body = "Message 4"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-11", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + Message received1 = mq.ReceiveById (id, MessageQueueTransactionType.Single); + Assert.AreEqual (body, received1.Body); + } + + [Test] + public void ReceiveByCorrelationIdWithTransaction () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "Message 4"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + sent1.CorrelationId = correlationId; + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-12", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveByCorrelationId(correlationId, tx); + + tx.Commit (); + + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveByCorrelationIdWithTransactionAbort () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "foo-" + DateTime.Now.ToString(); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + sent1.CorrelationId = correlationId; + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-13", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveByCorrelationId (correlationId, tx); + + tx.Abort (); + } + + Message received2 = mq.Receive (); + Assert.AreEqual (body, received2.Body); + } + + [Test] + public void ReceiveByCorrelationIdWithTransactionType () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "Message 10"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + sent1.CorrelationId = correlationId; + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-14", true); + mq.Formatter = new BinaryMessageFormatter (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + Message received1 = mq.ReceiveByCorrelationId (correlationId, MessageQueueTransactionType.Single); + Assert.AreEqual (body, received1.Body); + } + + [Test] + public void ReceiveWithTransactionAndTimeout () + { + String body = "Message 11"; + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-15", true); + mq.Formatter = new BinaryMessageFormatter (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.Receive (new TimeSpan (0, 0, 2), tx); + + tx.Commit (); + + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveWithTransactionAndTimeoutAndAbort () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-16", true); + mq.Formatter = new BinaryMessageFormatter (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.Receive (new TimeSpan (0, 0, 2), tx); + + tx.Abort (); + } + + Message received2 = mq.Receive (); + Assert.AreEqual (body, received2.Body); + } + + [Test] + public void ReceiveWithTransactionTypeAndTimeout () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-17", true); + mq.Formatter = new BinaryMessageFormatter (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + + Message received1 = mq.Receive (new TimeSpan (0, 0, 5), MessageQueueTransactionType.Single); + + Assert.AreEqual (body, received1.Body); + } + + [Test] + [ExpectedException (typeof (MessageQueueException))] + public void ReceiveWithTransactionTypeAndTimeoutFailure () + { + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-18", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + Message received1 = mq.Receive (new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single); + } + + [Test] + public void ReceiveByIdWithTransactionAndTimeout () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-19", true); + mq.Formatter = new BinaryMessageFormatter (); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveById (id, new TimeSpan (0, 0, 2), tx); + + tx.Commit (); + + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveByIdWithTransactionTypeAndTimeout () + { + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-20", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + Message received1 = mq.ReceiveById (id, new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single); + Assert.AreEqual (body, received1.Body); + } + + [Test] + public void ReceiveByCorrelationIdWithTransactionAndTimeout () + { + string correlationId = Guid.NewGuid () + "\\0"; + String body = "foo-" + DateTime.Now.ToString (); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + sent1.CorrelationId = correlationId; + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-21", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + using (MessageQueueTransaction tx = new MessageQueueTransaction ()) { + tx.Begin (); + + Message received1 = mq.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2), tx); + tx.Commit (); + Assert.AreEqual (body, received1.Body); + } + } + + [Test] + public void ReceiveByCorrelationIdWithTransactionTypeAndTimeout () + { + string correlationId = Guid.NewGuid() + "\\0"; + String body = "foo-" + DateTime.Now.ToString(); + Message sent1 = new Message (body, new BinaryMessageFormatter ()); + sent1.CorrelationId = correlationId; + MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-22", true); + Assert.IsTrue (mq.Transactional, "Message Queue should be transactional"); + mq.Send (sent1, MessageQueueTransactionType.Single); + string id = sent1.Id; + + Message received1 = mq.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single); + Assert.AreEqual (body, received1.Body); + } + } +} |