Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/mono.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbarkerm <michael.barker@lmax.com>2011-01-16 16:32:24 +0300
committerMiguel de Icaza <miguel@gnome.org>2011-01-16 20:06:56 +0300
commit2b516ade6dfef6edb139fc2d5b936f00c31e9b99 (patch)
tree0cadffcd70db29a67d9d95e53ab32d150f5749bc /mcs/class/Mono.Messaging.RabbitMQ
parent8566c056b285ac433fa0d00b82935d50350b99a4 (diff)
- Refactoring, code clean-up and reduced garbage - Introduce connection pooling for improved performance
Diffstat (limited to 'mcs/class/Mono.Messaging.RabbitMQ')
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources3
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs2
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs8
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs326
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs97
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs31
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources1
-rw-r--r--mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs1
8 files changed, 172 insertions, 297 deletions
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
index 193d4813a17..07619307d66 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources
@@ -1,7 +1,10 @@
./Assembly/AssemblyInfo.cs
../../build/common/Consts.cs
../../build/common/Locale.cs
+./Mono.Messaging.RabbitMQ/IMessagingContext.cs
./Mono.Messaging.RabbitMQ/MessageFactory.cs
+./Mono.Messaging.RabbitMQ/MessagingContext.cs
+./Mono.Messaging.RabbitMQ/MessagingContextPool.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
./Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
index fa9d44d5f4c..228b3ec0824 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs
@@ -220,7 +220,7 @@ namespace Mono.Messaging.RabbitMQ {
return epoch.AddSeconds (ats.UnixTime).ToLocalTime ();
}
- public static int TimeSpanToInt32 (TimeSpan timespan)
+ public static int TimeSpanToMillis (TimeSpan timespan)
{
if (timespan == TimeSpan.MaxValue)
return -1;
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
index c96c62f82fa..bf4b0580364 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs
@@ -139,8 +139,8 @@ namespace Mono.Messaging.RabbitMQ {
public bool MoveNext (TimeSpan timeout)
{
- int to = MessageFactory.TimeSpanToInt32 (timeout);
- return Subscription.Next (to, out current);
+ int timeoutMillis = MessageFactory.TimeSpanToMillis (timeout);
+ return Subscription.Next (timeoutMillis, out current);
}
public IMessage RemoveCurrent ()
@@ -178,12 +178,12 @@ namespace Mono.Messaging.RabbitMQ {
public IMessage RemoveCurrent (TimeSpan timeout, IMessageQueueTransaction transaction)
{
- throw new NotImplementedException ();
+ throw new NotSupportedException ("Unable to remove messages within a transaction");
}
public IMessage RemoveCurrent (TimeSpan timeout, MessageQueueTransactionType transactionType)
{
- throw new NotImplementedException ();
+ throw new NotSupportedException ("Unable to remove messages within a transaction");
}
private IMessage CreateMessage (BasicDeliverEventArgs result)
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
index 5c742d1ad0b..23d5a28df23 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs
@@ -50,6 +50,11 @@ namespace Mono.Messaging.RabbitMQ {
/// </summary>
public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue {
+ private readonly RabbitMQMessagingProvider provider;
+ private readonly MessageFactory helper;
+ private readonly bool transactional;
+ private readonly TimeSpan noTime = new TimeSpan(0, 0, 0, 0, 500);
+
private bool authenticate = false;
private short basePriority = 0;
private Guid category = Guid.Empty;
@@ -60,9 +65,6 @@ namespace Mono.Messaging.RabbitMQ {
private ISynchronizeInvoke synchronizingObject = null;
private bool useJournalQueue = false;
private QueueReference qRef = QueueReference.DEFAULT;
- private readonly RabbitMQMessagingProvider provider;
- private readonly MessageFactory helper;
- private readonly bool transactional;
public RabbitMQMessageQueue (RabbitMQMessagingProvider provider,
bool transactional)
@@ -166,16 +168,7 @@ namespace Mono.Messaging.RabbitMQ {
set { qRef = value; }
}
- private static long GetVersion (IConnection cn)
- {
- long version = cn.Protocol.MajorVersion;
- version = version << 32;
- version += cn.Protocol.MinorVersion;
- return version;
- }
-
- private void SetDeliveryInfo (IMessage msg, long senderVersion,
- string transactionId)
+ private void SetDeliveryInfo (IMessage msg, string transactionId)
{
msg.SetDeliveryInfo (Acknowledgment.None,
DateTime.MinValue,
@@ -183,7 +176,7 @@ namespace Mono.Messaging.RabbitMQ {
Guid.NewGuid ().ToString () + "\\0",
MessageType.Normal,
new byte[0],
- senderVersion,
+ 0,
DateTime.UtcNow,
null,
transactionId);
@@ -191,15 +184,13 @@ namespace Mono.Messaging.RabbitMQ {
public void Close ()
{
- // No-op (Queue are currently stateless)
}
public static void Delete (QueueReference qRef)
{
- using (IConnection cn = CreateConnection (qRef)) {
- using (IModel model = cn.CreateModel ()) {
- model.QueueDelete (qRef.Queue, false, false, false);
- }
+ RabbitMQMessagingProvider provider = (RabbitMQMessagingProvider) MessagingProviderLocator.GetProvider ();
+ using (IMessagingContext context = provider.CreateContext (qRef.Host)) {
+ context.Delete (qRef);
}
}
@@ -209,17 +200,15 @@ namespace Mono.Messaging.RabbitMQ {
throw new MonoMessagingException ("Path has not been specified");
if (msg.BodyStream == null)
- throw new ArgumentException ("Message is not serialized properly");
-
- try {
- using (IConnection cn = CreateConnection (QRef)) {
- SetDeliveryInfo (msg, GetVersion (cn), null);
- using (IModel ch = cn.CreateModel ()) {
- Send (ch, msg);
- }
+ throw new ArgumentException ("BodyStream is null, Message is not serialized properly");
+
+ using (IMessagingContext context = CurrentContext) {
+ try {
+ SetDeliveryInfo (msg, null);
+ context.Send (QRef, msg);
+ } catch (BrokerUnreachableException e) {
+ throw new ConnectionException (QRef, e);
}
- } catch (BrokerUnreachableException e) {
- throw new ConnectionException (QRef, e);
}
}
@@ -233,14 +222,15 @@ namespace Mono.Messaging.RabbitMQ {
RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
- tx.RunSend (SendInContext, msg);
+ SetDeliveryInfo (msg, tx.Id);
+ tx.Send (QRef, msg);
}
public void Send (IMessage msg, MessageQueueTransactionType transactionType)
{
switch (transactionType) {
case MessageQueueTransactionType.Single:
- using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) {
+ using (IMessageQueueTransaction tx = NewTx ()) {
try {
Send (msg, tx);
tx.Commit ();
@@ -260,177 +250,141 @@ namespace Mono.Messaging.RabbitMQ {
}
}
- private void SendInContext (ref string host, ref IConnection cn,
- ref IModel model, IMessage msg, string txId)
- {
- if (host == null)
- host = QRef.Host;
- else if (host != QRef.Host)
- throw new MonoMessagingException ("Transactions can not span multiple hosts");
-
- if (cn == null)
- cn = CreateConnection (QRef);
-
- if (model == null) {
- model = cn.CreateModel ();
- model.TxSelect ();
- }
-
- SetDeliveryInfo (msg, GetVersion (cn), txId);
- Send (model, msg);
- }
-
- private void Send (IModel model, IMessage msg)
- {
- string finalName = model.QueueDeclare (QRef.Queue, false);
- IMessageBuilder mb = helper.WriteMessage (model, msg);
-
- model.BasicPublish ("", finalName,
- (IBasicProperties) mb.GetContentHeader(),
- mb.GetContentBody ());
- }
-
public void Purge ()
{
- using (IConnection cn = CreateConnection (QRef)) {
- using (IModel model = cn.CreateModel ()) {
- model.QueuePurge (QRef.Queue, false);
- }
+ using (IMessagingContext context = CurrentContext) {
+ context.Purge (QRef);
}
}
public IMessage Peek ()
{
- using (IConnection cn = CreateConnection (QRef)) {
- using (IModel ch = cn.CreateModel ()) {
- return Receive (ch, -1, false);
- }
- }
+ return DoReceive (TimeSpan.MaxValue, null, false);
}
public IMessage Peek (TimeSpan timeout)
{
- return Run (Peeker (timeout));
+ return DoReceive (timeout, null, false);
}
public IMessage PeekById (string id)
{
- return Run (Peeker (ById (id)));
+ return DoReceive (noTime, ById (id), false);
}
public IMessage PeekById (string id, TimeSpan timeout)
{
- return Run (Peeker (timeout, ById (id)));
+ return DoReceive (timeout, ById (id), false);
}
public IMessage PeekByCorrelationId (string id)
{
- return Run (Peeker (ByCorrelationId (id)));
+ return DoReceive (noTime, ByCorrelationId (id), false);
}
public IMessage PeekByCorrelationId (string id, TimeSpan timeout)
{
- return Run (Peeker (timeout, ByCorrelationId (id)));
+ return DoReceive (timeout, ByCorrelationId (id), false);
}
public IMessage Receive ()
{
- return Run (Receiver ());
+ return DoReceive (TimeSpan.MaxValue, null, true);
}
public IMessage Receive (TimeSpan timeout)
{
- return Run (Receiver (timeout));
+ return DoReceive (timeout, null, true);
}
public IMessage Receive (TimeSpan timeout,
IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver (timeout));
+ return DoReceive (transaction, timeout, null, true);
}
public IMessage Receive (TimeSpan timeout,
MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver (timeout));
+ return DoReceive (transactionType, timeout, null, true);
}
public IMessage Receive (IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver());
+ return DoReceive (transaction, TimeSpan.MaxValue, null, true);
}
public IMessage Receive (MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver ());
- }
+ return DoReceive (transactionType, TimeSpan.MaxValue, null, true);
+ }
public IMessage ReceiveById (string id)
{
- return Run (Receiver (ById (id)));
+ return DoReceive (noTime, ById (id), true);
}
public IMessage ReceiveById (string id, TimeSpan timeout)
{
- return Run (Receiver (timeout, ById (id)));
+ return DoReceive (timeout, ById (id), true);
}
public IMessage ReceiveById (string id,
IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver (ById (id)));
+ return DoReceive (transaction, noTime, ById (id), true);
}
public IMessage ReceiveById (string id,
MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver (ById (id)));
+ return DoReceive (transactionType, noTime, ById (id), true);
}
public IMessage ReceiveById (string id, TimeSpan timeout,
IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver (timeout, ById (id)));
+ return DoReceive (transaction, timeout, ById (id), true);
}
public IMessage ReceiveById (string id, TimeSpan timeout,
MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver (timeout, ById (id)));
+ return DoReceive (transactionType, timeout, ById (id), true);
}
public IMessage ReceiveByCorrelationId (string id)
{
- return Run (Receiver (ByCorrelationId (id)));
+ return DoReceive (noTime, ByCorrelationId (id), true);
}
public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout)
{
- return Run (Receiver (timeout, ByCorrelationId (id)));
+ return DoReceive (timeout, ByCorrelationId (id), true);
}
public IMessage ReceiveByCorrelationId (string id,
IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver (ByCorrelationId (id)));
+ return DoReceive (transaction, noTime, ByCorrelationId (id), true);
}
public IMessage ReceiveByCorrelationId (string id,
MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver (ByCorrelationId (id)));
+ return DoReceive (transactionType, noTime, ByCorrelationId (id), true);
}
public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
IMessageQueueTransaction transaction)
{
- return Run (transaction, Receiver (timeout, ByCorrelationId (id)));
+ return DoReceive (transaction, timeout, ByCorrelationId (id), true);
}
public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout,
MessageQueueTransactionType transactionType)
{
- return Run (transactionType, Receiver (timeout, ByCorrelationId (id)));
+ return DoReceive (transactionType, timeout, ByCorrelationId (id), true);
}
public IMessageEnumerator GetMessageEnumerator ()
@@ -438,18 +392,15 @@ namespace Mono.Messaging.RabbitMQ {
return new RabbitMQMessageEnumerator (helper, QRef);
}
- private delegate IMessage RecieveDelegate (RabbitMQMessageQueue q,
- IModel model);
-
- private IMessage Run (MessageQueueTransactionType transactionType,
- RecieveDelegate r)
+ private IMessage DoReceive (MessageQueueTransactionType transactionType,
+ TimeSpan timeout, IsMatch matcher, bool ack)
{
switch (transactionType) {
case MessageQueueTransactionType.Single:
- using (RabbitMQMessageQueueTransaction tx = GetTx ()) {
+ using (RabbitMQMessageQueueTransaction tx = NewTx ()) {
bool success = false;
try {
- IMessage msg = Run (tx, r);
+ IMessage msg = DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
tx.Commit ();
success = true;
return msg;
@@ -460,133 +411,39 @@ namespace Mono.Messaging.RabbitMQ {
}
case MessageQueueTransactionType.None:
- return Run (r);
+ return DoReceive (timeout, matcher, true);
default:
- throw new NotSupportedException(transactionType + " not supported");
- }
- }
-
- private IMessage Run (IMessageQueueTransaction transaction,
- RecieveDelegate r)
- {
- TxReceiver txr = new TxReceiver (this, r);
- RabbitMQMessageQueueTransaction tx =
- (RabbitMQMessageQueueTransaction) transaction;
- return tx.RunReceive (txr.ReceiveInContext);
- }
-
- private IMessage Run (RecieveDelegate r)
- {
- using (IConnection cn = CreateConnection (QRef)) {
- using (IModel model = cn.CreateModel ()) {
- return r (this, model);
- }
+ throw new NotSupportedException (transactionType + " not supported");
}
}
- private class TxReceiver
+ private IMessage DoReceive (IMessageQueueTransaction transaction,
+ TimeSpan timeout, IsMatch matcher, bool ack)
{
- private readonly RecieveDelegate doReceive;
- private readonly RabbitMQMessageQueue q;
-
- public TxReceiver(RabbitMQMessageQueue q, RecieveDelegate doReceive) {
- this.q = q;
- this.doReceive = doReceive;
- }
-
- public IMessage ReceiveInContext (ref string host, ref IConnection cn,
- ref IModel model, string txId)
- {
- if (host == null)
- host = q.QRef.Host;
- else if (host != q.QRef.Host)
- throw new MonoMessagingException ("Transactions can not span multiple hosts");
-
- if (cn == null)
- cn = CreateConnection (q.QRef);
-
- if (model == null) {
- model = cn.CreateModel ();
- model.TxSelect ();
- }
-
- return doReceive (q, model);
- }
+ RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction;
+ return DoReceive ((IMessagingContext) tx, timeout, matcher, ack);
}
- private class RecieveDelegateFactory
+ private IMessage DoReceive (TimeSpan timeout, IsMatch matcher, bool ack)
{
- private readonly int timeout;
- private readonly IsMatch matcher;
- private readonly bool ack;
-
- public RecieveDelegateFactory (int timeout, IsMatch matcher)
- : this (timeout, matcher, true)
- {
- }
-
- public RecieveDelegateFactory (int timeout, IsMatch matcher, bool ack)
- {
- if (matcher != null && timeout == -1)
- this.timeout = 500;
- else
- this.timeout = timeout;
- this.matcher = matcher;
- this.ack = ack;
- }
-
- public IMessage RecieveDelegate (RabbitMQMessageQueue q, IModel model)
- {
- if (matcher == null)
- return q.Receive (model, timeout, ack);
- else
- return q.Receive (model, timeout, ack, matcher);
+ using (IMessagingContext context = CurrentContext) {
+ return DoReceive (context, timeout, matcher, ack);
}
}
- private static RecieveDelegate Receiver (TimeSpan timeout,
- IsMatch matcher)
- {
- int to = MessageFactory.TimeSpanToInt32 (timeout);
- return new RecieveDelegateFactory (to, matcher).RecieveDelegate;
- }
-
- private static RecieveDelegate Receiver (IsMatch matcher)
- {
- return new RecieveDelegateFactory (-1, matcher).RecieveDelegate;
- }
-
- private static RecieveDelegate Receiver (TimeSpan timeout)
+ private IMessage DoReceive (IMessagingContext context, TimeSpan timeout,
+ IsMatch matcher, bool ack)
{
- int to = MessageFactory.TimeSpanToInt32 (timeout);
- return new RecieveDelegateFactory (to, null).RecieveDelegate;
+ return context.Receive (QRef, timeout, matcher, ack);
}
-
- private RecieveDelegate Receiver ()
- {
- return new RecieveDelegateFactory (-1, null).RecieveDelegate;
- }
-
- private RecieveDelegate Peeker (TimeSpan timeout)
- {
- int to = MessageFactory.TimeSpanToInt32 (timeout);
- return new RecieveDelegateFactory (to, null, false).RecieveDelegate;
- }
-
- private RecieveDelegate Peeker (IsMatch matcher)
- {
- return new RecieveDelegateFactory (-1, matcher, false).RecieveDelegate;
- }
- private RecieveDelegate Peeker (TimeSpan timeout, IsMatch matcher)
- {
- int to = MessageFactory.TimeSpanToInt32 (timeout);
- return new RecieveDelegateFactory (to, matcher, false).RecieveDelegate;
+ private IMessagingContext CurrentContext {
+ get {
+ return provider.CreateContext (qRef.Host);
+ }
}
- delegate bool IsMatch (BasicDeliverEventArgs result);
-
private class IdMatcher
{
private readonly string id;
@@ -625,54 +482,9 @@ namespace Mono.Messaging.RabbitMQ {
return new CorrelationIdMatcher (correlationId).MatchById;
}
- private IMessage Receive (IModel model, int timeout, bool doAck)
- {
- string finalName = model.QueueDeclare (QRef.Queue, false);
-
- using (Subscription sub = new Subscription (model, finalName)) {
- BasicDeliverEventArgs result;
- if (sub.Next (timeout, out result)) {
- IMessage m = helper.ReadMessage (QRef, result);
- if (doAck)
- sub.Ack (result);
- return m;
- } else {
- throw new MonoMessagingException ("No Message Available");
- }
- }
- }
-
- private IMessage Receive (IModel model, int timeout,
- bool doAck, IsMatch matcher)
- {
- string finalName = model.QueueDeclare (QRef.Queue, false);
-
- using (Subscription sub = new Subscription (model, finalName)) {
- BasicDeliverEventArgs result;
- while (sub.Next (timeout, out result)) {
-
- if (matcher (result)) {
- IMessage m = helper.ReadMessage (QRef, result);
- if (doAck)
- sub.Ack (result);
- return m;
- }
- }
-
- throw new MessageUnavailableException ("Message not available");
- }
- }
-
- private RabbitMQMessageQueueTransaction GetTx ()
+ private RabbitMQMessageQueueTransaction NewTx ()
{
return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction ();
}
-
- private static IConnection CreateConnection (QueueReference qRef)
- {
- ConnectionFactory cf = new ConnectionFactory ();
- cf.Address = qRef.Host;
- return cf.CreateConnection ();
- }
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
index 52e3a93cf6c..5834bedb391 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs
@@ -45,19 +45,73 @@ using RabbitMQ.Util;
namespace Mono.Messaging.RabbitMQ {
- public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction {
+ public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction, IMessagingContext {
private readonly string txId;
+ private readonly MessagingContextPool contextPool;
+
private MessageQueueTransactionStatus status = MessageQueueTransactionStatus.Initialized;
- private IConnection cn = null;
- private IModel model = null;
private String host = null;
private bool isDisposed = false;
private Object syncObj = new Object ();
+ private bool isActive = false;
+ private MessagingContext context = null;
- public RabbitMQMessageQueueTransaction (string txId)
+ public RabbitMQMessageQueueTransaction (string txId, MessagingContextPool contextPool)
{
this.txId = txId;
+ this.contextPool = contextPool;
+ }
+
+ private IModel Model {
+ get { return Context.Model; }
+ }
+
+ private IConnection Connection {
+ get { return Context.Connection; }
+ }
+
+ private MessagingContext Context {
+ get {
+ if (null == context) {
+ context = contextPool.GetContext (host);
+ }
+ return context;
+ }
+ }
+
+ public IMessage Receive (QueueReference qRef, TimeSpan timeout, IsMatch matcher, bool ack)
+ {
+ lock (syncObj) {
+ ValidateHost (qRef);
+
+ Model.TxSelect ();
+ isActive = true;
+
+ return Context.Receive (qRef, timeout, matcher, ack);
+ }
+ }
+
+ public void Send (QueueReference qRef, IMessage msg)
+ {
+ lock (syncObj) {
+ ValidateHost (qRef);
+
+ Model.TxSelect ();
+ isActive = true;
+
+
+ Context.Send (qRef, msg);
+ }
+ }
+
+ private void ValidateHost (QueueReference qRef)
+ {
+ if (null == host) {
+ host = qRef.Host;
+ } else if (host != qRef.Host) {
+ throw new MonoMessagingException ("Transactions can not span multiple hosts");
+ }
}
public MessageQueueTransactionStatus Status {
@@ -70,8 +124,8 @@ namespace Mono.Messaging.RabbitMQ {
public void Abort ()
{
lock (syncObj) {
- if (model != null)
- model.TxRollback ();
+ if (isActive)
+ Context.Model.TxRollback ();
status = MessageQueueTransactionStatus.Aborted;
}
}
@@ -88,35 +142,29 @@ namespace Mono.Messaging.RabbitMQ {
public void Commit ()
{
lock (syncObj) {
- model.TxCommit ();
+ Context.Model.TxCommit ();
status = MessageQueueTransactionStatus.Committed;
}
}
- public string Id {
- get { return txId; }
- }
-
- public delegate void Send (ref string host, ref IConnection cn,
- ref IModel model, IMessage msg, string txId);
-
- public delegate IMessage Receive (ref string host, ref IConnection cn,
- ref IModel model, string txId);
-
- public void RunSend (Send sendDelegate, IMessage msg)
+ public void Delete (QueueReference qRef)
{
lock (syncObj) {
- sendDelegate (ref host, ref cn, ref model, msg, Id);
+ Context.Delete (qRef);
}
}
- public IMessage RunReceive (Receive receiveDelegate)
+ public void Purge (QueueReference qRef)
{
lock (syncObj) {
- return receiveDelegate (ref host, ref cn, ref model, Id);
+ Context.Purge (qRef);
}
}
+ public string Id {
+ get { return txId; }
+ }
+
public void Dispose ()
{
Dispose (true);
@@ -127,14 +175,11 @@ namespace Mono.Messaging.RabbitMQ {
{
lock (syncObj) {
if (!isDisposed && disposing) {
- if (model != null)
- model.Dispose ();
- if (cn != null)
- cn.Dispose ();
+ if (context != null)
+ context.Dispose ();
isDisposed = true;
}
}
}
-
}
}
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
index 90aaf7101a5..41644a3505a 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
@@ -36,21 +36,25 @@ using System.Threading;
using Mono.Messaging;
+using RabbitMQ.Client;
+
namespace Mono.Messaging.RabbitMQ {
public class RabbitMQMessagingProvider : IMessagingProvider {
private int txCounter = 0;
private readonly uint localIp;
+ private readonly MessagingContextPool contextPool;
- public RabbitMQMessagingProvider()
+ public RabbitMQMessagingProvider ()
{
localIp = GetLocalIP ();
+ contextPool = new MessagingContextPool (new MessageFactory (this),
+ CreateConnection);
}
private static uint GetLocalIP ()
{
- //IPHostEntry host = Dns.GetHostEntry (Dns.GetHostName ());
String strHostName = Dns.GetHostName ();
IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
foreach (IPAddress ip in ipEntry.AddressList) {
@@ -74,8 +78,21 @@ namespace Mono.Messaging.RabbitMQ {
public IMessageQueueTransaction CreateMessageQueueTransaction ()
{
Interlocked.Increment (ref txCounter);
- string txId = localIp.ToString () + txCounter.ToString ();
- return new RabbitMQMessageQueueTransaction (txId);
+ string txId = localIp.ToString () + txCounter.ToString ();
+
+ return new RabbitMQMessageQueueTransaction (txId, contextPool);
+ }
+
+ public IMessagingContext CreateContext (string host)
+ {
+ return contextPool.GetContext (host);
+ }
+
+ private IConnection CreateConnection (string host)
+ {
+ ConnectionFactory cf = new ConnectionFactory ();
+ cf.Address = host;
+ return cf.CreateConnection ();
}
public void DeleteQueue (QueueReference qRef)
@@ -116,8 +133,7 @@ namespace Mono.Messaging.RabbitMQ {
{
qLock.AcquireWriterLock (TIMEOUT);
try {
- IMessageQueue mq = new RabbitMQMessageQueue (this, qRef,
- transactional);
+ IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional);
queues[qRef] = mq;
return mq;
} finally {
@@ -134,8 +150,7 @@ namespace Mono.Messaging.RabbitMQ {
else {
LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
try {
- IMessageQueue mq = new RabbitMQMessageQueue (this, qRef,
- false);
+ IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false);
queues[qRef] = mq;
return mq;
} finally {
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
index c0ad4d9bcaf..ec10f2b5102 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources
@@ -4,5 +4,6 @@ Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs
Mono.Messaging.RabbitMQ/XmlMessageFormatterTest.cs
Mono.Messaging.RabbitMQ/TestUtils.cs
Mono.Messaging.RabbitMQ/MessageTest.cs
+Mono.Messaging.RabbitMQ/MessagingContextPoolTest.cs
Mono.Messaging.RabbitMQ/MessageBaseTest.cs
Mono.Messaging.RabbitMQ/MessageEnumeratorExceptionTest.cs
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs
index 345a4cd3bd4..13a22251856 100644
--- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs
+++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs
@@ -41,7 +41,6 @@ namespace MonoTests.Mono.Messaging.RabbitMQ
[TestFixture]
public class BinaryMessageFormatterTest
{
-
DynamicMock mock1;
IMessage msg1;
DynamicMock mock2;