diff options
author | Lluis Sanchez <lluis@novell.com> | 2002-12-29 15:36:37 +0300 |
---|---|---|
committer | Lluis Sanchez <lluis@novell.com> | 2002-12-29 15:36:37 +0300 |
commit | 0792fc1a6d8ba527c40e7e93d7138bbf64bf24cc (patch) | |
tree | e8e5f1425ac9c5cc76531c94a9a69ccba5da4679 /mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp | |
parent | b936987419152bc1a94a0931fe235f0d7c213535 (diff) |
no message
svn path=/trunk/mcs/; revision=9963
Diffstat (limited to 'mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp')
8 files changed, 937 insertions, 255 deletions
diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpChannel.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpChannel.cs index 28965f364da..1b169d51250 100644 --- a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpChannel.cs +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpChannel.cs @@ -2,6 +2,7 @@ // System.Runtime.Remoting.Channels.Tcp.TcpChannel.cs // // Author: Rodrigo Moya (rodrigo@ximian.com) +// Lluis Sanchez Gual (lsg@ctv.es) // // 2002 (C) Copyright, Ximian, Inc. // @@ -12,86 +13,98 @@ using System.Text.RegularExpressions; namespace System.Runtime.Remoting.Channels.Tcp { - public class TcpChannel : IChannelReceiver, IChannel, - IChannelSender + public class TcpChannel : IChannelReceiver, IChannel, IChannelSender { - private int tcp_port; - - public TcpChannel () - { - tcp_port = 0; + private TcpClientChannel _clientChannel;
+ private TcpServerChannel _serverChannel = null;
+ private string _name;
+ + public TcpChannel (): this (0) + { } public TcpChannel (int port) { - tcp_port = port; + Hashtable ht = new Hashtable();
+ ht["port"] = port.ToString();
+ Init(ht, null, null);
} - [MonoTODO] + public void Init(IDictionary properties, IClientChannelSinkProvider clientSink, IServerChannelSinkProvider serverSink)
+ {
+ _clientChannel = new TcpClientChannel(properties,clientSink);
+
+ string port = properties["port"] as string;
+ if (port != null && port != string.Empty)
+ {
+ _serverChannel = new TcpServerChannel(properties, serverSink);
+ }
+
+ _name = properties["name"] as string;
+ }
+
+ public TcpChannel (IDictionary properties, IClientChannelSinkProvider clientSinkProvider, IServerChannelSinkProvider serverSinkProvider) { - throw new NotImplementedException (); - } - - public object ChannelData - { - [MonoTODO] - get { - throw new NotImplementedException (); - } + Init (properties, clientSinkProvider, serverSinkProvider); } - public string ChannelName - { - [MonoTODO] - get { - throw new NotImplementedException (); - } - } - - public int ChannelPriority - { - [MonoTODO] - get { - throw new NotImplementedException (); - } - } - - [MonoTODO] - public IMessageSink CreateMessageSink (string url, - object remoteChannelData, - out string objectURI) - { - throw new NotImplementedException (); - } + public IMessageSink CreateMessageSink(string url, object remoteChannelData, out string objectURI)
+ {
+ return _clientChannel.CreateMessageSink(url, remoteChannelData, out objectURI);
+ }
+
+ public string ChannelName
+ {
+ get { return _name; }
+ }
+
+ public int ChannelPriority
+ {
+ get { return 1; }
+ }
+
+ public void StartListening (object data)
+ {
+ if (_serverChannel != null) _serverChannel.StartListening(data);
+ }
+
+ public void StopListening (object data)
+ {
+ if (_serverChannel != null) _serverChannel.StopListening(data);
+ }
+
+ public string[] GetUrlsForUri (string uri)
+ {
+ if (_serverChannel != null) return _serverChannel.GetUrlsForUri(uri);
+ else return null;
+ }
+
+ public object ChannelData
+ {
+ get
+ {
+ if (_serverChannel != null) return _serverChannel.ChannelData;
+ else return null;
+ }
+ }
- [MonoTODO] - public string[] GetUrlsForUri (string objectURI) + public string Parse (string url, out string objectURI) { - throw new NotImplementedException (); + return TcpChannel.ParseChannelUrl (url, out objectURI); } - public string Parse (string url, out string objectURI) + internal static string ParseChannelUrl (string url, out string objectURI) { int port; string host = ParseTcpURL (url, out objectURI, out port); - - return "tcp://" + host + ":" + port; - } - - [MonoTODO] - public void StartListening (object data) - { - throw new NotImplementedException (); - } - - [MonoTODO] - public void StopListening (object data) - { - throw new NotImplementedException (); + if (host != null) + return "tcp://" + host + ":" + port; + else + return null; } internal static string ParseTcpURL (string url, out string objectURI, out int port) @@ -101,7 +114,7 @@ namespace System.Runtime.Remoting.Channels.Tcp objectURI = null; port = 0; - Match m = Regex.Match (url, "tcp://([^:]+):([0-9]+)(/.*)"); + Match m = Regex.Match (url, "tcp://([^:]+):([0-9]+)[/](.*)"); if (!m.Success) return null; diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientChannel.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientChannel.cs index 7844e4d9297..7e78a9d95dd 100644 --- a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientChannel.cs +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientChannel.cs @@ -2,6 +2,7 @@ // System.Runtime.Remoting.Channels.Tcp.TcpClientChannel.cs // // Author: Dietmar Maurer (dietmar@ximian.com) +// Lluis Sanchez Gual (lsg@ctv.es) // // 2002 (C) Copyright, Ximian, Inc. // @@ -11,132 +12,51 @@ using System.IO; using System.Net.Sockets; using System.Runtime.Remoting.Messaging; using System.Runtime.Remoting.Channels; +using System.Runtime.Remoting.Channels.Simple; +using System.Threading; namespace System.Runtime.Remoting.Channels.Tcp { - - public class TcpClientTransportSink : IClientChannelSink - { - string host; - string object_uri; - int port; - - TcpClient tcpclient; - Stream stream = null; - - public TcpClientTransportSink (string url) - { - host = TcpChannel.ParseTcpURL (url, out object_uri, out port); - tcpclient = new TcpClient (); - } - - public IDictionary Properties - { - get { - return null; - } - } - - public IClientChannelSink NextChannelSink - { - get { - // we are the last one - return null; - } - } - - public void AsyncProcessRequest (IClientChannelSinkStack sinkStack, IMessage msg, - ITransportHeaders headers, Stream stream) - { - throw new NotImplementedException (); - } - - public void AsyncProcessResponse (IClientResponseChannelSinkStack sinkStack, - object state, ITransportHeaders headers, - Stream stream) - { - throw new NotImplementedException (); - } - - public Stream GetRequestStream (IMessage msg, ITransportHeaders headers) - { - // no acces to stream? - return null; - } - - public void ProcessMessage (IMessage msg, - ITransportHeaders requestHeaders, - Stream requestStream, - out ITransportHeaders responseHeaders, - out Stream responseStream) - { - if (stream == null) { - tcpclient.Connect (host, port); - stream = tcpclient.GetStream (); - } - - Console.WriteLine ("Client ProcessMessage"); - - responseHeaders = null; - responseStream = null; - //throw new NotImplementedException (); - } - - } - - public class TcpClientTransportSinkProvider : IClientChannelSinkProvider - { - public TcpClientTransportSinkProvider () - { - // what should we do here ? - } - - public IClientChannelSinkProvider Next - { - get { - return null; - } - - set { - // ignore, we are always the last in the chain - } - } - - public IClientChannelSink CreateSink (IChannelSender channel, string url, - object remoteChannelData) - { - return new TcpClientTransportSink (url); - } - } - public class TcpClientChannel : IChannelSender, IChannel { int priority = 1; string name = "tcp"; - IClientChannelSinkProvider sink_provider; + IClientChannelSinkProvider _sinkProvider; public TcpClientChannel () - { - sink_provider = new BinaryClientFormatterSinkProvider (); - sink_provider.Next = new TcpClientTransportSinkProvider (); + { } public TcpClientChannel (IDictionary properties, IClientChannelSinkProvider sinkProvider) { priority = 1; - sink_provider = sinkProvider; - // add the tcp provider at the end of the chain - IClientChannelSinkProvider prov = sinkProvider; - while (prov.Next != null) prov = prov.Next; - prov.Next = new TcpClientTransportSinkProvider (); + if (_sinkProvider != null) + { + _sinkProvider = sinkProvider; + + // add the tcp provider at the end of the chain + IClientChannelSinkProvider prov = sinkProvider; + while (prov.Next != null) prov = prov.Next; + prov.Next = new TcpClientTransportSinkProvider (); + + // Note: a default formatter is added only when + // no sink providers are specified in the config file. + } + else + { + // FIXME: change soap to binary + _sinkProvider = new SimpleClientFormatterSinkProvider (); + _sinkProvider.Next = new TcpClientTransportSinkProvider (); + } + } public TcpClientChannel (string name, IClientChannelSinkProvider sinkProvider) { priority = 1; this.name = name; - sink_provider = sinkProvider; + _sinkProvider = sinkProvider; // add the tcp provider at the end of the chain IClientChannelSinkProvider prov = sinkProvider; @@ -161,7 +81,7 @@ namespace System.Runtime.Remoting.Channels.Tcp public IMessageSink CreateMessageSink (string url, object remoteChannelData, out string objectURI) - { + { if (url == null && remoteChannelData != null) { IChannelDataStore ds = remoteChannelData as IChannelDataStore; if (ds != null) @@ -171,16 +91,12 @@ namespace System.Runtime.Remoting.Channels.Tcp if (Parse (url, out objectURI) == null) return null; - return (IMessageSink) sink_provider.CreateSink (this, url, remoteChannelData); + return (IMessageSink) _sinkProvider.CreateSink (this, url, remoteChannelData); } public string Parse (string url, out string objectURI) { - int port; - - string host = TcpChannel.ParseTcpURL (url, out objectURI, out port); - - return "tcp://" + host + ":" + port; + return TcpChannel.ParseChannelUrl (url, out objectURI); } } } diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSink.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSink.cs new file mode 100644 index 00000000000..ad26ec4371f --- /dev/null +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSink.cs @@ -0,0 +1,152 @@ +// +// System.Runtime.Remoting.Channels.Tcp.TcpClientTransportSink.cs +// +// Author: Dietmar Maurer (dietmar@ximian.com) +// Lluis Sanchez (lsg@ctv.es) +// +// 2002 (C) Copyright, Ximian, Inc. +//
+
+using System;
+using System.Runtime.Remoting.Channels;
+using System.Runtime.Remoting.Messaging;
+using System.Collections;
+using System.IO;
+using System.Threading;
+
+namespace System.Runtime.Remoting.Channels.Tcp
+{
+ public class TcpClientTransportSink : IClientChannelSink + { + string _host; + string _objectUri; + int _port; + + public TcpClientTransportSink (string url) + { + _host = TcpChannel.ParseTcpURL (url, out _objectUri, out _port); + } + + public IDictionary Properties + { + get
+ { + return null; + } + } + + public IClientChannelSink NextChannelSink + { + get
+ { + // we are the last one + return null; + } + } + + public void AsyncProcessRequest (IClientChannelSinkStack sinkStack, IMessage msg, + ITransportHeaders headers, Stream requestStream) + { + TcpConnection connection = null; + try + { + // Sends the stream using a connection from the pool + // and creates a WorkItem that will wait for the + // response of the server + + connection = TcpConnectionPool.GetConnection (_host, _port); + TcpMessageIO.SendMessageStream (connection.Stream, requestStream, headers, connection.Buffer); + sinkStack.Push (this, connection); + ThreadPool.QueueUserWorkItem (new WaitCallback(ReadAsyncTcpMessage), sinkStack); + } + catch + { + if (connection != null) connection.Release(); + throw; + } + } + + private void ReadAsyncTcpMessage(object data) + { + // This method is called by a new thread to asynchronously + // read the response to a request + + // The stack was provided as state data in QueueUserWorkItem + IClientChannelSinkStack stack = (IClientChannelSinkStack)data; + + // The first sink in the stack is this sink. Pop it and + // get the status data, which is the TcpConnection used to send + // the request + TcpConnection connection = (TcpConnection)stack.Pop(this); + + try + { + ITransportHeaders responseHeaders; + + // Read the response, blocking if necessary + MessageType type = TcpMessageIO.ReceiveMessageType (connection.Stream); + + if (type != MessageType.MethodMessage) + throw new RemotingException ("Unknown response message from server"); + + Stream responseStream = TcpMessageIO.ReceiveMessageStream (connection.Stream, out responseHeaders, connection.Buffer); + + // Free the connection, so it can be reused + connection.Release(); + connection = null; + + // Ok, proceed with the other sinks + stack.AsyncProcessResponse (responseHeaders, responseStream); + } + catch + { + if (connection != null) connection.Release(); + throw; + } + } + + public void AsyncProcessResponse (IClientResponseChannelSinkStack sinkStack, + object state, ITransportHeaders headers, + Stream stream) + { + // Should never be called + throw new NotSupportedException(); + } + + public Stream GetRequestStream (IMessage msg, ITransportHeaders headers) + { + return null; + } + + public void ProcessMessage (IMessage msg, + ITransportHeaders requestHeaders, + Stream requestStream, + out ITransportHeaders responseHeaders, + out Stream responseStream) + { + TcpConnection connection = null; + try + { + // Sends the message + connection = TcpConnectionPool.GetConnection (_host, _port); + TcpMessageIO.SendMessageStream (connection.Stream, requestStream, requestHeaders, connection.Buffer); + + // Reads the response + MessageType type = TcpMessageIO.ReceiveMessageType (connection.Stream); + + if (type != MessageType.MethodMessage) + throw new RemotingException ("Unknown response message from server"); + + responseStream = TcpMessageIO.ReceiveMessageStream (connection.Stream, out responseHeaders, connection.Buffer); + } + finally + { + if (connection != null) + connection.Release(); + } + } + + } + +
+}
diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSinkProvider.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSinkProvider.cs new file mode 100644 index 00000000000..bacdfd5576a --- /dev/null +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpClientTransportSinkProvider.cs @@ -0,0 +1,41 @@ +// +// System.Runtime.Remoting.Channels.Tcp.TcpClientTransportSinkProvider.cs +// +// Author: Dietmar Maurer (dietmar@ximian.com) +// Lluis Sanchez (lsg@ctv.es) +// +// 2002 (C) Copyright, Ximian, Inc. +//
+
+using System;
+using System.Runtime.Remoting.Channels;
+
+namespace System.Runtime.Remoting.Channels.Tcp
+{
+ public class TcpClientTransportSinkProvider : IClientChannelSinkProvider + { + public TcpClientTransportSinkProvider () + { + // what should we do here ? + } + + public IClientChannelSinkProvider Next + { + get
+ { + return null; + } + + set
+ { + // ignore, we are always the last in the chain + } + } + + public IClientChannelSink CreateSink (IChannelSender channel, string url, + object remoteChannelData) + { + return new TcpClientTransportSink (url); + } + }
+}
diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpConnectionPool.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpConnectionPool.cs new file mode 100644 index 00000000000..26a567dbcfb --- /dev/null +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpConnectionPool.cs @@ -0,0 +1,246 @@ +// +// System.Runtime.Remoting.Channels.Tcp.TcpConnectionPool.cs +// +// Author: Lluis Sanchez (lsg@ctv.es) +// +// 2002 (C) Lluis Sanchez Gual +// + +using System;
+using System.Collections;
+using System.Threading;
+using System.IO;
+using System.Net.Sockets;
+
+namespace System.Runtime.Remoting.Channels.Tcp
+{
+ // This is a pool of Tcp connections. Connections requested
+ // by the TCP channel are pooled after their use, and can
+ // be reused later. Connections are automaticaly closed
+ // if not used after some time, specified in KeepAliveSeconds.
+ // The number of allowed open connections can also be specified
+ // in MaxOpenConnections. The limit is per host.
+ // If a thread requests a connection and the limit has been
+ // reached, the thread is suspended until one is released.
+
+ internal class TcpConnectionPool
+ {
+ // Table of pools. There is a HostConnectionPool
+ // instance for each host
+ static Hashtable _pools = new Hashtable();
+
+ static int _maxOpenConnections = 2;
+ static int _keepAliveSeconds = 15;
+
+ static TcpConnectionPool()
+ {
+ // This thread will close unused connections
+ Thread t = new Thread (new ThreadStart (ConnectionCollector));
+ t.Start();
+ t.IsBackground = true;
+ }
+
+ public static int MaxOpenConnections
+ {
+ get { return _maxOpenConnections; }
+ set
+ {
+ if (value < 1) throw new RemotingException ("MaxOpenConnections must be greater than zero");
+ _maxOpenConnections = value;
+ }
+ }
+
+ public static int KeepAliveSeconds
+ {
+ get { return _keepAliveSeconds; }
+ set { _keepAliveSeconds = value; }
+ }
+
+ public static TcpConnection GetConnection (string host, int port)
+ {
+ HostConnectionPool hostPool;
+
+ lock (_pools)
+ {
+ string key = host + ":" + port;
+ hostPool = (HostConnectionPool) _pools[key];
+ if (hostPool == null)
+ {
+ hostPool = new HostConnectionPool(host, port);
+ _pools[key] = hostPool;
+ }
+ }
+
+ return hostPool.GetConnection();
+ }
+
+ private static void ConnectionCollector ()
+ {
+ while (true)
+ {
+ Thread.Sleep(3000);
+ lock (_pools)
+ {
+ ICollection values = _pools.Values;
+ foreach (HostConnectionPool pool in values)
+ pool.PurgeConnections();
+ }
+ }
+ }
+ }
+
+ internal class TcpConnection
+ {
+ DateTime _controlTime;
+ Stream _stream;
+ TcpClient _client;
+ HostConnectionPool _pool;
+ byte[] _buffer;
+
+ public TcpConnection (HostConnectionPool pool, TcpClient client)
+ {
+ _pool = pool;
+ _client = client;
+ _stream = client.GetStream();
+ _controlTime = DateTime.Now;
+ _buffer = new byte[TcpMessageIO.DefaultStreamBufferSize];
+ }
+
+ public Stream Stream
+ {
+ get { return _stream; }
+ }
+
+ public DateTime ControlTime
+ {
+ get { return _controlTime; }
+ set { _controlTime = value; }
+ }
+
+ // This is a "thread safe" buffer that can be used by
+ // TcpClientTransportSink to read or send data to the stream.
+ // The buffer is "thread safe" since only one thread can
+ // use a connection at a given time.
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ // Returns the connection to the pool
+ public void Release()
+ {
+ _pool.ReleaseConnection (this);
+ }
+
+ public void Close()
+ {
+ _client.Close();
+ }
+ }
+
+ internal class HostConnectionPool
+ {
+ ArrayList _pool = new ArrayList();
+ int _activeConnections = 0;
+
+ string _host;
+ int _port;
+
+ public HostConnectionPool (string host, int port)
+ {
+ _host = host;
+ _port = port;
+ }
+
+ public TcpConnection GetConnection ()
+ {
+ lock (_pool)
+ {
+ TcpConnection connection = null;
+
+ do
+ {
+ if (_pool.Count > 0)
+ {
+ // There are available connections
+ connection = (TcpConnection)_pool[_pool.Count - 1];
+ _pool.RemoveAt(_pool.Count - 1);
+ }
+
+ if (connection == null && _activeConnections < TcpConnectionPool.MaxOpenConnections)
+ {
+ // No connections available, but the max connections
+ // has not been reached yet, so a new one can be created
+ connection = CreateConnection();
+ }
+
+ // No available connections in the pool
+ // Wait for somewone to release one.
+
+ if (connection == null)
+ Monitor.Wait(_pool);
+ }
+ while (connection == null);
+
+ return connection;
+ }
+ }
+
+ private TcpConnection CreateConnection()
+ {
+ try
+ {
+ TcpClient client = new TcpClient(_host, _port);
+ TcpConnection entry = new TcpConnection(this, client);
+ _activeConnections++;
+ return entry;
+ }
+ catch (Exception ex)
+ {
+ throw new RemotingException (ex.Message);
+ }
+ }
+
+ public void ReleaseConnection (TcpConnection entry)
+ {
+ lock (_pool)
+ {
+ entry.ControlTime = DateTime.Now; // Initialize timeout
+ _pool.Add (entry);
+ Monitor.Pulse (_pool);
+ }
+ }
+
+ private void CancelConnection(TcpConnection entry)
+ {
+ try
+ {
+ entry.Stream.Close();
+ _activeConnections--;
+ }
+ catch
+ {
+ }
+ }
+
+ public void PurgeConnections()
+ {
+ lock (_pool)
+ {
+ for (int n=0; n < _pool.Count; n++)
+ {
+ TcpConnection entry = (TcpConnection)_pool[n];
+ if ( (DateTime.Now - entry.ControlTime).TotalSeconds > TcpConnectionPool.KeepAliveSeconds)
+ {
+ CancelConnection (entry);
+ _pool.RemoveAt(n);
+ n--;
+ }
+ }
+ }
+ }
+
+ }
+
+
+}
diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpMessageIO.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpMessageIO.cs new file mode 100644 index 00000000000..c167f7e2f40 --- /dev/null +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpMessageIO.cs @@ -0,0 +1,205 @@ +// System.Runtime.Remoting.Channels.Tcp.TcpMessageIO.cs +// +// Author: Lluis Sanchez (lsg@ctv.es) +// +// (C) 2002 Lluis Sanchez Gual + +using System.Runtime.Serialization; +using System.Collections; +using System.IO; +using System.Text; +using System.Net.Sockets; + +namespace System.Runtime.Remoting.Channels.Tcp +{ + enum MessageType { MethodMessage = 0, CancelSignal = 1, Unknown = 10} + + internal class TcpMessageIO + { + static byte[][] _msgHeaders = + {
+ new byte[] { (byte)'.', (byte)'N', (byte)'E', (byte)'T', 0 }, + new byte[] { 255, 255, 255, 255, 255 } + }; + + public static int DefaultStreamBufferSize = 1000; + + // Identifies an incoming message + public static MessageType ReceiveMessageType (Stream networkStream) + { + try + { + bool[] isOnTrack = new bool[_msgHeaders.Length]; + bool atLeastOneOnTrack = true; + int i = 0; + + while (atLeastOneOnTrack) + { + atLeastOneOnTrack = false; + byte c = (byte)networkStream.ReadByte(); + for (int n = 0; n<_msgHeaders.Length; n++) + { + if (i > 0 && !isOnTrack[n]) continue; + + isOnTrack[n] = (c == _msgHeaders[n][i]); + if (isOnTrack[n] && (i == _msgHeaders[n].Length-1)) return (MessageType)n; + atLeastOneOnTrack = atLeastOneOnTrack || isOnTrack[n]; + } + i++; + } + return MessageType.Unknown; + } + catch (IOException ex) + { + // Stream closed + return MessageType.CancelSignal; + } + } + + public static void SendMessageStream (Stream networkStream, Stream data, ITransportHeaders requestHeaders, byte[] buffer) + { + if (buffer == null) buffer = new byte[DefaultStreamBufferSize]; + + // Writes the message start header + byte[] dotnetHeader = _msgHeaders[(int)MessageType.MethodMessage]; + networkStream.Write(dotnetHeader, 0, dotnetHeader.Length); + + // Writes the length of the stream being sent (not including the headers) + int num = (int)data.Length; + buffer [0] = (byte) num; + buffer [1] = (byte) (num >> 8); + buffer [2] = (byte) (num >> 16); + buffer [3] = (byte) (num >> 24); + networkStream.Write(buffer, 0, 4); + + // Writes the message headers + SendHeaders (networkStream, requestHeaders, buffer); + + // Writes the stream + if (data is MemoryStream) + { + // The copy of the stream can be optimized. The internal + // buffer of MemoryStream can be used. + MemoryStream memStream = (MemoryStream)data; + networkStream.Write (memStream.GetBuffer(), 0, (int)memStream.Length); + } + else + { + int nread = data.Read (buffer, 0, buffer.Length); + while (nread > 0) + { + networkStream.Write (buffer, 0, nread); + nread = data.Read (buffer, 0, buffer.Length); + } + } + } + + private static void SendHeaders(Stream networkStream, ITransportHeaders requestHeaders, byte[] buffer) + { + if (requestHeaders == null) + SendString (networkStream, "", buffer); + else + { + // Writes the headers as a sequence of strings + IEnumerator e = requestHeaders.GetEnumerator(); + while (e.MoveNext()) + { + DictionaryEntry hdr = (DictionaryEntry)e.Current; + SendString (networkStream, hdr.Key.ToString(), buffer); + SendString (networkStream, hdr.Value.ToString(), buffer); + } + SendString (networkStream, "", buffer); + } + } + + public static ITransportHeaders ReceiveHeaders (Stream networkStream, byte[] buffer) + { + TransportHeaders headers = new TransportHeaders(); + + string key = ReceiveString (networkStream, buffer); + while (key != string.Empty) + { + headers[key] = ReceiveString (networkStream, buffer); + key = ReceiveString (networkStream, buffer); + } + return headers; + } + + public static Stream ReceiveMessageStream (Stream networkStream, out ITransportHeaders headers, byte[] buffer) + { + if (buffer == null) buffer = new byte[DefaultStreamBufferSize]; + + // Gets the length of the data stream + int nr = 0; + while (nr < 4) + nr += networkStream.Read (buffer, nr, 4 - nr); + + int byteCount = (buffer [0] | (buffer [1] << 8) | + (buffer [2] << 16) | (buffer [3] << 24)); + + // Reads the headers + + headers = ReceiveHeaders (networkStream, buffer); + + byte[] resultBuffer = new byte[byteCount]; + + nr = 0; + while (nr < byteCount) + nr += networkStream.Read (resultBuffer, nr, byteCount - nr); + + return new MemoryStream(resultBuffer); + } + + private static void SendString (Stream networkStream, string str, byte[] buffer) + { + // Allocates a buffer. Use the internal buffer if it is + // big enough. If not, create a new one. + + int maxBytes = Encoding.UTF8.GetMaxByteCount(str.Length)+4; //+4 bytes for storing the string length + if (maxBytes > buffer.Length) + buffer = new byte[maxBytes]; + + int num = Encoding.UTF8.GetBytes (str, 0, str.Length, buffer, 4); + + // store number of bytes (not number of chars!) + + buffer [0] = (byte) num; + buffer [1] = (byte) (num >> 8); + buffer [2] = (byte) (num >> 16); + buffer [3] = (byte) (num >> 24); + + // Write the string bytes + networkStream.Write (buffer, 0, num + 4); + } + + private static string ReceiveString (Stream networkStream, byte[] buffer) + { + int nr = 0; + while (nr < 4) + nr += networkStream.Read (buffer, nr, 4 - nr); + + // Reads the number of bytes (not chars!) + + int byteCount = (buffer [0] | (buffer [1] << 8) | + (buffer [2] << 16) | (buffer [3] << 24)); + + if (byteCount == 0) return string.Empty; + + // Allocates a buffer of the correct size. Use the + // internal buffer if it is big enough + + if (byteCount > buffer.Length) + buffer = new byte[byteCount]; + + // Reads the string + + nr = 0; + while (nr < byteCount) + nr += networkStream.Read (buffer, nr, byteCount - nr); + + char[] chars = Encoding.UTF8.GetChars(buffer, 0, byteCount); + return new string(chars); + } + + } +} diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerChannel.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerChannel.cs index 2f3a0ee7598..7b491bdf3ec 100644 --- a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerChannel.cs +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerChannel.cs @@ -2,6 +2,7 @@ // System.Runtime.Remoting.Channels.Tcp.TcpServerChannel.cs // // Author: Rodrigo Moya (rodrigo@ximian.com) +// Lluis Sanchez Gual (lsg@ctv.es) // // 2002 (C) Copyright, Ximian, Inc. // @@ -13,63 +14,10 @@ using System.Net.Sockets; using System.Net; using System.Threading; using System.IO; +using System.Runtime.Remoting.Channels.Simple; namespace System.Runtime.Remoting.Channels.Tcp { - public class TcpServerTransportSink : IServerChannelSink, IChannelSinkBase - { - IServerChannelSink next_sink; - - public TcpServerTransportSink (IServerChannelSink next) - { - next_sink = next; - } - - public IServerChannelSink NextChannelSink { - get { - return next_sink; - } - } - - [MonoTODO] - public IDictionary Properties { - get { - throw new NotImplementedException (); - } - } - - [MonoTODO] - public void AsyncProcessResponse (IServerResponseChannelSinkStack sinkStack, object state, - IMessage msg, ITransportHeaders headers, Stream stream) - { - throw new NotImplementedException (); - } - - [MonoTODO] - public Stream GetResponseStream (IServerResponseChannelSinkStack sinkStack, object state, - IMessage msg, ITransportHeaders headers) - { - throw new NotImplementedException (); - } - - public ServerProcessing ProcessMessage (IServerChannelSinkStack sinkStack, - IMessage requestMsg, - ITransportHeaders requestHeaders, - Stream requestStream, - out IMessage responseMsg, - out ITransportHeaders responseHeaders, - out Stream responseStream) - { - // this is the first sink, and TcpServerChannel does not call it. - throw new NotSupportedException (); - } - - internal void InternalProcessMessage (Stream requestStream) - { - Console.WriteLine ("ProcessMessageInternal"); - } - } - public class TcpServerChannel : IChannelReceiver, IChannel { int port = 0; @@ -81,11 +29,12 @@ namespace System.Runtime.Remoting.Channels.Tcp TcpServerTransportSink sink; ChannelDataStore channel_data; - void Init (IServerChannelSinkProvider provider) { - if (provider == null) { - provider = new BinaryServerFormatterSinkProvider (); + void Init (IServerChannelSinkProvider serverSinkProvider) + { + if (serverSinkProvider == null) { + // FIXME: change soap for binary + serverSinkProvider = new SimpleServerFormatterSinkProvider (); } - IServerChannelSink next_sink = provider.CreateSink (this); host = Dns.GetHostByName(Dns.GetHostName()).HostName; @@ -95,9 +44,20 @@ namespace System.Runtime.Remoting.Channels.Tcp uris = new String [1]; uris [0] = GetChannelUri (); } - - channel_data = new ChannelDataStore (uris);; + // Gets channel data from the chain of channel providers + + channel_data = new ChannelDataStore (uris); + IServerChannelSinkProvider provider = serverSinkProvider;
+ while (provider != null)
+ {
+ provider.GetChannelData(channel_data);
+ provider = provider.Next;
+ }
+ + // Creates the sink chain that will process all incoming messages + + IServerChannelSink next_sink = ChannelServices.CreateServerChannelSinkChain (serverSinkProvider, this);
sink = new TcpServerTransportSink (next_sink); listener = new TcpListener (port); @@ -113,21 +73,21 @@ namespace System.Runtime.Remoting.Channels.Tcp public TcpServerChannel (IDictionary properties, IServerChannelSinkProvider serverSinkProvider) { - port = (int)properties ["port"]; + port = Int32.Parse ((string)properties ["port"]); Init (serverSinkProvider); } public TcpServerChannel (string name, int port, IServerChannelSinkProvider serverSinkProvider) { - name = name; + this.name = name; this.port = port; Init (serverSinkProvider); } public TcpServerChannel (string name, int port) { - name = name; + this.name = name; this.port = port; Init (null); } @@ -158,35 +118,32 @@ namespace System.Runtime.Remoting.Channels.Tcp return "tcp://" + host + ":" + port; } - public string[] GetUrlsForUri (string uri) - { - string [] result = new String [1]; - - if (uri.IndexOf ('/') != 0) - result [0] = GetChannelUri () + "/" + uri; - else - result [0] = GetChannelUri () + uri; - - return result; - } + public string[] GetUrlsForUri (string uri)
+ {
+ if (!uri.StartsWith ("/")) uri = "/" + uri; +
+ string [] chnl_uris = channel_data.ChannelUris;
+ string [] result = new String [chnl_uris.Length];
+
+ for (int i = 0; i < chnl_uris.Length; i++)
+ result [i] = chnl_uris [i] + uri;
+
+ return result;
+ }
public string Parse (string url, out string objectURI) { - int port; - - string host = TcpChannel.ParseTcpURL (url, out objectURI, out port); - - return "tcp://" + host + ":" + port; + return TcpChannel.ParseChannelUrl (url, out objectURI); } void WaitForConnections () { - while (true) { + while (true) + { TcpClient client = listener.AcceptTcpClient (); - sink.InternalProcessMessage (client.GetStream ()); - - client.Close (); + ClientConnection reader = new ClientConnection (client, sink); + ThreadPool.QueueUserWorkItem ( new WaitCallback( reader.ProcessMessages)); } } @@ -201,6 +158,7 @@ namespace System.Runtime.Remoting.Channels.Tcp } server_thread = new Thread (new ThreadStart (WaitForConnections)); + server_thread.IsBackground = true; server_thread.Start (); } } @@ -214,4 +172,53 @@ namespace System.Runtime.Remoting.Channels.Tcp } } } + + class ClientConnection + { + TcpClient _client; + TcpServerTransportSink _sink; + Stream _stream; + + byte[] _buffer = new byte[TcpMessageIO.DefaultStreamBufferSize]; + + public ClientConnection (TcpClient client, TcpServerTransportSink sink) + { + _client = client; + _sink = sink; + } + + public Stream Stream + { + get { return _stream; } + } + + public byte[] Buffer + { + get { return _buffer; } + } + + public void ProcessMessages(object data) + { + _stream = _client.GetStream(); + + bool end = false; + while (!end) + { + MessageType type = TcpMessageIO.ReceiveMessageType (_stream); + + switch (type) + { + case MessageType.MethodMessage: + _sink.InternalProcessMessage (this); + break; + + case MessageType.CancelSignal: + end = true; + break; + } + } + + _stream.Close(); + } + } } diff --git a/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerTransportSink.cs b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerTransportSink.cs new file mode 100644 index 00000000000..409ecc81a71 --- /dev/null +++ b/mcs/class/System.Runtime.Remoting/System.Runtime.Remoting.Channels.Tcp/TcpServerTransportSink.cs @@ -0,0 +1,102 @@ +// +// System.Runtime.Remoting.Channels.Tcp.TcpServerTransportSink.cs +// +// Author: Rodrigo Moya (rodrigo@ximian.com) +// Lluis Sanchez Gual (lsg@ctv.es) +// +// 2002 (C) Copyright, Ximian, Inc. +// + +using System;
+using System.Collections;
+using System.Runtime.Remoting.Messaging;
+using System.IO;
+
+namespace System.Runtime.Remoting.Channels.Tcp
+{
+ public class TcpServerTransportSink : IServerChannelSink, IChannelSinkBase + { + IServerChannelSink next_sink; + + public TcpServerTransportSink (IServerChannelSink next) + { + next_sink = next; + } + + public IServerChannelSink NextChannelSink
+ { + get
+ { + return next_sink; + } + } + + public IDictionary Properties
+ { + get
+ { + if (next_sink != null) return next_sink.Properties; + else return null; + } + } + + public void AsyncProcessResponse (IServerResponseChannelSinkStack sinkStack, object state, + IMessage msg, ITransportHeaders headers, Stream responseStream) + { + ClientConnection connection = (ClientConnection)state; + TcpMessageIO.SendMessageStream (connection.Stream, responseStream, headers, connection.Buffer);
+ } + + public Stream GetResponseStream (IServerResponseChannelSinkStack sinkStack, object state, + IMessage msg, ITransportHeaders headers) + { + return null; + } + + public ServerProcessing ProcessMessage (IServerChannelSinkStack sinkStack, + IMessage requestMsg, + ITransportHeaders requestHeaders, + Stream requestStream, + out IMessage responseMsg, + out ITransportHeaders responseHeaders, + out Stream responseStream) + { + // this is the first sink, and TcpServerChannel does not call it. + throw new NotSupportedException (); + } + + internal void InternalProcessMessage (ClientConnection connection) + { + // Reads the headers and the request stream + + Stream requestStream; + ITransportHeaders requestHeaders;
+ + requestStream = TcpMessageIO.ReceiveMessageStream (connection.Stream, out requestHeaders, connection.Buffer);
+
+ // Pushes the connection object together with the sink. This information
+ // will be used for sending the response in an async call.
+
+ ServerChannelSinkStack sinkStack = new ServerChannelSinkStack();
+ sinkStack.Push(this, connection);
+
+ ITransportHeaders responseHeaders;
+ Stream responseStream;
+ IMessage responseMsg;
+
+ ServerProcessing proc = next_sink.ProcessMessage(sinkStack, null, requestHeaders, requestStream, out responseMsg, out responseHeaders, out responseStream);
+
+ switch (proc)
+ {
+ case ServerProcessing.Complete:
+ TcpMessageIO.SendMessageStream (connection.Stream, responseStream, responseHeaders, connection.Buffer);
+ break;
+
+ case ServerProcessing.Async:
+ case ServerProcessing.OneWay:
+ break;
+ }
+ } + }
+}
+
|