diff options
author | jfrijters <jfrijters> | 2011-08-24 13:07:19 +0400 |
---|---|---|
committer | jfrijters <jfrijters> | 2011-08-24 13:07:19 +0400 |
commit | eaaccdd4e95adb03091adfb4e7fc91bd12d72e94 (patch) | |
tree | cdb224b7a4e958fd16a3084134b6ab24292215ce /openjdk/sun/nio/ch | |
parent | 60adc319391d03eba308e29fb8d95ebe00a82afa (diff) |
Implemented Asynchronous[Server]SocketChannel.
Diffstat (limited to 'openjdk/sun/nio/ch')
4 files changed, 56 insertions, 248 deletions
diff --git a/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java b/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java deleted file mode 100644 index e2310f73..00000000 --- a/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package sun.nio.ch; - -import java.nio.channels.spi.AsynchronousChannelProvider; - -/** - * Creates this platform's default asynchronous channel provider - */ - -public class DefaultAsynchronousChannelProvider { - private DefaultAsynchronousChannelProvider() { } - - /** - * Returns the default AsynchronousChannelProvider. - */ - public static AsynchronousChannelProvider create() { - throw new ikvm.internal.NotYetImplementedError(); - } -} diff --git a/openjdk/sun/nio/ch/SocketDispatcher.java b/openjdk/sun/nio/ch/SocketDispatcher.java index 62409143..7d408326 100644 --- a/openjdk/sun/nio/ch/SocketDispatcher.java +++ b/openjdk/sun/nio/ch/SocketDispatcher.java @@ -99,6 +99,10 @@ class SocketDispatcher extends NativeDispatcher } void preClose(FileDescriptor fd) throws IOException { + closeImpl(fd); + } + + static void closeImpl(FileDescriptor fd) throws IOException { try { if (false) throw new cli.System.Net.Sockets.SocketException(); diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java index 00d6c463..b95f4279 100644 --- a/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java +++ b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java @@ -29,38 +29,21 @@ import java.nio.channels.*; import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.io.FileDescriptor; import java.io.IOException; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; -import sun.misc.Unsafe; /** * Windows implementation of AsynchronousServerSocketChannel using overlapped I/O. */ class WindowsAsynchronousServerSocketChannelImpl - extends AsynchronousServerSocketChannelImpl implements Iocp.OverlappedChannel + extends AsynchronousServerSocketChannelImpl { - private static final Unsafe unsafe = Unsafe.getUnsafe(); - - // 2 * (sizeof(SOCKET_ADDRESS) + 16) - private static final int DATA_BUFFER_SIZE = 88; - - private final long handle; - private final int completionKey; private final Iocp iocp; - // typically there will be zero, or one I/O operations pending. In rare - // cases there may be more. These rare cases arise when a sequence of accept - // operations complete immediately and handled by the initiating thread. - // The corresponding OVERLAPPED cannot be reused/released until the completion - // event has been posted. - private final PendingIoCache ioCache; - - // the data buffer to receive the local/remote socket address - private final long dataBuffer; - // flag to indicate that an accept operation is outstanding private AtomicBoolean accepting = new AtomicBoolean(); @@ -68,41 +51,13 @@ class WindowsAsynchronousServerSocketChannelImpl WindowsAsynchronousServerSocketChannelImpl(Iocp iocp) throws IOException { super(iocp); - // associate socket with given completion port - long h = IOUtil.fdVal(fd); - int key; - try { - key = iocp.associate(this, h); - } catch (IOException x) { - closesocket0(h); // prevent leak - throw x; - } - - this.handle = h; - this.completionKey = key; this.iocp = iocp; - this.ioCache = new PendingIoCache(); - this.dataBuffer = unsafe.allocateMemory(DATA_BUFFER_SIZE); - } - - @Override - public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { - return ioCache.remove(overlapped); } @Override void implClose() throws IOException { // close socket (which may cause outstanding accept to be aborted). - closesocket0(handle); - - // waits until the accept operations have completed - ioCache.close(); - - // finally disassociate from the completion port - iocp.disassociate(completionKey); - - // release other resources - unsafe.freeMemory(dataBuffer); + SocketDispatcher.closeImpl(fd); } @Override @@ -144,7 +99,7 @@ class WindowsAsynchronousServerSocketChannelImpl * in that it requires 2 calls to getsockname and 2 calls to getpeername. * (should change this to use GetAcceptExSockaddrs) */ - updateAcceptContext(handle, channel.handle()); + updateAcceptContext(fd, channel.fd); InetSocketAddress local = Net.localAddress(channel.fd); final InetSocketAddress remote = Net.remoteAddress(channel.fd); @@ -168,7 +123,6 @@ class WindowsAsynchronousServerSocketChannelImpl */ @Override public void run() { - long overlapped = 0L; try { // begin usage of listener socket @@ -180,9 +134,8 @@ class WindowsAsynchronousServerSocketChannelImpl channel.begin(); synchronized (result) { - overlapped = ioCache.add(result); - int n = accept0(handle, channel.handle(), overlapped, dataBuffer); + int n = accept0(fd, channel.fd, this); if (n == IOStatus.UNAVAILABLE) { return; } @@ -200,8 +153,6 @@ class WindowsAsynchronousServerSocketChannelImpl } } catch (Throwable x) { // failed to initiate accept so release resources - if (overlapped != 0L) - ioCache.remove(overlapped); closeChildChannel(); if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); @@ -354,11 +305,11 @@ class WindowsAsynchronousServerSocketChannelImpl private static native void initIDs(); - private static native int accept0(long listenSocket, long acceptSocket, - long overlapped, long dataBuffer) throws IOException; + private static native int accept0(FileDescriptor listenSocket, FileDescriptor acceptSocket, + Iocp.ResultHandler handler) throws IOException; - private static native void updateAcceptContext(long listenSocket, - long acceptSocket) throws IOException; + private static native void updateAcceptContext(FileDescriptor listenSocket, + FileDescriptor acceptSocket) throws IOException; private static native void closesocket0(long socket) throws IOException; diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java index a55e7a01..e82c1d9b 100644 --- a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java +++ b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java @@ -30,87 +30,29 @@ import java.nio.ByteBuffer; import java.nio.BufferOverflowException; import java.net.*; import java.util.concurrent.*; +import java.io.FileDescriptor; import java.io.IOException; -import sun.misc.Unsafe; /** * Windows implementation of AsynchronousSocketChannel using overlapped I/O. */ class WindowsAsynchronousSocketChannelImpl - extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel + extends AsynchronousSocketChannelImpl { - private static final Unsafe unsafe = Unsafe.getUnsafe(); - private static int addressSize = unsafe.addressSize(); - - private static int dependsArch(int value32, int value64) { - return (addressSize == 4) ? value32 : value64; - } - - /* - * typedef struct _WSABUF { - * u_long len; - * char FAR * buf; - * } WSABUF; - */ - private static final int SIZEOF_WSABUF = dependsArch(8, 16); - private static final int OFFSETOF_LEN = 0; - private static final int OFFSETOF_BUF = dependsArch(4, 8); - // maximum vector size for scatter/gather I/O private static final int MAX_WSABUF = 16; - private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; - - - // socket handle. Use begin()/end() around each usage of this handle. - final long handle; - // I/O completion port that the socket is associated with private final Iocp iocp; - // completion key to identify channel when I/O completes - private final int completionKey; - - // Pending I/O operations are tied to an OVERLAPPED structure that can only - // be released when the I/O completion event is posted to the completion - // port. Where I/O operations complete immediately then it is possible - // there may be more than two OVERLAPPED structures in use. - private final PendingIoCache ioCache; - - // per-channel arrays of WSABUF structures - private final long readBufferArray; - private final long writeBufferArray; - WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) throws IOException { super(iocp); - // associate socket with default completion port - long h = IOUtil.fdVal(fd); - int key = 0; - try { - key = iocp.associate(this, h); - } catch (ShutdownChannelGroupException x) { - if (failIfGroupShutdown) { - closesocket0(h); - throw x; - } - } catch (IOException x) { - closesocket0(h); - throw x; - } - - this.handle = h; this.iocp = iocp; - this.completionKey = key; - this.ioCache = new PendingIoCache(); - - // allocate WSABUF arrays - this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); - this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); } WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { @@ -122,19 +64,6 @@ class WindowsAsynchronousSocketChannelImpl return iocp; } - /** - * Invoked by Iocp when an I/O operation competes. - */ - @Override - public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { - return ioCache.remove(overlapped); - } - - // invoked by WindowsAsynchronousServerSocketChannelImpl - long handle() { - return handle; - } - // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection // accept void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) { @@ -148,19 +77,7 @@ class WindowsAsynchronousSocketChannelImpl @Override void implClose() throws IOException { // close socket (may cause outstanding async I/O operations to fail). - closesocket0(handle); - - // waits until all I/O operations have completed - ioCache.close(); - - // release arrays of WSABUF structures - unsafe.freeMemory(readBufferArray); - unsafe.freeMemory(writeBufferArray); - - // finally disassociate from the completion port (key can be 0 if - // channel created when group is shutdown) - if (completionKey != 0) - iocp.disassociate(completionKey); + SocketDispatcher.closeImpl(fd); } @Override @@ -205,7 +122,7 @@ class WindowsAsynchronousSocketChannelImpl * Invoke after a connection is successfully established. */ private void afterConnect() throws IOException { - updateConnectContext(handle); + updateConnectContext(fd); synchronized (stateLock) { state = ST_CONNECTED; remoteAddress = remote; @@ -217,7 +134,6 @@ class WindowsAsynchronousSocketChannelImpl */ @Override public void run() { - long overlapped = 0L; Throwable exc = null; try { begin(); @@ -225,10 +141,9 @@ class WindowsAsynchronousSocketChannelImpl // synchronize on result to allow this thread handle the case // where the connection is established immediately. synchronized (result) { - overlapped = ioCache.add(result); // initiate the connection - int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), - remote.getPort(), overlapped); + int n = connect0(fd, Net.isIPv6Available(), remote.getAddress(), + remote.getPort(), this); if (n == IOStatus.UNAVAILABLE) { // connection is pending return; @@ -239,8 +154,6 @@ class WindowsAsynchronousSocketChannelImpl result.setResult(null); } } catch (Throwable x) { - if (overlapped != 0L) - ioCache.remove(overlapped); exc = x; } finally { end(); @@ -388,30 +301,23 @@ class WindowsAsynchronousSocketChannelImpl /** * Invoked prior to read to prepare the WSABUF array. Where necessary, - * it substitutes non-direct buffers with direct buffers. + * it substitutes direct buffers with managed buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; - long address = readBufferArray; for (int i=0; i<numBufs; i++) { ByteBuffer dst = bufs[i]; int pos = dst.position(); int lim = dst.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); - long a; - if (!(dst instanceof DirectBuffer)) { + if (!dst.hasArray()) { // substitute with direct buffer - ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); + ByteBuffer bb = ByteBuffer.allocate(rem); shadow[i] = bb; - a = ((DirectBuffer)bb).address(); } else { shadow[i] = dst; - a = ((DirectBuffer)dst).address() + pos; } - unsafe.putAddress(address + OFFSETOF_BUF, a); - unsafe.putInt(address + OFFSETOF_LEN, rem); - address += SIZEOF_WSABUF; } } @@ -448,7 +354,7 @@ class WindowsAsynchronousSocketChannelImpl // Put results from shadow into the slow buffers for (int i=0; i<numBufs; i++) { - if (!(bufs[i] instanceof DirectBuffer)) { + if (!bufs[i].hasArray()) { shadow[i].flip(); try { bufs[i].put(shadow[i]); @@ -460,32 +366,23 @@ class WindowsAsynchronousSocketChannelImpl } void releaseBuffers() { - for (int i=0; i<numBufs; i++) { - if (!(bufs[i] instanceof DirectBuffer)) { - Util.releaseTemporaryDirectBuffer(shadow[i]); - } - } } @Override @SuppressWarnings("unchecked") public void run() { - long overlapped = 0L; boolean prepared = false; boolean pending = false; try { begin(); - // substitute non-direct buffers + // substitute direct buffers prepareBuffers(); prepared = true; - // get an OVERLAPPED structure (from the cache or allocate) - overlapped = ioCache.add(result); - // initiate read - int n = read0(handle, numBufs, readBufferArray, overlapped); + int n = read0(fd, shadow, this); if (n == IOStatus.UNAVAILABLE) { // I/O is pending pending = true; @@ -499,8 +396,19 @@ class WindowsAsynchronousSocketChannelImpl } else { result.setResult((V)Integer.valueOf(-1)); } + } + // read completed immediately + if (n == 0) { + n = -1; // EOF } else { - throw new InternalError("Read completed immediately"); + updateBuffers(n); + } + releaseBuffers(); + enableReading(); + if (scatteringRead) { + result.setResult((V)Long.valueOf(n)); + } else { + result.setResult((V)Integer.valueOf(n)); } } catch (Throwable x) { // failed to initiate read @@ -514,8 +422,6 @@ class WindowsAsynchronousSocketChannelImpl } finally { // release resources if I/O not pending if (!pending) { - if (overlapped != 0L) - ioCache.remove(overlapped); if (prepared) releaseBuffers(); } @@ -662,33 +568,26 @@ class WindowsAsynchronousSocketChannelImpl /** * Invoked prior to write to prepare the WSABUF array. Where necessary, - * it substitutes non-direct buffers with direct buffers. + * it substitutes direct buffers with managed buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; - long address = writeBufferArray; for (int i=0; i<numBufs; i++) { ByteBuffer src = bufs[i]; int pos = src.position(); int lim = src.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); - long a; - if (!(src instanceof DirectBuffer)) { + if (!src.hasArray()) { // substitute with direct buffer - ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); + ByteBuffer bb = ByteBuffer.allocate(rem); bb.put(src); bb.flip(); src.position(pos); // leave heap buffer untouched for now shadow[i] = bb; - a = ((DirectBuffer)bb).address(); } else { shadow[i] = src; - a = ((DirectBuffer)src).address() + pos; } - unsafe.putAddress(address + OFFSETOF_BUF, a); - unsafe.putInt(address + OFFSETOF_LEN, rem); - address += SIZEOF_WSABUF; } } @@ -727,17 +626,11 @@ class WindowsAsynchronousSocketChannelImpl } void releaseBuffers() { - for (int i=0; i<numBufs; i++) { - if (!(bufs[i] instanceof DirectBuffer)) { - Util.releaseTemporaryDirectBuffer(shadow[i]); - } - } } @Override //@SuppressWarnings("unchecked") public void run() { - long overlapped = 0L; boolean prepared = false; boolean pending = false; boolean shutdown = false; @@ -745,13 +638,11 @@ class WindowsAsynchronousSocketChannelImpl try { begin(); - // substitute non-direct buffers + // substitute direct buffers prepareBuffers(); prepared = true; - // get an OVERLAPPED structure (from the cache or allocate) - overlapped = ioCache.add(result); - int n = write0(handle, numBufs, writeBufferArray, overlapped); + int n = write0(fd, shadow, this); if (n == IOStatus.UNAVAILABLE) { // I/O is pending pending = true; @@ -763,7 +654,14 @@ class WindowsAsynchronousSocketChannelImpl throw new ClosedChannelException(); } // write completed immediately - throw new InternalError("Write completed immediately"); + updateBuffers(n); + releaseBuffers(); + enableWriting(); + if (gatheringWrite) { + result.setResult((V)Long.valueOf(n)); + } else { + result.setResult((V)Integer.valueOf(n)); + } } catch (Throwable x) { // write failed. Enable writing before releasing waiters. enableWriting(); @@ -775,8 +673,6 @@ class WindowsAsynchronousSocketChannelImpl } finally { // release resources if I/O not pending if (!pending) { - if (overlapped != 0L) - ioCache.remove(overlapped); if (prepared) releaseBuffers(); } @@ -899,16 +795,16 @@ class WindowsAsynchronousSocketChannelImpl private static native void initIDs(); - private static native int connect0(long socket, boolean preferIPv6, - InetAddress remote, int remotePort, long overlapped) throws IOException; + private static native int connect0(FileDescriptor fd, boolean preferIPv6, + InetAddress remote, int remotePort, Iocp.ResultHandler handler) throws IOException; - private static native void updateConnectContext(long socket) throws IOException; + private static native void updateConnectContext(FileDescriptor fd) throws IOException; - private static native int read0(long socket, int count, long addres, long overlapped) + private static native int read0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler) throws IOException; - private static native int write0(long socket, int count, long address, - long overlapped) throws IOException; + private static native int write0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler) + throws IOException; private static native void shutdown0(long socket, int how) throws IOException; |