diff options
author | barkerm <michael.barker@lmax.com> | 2011-01-16 16:32:24 +0300 |
---|---|---|
committer | Miguel de Icaza <miguel@gnome.org> | 2011-01-16 20:06:56 +0300 |
commit | 2b516ade6dfef6edb139fc2d5b936f00c31e9b99 (patch) | |
tree | 0cadffcd70db29a67d9d95e53ab32d150f5749bc /mcs/class/Mono.Messaging.RabbitMQ | |
parent | 8566c056b285ac433fa0d00b82935d50350b99a4 (diff) |
- Refactoring, code clean-up and reduced garbage - Introduce connection pooling for improved performance
Diffstat (limited to 'mcs/class/Mono.Messaging.RabbitMQ')
8 files changed, 172 insertions, 297 deletions
diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources index 193d4813a17..07619307d66 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ.dll.sources @@ -1,7 +1,10 @@ ./Assembly/AssemblyInfo.cs ../../build/common/Consts.cs ../../build/common/Locale.cs +./Mono.Messaging.RabbitMQ/IMessagingContext.cs ./Mono.Messaging.RabbitMQ/MessageFactory.cs +./Mono.Messaging.RabbitMQ/MessagingContext.cs +./Mono.Messaging.RabbitMQ/MessagingContextPool.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs ./Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs index fa9d44d5f4c..228b3ec0824 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/MessageFactory.cs @@ -220,7 +220,7 @@ namespace Mono.Messaging.RabbitMQ { return epoch.AddSeconds (ats.UnixTime).ToLocalTime (); } - public static int TimeSpanToInt32 (TimeSpan timespan) + public static int TimeSpanToMillis (TimeSpan timespan) { if (timespan == TimeSpan.MaxValue) return -1; diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs index c96c62f82fa..bf4b0580364 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageEnumerator.cs @@ -139,8 +139,8 @@ namespace Mono.Messaging.RabbitMQ { public bool MoveNext (TimeSpan timeout) { - int to = MessageFactory.TimeSpanToInt32 (timeout); - return Subscription.Next (to, out current); + int timeoutMillis = MessageFactory.TimeSpanToMillis (timeout); + return Subscription.Next (timeoutMillis, out current); } public IMessage RemoveCurrent () @@ -178,12 +178,12 @@ namespace Mono.Messaging.RabbitMQ { public IMessage RemoveCurrent (TimeSpan timeout, IMessageQueueTransaction transaction) { - throw new NotImplementedException (); + throw new NotSupportedException ("Unable to remove messages within a transaction"); } public IMessage RemoveCurrent (TimeSpan timeout, MessageQueueTransactionType transactionType) { - throw new NotImplementedException (); + throw new NotSupportedException ("Unable to remove messages within a transaction"); } private IMessage CreateMessage (BasicDeliverEventArgs result) diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs index 5c742d1ad0b..23d5a28df23 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueue.cs @@ -50,6 +50,11 @@ namespace Mono.Messaging.RabbitMQ { /// </summary> public class RabbitMQMessageQueue : MessageQueueBase, IMessageQueue { + private readonly RabbitMQMessagingProvider provider; + private readonly MessageFactory helper; + private readonly bool transactional; + private readonly TimeSpan noTime = new TimeSpan(0, 0, 0, 0, 500); + private bool authenticate = false; private short basePriority = 0; private Guid category = Guid.Empty; @@ -60,9 +65,6 @@ namespace Mono.Messaging.RabbitMQ { private ISynchronizeInvoke synchronizingObject = null; private bool useJournalQueue = false; private QueueReference qRef = QueueReference.DEFAULT; - private readonly RabbitMQMessagingProvider provider; - private readonly MessageFactory helper; - private readonly bool transactional; public RabbitMQMessageQueue (RabbitMQMessagingProvider provider, bool transactional) @@ -166,16 +168,7 @@ namespace Mono.Messaging.RabbitMQ { set { qRef = value; } } - private static long GetVersion (IConnection cn) - { - long version = cn.Protocol.MajorVersion; - version = version << 32; - version += cn.Protocol.MinorVersion; - return version; - } - - private void SetDeliveryInfo (IMessage msg, long senderVersion, - string transactionId) + private void SetDeliveryInfo (IMessage msg, string transactionId) { msg.SetDeliveryInfo (Acknowledgment.None, DateTime.MinValue, @@ -183,7 +176,7 @@ namespace Mono.Messaging.RabbitMQ { Guid.NewGuid ().ToString () + "\\0", MessageType.Normal, new byte[0], - senderVersion, + 0, DateTime.UtcNow, null, transactionId); @@ -191,15 +184,13 @@ namespace Mono.Messaging.RabbitMQ { public void Close () { - // No-op (Queue are currently stateless) } public static void Delete (QueueReference qRef) { - using (IConnection cn = CreateConnection (qRef)) { - using (IModel model = cn.CreateModel ()) { - model.QueueDelete (qRef.Queue, false, false, false); - } + RabbitMQMessagingProvider provider = (RabbitMQMessagingProvider) MessagingProviderLocator.GetProvider (); + using (IMessagingContext context = provider.CreateContext (qRef.Host)) { + context.Delete (qRef); } } @@ -209,17 +200,15 @@ namespace Mono.Messaging.RabbitMQ { throw new MonoMessagingException ("Path has not been specified"); if (msg.BodyStream == null) - throw new ArgumentException ("Message is not serialized properly"); - - try { - using (IConnection cn = CreateConnection (QRef)) { - SetDeliveryInfo (msg, GetVersion (cn), null); - using (IModel ch = cn.CreateModel ()) { - Send (ch, msg); - } + throw new ArgumentException ("BodyStream is null, Message is not serialized properly"); + + using (IMessagingContext context = CurrentContext) { + try { + SetDeliveryInfo (msg, null); + context.Send (QRef, msg); + } catch (BrokerUnreachableException e) { + throw new ConnectionException (QRef, e); } - } catch (BrokerUnreachableException e) { - throw new ConnectionException (QRef, e); } } @@ -233,14 +222,15 @@ namespace Mono.Messaging.RabbitMQ { RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction; - tx.RunSend (SendInContext, msg); + SetDeliveryInfo (msg, tx.Id); + tx.Send (QRef, msg); } public void Send (IMessage msg, MessageQueueTransactionType transactionType) { switch (transactionType) { case MessageQueueTransactionType.Single: - using (IMessageQueueTransaction tx = provider.CreateMessageQueueTransaction ()) { + using (IMessageQueueTransaction tx = NewTx ()) { try { Send (msg, tx); tx.Commit (); @@ -260,177 +250,141 @@ namespace Mono.Messaging.RabbitMQ { } } - private void SendInContext (ref string host, ref IConnection cn, - ref IModel model, IMessage msg, string txId) - { - if (host == null) - host = QRef.Host; - else if (host != QRef.Host) - throw new MonoMessagingException ("Transactions can not span multiple hosts"); - - if (cn == null) - cn = CreateConnection (QRef); - - if (model == null) { - model = cn.CreateModel (); - model.TxSelect (); - } - - SetDeliveryInfo (msg, GetVersion (cn), txId); - Send (model, msg); - } - - private void Send (IModel model, IMessage msg) - { - string finalName = model.QueueDeclare (QRef.Queue, false); - IMessageBuilder mb = helper.WriteMessage (model, msg); - - model.BasicPublish ("", finalName, - (IBasicProperties) mb.GetContentHeader(), - mb.GetContentBody ()); - } - public void Purge () { - using (IConnection cn = CreateConnection (QRef)) { - using (IModel model = cn.CreateModel ()) { - model.QueuePurge (QRef.Queue, false); - } + using (IMessagingContext context = CurrentContext) { + context.Purge (QRef); } } public IMessage Peek () { - using (IConnection cn = CreateConnection (QRef)) { - using (IModel ch = cn.CreateModel ()) { - return Receive (ch, -1, false); - } - } + return DoReceive (TimeSpan.MaxValue, null, false); } public IMessage Peek (TimeSpan timeout) { - return Run (Peeker (timeout)); + return DoReceive (timeout, null, false); } public IMessage PeekById (string id) { - return Run (Peeker (ById (id))); + return DoReceive (noTime, ById (id), false); } public IMessage PeekById (string id, TimeSpan timeout) { - return Run (Peeker (timeout, ById (id))); + return DoReceive (timeout, ById (id), false); } public IMessage PeekByCorrelationId (string id) { - return Run (Peeker (ByCorrelationId (id))); + return DoReceive (noTime, ByCorrelationId (id), false); } public IMessage PeekByCorrelationId (string id, TimeSpan timeout) { - return Run (Peeker (timeout, ByCorrelationId (id))); + return DoReceive (timeout, ByCorrelationId (id), false); } public IMessage Receive () { - return Run (Receiver ()); + return DoReceive (TimeSpan.MaxValue, null, true); } public IMessage Receive (TimeSpan timeout) { - return Run (Receiver (timeout)); + return DoReceive (timeout, null, true); } public IMessage Receive (TimeSpan timeout, IMessageQueueTransaction transaction) { - return Run (transaction, Receiver (timeout)); + return DoReceive (transaction, timeout, null, true); } public IMessage Receive (TimeSpan timeout, MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver (timeout)); + return DoReceive (transactionType, timeout, null, true); } public IMessage Receive (IMessageQueueTransaction transaction) { - return Run (transaction, Receiver()); + return DoReceive (transaction, TimeSpan.MaxValue, null, true); } public IMessage Receive (MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver ()); - } + return DoReceive (transactionType, TimeSpan.MaxValue, null, true); + } public IMessage ReceiveById (string id) { - return Run (Receiver (ById (id))); + return DoReceive (noTime, ById (id), true); } public IMessage ReceiveById (string id, TimeSpan timeout) { - return Run (Receiver (timeout, ById (id))); + return DoReceive (timeout, ById (id), true); } public IMessage ReceiveById (string id, IMessageQueueTransaction transaction) { - return Run (transaction, Receiver (ById (id))); + return DoReceive (transaction, noTime, ById (id), true); } public IMessage ReceiveById (string id, MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver (ById (id))); + return DoReceive (transactionType, noTime, ById (id), true); } public IMessage ReceiveById (string id, TimeSpan timeout, IMessageQueueTransaction transaction) { - return Run (transaction, Receiver (timeout, ById (id))); + return DoReceive (transaction, timeout, ById (id), true); } public IMessage ReceiveById (string id, TimeSpan timeout, MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver (timeout, ById (id))); + return DoReceive (transactionType, timeout, ById (id), true); } public IMessage ReceiveByCorrelationId (string id) { - return Run (Receiver (ByCorrelationId (id))); + return DoReceive (noTime, ByCorrelationId (id), true); } public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout) { - return Run (Receiver (timeout, ByCorrelationId (id))); + return DoReceive (timeout, ByCorrelationId (id), true); } public IMessage ReceiveByCorrelationId (string id, IMessageQueueTransaction transaction) { - return Run (transaction, Receiver (ByCorrelationId (id))); + return DoReceive (transaction, noTime, ByCorrelationId (id), true); } public IMessage ReceiveByCorrelationId (string id, MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver (ByCorrelationId (id))); + return DoReceive (transactionType, noTime, ByCorrelationId (id), true); } public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout, IMessageQueueTransaction transaction) { - return Run (transaction, Receiver (timeout, ByCorrelationId (id))); + return DoReceive (transaction, timeout, ByCorrelationId (id), true); } public IMessage ReceiveByCorrelationId (string id, TimeSpan timeout, MessageQueueTransactionType transactionType) { - return Run (transactionType, Receiver (timeout, ByCorrelationId (id))); + return DoReceive (transactionType, timeout, ByCorrelationId (id), true); } public IMessageEnumerator GetMessageEnumerator () @@ -438,18 +392,15 @@ namespace Mono.Messaging.RabbitMQ { return new RabbitMQMessageEnumerator (helper, QRef); } - private delegate IMessage RecieveDelegate (RabbitMQMessageQueue q, - IModel model); - - private IMessage Run (MessageQueueTransactionType transactionType, - RecieveDelegate r) + private IMessage DoReceive (MessageQueueTransactionType transactionType, + TimeSpan timeout, IsMatch matcher, bool ack) { switch (transactionType) { case MessageQueueTransactionType.Single: - using (RabbitMQMessageQueueTransaction tx = GetTx ()) { + using (RabbitMQMessageQueueTransaction tx = NewTx ()) { bool success = false; try { - IMessage msg = Run (tx, r); + IMessage msg = DoReceive ((IMessagingContext) tx, timeout, matcher, ack); tx.Commit (); success = true; return msg; @@ -460,133 +411,39 @@ namespace Mono.Messaging.RabbitMQ { } case MessageQueueTransactionType.None: - return Run (r); + return DoReceive (timeout, matcher, true); default: - throw new NotSupportedException(transactionType + " not supported"); - } - } - - private IMessage Run (IMessageQueueTransaction transaction, - RecieveDelegate r) - { - TxReceiver txr = new TxReceiver (this, r); - RabbitMQMessageQueueTransaction tx = - (RabbitMQMessageQueueTransaction) transaction; - return tx.RunReceive (txr.ReceiveInContext); - } - - private IMessage Run (RecieveDelegate r) - { - using (IConnection cn = CreateConnection (QRef)) { - using (IModel model = cn.CreateModel ()) { - return r (this, model); - } + throw new NotSupportedException (transactionType + " not supported"); } } - private class TxReceiver + private IMessage DoReceive (IMessageQueueTransaction transaction, + TimeSpan timeout, IsMatch matcher, bool ack) { - private readonly RecieveDelegate doReceive; - private readonly RabbitMQMessageQueue q; - - public TxReceiver(RabbitMQMessageQueue q, RecieveDelegate doReceive) { - this.q = q; - this.doReceive = doReceive; - } - - public IMessage ReceiveInContext (ref string host, ref IConnection cn, - ref IModel model, string txId) - { - if (host == null) - host = q.QRef.Host; - else if (host != q.QRef.Host) - throw new MonoMessagingException ("Transactions can not span multiple hosts"); - - if (cn == null) - cn = CreateConnection (q.QRef); - - if (model == null) { - model = cn.CreateModel (); - model.TxSelect (); - } - - return doReceive (q, model); - } + RabbitMQMessageQueueTransaction tx = (RabbitMQMessageQueueTransaction) transaction; + return DoReceive ((IMessagingContext) tx, timeout, matcher, ack); } - private class RecieveDelegateFactory + private IMessage DoReceive (TimeSpan timeout, IsMatch matcher, bool ack) { - private readonly int timeout; - private readonly IsMatch matcher; - private readonly bool ack; - - public RecieveDelegateFactory (int timeout, IsMatch matcher) - : this (timeout, matcher, true) - { - } - - public RecieveDelegateFactory (int timeout, IsMatch matcher, bool ack) - { - if (matcher != null && timeout == -1) - this.timeout = 500; - else - this.timeout = timeout; - this.matcher = matcher; - this.ack = ack; - } - - public IMessage RecieveDelegate (RabbitMQMessageQueue q, IModel model) - { - if (matcher == null) - return q.Receive (model, timeout, ack); - else - return q.Receive (model, timeout, ack, matcher); + using (IMessagingContext context = CurrentContext) { + return DoReceive (context, timeout, matcher, ack); } } - private static RecieveDelegate Receiver (TimeSpan timeout, - IsMatch matcher) - { - int to = MessageFactory.TimeSpanToInt32 (timeout); - return new RecieveDelegateFactory (to, matcher).RecieveDelegate; - } - - private static RecieveDelegate Receiver (IsMatch matcher) - { - return new RecieveDelegateFactory (-1, matcher).RecieveDelegate; - } - - private static RecieveDelegate Receiver (TimeSpan timeout) + private IMessage DoReceive (IMessagingContext context, TimeSpan timeout, + IsMatch matcher, bool ack) { - int to = MessageFactory.TimeSpanToInt32 (timeout); - return new RecieveDelegateFactory (to, null).RecieveDelegate; + return context.Receive (QRef, timeout, matcher, ack); } - - private RecieveDelegate Receiver () - { - return new RecieveDelegateFactory (-1, null).RecieveDelegate; - } - - private RecieveDelegate Peeker (TimeSpan timeout) - { - int to = MessageFactory.TimeSpanToInt32 (timeout); - return new RecieveDelegateFactory (to, null, false).RecieveDelegate; - } - - private RecieveDelegate Peeker (IsMatch matcher) - { - return new RecieveDelegateFactory (-1, matcher, false).RecieveDelegate; - } - private RecieveDelegate Peeker (TimeSpan timeout, IsMatch matcher) - { - int to = MessageFactory.TimeSpanToInt32 (timeout); - return new RecieveDelegateFactory (to, matcher, false).RecieveDelegate; + private IMessagingContext CurrentContext { + get { + return provider.CreateContext (qRef.Host); + } } - delegate bool IsMatch (BasicDeliverEventArgs result); - private class IdMatcher { private readonly string id; @@ -625,54 +482,9 @@ namespace Mono.Messaging.RabbitMQ { return new CorrelationIdMatcher (correlationId).MatchById; } - private IMessage Receive (IModel model, int timeout, bool doAck) - { - string finalName = model.QueueDeclare (QRef.Queue, false); - - using (Subscription sub = new Subscription (model, finalName)) { - BasicDeliverEventArgs result; - if (sub.Next (timeout, out result)) { - IMessage m = helper.ReadMessage (QRef, result); - if (doAck) - sub.Ack (result); - return m; - } else { - throw new MonoMessagingException ("No Message Available"); - } - } - } - - private IMessage Receive (IModel model, int timeout, - bool doAck, IsMatch matcher) - { - string finalName = model.QueueDeclare (QRef.Queue, false); - - using (Subscription sub = new Subscription (model, finalName)) { - BasicDeliverEventArgs result; - while (sub.Next (timeout, out result)) { - - if (matcher (result)) { - IMessage m = helper.ReadMessage (QRef, result); - if (doAck) - sub.Ack (result); - return m; - } - } - - throw new MessageUnavailableException ("Message not available"); - } - } - - private RabbitMQMessageQueueTransaction GetTx () + private RabbitMQMessageQueueTransaction NewTx () { return (RabbitMQMessageQueueTransaction) provider.CreateMessageQueueTransaction (); } - - private static IConnection CreateConnection (QueueReference qRef) - { - ConnectionFactory cf = new ConnectionFactory (); - cf.Address = qRef.Host; - return cf.CreateConnection (); - } } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs index 52e3a93cf6c..5834bedb391 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessageQueueTransaction.cs @@ -45,19 +45,73 @@ using RabbitMQ.Util; namespace Mono.Messaging.RabbitMQ { - public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction { + public class RabbitMQMessageQueueTransaction : IMessageQueueTransaction, IMessagingContext { private readonly string txId; + private readonly MessagingContextPool contextPool; + private MessageQueueTransactionStatus status = MessageQueueTransactionStatus.Initialized; - private IConnection cn = null; - private IModel model = null; private String host = null; private bool isDisposed = false; private Object syncObj = new Object (); + private bool isActive = false; + private MessagingContext context = null; - public RabbitMQMessageQueueTransaction (string txId) + public RabbitMQMessageQueueTransaction (string txId, MessagingContextPool contextPool) { this.txId = txId; + this.contextPool = contextPool; + } + + private IModel Model { + get { return Context.Model; } + } + + private IConnection Connection { + get { return Context.Connection; } + } + + private MessagingContext Context { + get { + if (null == context) { + context = contextPool.GetContext (host); + } + return context; + } + } + + public IMessage Receive (QueueReference qRef, TimeSpan timeout, IsMatch matcher, bool ack) + { + lock (syncObj) { + ValidateHost (qRef); + + Model.TxSelect (); + isActive = true; + + return Context.Receive (qRef, timeout, matcher, ack); + } + } + + public void Send (QueueReference qRef, IMessage msg) + { + lock (syncObj) { + ValidateHost (qRef); + + Model.TxSelect (); + isActive = true; + + + Context.Send (qRef, msg); + } + } + + private void ValidateHost (QueueReference qRef) + { + if (null == host) { + host = qRef.Host; + } else if (host != qRef.Host) { + throw new MonoMessagingException ("Transactions can not span multiple hosts"); + } } public MessageQueueTransactionStatus Status { @@ -70,8 +124,8 @@ namespace Mono.Messaging.RabbitMQ { public void Abort () { lock (syncObj) { - if (model != null) - model.TxRollback (); + if (isActive) + Context.Model.TxRollback (); status = MessageQueueTransactionStatus.Aborted; } } @@ -88,35 +142,29 @@ namespace Mono.Messaging.RabbitMQ { public void Commit () { lock (syncObj) { - model.TxCommit (); + Context.Model.TxCommit (); status = MessageQueueTransactionStatus.Committed; } } - public string Id { - get { return txId; } - } - - public delegate void Send (ref string host, ref IConnection cn, - ref IModel model, IMessage msg, string txId); - - public delegate IMessage Receive (ref string host, ref IConnection cn, - ref IModel model, string txId); - - public void RunSend (Send sendDelegate, IMessage msg) + public void Delete (QueueReference qRef) { lock (syncObj) { - sendDelegate (ref host, ref cn, ref model, msg, Id); + Context.Delete (qRef); } } - public IMessage RunReceive (Receive receiveDelegate) + public void Purge (QueueReference qRef) { lock (syncObj) { - return receiveDelegate (ref host, ref cn, ref model, Id); + Context.Purge (qRef); } } + public string Id { + get { return txId; } + } + public void Dispose () { Dispose (true); @@ -127,14 +175,11 @@ namespace Mono.Messaging.RabbitMQ { { lock (syncObj) { if (!isDisposed && disposing) { - if (model != null) - model.Dispose (); - if (cn != null) - cn.Dispose (); + if (context != null) + context.Dispose (); isDisposed = true; } } } - } } diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs index 90aaf7101a5..41644a3505a 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs @@ -36,21 +36,25 @@ using System.Threading; using Mono.Messaging; +using RabbitMQ.Client; + namespace Mono.Messaging.RabbitMQ { public class RabbitMQMessagingProvider : IMessagingProvider { private int txCounter = 0; private readonly uint localIp; + private readonly MessagingContextPool contextPool; - public RabbitMQMessagingProvider() + public RabbitMQMessagingProvider () { localIp = GetLocalIP (); + contextPool = new MessagingContextPool (new MessageFactory (this), + CreateConnection); } private static uint GetLocalIP () { - //IPHostEntry host = Dns.GetHostEntry (Dns.GetHostName ()); String strHostName = Dns.GetHostName (); IPHostEntry ipEntry = Dns.GetHostByName (strHostName); foreach (IPAddress ip in ipEntry.AddressList) { @@ -74,8 +78,21 @@ namespace Mono.Messaging.RabbitMQ { public IMessageQueueTransaction CreateMessageQueueTransaction () { Interlocked.Increment (ref txCounter); - string txId = localIp.ToString () + txCounter.ToString (); - return new RabbitMQMessageQueueTransaction (txId); + string txId = localIp.ToString () + txCounter.ToString (); + + return new RabbitMQMessageQueueTransaction (txId, contextPool); + } + + public IMessagingContext CreateContext (string host) + { + return contextPool.GetContext (host); + } + + private IConnection CreateConnection (string host) + { + ConnectionFactory cf = new ConnectionFactory (); + cf.Address = host; + return cf.CreateConnection (); } public void DeleteQueue (QueueReference qRef) @@ -116,8 +133,7 @@ namespace Mono.Messaging.RabbitMQ { { qLock.AcquireWriterLock (TIMEOUT); try { - IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, - transactional); + IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional); queues[qRef] = mq; return mq; } finally { @@ -134,8 +150,7 @@ namespace Mono.Messaging.RabbitMQ { else { LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT); try { - IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, - false); + IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false); queues[qRef] = mq; return mq; } finally { diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources index c0ad4d9bcaf..ec10f2b5102 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources +++ b/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ_test.dll.sources @@ -4,5 +4,6 @@ Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs Mono.Messaging.RabbitMQ/XmlMessageFormatterTest.cs Mono.Messaging.RabbitMQ/TestUtils.cs Mono.Messaging.RabbitMQ/MessageTest.cs +Mono.Messaging.RabbitMQ/MessagingContextPoolTest.cs Mono.Messaging.RabbitMQ/MessageBaseTest.cs Mono.Messaging.RabbitMQ/MessageEnumeratorExceptionTest.cs diff --git a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs index 345a4cd3bd4..13a22251856 100644 --- a/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs +++ b/mcs/class/Mono.Messaging.RabbitMQ/Test/Mono.Messaging.RabbitMQ/BinaryMessageFormatterTest.cs @@ -41,7 +41,6 @@ namespace MonoTests.Mono.Messaging.RabbitMQ [TestFixture] public class BinaryMessageFormatterTest { - DynamicMock mock1; IMessage msg1; DynamicMock mock2; |