Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAtsushi Eno <atsushieno@gmail.com>2008-12-07 12:55:03 +0300
committerAtsushi Eno <atsushieno@gmail.com>2008-12-07 12:55:03 +0300
commit56f2a5eee8ee484218835cc53663685ac70b06c5 (patch)
tree0d588368b1de72b2f8ee7120e377d3f226d243fb /mcs/class/Mono.Messaging.RabbitMQ
parent571a0e148cce4ff8797f85982b34707e632610e6 (diff)
Apply RabbitMQ support patch by Michael Barker, on bug #457089.
svn path=/branches/messaging-2008/mcs/; revision=120957
Diffstat (limited to 'mcs/class/Mono.Messaging.RabbitMQ')
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources1
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog22
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs31
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs17
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs584
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs140
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs111
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources6
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs130
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs247
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog36
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs66
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs67
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs43
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs187
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs4
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs138
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs467
18 files changed, 2124 insertions, 173 deletions
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
index 21a917c3dec..193d4813a17 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
@@ -4,5 +4,6 @@
./Mono.Messaging.RabbitMQ/MessageFactory.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
+./Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog
index f3f3d3d1c25..3604f63ebf9 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/ChangeLog
@@ -1,3 +1,25 @@
+2008-12-07 Michael Barker <mike@middlesoft.co.uk>
+
+ * RabbitMQMessageQueue.cs: Throw MessageUnavailableException when there are
+ no messages.
+
+2008-11-23 Michael Barker <mike@middlesoft.co.uk>
+
+ * RabbitMQMessageQueue.cs: Added selector support for ReceiveBy{Id,CorrelationId}
+ and support for MessageQueueTransactionType, currently only None and Single
+ are supported. Added PeekBy{Id,CorrelationId} methods.
+ * MessageFactory.cs: Made read/write message methods non-static and requires
+ the MessagingProvider as a constructor parameter.
+
+2008-11-02 Michael Barker <mike@middlesoft.co.uk>
+
+ * RabbitMQMessageQueueTransaction.cs: New, Handles transactional delivery
+ by maintaining the transaction context.
+ * RabbitMQMessageQueue.cs: Added support for transactions, purging, deleting
+ and refactored some of the methods to improve the code reuse.
+ * RabbitMQMessagingProvider.cs: Added methods for queue deletion and
+ creating transactions.
+
2008-10-26 Michael Barker <mike@middlesoft.co.uk>
* MessageFactory.cs: Support all properties defined in the 1.1 version of
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
index aa0f852bf0d..e48da2ca2ba 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
@@ -67,10 +67,18 @@ namespace Mono.Messaging.RabbitMQ {
private static readonly string USE_AUTHENTICATION_KEY = "UseAuthentication";
private static readonly string USE_DEAD_LETTER_QUEUE_KEY = "UseDeadLetterQueue";
private static readonly string USE_ENCRYPTION_KEY = "UseEncryption";
+ private static readonly string TRANSACTION_ID_KEY = "TrandactionId";
private static readonly int PERSISTENT_DELIVERY_MODE = 2;
- public static IMessageBuilder WriteMessage (IModel ch, IMessage msg)
+ private readonly RabbitMQMessagingProvider provider;
+
+ public MessageFactory (RabbitMQMessagingProvider provider)
+ {
+ this.provider = provider;
+ }
+
+ public IMessageBuilder WriteMessage (IModel ch, IMessage msg)
{
BasicMessageBuilder mb = new BasicMessageBuilder (ch);
mb.Properties.MessageId = msg.Id;
@@ -103,6 +111,7 @@ namespace Mono.Messaging.RabbitMQ {
headers[SENDER_CERTIFICATE_KEY] = msg.SenderCertificate;
headers[TIME_TO_BE_RECEIVED_KEY] = msg.TimeToBeReceived.Ticks;
headers[TIME_TO_REACH_QUEUE_KEY] = msg.TimeToReachQueue.Ticks;
+ SetValue (headers, TRANSACTION_ID_KEY, msg.TransactionId);
headers[USE_AUTHENTICATION_KEY] = msg.UseAuthentication;
headers[USE_DEAD_LETTER_QUEUE_KEY] = msg.UseDeadLetterQueue;
headers[USE_ENCRYPTION_KEY] = msg.UseEncryption;
@@ -122,7 +131,7 @@ namespace Mono.Messaging.RabbitMQ {
headers[name] = val;
}
- public static IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result)
+ public IMessage ReadMessage (QueueReference destination, BasicDeliverEventArgs result)
{
/*
if (destination == null)
@@ -133,22 +142,24 @@ namespace Mono.Messaging.RabbitMQ {
MessageBase msg = new MessageBase ();
Stream s = new MemoryStream ();
s.Write (result.Body, 0, result.Body.Length);
- Console.WriteLine ("Body.Length Out {0}", result.Body.Length);
DateTime arrivedTime = DateTime.Now;
IDictionary headers = result.BasicProperties.Headers;
long senderVersion = (long) headers[SENDER_VERSION_KEY];
string sourceMachine = GetString (headers, SOURCE_MACHINE_KEY);
DateTime sentTime = AmqpTimestampToDateTime (result.BasicProperties.Timestamp);
+ string transactionId = GetString (headers, TRANSACTION_ID_KEY);
msg.SetDeliveryInfo (Acknowledgment.None,
arrivedTime,
- new RabbitMQMessageQueue (destination),
+ new RabbitMQMessageQueue (provider,
+ destination,
+ true),
result.BasicProperties.MessageId,
MessageType.Normal,
new byte[0],
senderVersion,
sentTime,
sourceMachine,
- null);
+ transactionId);
msg.CorrelationId = result.BasicProperties.CorrelationId;
msg.BodyStream = s;
msg.BodyType = (int) result.BasicProperties.Headers[BODY_TYPE_KEY];
@@ -156,8 +167,12 @@ namespace Mono.Messaging.RabbitMQ {
Enum.ToObject (typeof (AcknowledgeTypes),
headers[ACKNOWLEDGE_TYPE_KEY]);
string adminQueuePath = GetString (headers, ADMINISTRATION_QUEUE_KEY);
- if (adminQueuePath != null)
- msg.AdministrationQueue = new RabbitMQMessageQueue (QueueReference.Parse (adminQueuePath));
+ if (adminQueuePath != null) {
+ QueueReference qRef = QueueReference.Parse (adminQueuePath);
+ msg.AdministrationQueue = new RabbitMQMessageQueue (provider,
+ qRef,
+ true);
+ }
msg.AppSpecific = (int) headers[APP_SPECIFIC_KEY];
msg.AuthenticationProviderName = GetString (headers, AUTHENTICATION_PROVIDER_NAME_KEY);
msg.AuthenticationProviderType = (CryptographicProviderType) Enum.ToObject (typeof (CryptographicProviderType), headers[AUTHENTICATION_PROVIDER_TYPE_KEY]);
@@ -172,7 +187,7 @@ namespace Mono.Messaging.RabbitMQ {
msg.Priority = (MessagePriority) Enum.ToObject (typeof (MessagePriority), result.BasicProperties.Priority);
msg.Recoverable = result.BasicProperties.DeliveryMode == PERSISTENT_DELIVERY_MODE;
if (result.BasicProperties.ReplyTo != null)
- msg.ResponseQueue = new RabbitMQMessageQueue (QueueReference.Parse (result.BasicProperties.ReplyTo));
+ msg.ResponseQueue = new RabbitMQMessageQueue (provider, QueueReference.Parse (result.BasicProperties.ReplyTo), true);
msg.SenderCertificate = (byte[]) headers[SENDER_CERTIFICATE_KEY];
msg.TimeToBeReceived = new TimeSpan((long) headers[TIME_TO_BE_RECEIVED_KEY]);
msg.TimeToReachQueue = new TimeSpan((long) headers[TIME_TO_REACH_QUEUE_KEY]);
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
index 33d6236426d..30d0ea3542b 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
@@ -43,13 +43,16 @@ namespace Mono.Messaging.RabbitMQ {
public class RabbitMQMessageEnumerator : IMessageEnumerator {
+ private readonly MessageFactory helper;
private readonly QueueReference qRef;
private IConnection cn = null;
private BasicDeliverEventArgs current = null;
private IModel model = null;
private Subscription subscription = null;
- public RabbitMQMessageEnumerator (QueueReference qRef) {
+ public RabbitMQMessageEnumerator (MessageFactory helper,
+ QueueReference qRef) {
+ this.helper = helper;
this.qRef = qRef;
}
@@ -140,9 +143,19 @@ namespace Mono.Messaging.RabbitMQ {
return msg;
}
+ public IMessage RemoveCurrent (IMessageQueueTransaction transaction)
+ {
+ throw new NotSupportedException ("Unable to remove messages within a transaction");
+ }
+
+ public IMessage RemoveCurrent (MessageQueueTransactionType transactionType)
+ {
+ throw new NotSupportedException ("Unable to remove messages within a transaction");
+ }
+
private IMessage CreateMessage (BasicDeliverEventArgs result)
{
- return MessageFactory.ReadMessage (qRef, result);
+ return helper.ReadMessage (qRef, result);
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
index eacc97675d2..b83eaca15a9 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
@@ -55,14 +55,33 @@ namespace Mono.Messaging.RabbitMQ {
private ISynchronizeInvoke synchronizingObject = null;
private bool useJournalQueue = false;
private QueueReference qRef = QueueReference.DEFAULT;
+ private readonly RabbitMQMessagingProvider provider;
+ private readonly MessageFactory helper;
+ private readonly string realm;
+ private readonly bool transactional;
- public RabbitMQMessageQueue ()
+ public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
+ bool transactional)
+ : this (provider, QueueReference.DEFAULT, transactional)
{
}
- public RabbitMQMessageQueue (QueueReference qRef)
+ public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
+ QueueReference qRef,
+ bool transactional)
+ : this (provider, "/data", qRef, transactional)
{
+ }
+
+ public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
+ string realm, QueueReference qRef,
+ bool transactional)
+ {
+ this.provider = provider;
+ this.helper = new MessageFactory (provider);
+ this.realm = realm;
this.qRef = qRef;
+ this.transactional = transactional;
}
public bool Authenticate {
@@ -130,7 +149,7 @@ namespace Mono.Messaging.RabbitMQ {
}
public bool Transactional {
- get { throw new NotImplementedException (); }
+ get { return transactional; }
}
public bool UseJournalQueue {
@@ -155,23 +174,43 @@ namespace Mono.Messaging.RabbitMQ {
return version;
}
- private void SetDeliveryInfo (IMessage msg, IConnection cn)
+ private void SetDeliveryInfo (IMessage msg, long senderVersion,
+ string transactionId)
{
- long senderVersion = GetVersion (cn);
msg.SetDeliveryInfo (Acknowledgment.None,
DateTime.MinValue,
this,
- Guid.NewGuid ().ToString (),
+ Guid.NewGuid ().ToString () + "\\0",
MessageType.Normal,
new byte[0],
senderVersion,
DateTime.UtcNow,
null,
- null);
+ transactionId);
}
+ public void Close ()
+ {
+ // No-op (Queue are currently stateless)
+ }
+
+ public static void Delete (string realm, QueueReference qRef)
+ {
+ ConnectionFactory cf = new ConnectionFactory ();
+
+ using (IConnection cn = cf.CreateConnection (qRef.Host)) {
+ using (IModel model = cn.CreateModel ()) {
+ ushort ticket = model.AccessRequest (realm);
+ model.QueueDelete (ticket, qRef.Queue, false, false, false);
+ }
+ }
+ }
+
public void Send (IMessage msg)
{
+ if (QRef == QueueReference.DEFAULT)
+ throw new MonoMessagingException ("Path has not been specified");
+
if (msg.BodyStream == null)
throw new ArgumentException ("Message is not serialized properly");
@@ -179,17 +218,9 @@ namespace Mono.Messaging.RabbitMQ {
try {
using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+ SetDeliveryInfo (msg, GetVersion (cn), null);
using (IModel ch = cn.CreateModel ()) {
- ushort ticket = ch.AccessRequest ("/data");
- string finalName = ch.QueueDeclare (ticket, QRef.Queue, false);
- SetDeliveryInfo (msg, cn);
- IMessageBuilder mb = MessageFactory.WriteMessage (ch, msg);
- Console.WriteLine("Body.Length In {0}", mb.GetContentBody ().Length);
-
- ch.BasicPublish (ticket, "",
- finalName,
- (IBasicProperties) mb.GetContentHeader(),
- mb.GetContentBody ());
+ Send (ch, msg);
}
}
} catch (BrokerUnreachableException e) {
@@ -197,32 +228,525 @@ namespace Mono.Messaging.RabbitMQ {
}
}
- public IMessage Receive ()
+ public void Send (IMessage msg, IMessageQueueTransaction transaction)
+ {
+ if (QRef == QueueReference.DEFAULT)
+ throw new MonoMessagingException ("Path has not been specified");
+
+ if (msg.BodyStream == null)
+ throw new ArgumentException ("Message is not serialized properly");
+
+ RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
+
+ tx.RunSend (SendInContext, msg);
+ }
+
+ public void Send (IMessage msg, MessageQueueTransactionType transactionType)
+ {
+ switch (transactionType) {
+ case MessageQueueTransactionType.Single:
+ using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
+ try {
+ Send (msg, tx);
+ tx.Commit ();
+ } catch (Exception e) {
+ tx.Abort ();
+ throw new MonoMessagingException(e.Message, e);
+ }
+ }
+ break;
+
+ case MessageQueueTransactionType.None:
+ Send (msg);
+ break;
+
+ case MessageQueueTransactionType.Automatic:
+ throw new NotSupportedException("Automatic transaction types not supported");
+ }
+ }
+
+ private void SendInContext (ref string host, ref IConnection cn,
+ ref IModel model, IMessage msg, string txId)
+ {
+ if (host == null)
+ host = QRef.Host;
+ else if (host != QRef.Host)
+ throw new MonoMessagingException ("Transactions can not span multiple hosts");
+
+ if (cn == null) {
+ ConnectionFactory cf = new ConnectionFactory ();
+ cn = cf.CreateConnection (host);
+ }
+
+ if (model == null) {
+ model = cn.CreateModel ();
+ model.TxSelect ();
+ }
+
+ SetDeliveryInfo (msg, GetVersion (cn), txId);
+ Send (model, msg);
+ }
+
+ private void Send (IModel model, IMessage msg)
+ {
+ ushort ticket = model.AccessRequest ("/data");
+ string finalName = model.QueueDeclare (ticket, QRef.Queue, true);
+ IMessageBuilder mb = helper.WriteMessage (model, msg);
+
+ model.BasicPublish (ticket, "", finalName,
+ (IBasicProperties) mb.GetContentHeader(),
+ mb.GetContentBody ());
+ }
+
+ public void Purge ()
+ {
+ ConnectionFactory cf = new ConnectionFactory ();
+
+ using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+ using (IModel model = cn.CreateModel ()) {
+ ushort ticket = model.AccessRequest (realm);
+ model.QueuePurge (ticket, QRef.Queue, false);
+ }
+ }
+ }
+
+ public IMessage Peek ()
{
ConnectionFactory cf = new ConnectionFactory ();
using (IConnection cn = cf.CreateConnection (QRef.Host)) {
using (IModel ch = cn.CreateModel ()) {
- ushort ticket = ch.AccessRequest ("/data");
- string finalName = ch.QueueDeclare (ticket, QRef.Queue, false);
+ return Receive (ch, -1, false);
+ }
+ }
+ }
+
+ public IMessage Peek (TimeSpan timeout)
+ {
+ return Run (Peeker (timeout));
+// ConnectionFactory cf = new ConnectionFactory ();
+//
+// using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+// using (IModel ch = cn.CreateModel ()) {
+// if (timeout == TimeSpan.MaxValue) {
+// return Receive (ch, -1, false);
+// } else {
+// return Receive (ch, (int) timeout.TotalMilliseconds, false);
+// }
+// }
+// }
+ }
+
+ public IMessage PeekById (string id)
+ {
+ return Run (Peeker (ById (id)));
+// ConnectionFactory cf = new ConnectionFactory ();
+//
+// using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+// using (IModel ch = cn.CreateModel ()) {
+// return Receive (ch, 500, true, new IdMatcher (id).MatchById);
+// }
+// }
+ }
+
+ public IMessage PeekById (string id, TimeSpan timeout)
+ {
+ return Run (Peeker (timeout, ById (id)));
+ }
+
+ public IMessage PeekByCorrelationId (string id)
+ {
+ return Run (Peeker (ByCorrelationId (id)));
+// ConnectionFactory cf = new ConnectionFactory ();
+//
+// using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+// using (IModel ch = cn.CreateModel ()) {
+// return Receive (ch, 500, false,
+// new CorrelationIdMatcher (id).MatchById);
+// }
+// }
+ }
+
+ public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
+ {
+ return Run (Peeker (timeout, ByCorrelationId (id)));
+ }
+
+ public IMessage Receive ()
+ {
+ return Run (Receiver ());
+ }
+
+ public IMessage Receive (TimeSpan timeout)
+ {
+ return Run (Receiver (timeout));
+ }
+
+ public IMessage Receive (TimeSpan timeout,
+ IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver (timeout));
+ }
+
+ public IMessage Receive (TimeSpan timeout,
+ MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver (timeout));
+ }
+
+ public IMessage Receive (IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver());
+ }
+
+ public IMessage Receive (MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver ());
+ }
+
+ public IMessage ReceiveById (string id)
+ {
+ return Run (Receiver (ById (id)));
+ }
+
+ public IMessage ReceiveById (string id, TimeSpan timeout)
+ {
+ return Run (Receiver (timeout, ById (id)));
+ }
+
+ public IMessage ReceiveById (string id,
+ IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver (ById (id)));
+ }
+
+ public IMessage ReceiveById (string id,
+ MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver (ById (id)));
+ }
+
+ public IMessage ReceiveById (string id, TimeSpan timeout,
+ IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver (timeout, ById (id)));
+ }
+
+ public IMessage ReceiveById (string id, TimeSpan timeout,
+ MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver (timeout, ById (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id)
+ {
+ return Run (Receiver (ByCorrelationId (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
+ {
+ return Run (Receiver (timeout, ByCorrelationId (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id,
+ IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver (ByCorrelationId (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id,
+ MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver (ByCorrelationId (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
+ IMessageQueueTransaction transaction)
+ {
+ return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
+ }
+
+ public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
+ MessageQueueTransactionType transactionType)
+ {
+ return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
+ }
+
+ public IMessageEnumerator GetMessageEnumerator ()
+ {
+ return new RabbitMQMessageEnumerator (helper, QRef);
+ }
+
+ private IMessage Run (MessageQueueTransactionType transactionType,
+ TxReceiver.DoReceive r)
+ {
+ switch (transactionType) {
+ case MessageQueueTransactionType.Single:
+ using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
+ bool success = false;
+ try {
+ IMessage msg = Run (tx, r);
+ tx.Commit ();
+ success = true;
+ return msg;
+ } finally {
+ if (!success)
+ tx.Abort ();
+ }
+ }
+
+ case MessageQueueTransactionType.None:
+ return Run (r);
+
+ default:
+ throw new NotSupportedException(transactionType + " not supported");
+ }
+ }
+
+ private IMessage Run (IMessageQueueTransaction transaction,
+ TxReceiver.DoReceive r)
+ {
+ TxReceiver txr = new TxReceiver (this, r);
+ RabbitMQMessageQueueTransaction tx =
+ (RabbitMQMessageQueueTransaction) transaction;
+ return tx.RunReceive (txr.ReceiveInContext);
+ }
+
+ private IMessage Run (TxReceiver.DoReceive r)
+ {
+ ConnectionFactory cf = new ConnectionFactory ();
+ using (IConnection cn = cf.CreateConnection (QRef.Host)) {
+ using (IModel model = cn.CreateModel ()) {
+ return r (this, model);
+ }
+ }
+ }
+
+ private IMessage ReceiveInContext (ref string host, ref IConnection cn,
+ ref IModel model, string txId)
+ {
+ if (host == null)
+ host = QRef.Host;
+ else if (host != QRef.Host)
+ throw new MonoMessagingException ("Transactions can not span multiple hosts");
+
+ if (cn == null) {
+ ConnectionFactory cf = new ConnectionFactory ();
+ cn = cf.CreateConnection (host);
+ }
+
+ if (model == null) {
+ model = cn.CreateModel ();
+ model.TxSelect ();
+ }
+
+ return Receive (model, -1, true);
+ }
+
+ private class TxReceiver
+ {
+ private readonly DoReceive doReceive;
+ private readonly RabbitMQMessageQueue q;
+
+ public TxReceiver(RabbitMQMessageQueue q, DoReceive doReceive) {
+ this.q = q;
+ this.doReceive = doReceive;
+ }
+
+ public delegate IMessage DoReceive (RabbitMQMessageQueue q, IModel model);
+
+ public IMessage ReceiveInContext (ref string host, ref IConnection cn,
+ ref IModel model, string txId)
+ {
+ if (host == null)
+ host = q.QRef.Host;
+ else if (host != q.QRef.Host)
+ throw new MonoMessagingException ("Transactions can not span multiple hosts");
+
+ if (cn == null) {
+ ConnectionFactory cf = new ConnectionFactory ();
+ cn = cf.CreateConnection (host);
+ }
+
+ if (model == null) {
+ model = cn.CreateModel ();
+ model.TxSelect ();
+ }
+
+ return doReceive (q, model);
+ }
+ }
+
+ private class DoReceiveWithTimeout
+ {
+ private readonly int timeout;
+ private readonly IsMatch matcher;
+ private readonly bool ack;
+
+ public DoReceiveWithTimeout (int timeout, IsMatch matcher)
+ : this (timeout, matcher, true)
+ {
+ }
+
+ public DoReceiveWithTimeout (int timeout, IsMatch matcher, bool ack)
+ {
+ if (matcher != null && timeout == -1)
+ this.timeout = 500;
+ else
+ this.timeout = timeout;
+ this.matcher = matcher;
+ this.ack = ack;
+ }
+
+ public IMessage DoReceive (RabbitMQMessageQueue q, IModel model)
+ {
+ if (matcher == null)
+ return q.Receive (model, timeout, ack);
+ else
+ return q.Receive (model, timeout, ack, matcher);
+ }
+ }
+
+ private static TxReceiver.DoReceive Receiver (TimeSpan timeout,
+ IsMatch matcher)
+ {
+ int to = TimeSpanToInt32 (timeout);
+ return new DoReceiveWithTimeout (to, matcher).DoReceive;
+ }
+
+ private static TxReceiver.DoReceive Receiver (IsMatch matcher)
+ {
+ return new DoReceiveWithTimeout (-1, matcher).DoReceive;
+ }
+
+ private static TxReceiver.DoReceive Receiver (TimeSpan timeout)
+ {
+ int to = TimeSpanToInt32 (timeout);
+ return new DoReceiveWithTimeout (to, null).DoReceive;
+ }
+
+ private TxReceiver.DoReceive Receiver ()
+ {
+ return new DoReceiveWithTimeout (-1, null).DoReceive;
+ }
+
+ private TxReceiver.DoReceive Peeker ()
+ {
+ return new DoReceiveWithTimeout (-1, null).DoReceive;
+ }
+
+ private TxReceiver.DoReceive Peeker (TimeSpan timeout)
+ {
+ int to = TimeSpanToInt32 (timeout);
+ return new DoReceiveWithTimeout (to, null, false).DoReceive;
+ }
+
+ private TxReceiver.DoReceive Peeker (IsMatch matcher)
+ {
+ return new DoReceiveWithTimeout (-1, matcher, false).DoReceive;
+ }
+
+ private TxReceiver.DoReceive Peeker (TimeSpan timeout, IsMatch matcher)
+ {
+ int to = TimeSpanToInt32 (timeout);
+ return new DoReceiveWithTimeout (to, matcher, false).DoReceive;
+ }
+
+ delegate bool IsMatch (BasicDeliverEventArgs result);
+
+ private class IdMatcher
+ {
+ private readonly string id;
+ public IdMatcher (string id)
+ {
+ this.id = id;
+ }
+
+ public bool MatchById (BasicDeliverEventArgs result)
+ {
+ return result.BasicProperties.MessageId == id;
+ }
+ }
+
+ private static IsMatch ById (string id)
+ {
+ return new IdMatcher (id).MatchById;
+ }
+
+ private class CorrelationIdMatcher
+ {
+ private readonly string correlationId;
+ public CorrelationIdMatcher (string correlationId)
+ {
+ this.correlationId = correlationId;
+ }
+
+ public bool MatchById (BasicDeliverEventArgs result)
+ {
+ return result.BasicProperties.CorrelationId == correlationId;
+ }
+ }
+
+ private static IsMatch ByCorrelationId (string correlationId)
+ {
+ return new CorrelationIdMatcher (correlationId).MatchById;
+ }
+
+ private IMessage Receive (IModel model, int timeout, bool doAck)
+ {
+ Console.WriteLine ("{0}, {1}", timeout, doAck);
+
+ ushort ticket = model.AccessRequest (realm);
+ string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+
+ using (Subscription sub = new Subscription (model, ticket, finalName)) {
+ BasicDeliverEventArgs result;
+ if (sub.Next (timeout, out result)) {
+ IMessage m = helper.ReadMessage (QRef, result);
+ if (doAck)
+ sub.Ack (result);
+ return m;
+ } else {
+ throw new MonoMessagingException ("No Message Available");
+ }
+ }
+ }
+
+ private IMessage Receive (IModel model, int timeout,
+ bool doAck, IsMatch matcher)
+ {
+ Console.WriteLine ("{0}, {1}", timeout, doAck);
+
+ ushort ticket = model.AccessRequest (realm);
+ string finalName = model.QueueDeclare (ticket, QRef.Queue, false);
+
+ using (Subscription sub = new Subscription (model, ticket, finalName)) {
+ BasicDeliverEventArgs result;
+ while (sub.Next (timeout, out result)) {
- Subscription sub = new Subscription (ch, ticket, finalName);
- BasicDeliverEventArgs result = sub.Next ();
- sub.Ack (result);
- sub.Close ();
- if (result == null) {
- throw new MonoMessagingException ("No Message Available");
- } else {
- IMessage m = MessageFactory.ReadMessage (QRef, result);
+ if (matcher (result)) {
+ IMessage m = helper.ReadMessage (QRef, result);
+ if (doAck)
+ sub.Ack (result);
return m;
}
}
+
+ throw new MessageUnavailableException ("Message not available");
}
}
- public IMessageEnumerator GetMessageEnumerator ()
+ private RabbitMQMessageQueueTransaction GetTx ()
+ {
+ return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
+ }
+
+ private static int TimeSpanToInt32 (TimeSpan timespan)
{
- return new RabbitMQMessageEnumerator (QRef);
+ if (timespan == TimeSpan.MaxValue)
+ return -1;
+ else
+ return (int) timespan.TotalMilliseconds;
}
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
new file mode 100644
index 00000000000..52e3a93cf6c
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
@@ -0,0 +1,140 @@
+//
+// Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Collections;
+using System.ComponentModel;
+using System.IO;
+using System.Text;
+
+using Mono.Messaging;
+
+using RabbitMQ.Client;
+using RabbitMQ.Client.Content;
+using RabbitMQ.Client.Events;
+using RabbitMQ.Client.Exceptions;
+using RabbitMQ.Client.MessagePatterns;
+using RabbitMQ.Util;
+
+namespace Mono.Messaging.RabbitMQ {
+
+ public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction {
+
+ private readonly string txId;
+ private MessageQueueTransactionStatus status = MessageQueueTransactionStatus.Initialized;
+ private IConnection cn = null;
+ private IModel model = null;
+ private String host = null;
+ private bool isDisposed = false;
+ private Object syncObj = new Object ();
+
+ public RabbitMQMessageQueueTransaction (string txId)
+ {
+ this.txId = txId;
+ }
+
+ public MessageQueueTransactionStatus Status {
+ get {
+ lock (syncObj)
+ return status;
+ }
+ }
+
+ public void Abort ()
+ {
+ lock (syncObj) {
+ if (model != null)
+ model.TxRollback ();
+ status = MessageQueueTransactionStatus.Aborted;
+ }
+ }
+
+ public void Begin ()
+ {
+ lock (syncObj) {
+ if (status == MessageQueueTransactionStatus.Pending)
+ throw new InvalidOperationException ("Transaction already started");
+ status = MessageQueueTransactionStatus.Pending;
+ }
+ }
+
+ public void Commit ()
+ {
+ lock (syncObj) {
+ model.TxCommit ();
+ status = MessageQueueTransactionStatus.Committed;
+ }
+ }
+
+ public string Id {
+ get { return txId; }
+ }
+
+ public delegate void Send (ref string host, ref IConnection cn,
+ ref IModel model, IMessage msg, string txId);
+
+ public delegate IMessage Receive (ref string host, ref IConnection cn,
+ ref IModel model, string txId);
+
+ public void RunSend (Send sendDelegate, IMessage msg)
+ {
+ lock (syncObj) {
+ sendDelegate (ref host, ref cn, ref model, msg, Id);
+ }
+ }
+
+ public IMessage RunReceive (Receive receiveDelegate)
+ {
+ lock (syncObj) {
+ return receiveDelegate (ref host, ref cn, ref model, Id);
+ }
+ }
+
+ public void Dispose ()
+ {
+ Dispose (true);
+ GC.SuppressFinalize (this);
+ }
+
+ protected virtual void Dispose (bool disposing)
+ {
+ lock (syncObj) {
+ if (!isDisposed && disposing) {
+ if (model != null)
+ model.Dispose ();
+ if (cn != null)
+ cn.Dispose ();
+ isDisposed = true;
+ }
+ }
+ }
+
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
index 88236f635c7..45445659d0e 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
@@ -29,6 +29,10 @@
//
using System;
+using System.Collections;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
using Mono.Messaging;
@@ -36,26 +40,113 @@ namespace Mono.Messaging.RabbitMQ {
public class RabbitMQMessagingProvider : IMessagingProvider {
- public bool Exists (QueueReference qRef)
- {
- // In AMQP all queues exist, because they are declared rather
- // than created.
- return true;
- }
+ private volatile uint txCounter = 0;
+ private readonly uint localIp;
+ private static readonly string DEFAULT_REALM = "/data";
- public IMessageQueue GetMessageQueue ()
+ public RabbitMQMessagingProvider()
{
- return new RabbitMQMessageQueue ();
+ localIp = GetLocalIP ();
}
- public IMessageQueue CreateMessageQueue (QueueReference qRef)
+ private static uint GetLocalIP ()
{
- return new RabbitMQMessageQueue (qRef);
+ //IPHostEntry host = Dns.GetHostEntry (Dns.GetHostName ());
+ String strHostName = Dns.GetHostName ();
+ IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
+ foreach (IPAddress ip in ipEntry.AddressList) {
+ if (AddressFamily.InterNetwork == ip.AddressFamily) {
+ byte[] addr = ip.GetAddressBytes ();
+ uint localIP = 0;
+ for (int i = 0; i < 4 && i < addr.Length; i++) {
+ localIP += (uint) (addr[i] << 8 * (3 - i));
+ }
+ return localIP;
+ }
+ }
+ return 0;
}
public IMessage CreateMessage ()
{
return new MessageBase ();
}
+
+ public IMessageQueueTransaction CreateMessageQueueTransaction ()
+ {
+ string txId = localIp.ToString () + (++txCounter).ToString ();
+ return new RabbitMQMessageQueueTransaction (txId);
+ }
+
+ public void DeleteQueue (QueueReference qRef)
+ {
+ RabbitMQMessageQueue.Delete (DEFAULT_REALM, qRef);
+ }
+
+ private readonly IDictionary queues = new Hashtable ();
+ private readonly ReaderWriterLock qLock = new ReaderWriterLock ();
+ private const int TIMEOUT = 15000;
+
+ public IMessageQueue[] GetPublicQueues ()
+ {
+ IMessageQueue[] qs;
+ qLock.AcquireReaderLock (TIMEOUT);
+ try {
+ ICollection qCollection = queues.Values;
+ qs = new IMessageQueue[qCollection.Count];
+ qCollection.CopyTo (qs, 0);
+ return qs;
+ } finally {
+ qLock.ReleaseReaderLock ();
+ }
+ }
+
+ public bool Exists (QueueReference qRef)
+ {
+ qLock.AcquireReaderLock (TIMEOUT);
+ try {
+ return queues.Contains (qRef);
+ } finally {
+ qLock.ReleaseReaderLock ();
+ }
+ }
+
+ public IMessageQueue CreateMessageQueue (QueueReference qRef,
+ bool transactional)
+ {
+ qLock.AcquireWriterLock (TIMEOUT);
+ try {
+ IMessageQueue mq = new RabbitMQMessageQueue (this, qRef,
+ transactional);
+ queues[qRef] = mq;
+ return mq;
+ } finally {
+ qLock.ReleaseWriterLock ();
+ }
+ }
+
+ public IMessageQueue GetMessageQueue (QueueReference qRef)
+ {
+ qLock.AcquireReaderLock (TIMEOUT);
+ try {
+ if (queues.Contains (qRef))
+ return (IMessageQueue) queues[qRef];
+ else {
+ LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
+ try {
+ IMessageQueue mq = new RabbitMQMessageQueue (this, qRef,
+ false);
+ queues[qRef] = mq;
+ return mq;
+ } finally {
+ qLock.DowngradeFromWriterLock (ref lc);
+ }
+ }
+ } finally {
+ qLock.ReleaseReaderLock ();
+ }
+ }
+
+
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
index c2368a61c89..1845e0082bc 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
@@ -1,3 +1,9 @@
+Mono.Messaging.RabbitMQ/MQUtil.cs
Mono.Messaging.RabbitMQ/BasicMessagingTest.cs
+Mono.Messaging.RabbitMQ/AdminTest.cs
+Mono.Messaging.RabbitMQ/FailuresTest.cs
Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs
Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs
+Mono.Messaging.RabbitMQ/PeekTest.cs
+Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs
+Mono.Messaging.RabbitMQ/SelectorTest.cs \ No newline at end of file
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs
new file mode 100644
index 00000000000..612f401988d
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/AdminTest.cs
@@ -0,0 +1,130 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+using System.Reflection;
+using System.Threading;
+using System.Text.RegularExpressions;
+
+using NUnit.Framework;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ [TestFixture]
+ public class AdminTest {
+
+ [Test]
+ public void CreateNonTransactionalQueue ()
+ {
+ string qName = @".\private$\admin-queue-1";
+ Assert.IsFalse (MessageQueue.Exists (qName), "Queue should not exist");
+ MessageQueue q = MessageQueue.Create (qName);
+ Assert.IsFalse (q.Transactional);
+ Assert.IsTrue (MessageQueue.Exists (qName), "Queue should exist");
+ }
+
+ [Test]
+ public void CreateTransactionalQueue ()
+ {
+ string qName = @".\private$\admin-queue-2";
+ Assert.IsFalse (MessageQueue.Exists (qName), "Queue should not exist");
+ MessageQueue q = MessageQueue.Create (qName, true);
+ Assert.IsTrue (q.Transactional, "Queue should be transactional");
+ Assert.IsTrue (MessageQueue.Exists (qName), "Queue should exist");
+ }
+
+ private bool Contains(MessageQueue[] qs, String qName)
+ {
+ foreach (MessageQueue q in qs)
+ {
+ if (q.QueueName == qName)
+ return true;
+ }
+ return false;
+ }
+
+ [Test]
+ public void GetPublicQueues ()
+ {
+ string qName1 = @".\admin-queue-3";
+ string qName2 = @".\admin-queue-4";
+
+ MessageQueue.Create (qName1);
+ MessageQueue.Create (qName2);
+
+ MessageQueue[] mq = MessageQueue.GetPublicQueues ();
+ Assert.IsTrue (Contains (mq, "admin-queue-3"), qName1 + " not found");
+ Assert.IsTrue (Contains (mq, "admin-queue-4"), qName2 + " not found");
+ }
+
+ [Test]
+ public void GetQueue ()
+ {
+ MessageQueue q1 = MQUtil.GetQueue(@".\private$\admin-queue-5", true);
+ Assert.IsTrue (q1.Transactional, "Queue should be transactional");
+ MessageQueue q2 = MQUtil.GetQueue(@".\private$\admin-queue-5", true);
+ Assert.IsTrue (q2.Transactional, "Queue should be transactional");
+ }
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void PurgeQueue ()
+ {
+ MessageQueue q = MQUtil.GetQueue(@".\private$\purge-queue");
+ Message m1 = new Message ("foobar1", new BinaryMessageFormatter ());
+ Message m2 = new Message ("foobar2", new BinaryMessageFormatter ());
+ Message m3 = new Message ("foobar3", new BinaryMessageFormatter ());
+ Message m4 = new Message ("foobar4", new BinaryMessageFormatter ());
+
+ q.Send (m1);
+ q.Send (m2);
+ q.Send (m3);
+ q.Send (m4);
+
+ Message received = q.Receive ();
+ q.Purge ();
+ q.Receive (new TimeSpan (0, 0, 2));
+ }
+
+ [Test]
+ public void DeleteQueue ()
+ {
+ MessageQueue q = MQUtil.GetQueue(@".\private$\delete-queue");
+ Message m1 = new Message ("foobar1", new BinaryMessageFormatter ());
+
+ q.Send (m1);
+
+ Message received = q.Receive ();
+
+ MessageQueue.Delete(@".\private$\delete-queue");
+ }
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs
index 31368cea674..57e02307c41 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BasicMessagingTest.cs
@@ -36,7 +36,7 @@ using System.Text.RegularExpressions;
using NUnit.Framework;
-namespace MonoTests.Mono.Messsaging.RabbitMQ
+namespace MonoTests.Mono.Messaging.RabbitMQ
{
[TestFixture]
public class BasicMessageTest {
@@ -44,12 +44,14 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
[Test]
public void SendReceiveBinaryMessage ()
{
- String qName = "testq";
- MessageQueue mq = new MessageQueue (qName);
- Assert.AreEqual (mq.QueueName, qName, "Queue name not set properly");
+ String qName = @"private$\testq";
+ String qPath = @".\" + qName;
+ MessageQueue mq = MQUtil.GetQueue (qPath);
+ Assert.AreEqual(qName, mq.QueueName, "Queue name not set properly");
String s = "Test: " + DateTime.Now;
Message m = new Message (s, new BinaryMessageFormatter ());
- m.CorrelationId = "foo";
+ m.CorrelationId = Guid.NewGuid () + "\\0";
+ mq.MessageReadPropertyFilter.SetAll ();
mq.Send (m);
@@ -57,28 +59,55 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
m2.Formatter = new BinaryMessageFormatter ();
Assert.AreEqual (s, m2.Body);
- Assert.IsTrue (DateTime.MinValue == m.ArrivedTime);
- Assert.IsNotNull (m2.Id, "Id is null");
+ //Assert.IsTrue (DateTime.MinValue == m.ArrivedTime);
+ Assert.IsNotNull(m2.Id, "Id is null");
Assert.IsTrue (Guid.Empty.ToString () != m2.Id, "Id is Empty");
- Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set");
+ Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set");
Assert.AreEqual (Acknowledgment.None, m2.Acknowledgment, "Acknowledgment");
Assert.AreEqual (m.CorrelationId, m2.CorrelationId, "CorrelationId not set properly");
Assert.IsTrue (0 != m2.SenderVersion);
- Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null");
+ // TODO: This is not support on a workgroup installation.
+ //Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null");
Assert.AreEqual (qName, m2.DestinationQueue.QueueName, "Destination Queue not set");
+
+ mq.Close ();
+ }
+
+ [Test]
+ public void SendMessageWithLabel ()
+ {
+ String qName = @".\private$\testq";
+ String label = "mylabel";
+ MessageQueue mq = MQUtil.GetQueue (qName);
+ Assert.AreEqual (@"private$\testq", mq.QueueName, "Queue name not set properly");
+ String s = "Test: " + DateTime.Now;
+ Message m = new Message (s, new BinaryMessageFormatter ());
+ m.CorrelationId = Guid.NewGuid () + "\\0" ;
+
+ mq.Send (m, label);
+
+ Message m2 = mq.Receive ();
+ m2.Formatter = new BinaryMessageFormatter ();
+ Assert.AreEqual (s, m2.Body, "Message not passed correctly");
+ Assert.AreEqual (label, m2.Label, "Label not passed correctly");
}
+
[Test]
public void CheckDefaults ()
{
Message m = new Message ("Test", new BinaryMessageFormatter ());
Assert.AreEqual (true, m.AttachSenderId, "AttachSenderId has incorrect default");
- Assert.AreEqual ("Microsoft Base Cryptographic Provider version 1.0",
+ Assert.AreEqual (Guid.Empty.ToString () + "\\0", m.Id, "Id has incorrect default");
+ Assert.AreEqual ("Microsoft Base Cryptographic Provider, Ver. 1.0",
m.AuthenticationProviderName,
"AuthenticationProviderName has incorrect default");
Assert.AreEqual (0, m.Extension.Length, "Extension has incorrect default");
Assert.AreEqual ("", m.Label, "Label has incorrect default");
Assert.IsFalse (m.Recoverable, "Recoverable has incorrect default");
+ Assert.IsFalse (m.IsFirstInTransaction, "IsFirstInTransaction has incorrect default");
+ Assert.IsFalse (m.IsLastInTransaction, "IsLastInTransaction has incorrect default");
+ Assert.AreEqual ("", m.TransactionId, "TransactionId has incorrect default");
Assert.AreEqual (MessagePriority.Normal, m.Priority, "MessagePriority has incorrect default");
}
@@ -87,8 +116,8 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
PropertyInfo pi = m.GetType ().GetProperty (property);
try {
Assert.IsNotNull (pi, "Property not defined: " + property);
- pi.GetValue (m, null);
- Assert.Fail (property);
+ object o = pi.GetValue (m, null);
+ Assert.Fail (property + ": " + o);
} catch (InvalidOperationException e) {
} catch (TargetInvocationException e) {
Assert.AreEqual (typeof (InvalidOperationException),
@@ -104,9 +133,9 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
CheckInvalidOperation (m, "ArrivedTime");
CheckInvalidOperation (m, "Authenticated");
CheckInvalidOperation (m, "DestinationQueue");
- CheckInvalidOperation (m, "Id");
- CheckInvalidOperation (m, "IsFirstInTransaction");
- CheckInvalidOperation (m, "IsLastInTransaction");
+ //CheckInvalidOperation (m, "Id");
+ //CheckInvalidOperation (m, "IsFirstInTransaction");
+ //CheckInvalidOperation (m, "IsLastInTransaction");
// TODO: Support 2.0 features.
//CheckInvalidOperation (m, "LookupId");
CheckInvalidOperation (m, "MessageType");
@@ -114,71 +143,75 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
CheckInvalidOperation (m, "SenderVersion");
CheckInvalidOperation (m, "SentTime");
CheckInvalidOperation (m, "SourceMachine");
- CheckInvalidOperation (m, "TransactionId");
+ //CheckInvalidOperation (m, "TransactionId");
}
- private static void CheckArgumentInvalid (Message m, String property)
+ private static void CheckArgumentInvalid(Message m, String property, Type exceptionType)
{
- PropertyInfo pi = m.GetType ().GetProperty (property);
+ PropertyInfo pi = m.GetType().GetProperty(property);
try {
- Assert.IsNotNull (pi, "Property not defined: " + property);
- pi.SetValue (m, null, null);
- Assert.Fail (property);
+ Assert.IsNotNull(pi, "Property not defined: " + property);
+ pi.SetValue(m, null, null);
+ Assert.Fail(property);
} catch (InvalidOperationException e) {
} catch (TargetInvocationException e) {
- Assert.AreEqual (typeof (ArgumentException),
- e.InnerException.GetType ());
+ Assert.AreEqual(exceptionType,
+ e.InnerException.GetType(),
+ property);
}
}
-
+
[Test]
public void CheckArgumentInvalidForProperties ()
{
Message m = new Message ("Stuff");
- CheckArgumentInvalid (m, "DestinationSymmetricKey");
- CheckArgumentInvalid (m, "DigitalSignature");
- CheckArgumentInvalid (m, "Extension");
+ CheckArgumentInvalid (m, "DestinationSymmetricKey", typeof (ArgumentNullException));
+ CheckArgumentInvalid (m, "DigitalSignature", typeof(ArgumentNullException));
+ CheckArgumentInvalid (m, "Extension", typeof(ArgumentNullException));
}
-
+
[Test]
public void SendReceiveBinaryMessageWithAllPropertiesSet ()
- {
- String qName = "testq";
- MessageQueue mq = new MessageQueue (qName);
- Assert.AreEqual (mq.QueueName, qName, "Queue name not set properly");
+ {
+ String qName = @"private$\testq";
+ String qPath = @".\" + qName;
+ MessageQueue mq = MQUtil.GetQueue (qPath);
+ mq.MessageReadPropertyFilter.SetAll ();
+ Assert.AreEqual(qName, mq.QueueName, "Queue name not set properly");
- MessageQueue adminQ = new MessageQueue ("myAdmin");
- MessageQueue responseQ = new MessageQueue ("myResponse");
+ MessageQueue adminQ = MQUtil.GetQueue (@".\private$\myadmin");
+ MessageQueue responseQ = MQUtil.GetQueue (@".\private$\myresponse");
Guid connectorType = Guid.NewGuid ();
String s = "Test: " + DateTime.Now;
Message m = new Message (s, new BinaryMessageFormatter ());
- m.CorrelationId = "foo";
+ m.CorrelationId = Guid.NewGuid () + "\\0";
m.AcknowledgeType = AcknowledgeTypes.PositiveArrival;
m.AdministrationQueue = adminQ;
m.AppSpecific = 5;
- m.AuthenticationProviderName = "Test Provider Name";
- m.AuthenticationProviderType = CryptographicProviderType.None;
- m.ConnectorType = connectorType;
- m.DestinationSymmetricKey = new byte[] { 0x0A, 0x0B, 0x0C };
- m.DigitalSignature = new byte[] { 0x0C, 0x0D, 0x0E };
- m.EncryptionAlgorithm = EncryptionAlgorithm.Rc4;
+ //m.AuthenticationProviderName = "Test Provider Name";
+ //m.AuthenticationProviderType = CryptographicProviderType.None;
+ //m.ConnectorType = connectorType;
+ //m.DestinationSymmetricKey = new byte[] { 0x0A, 0x0B, 0x0C };
+ //m.DigitalSignature = new byte[] { 0x0C, 0x0D, 0x0E };
+ //m.EncryptionAlgorithm = EncryptionAlgorithm.Rc4;
m.Extension = new byte[] { 0x01, 0x02, 0x03 };
- m.HashAlgorithm = HashAlgorithm.Sha;
+ //m.HashAlgorithm = HashAlgorithm.Sha;
m.Label = "MyLabel";
m.Priority = MessagePriority.AboveNormal;
m.Recoverable = true;
m.ResponseQueue = responseQ;
m.SenderCertificate = new byte[] { 0x04, 0x05, 0x06 };
- m.TimeToBeReceived = new TimeSpan(0, 0, 5);
- m.TimeToReachQueue = new TimeSpan(0, 0, 10);
- m.UseAuthentication = true;
+ m.TimeToBeReceived = new TimeSpan(0, 0, 10);
+ m.TimeToReachQueue = new TimeSpan(0, 0, 5);
+ //m.UseAuthentication = true;
m.UseDeadLetterQueue = true;
- m.UseEncryption = true;
+ //m.UseEncryption = true;
mq.Send (m);
-
+
Message m2 = mq.Receive ();
+
m2.Formatter = new BinaryMessageFormatter ();
Assert.AreEqual (s, m2.Body);
@@ -187,23 +220,23 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
Assert.AreEqual (adminQ.QueueName, m2.AdministrationQueue.QueueName,
"AdministrationQueue not passed correctly");
Assert.AreEqual (5, m2.AppSpecific, "AppSpecific not passed correctly");
- Assert.AreEqual (m.AuthenticationProviderName, m2.AuthenticationProviderName,
- "AuthenticationProviderName not passed correctly");
- Assert.AreEqual (m.AuthenticationProviderType, m2.AuthenticationProviderType,
- "AuthenticationProviderType not passed correctly");
- Assert.AreEqual (connectorType, m2.ConnectorType,
- "ConnectorType not passed correctly");
+ //Assert.AreEqual (m.AuthenticationProviderName, m2.AuthenticationProviderName,
+ // "AuthenticationProviderName not passed correctly");
+ //Assert.AreEqual (m.AuthenticationProviderType, m2.AuthenticationProviderType,
+ // "AuthenticationProviderType not passed correctly");
+ //Assert.AreEqual (connectorType, m2.ConnectorType,
+ // "ConnectorType not passed correctly");
Assert.AreEqual (m.CorrelationId, m2.CorrelationId,
"CorrelationId not passed correctly");
- AreEqual (m.DestinationSymmetricKey, m2.DestinationSymmetricKey,
- "DestinationSymmetricKey not passed correctly");
- AreEqual (m.DigitalSignature, m2.DigitalSignature,
- "DigitalSignature not passed properly");
- Assert.AreEqual (EncryptionAlgorithm.Rc4, m2.EncryptionAlgorithm,
- "EncryptionAlgorithm not passed properly");
+ //AreEqual (m.DestinationSymmetricKey, m2.DestinationSymmetricKey,
+ // "DestinationSymmetricKey not passed correctly");
+ //AreEqual (m.DigitalSignature, m2.DigitalSignature,
+ // "DigitalSignature not passed properly");
+ //Assert.AreEqual (EncryptionAlgorithm.Rc4, m2.EncryptionAlgorithm,
+ // "EncryptionAlgorithm not passed properly");
AreEqual (m.Extension, m2.Extension, "Extension not passed properly");
- Assert.AreEqual (m.HashAlgorithm, m2.HashAlgorithm,
- "HashAlgorithm not passed properly");
+ //Assert.AreEqual (m.HashAlgorithm, m2.HashAlgorithm,
+ // "HashAlgorithm not passed properly");
Assert.AreEqual (m.Label, m2.Label, "Label not passed correctly");
Assert.AreEqual (MessagePriority.AboveNormal, m2.Priority,
"Priority not passed properly");
@@ -211,25 +244,26 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
Assert.AreEqual (responseQ.QueueName, m2.ResponseQueue.QueueName,
"ResponseQueue not passed properly");
AreEqual (m.SenderCertificate, m2.SenderCertificate,
- "SenderCertificate not passed properly");
+ "SenderCertificate not passed properly");
Assert.AreEqual (m.TimeToBeReceived, m2.TimeToBeReceived,
"TimeToBeReceived not passed properly");
Assert.AreEqual (m.TimeToReachQueue, m2.TimeToReachQueue,
"TimeToReachQueue not passed properly");
- Assert.IsTrue (m2.UseAuthentication,
- "UseAuthentication not passed properly");
+ //Assert.IsTrue (m2.UseAuthentication,
+ // "UseAuthentication not passed properly");
Assert.IsTrue (m2.UseDeadLetterQueue,
"UseDeadLetterQueue not passed properly");
- Assert.IsTrue (m2.UseEncryption, "UseEncryption not pass properly");
+ //Assert.IsTrue (m2.UseEncryption, "UseEncryption not pass properly");
//Assert.AreEqual ();
- Assert.IsTrue (DateTime.MinValue == m.ArrivedTime);
Assert.IsNotNull (m2.Id, "Id is null");
Assert.IsTrue (Guid.Empty.ToString () != m2.Id, "Id is Empty");
- Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set");
+ Assert.IsTrue (DateTime.MinValue != m2.ArrivedTime, "Arrived Time is not set");
Assert.AreEqual (Acknowledgment.None, m2.Acknowledgment, "Acknowledgment");
Assert.IsTrue (0 != m2.SenderVersion);
- Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null");
+
+ //Assert.IsNotNull (m2.SourceMachine, "SourceMachine is null");
+
}
private static void AreEqual(byte[] expected, byte[] actual, string message)
@@ -239,12 +273,35 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
Assert.AreEqual (expected[i], actual[i], message);
}
+ //[Test]
+ // No supported by Rabbit
+ public void SendPriorityMessages ()
+ {
+ MessageQueue mq = MQUtil.GetQueue ("testpriority");
+ Message sent1 = new Message ("Highest", new BinaryMessageFormatter ());
+ sent1.Priority = MessagePriority.Highest;
+ Message sent2 = new Message ("Lowest", new BinaryMessageFormatter ());
+ sent2.Priority = MessagePriority.Lowest;
+
+ mq.Send (sent1);
+ mq.Send (sent2);
+
+ Message received1 = mq.Receive ();
+ Message received2 = mq.Receive ();
+
+ Assert.AreEqual (MessagePriority.Highest, received2.Priority,
+ "Priority delivery incorrect");
+ Assert.AreEqual (MessagePriority.Lowest, received1.Priority,
+ "Priority delivery incorrect");
+ }
+
[Test]
public void SendReceiveXmlMessage ()
{
- MessageQueue mq = new MessageQueue ("testq");
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\testq");
String s = "Test: " + DateTime.Now;
Message m = new Message (s, new XmlMessageFormatter (new Type[] { typeof (string) }));
+ mq.MessageReadPropertyFilter.SetAll();
mq.Send (m);
Message m2 = mq.Receive ();
@@ -257,26 +314,12 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
}
[Test]
- public void CreateQueue ()
- {
- string path = @".\private$\MyQ";
- string body = "This is a test";
-
- MessageQueue q = MessageQueue.Create (path);
- Assert.IsNotNull (q);
- }
-
- [Test]
public void SendBinaryText ()
{
string path = @".\private$\MyQ";
string body = "This is a test";
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
+ MessageQueue q = MQUtil.GetQueue (path);
q.Formatter = new BinaryMessageFormatter ();
@@ -295,17 +338,13 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
string path = @".\private$\MyQ";
string body = "This is a test";
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
+ MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ());
q.Send (body);
Message m2 = q.Receive ();
XmlMessageFormatter xmlf = (XmlMessageFormatter) q.Formatter;
- Assert.AreEqual (typeof (string), xmlf.TargetTypes[0]);
+ //Assert.AreEqual (typeof (string), xmlf.TargetTypes[0]);
Assert.AreEqual (typeof (XmlMessageFormatter), m2.Formatter.GetType ());
Assert.AreEqual (body, m2.Body);
Assert.AreEqual (0, m2.BodyType);
@@ -320,11 +359,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
body.MyProperty2 = "Something";
body.MyProperty3 = "Something else";
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
+ MessageQueue q = MQUtil.GetQueue (path);
q.Formatter = new BinaryMessageFormatter ();
@@ -349,15 +384,11 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
body.MyProperty2 = "Something";
body.MyProperty3 = "Something else";
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
+ MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ());
q.Send (body);
- MessageQueue q2 = new MessageQueue (path);
+ MessageQueue q2 = MQUtil.GetQueue (path);
q2.Formatter = new XmlMessageFormatter (new Type[] { typeof(Thingy) });
Message m2 = q2.Receive ();
@@ -380,15 +411,9 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
Message m1 = new Message (body);
m1.Formatter = new BinaryMessageFormatter ();
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
-
+ MessageQueue q = MQUtil.GetQueue (path);
q.Send (m1);
- //Assert.IsNotNull (m1.Id);
Message m2 = q.Receive ();
m2.Formatter = new BinaryMessageFormatter ();
@@ -412,11 +437,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
Message m1 = new Message (body);
Assert.IsNull (m1.Formatter);
- MessageQueue q;
- if (MessageQueue.Exists (path))
- q = new MessageQueue (path);
- else
- q = MessageQueue.Create (path);
+ MessageQueue q = MQUtil.GetQueue (path, new XmlMessageFormatter ());
q.Send (m1);
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog
index fd6cb0f7b13..fea66238c2b 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/ChangeLog
@@ -1,3 +1,39 @@
+2008-12-06 Michael Barker <mike@middlesoft.co.uk>
+
+ * AdminTest.cs: Updated to run against MS.NET
+ * BasicMessagingTest.cs: Updated to run against MS.NET
+ * FailuresTest.cs: Updated to run against MS.NET
+ * MessageEnumeratorTest.cs: Updated to run against MS.NET
+ * MQUtil.cs: Updated to run against MS.NET
+ * PeekTest.cs: Updated to run against MS.NET
+ * SelectorTest.cs: Updated to run against MS.NET
+ * TransactionMessagingTest.cs: Updated to run against MS.NET
+
+2008-12-01 Michael Barker <mike@middlesoft.co.uk>
+
+ * AdminTest.cs: Added tests for queue discovery methods.
+
+2008-11-23 Michael Barker <mike@middlesoft.co.uk>
+
+ * TransactionMessagingTest.cs: Added tests for all methods that the
+ transaction type argument, currently only Single is supported. Added methods
+ with transactions and timeout.
+ * PeekTest.cs: Added PeekBy{Id,CorrelationId} tests.
+
+2008-11-09 Michael Barker <mike@middlesoft.co.uk>
+
+ * SelectorTest.cs: New, tests for ReceiveByXYZ() methods
+
+2008-11-04 Michael Barker <mike@middlesoft.co.uk>
+
+ * PeekTest.cs: New, tests for peeking at messages.
+
+2008-11-02 Michael Barker <mike@middlesoft.co.uk>
+
+ * TransactionMessagingTest.cs: New, tests for transactional messaging.
+ * FailuresTest.cs: New, tests for exceptions.
+ * AdminTest.cs: New, tests for administration functions.
+
2008-09-29 Michael Barker <mike@middlesoft.co.uk>
* BasicMessagingTest.cs: New
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs
new file mode 100644
index 00000000000..c4ca13e1cef
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/FailuresTest.cs
@@ -0,0 +1,66 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+using System.Reflection;
+using System.Threading;
+using System.Text.RegularExpressions;
+
+using NUnit.Framework;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ [TestFixture]
+ public class FailuresTest {
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void SendWithPathNotSet ()
+ {
+ MessageQueue q = new MessageQueue ();
+ Message m = new Message ("foobar", new BinaryMessageFormatter ());
+
+ q.Send (m);
+ }
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void SendInTransactionWithPathNotSet ()
+ {
+ MessageQueue q = new MessageQueue ();
+ Message m = new Message ("foobar", new BinaryMessageFormatter ());
+ MessageQueueTransaction tx = new MessageQueueTransaction ();
+
+ q.Send (m, tx);
+ }
+
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs
new file mode 100644
index 00000000000..6c32aca5393
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MQUtil.cs
@@ -0,0 +1,67 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ public class MQUtil
+ {
+ public static MessageQueue GetQueue (string path)
+ {
+ return GetQueue (path, false);
+ }
+
+ public static MessageQueue GetQueue (string path, bool isTransactional)
+ {
+ return GetQueue (path, isTransactional,
+ new BinaryMessageFormatter ());
+ }
+
+ public static MessageQueue GetQueue (string path, IMessageFormatter formatter)
+ {
+ return GetQueue (path, false, formatter);
+ }
+
+ public static MessageQueue GetQueue (string path, bool isTransactional,
+ IMessageFormatter formatter)
+ {
+ MessageQueue q;
+ if (MessageQueue.Exists (path)) {
+ q = new MessageQueue (path);
+ } else {
+ q = MessageQueue.Create (path, isTransactional);
+ }
+ q.Formatter = formatter;
+ return q;
+ }
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs
index e918e72df69..5c4c2bf3584 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/MessageEnumeratorTest.cs
@@ -31,22 +31,22 @@
using System;
using System.Messaging;
-using Mono.Messaging;
-using Mono.Messaging.RabbitMQ;
+//using Mono.Messaging;
+//using Mono.Messaging.RabbitMQ;
using NUnit.Framework;
-namespace MonoTests.Mono.Messsaging.RabbitMQ
+namespace MonoTests.Mono.Messaging.RabbitMQ
{
[TestFixture]
public class MessageEnumeratorTest {
- private readonly String qName = "testq2";
+ private readonly String qName = @".\private$\testq2";
private void SendMessage (string s) {
- MessageQueue mq = new MessageQueue (qName);
+ MessageQueue mq = MQUtil.GetQueue (qName);
Message m = new Message (s, new BinaryMessageFormatter ());
- m.CorrelationId = "foo";
+ m.CorrelationId = Guid.NewGuid () + "\\0";
mq.Send (m);
}
@@ -58,7 +58,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
SendMessage ("message 3");
SendMessage ("message 4");
- MessageQueue mq0 = new MessageQueue (qName);
+ MessageQueue mq0 = MQUtil.GetQueue (qName);
MessageEnumerator me0 = mq0.GetMessageEnumerator ();
me0.MoveNext ();
@@ -72,7 +72,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
me0.Dispose ();
mq0.Dispose ();
- MessageQueue mq1 = new MessageQueue (qName);
+ MessageQueue mq1 = MQUtil.GetQueue (qName);
MessageEnumerator me1 = mq1.GetMessageEnumerator ();
me1.MoveNext();
@@ -83,6 +83,33 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
m1.Formatter = new BinaryMessageFormatter ();
Console.WriteLine ("{0}", m1.Body);
Assert.AreEqual ("message 4", (String) m1.Body, "body incorrect");
+
+ mq1.Purge ();
+ MessageQueue.Delete (qName);
+ }
+
+ //[Test]
+ // Not supported with AMQP
+ public void RemoveMessageWithTx ()
+ {
+ MessageQueue q = MQUtil.GetQueue ("testq3");
+
+ q.Formatter = new BinaryMessageFormatter ();
+ q.Send ("foo1");
+ q.Send ("foo2");
+
+ MessageEnumerator me1 = q.GetMessageEnumerator ();
+ MessageQueueTransaction tx = new MessageQueueTransaction ();
+ me1.MoveNext ();
+ Message m1 = me1.Current;
+ me1.RemoveCurrent (tx);
+ tx.Commit ();
+ me1.Close ();
+
+ MessageEnumerator me2 = q.GetMessageEnumerator ();
+ Assert.IsTrue (me1.MoveNext ());
+ me2.RemoveCurrent ();
+ Assert.IsFalse (me2.MoveNext ());
}
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs
new file mode 100644
index 00000000000..2c6c60b49b3
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/PeekTest.cs
@@ -0,0 +1,187 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+
+using NUnit.Framework;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ [TestFixture]
+ public class PeekTest {
+
+ [Test]
+ public void PeekMessage ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\peek-queue-1");
+ mq.Send (s1);
+
+ Message r1 = mq.Peek ();
+ Assert.AreEqual (body, r1.Body);
+
+ Message r2 = mq.Receive ();
+ Assert.AreEqual (body, r2.Body);
+ }
+
+ [Test]
+ public void PeekMessageWithTimeout ()
+ {
+ String body = "foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ MessageQueue mq = MQUtil.GetQueue(@".\private$\peek-queue-2");
+ mq.Send (s1);
+
+ Message r1 = mq.Peek (new TimeSpan (0, 0, 2));
+ Assert.AreEqual (body, r1.Body);
+
+ Message r2 = mq.Receive ();
+ Assert.AreEqual (body, r2.Body);
+ }
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void PeekNoMessageWithTimeout ()
+ {
+ MessageQueue mq = MQUtil.GetQueue(@".\private$\peek-queue-3");
+ Message r1 = mq.Peek (new TimeSpan (0, 0, 2));
+ }
+
+ [Test]
+ public void PeekById ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-4");
+ q.Send (s1);
+
+ String id = s1.Id;
+ try {
+ Message r1 = q.PeekById (id);
+ Assert.AreEqual (body, r1.Body, "Unable to PeekById correctly");
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ public void PeekByIdWithTimeout ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-5");
+ q.Send (s1);
+
+ String id = s1.Id;
+ try {
+ Message r1 = q.PeekById (id, new TimeSpan (0, 0, 2));
+ Assert.AreEqual (body, r1.Body, "Unable to PeekById correctly");
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ [ExpectedException (typeof (InvalidOperationException))]
+ public void PeekByIdNotFound ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-6");
+ q.Send (s1);
+
+ String id = "fail!";
+
+ try {
+ Message r1 = q.PeekById (id);
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ public void PeekByCorrelationId ()
+ {
+ String correlationId = Guid.NewGuid () + "\\0";
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ s1.CorrelationId = correlationId;
+ MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-7");
+ q.Formatter = new BinaryMessageFormatter ();
+ q.Send (s1);
+
+ try {
+ Message r1 = q.PeekByCorrelationId (correlationId);
+ Assert.AreEqual (body, r1.Body, "Unable to PeekByCorrelationId correctly");
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ [ExpectedException (typeof (InvalidOperationException))]
+ public void PeekByCorrelationIdNotFound ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body);
+ String correlationId = Guid.NewGuid() + "\\0";
+ MessageQueue q = MQUtil.GetQueue(@".\private$\peek-queue-8");
+ q.Formatter = new BinaryMessageFormatter ();
+ q.Send (s1);
+
+ try {
+ Message r1 = q.PeekByCorrelationId ("fail!");
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ public void PeekByCorrelationIdWithTimeout ()
+ {
+ String correlationId = Guid.NewGuid () + "\\0";
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ s1.CorrelationId = correlationId;
+ MessageQueue q = MQUtil.GetQueue (@".\private$\peek-queue-9");
+ q.Formatter = new BinaryMessageFormatter ();
+ q.Send (s1);
+
+ try {
+ Message r1 = q.PeekByCorrelationId (correlationId, new TimeSpan (0, 0, 2));
+ Assert.AreEqual (body, r1.Body, "Unable to PeekByCorrelationId correctly");
+ } finally {
+ q.Purge ();
+ }
+ }
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs
index 65305f7c9ef..31fd8f19478 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/RabbitMQMessagingProviderTest.cs
@@ -35,7 +35,7 @@ using Mono.Messaging.RabbitMQ;
using NUnit.Framework;
-namespace MonoTests.Mono.Messsaging.RabbitMQ
+namespace MonoTests.Mono.Messaging.RabbitMQ
{
[TestFixture]
public class RabbitMQMessagingProviderTest {
@@ -52,7 +52,7 @@ namespace MonoTests.Mono.Messsaging.RabbitMQ
public void GetMessageQueue ()
{
IMessagingProvider p = new RabbitMQMessagingProvider ();
- IMessageQueue q = p.GetMessageQueue ();
+ IMessageQueue q = p.CreateMessageQueue (QueueReference.DEFAULT, true);
Assert.IsNotNull (q);
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs
new file mode 100644
index 00000000000..73caaedfc6e
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/SelectorTest.cs
@@ -0,0 +1,138 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+
+using NUnit.Framework;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ [TestFixture]
+ public class SelectorTest
+ {
+
+ [Test]
+ public void SelectById ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString ();
+ Message s1 = new Message (body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue (@".\private$\selector-queue-1");
+ q.Send (s1);
+
+ String id = s1.Id;
+
+ Message r1 = q.ReceiveById (id);
+
+ Assert.AreEqual (body, r1.Body, "Unable to ReceiveById correctly");
+ }
+
+ [Test]
+ [ExpectedException (typeof (InvalidOperationException))]
+ public void SelectByIdNotFound ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-2");
+ q.Send (s1);
+
+ String id = "fail!";
+
+ try {
+ Message r1 = q.ReceiveById (id);
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ public void SelectByCorrelationId ()
+ {
+ string correlationId = Guid.NewGuid () + "\\0";
+ String body = "Foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ s1.CorrelationId = correlationId;
+ MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-3");
+ q.Send (s1);
+
+ Message r1 = q.ReceiveByCorrelationId (correlationId);
+
+ Assert.AreEqual (body, r1.Body, "Unable to ReceiveByCorrelationId correctly");
+ }
+
+ [Test]
+ [ExpectedException (typeof (InvalidOperationException))]
+ public void SelectByCorrelationIdNotFound ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "Foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ s1.CorrelationId = correlationId;
+ MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-4");
+ q.Send (s1);
+
+ try {
+ Message r1 = q.ReceiveByCorrelationId ("fail!");
+ } finally {
+ q.Purge ();
+ }
+ }
+
+ [Test]
+ public void SelectByIdWithTimeout ()
+ {
+ String body = "Foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-5");
+ q.Send (s1);
+
+ String id = s1.Id;
+
+ Message r1 = q.ReceiveById (id, new TimeSpan (0, 0, 2));
+
+ Assert.AreEqual (body, r1.Body, "Unable to ReceiveById correctly");
+ }
+
+ [Test]
+ public void SelectByCorrelationIdWithTimeout ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "Foo-" + DateTime.Now.ToString();
+ Message s1 = new Message(body, new BinaryMessageFormatter());
+ s1.CorrelationId = correlationId;
+ MessageQueue q = MQUtil.GetQueue(@".\private$\selector-queue-3");
+ q.Send (s1);
+
+ Message r1 = q.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2));
+
+ Assert.AreEqual (body, r1.Body, "Unable to ReceiveByCorrelationId correctly");
+ }
+ }
+}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs
new file mode 100644
index 00000000000..e0230952e26
--- /dev/null
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/TransactionMessagingTest.cs
@@ -0,0 +1,467 @@
+//
+// Test.Mono.Messaging.RabbitMQ
+//
+// Authors:
+// Michael Barker (mike@middlesoft.co.uk)
+//
+// (C) 2008 Michael Barker
+//
+
+//
+// Permission is hereby granted, free of charge, to any person obtaining
+// a copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to
+// permit persons to whom the Software is furnished to do so, subject to
+// the following conditions:
+//
+// The above copyright notice and this permission notice shall be
+// included in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+//
+
+using System;
+using System.Messaging;
+using System.Reflection;
+using System.Threading;
+using System.Text.RegularExpressions;
+
+using NUnit.Framework;
+
+namespace MonoTests.Mono.Messaging.RabbitMQ
+{
+ [TestFixture]
+ public class TransactionMessageTest {
+
+ [Test]
+ public void Send2WithTransaction ()
+ {
+ Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ());
+ Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-1", true);
+ mq.MessageReadPropertyFilter.SetAll ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ mq.Send (sent1, tx);
+ mq.Send (sent2, tx);
+
+ tx.Commit ();
+
+ Message received1 = mq.Receive ();
+ Assert.IsNotNull (received1.TransactionId, "TransactionId not set");
+ Message received2 = mq.Receive ();
+ Assert.IsNotNull (received2.TransactionId, "TransactionId not set");
+
+ Assert.AreEqual (received1.TransactionId, received2.TransactionId, "Messages have differing TransactionIds");
+ Assert.IsTrue (received1.TransactionId.Length > 1);
+ Assert.AreEqual (sent1.Body, received1.Body, "Message 1 not delivered correctly");
+ Assert.AreEqual (sent2.Body, received2.Body, "Message 2 not delivered correctly");
+ }
+ }
+
+ [Test]
+ public void Send2WithLabelWithTransaction ()
+ {
+ String label1 = "label1";
+ String label2 = "label2";
+ Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ());
+ Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-2", true);
+ mq.MessageReadPropertyFilter.SetAll ();
+ Assert.IsTrue(mq.Transactional, "Message Queue should be transactional");
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ mq.Send (sent1, label1, tx);
+ mq.Send (sent2, label2, tx);
+
+ tx.Commit ();
+
+ Message received1 = mq.Receive ();
+ Assert.IsNotNull (received1.TransactionId, "TransactionId not set");
+ Message received2 = mq.Receive ();
+ Assert.IsNotNull (received2.TransactionId, "TransactionId not set");
+
+ Assert.AreEqual (received1.TransactionId, received2.TransactionId, "Messages have differing TransactionIds");
+ Assert.IsTrue (received1.TransactionId.Length > 1);
+ Assert.AreEqual (sent1.Body, received1.Body, "Message 1 not delivered correctly");
+ Assert.AreEqual (sent2.Body, received2.Body, "Message 2 not delivered correctly");
+ Assert.AreEqual (label1, received1.Label, "Label 1 not passed correctly");
+ Assert.AreEqual (label2, received2.Label, "Label 2 not passed correctly");
+ }
+ }
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void Send2WithTransactionAbort ()
+ {
+ Message sent1 = new Message ("Message 1", new BinaryMessageFormatter ());
+ Message sent2 = new Message ("Message 2", new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-1", true);
+ mq.MessageReadPropertyFilter.SetAll ();
+ Assert.IsTrue(mq.Transactional, "Message Queue should be transactional");
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ mq.Send (sent1, tx);
+ mq.Send (sent2, tx);
+
+ tx.Abort ();
+ mq.Receive (new TimeSpan (0, 0, 2));
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTransaction ()
+ {
+ String body = "Message 4";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-4", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.Receive (tx);
+
+ tx.Commit ();
+
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTransactionAbort ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-5", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.Receive (tx);
+
+ tx.Abort ();
+ }
+
+ Message received2 = mq.Receive ();
+ Assert.AreEqual (body, received2.Body);
+ }
+
+ [Test]
+ public void ReceiveWithTransactionType ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-6", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ Message received1 = mq.Receive (MessageQueueTransactionType.Single);
+
+ Assert.AreEqual (body, received1.Body);
+ }
+
+ [Test]
+ public void SendWithTransactionType ()
+ {
+ Message sent1 = new Message ("Message 1");
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-7", true);
+ mq.MessageReadPropertyFilter.SetAll();
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ Message received1 = mq.Receive ();
+ Assert.IsNotNull (received1.TransactionId, "TransactionId not set");
+ }
+
+ [Test]
+ public void SendWithTransactionTypeAndLabel ()
+ {
+ Message sent1 = new Message ("Message 1");
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-8", true);
+ mq.MessageReadPropertyFilter.SetAll();
+ String label = "mylabel";
+
+ mq.Send (sent1, label, MessageQueueTransactionType.Single);
+
+ Message received1 = mq.Receive ();
+ Assert.IsNotNull (received1.TransactionId, "TransactionId not set");
+ Assert.AreEqual (label, received1.Label, "Label not set");
+ }
+
+ [Test]
+ public void ReceiveByIdWithTransaction ()
+ {
+ String body = "Message 4";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-9", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveById (id, tx);
+
+ tx.Commit ();
+
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveByIdWithTransactionAbort ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-10", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveById (id, tx);
+
+ tx.Abort ();
+ }
+
+ Message received2 = mq.Receive ();
+ Assert.AreEqual (body, received2.Body);
+ }
+
+ [Test]
+ public void ReceiveByIdWithTransactionType ()
+ {
+ String body = "Message 4";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-11", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ Message received1 = mq.ReceiveById (id, MessageQueueTransactionType.Single);
+ Assert.AreEqual (body, received1.Body);
+ }
+
+ [Test]
+ public void ReceiveByCorrelationIdWithTransaction ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "Message 4";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ sent1.CorrelationId = correlationId;
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-12", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveByCorrelationId(correlationId, tx);
+
+ tx.Commit ();
+
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveByCorrelationIdWithTransactionAbort ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "foo-" + DateTime.Now.ToString();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ sent1.CorrelationId = correlationId;
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-13", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveByCorrelationId (correlationId, tx);
+
+ tx.Abort ();
+ }
+
+ Message received2 = mq.Receive ();
+ Assert.AreEqual (body, received2.Body);
+ }
+
+ [Test]
+ public void ReceiveByCorrelationIdWithTransactionType ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "Message 10";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ sent1.CorrelationId = correlationId;
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-14", true);
+ mq.Formatter = new BinaryMessageFormatter ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ Message received1 = mq.ReceiveByCorrelationId (correlationId, MessageQueueTransactionType.Single);
+ Assert.AreEqual (body, received1.Body);
+ }
+
+ [Test]
+ public void ReceiveWithTransactionAndTimeout ()
+ {
+ String body = "Message 11";
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-15", true);
+ mq.Formatter = new BinaryMessageFormatter ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.Receive (new TimeSpan (0, 0, 2), tx);
+
+ tx.Commit ();
+
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTransactionAndTimeoutAndAbort ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-16", true);
+ mq.Formatter = new BinaryMessageFormatter ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.Receive (new TimeSpan (0, 0, 2), tx);
+
+ tx.Abort ();
+ }
+
+ Message received2 = mq.Receive ();
+ Assert.AreEqual (body, received2.Body);
+ }
+
+ [Test]
+ public void ReceiveWithTransactionTypeAndTimeout ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-17", true);
+ mq.Formatter = new BinaryMessageFormatter ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+
+ Message received1 = mq.Receive (new TimeSpan (0, 0, 5), MessageQueueTransactionType.Single);
+
+ Assert.AreEqual (body, received1.Body);
+ }
+
+ [Test]
+ [ExpectedException (typeof (MessageQueueException))]
+ public void ReceiveWithTransactionTypeAndTimeoutFailure ()
+ {
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-18", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ Message received1 = mq.Receive (new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single);
+ }
+
+ [Test]
+ public void ReceiveByIdWithTransactionAndTimeout ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-19", true);
+ mq.Formatter = new BinaryMessageFormatter ();
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveById (id, new TimeSpan (0, 0, 2), tx);
+
+ tx.Commit ();
+
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveByIdWithTransactionTypeAndTimeout ()
+ {
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-20", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ Message received1 = mq.ReceiveById (id, new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single);
+ Assert.AreEqual (body, received1.Body);
+ }
+
+ [Test]
+ public void ReceiveByCorrelationIdWithTransactionAndTimeout ()
+ {
+ string correlationId = Guid.NewGuid () + "\\0";
+ String body = "foo-" + DateTime.Now.ToString ();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ sent1.CorrelationId = correlationId;
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-21", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ using (MessageQueueTransaction tx = new MessageQueueTransaction ()) {
+ tx.Begin ();
+
+ Message received1 = mq.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2), tx);
+ tx.Commit ();
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+
+ [Test]
+ public void ReceiveByCorrelationIdWithTransactionTypeAndTimeout ()
+ {
+ string correlationId = Guid.NewGuid() + "\\0";
+ String body = "foo-" + DateTime.Now.ToString();
+ Message sent1 = new Message (body, new BinaryMessageFormatter ());
+ sent1.CorrelationId = correlationId;
+ MessageQueue mq = MQUtil.GetQueue (@".\private$\tx-queue-22", true);
+ Assert.IsTrue (mq.Transactional, "Message Queue should be transactional");
+ mq.Send (sent1, MessageQueueTransactionType.Single);
+ string id = sent1.Id;
+
+ Message received1 = mq.ReceiveByCorrelationId (correlationId, new TimeSpan (0, 0, 2), MessageQueueTransactionType.Single);
+ Assert.AreEqual (body, received1.Body);
+ }
+ }
+}