Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/ikvm-fork.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjfrijters <jfrijters>2011-08-23 08:58:36 +0400
committerjfrijters <jfrijters>2011-08-23 08:58:36 +0400
commit60adc319391d03eba308e29fb8d95ebe00a82afa (patch)
tree6cd965681e7aa466996fbf9170207ebd607b76c0 /openjdk/sun/nio/ch
parente94efd47457eee97c32cc75e94e70e521bc90114 (diff)
Prepare for forking.
Diffstat (limited to 'openjdk/sun/nio/ch')
-rw-r--r--openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java369
-rw-r--r--openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java921
2 files changed, 1290 insertions, 0 deletions
diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
new file mode 100644
index 00000000..00d6c463
--- /dev/null
+++ b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
@@ -0,0 +1,369 @@
+/*
+ * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.*;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import sun.misc.Unsafe;
+
+/**
+ * Windows implementation of AsynchronousServerSocketChannel using overlapped I/O.
+ */
+
+class WindowsAsynchronousServerSocketChannelImpl
+ extends AsynchronousServerSocketChannelImpl implements Iocp.OverlappedChannel
+{
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+
+ // 2 * (sizeof(SOCKET_ADDRESS) + 16)
+ private static final int DATA_BUFFER_SIZE = 88;
+
+ private final long handle;
+ private final int completionKey;
+ private final Iocp iocp;
+
+ // typically there will be zero, or one I/O operations pending. In rare
+ // cases there may be more. These rare cases arise when a sequence of accept
+ // operations complete immediately and handled by the initiating thread.
+ // The corresponding OVERLAPPED cannot be reused/released until the completion
+ // event has been posted.
+ private final PendingIoCache ioCache;
+
+ // the data buffer to receive the local/remote socket address
+ private final long dataBuffer;
+
+ // flag to indicate that an accept operation is outstanding
+ private AtomicBoolean accepting = new AtomicBoolean();
+
+
+ WindowsAsynchronousServerSocketChannelImpl(Iocp iocp) throws IOException {
+ super(iocp);
+
+ // associate socket with given completion port
+ long h = IOUtil.fdVal(fd);
+ int key;
+ try {
+ key = iocp.associate(this, h);
+ } catch (IOException x) {
+ closesocket0(h); // prevent leak
+ throw x;
+ }
+
+ this.handle = h;
+ this.completionKey = key;
+ this.iocp = iocp;
+ this.ioCache = new PendingIoCache();
+ this.dataBuffer = unsafe.allocateMemory(DATA_BUFFER_SIZE);
+ }
+
+ @Override
+ public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
+ return ioCache.remove(overlapped);
+ }
+
+ @Override
+ void implClose() throws IOException {
+ // close socket (which may cause outstanding accept to be aborted).
+ closesocket0(handle);
+
+ // waits until the accept operations have completed
+ ioCache.close();
+
+ // finally disassociate from the completion port
+ iocp.disassociate(completionKey);
+
+ // release other resources
+ unsafe.freeMemory(dataBuffer);
+ }
+
+ @Override
+ public AsynchronousChannelGroupImpl group() {
+ return iocp;
+ }
+
+ /**
+ * Task to initiate accept operation and to handle result.
+ */
+ private class AcceptTask implements Runnable, Iocp.ResultHandler {
+ private final WindowsAsynchronousSocketChannelImpl channel;
+ private final AccessControlContext acc;
+ private final PendingFuture<AsynchronousSocketChannel,Object> result;
+
+ AcceptTask(WindowsAsynchronousSocketChannelImpl channel,
+ AccessControlContext acc,
+ PendingFuture<AsynchronousSocketChannel,Object> result)
+ {
+ this.channel = channel;
+ this.acc = acc;
+ this.result = result;
+ }
+
+ void enableAccept() {
+ accepting.set(false);
+ }
+
+ void closeChildChannel() {
+ try {
+ channel.close();
+ } catch (IOException ignore) { }
+ }
+
+ // caller must have acquired read lock for the listener and child channel.
+ void finishAccept() throws IOException {
+ /**
+ * Set local/remote addresses. This is currently very inefficient
+ * in that it requires 2 calls to getsockname and 2 calls to getpeername.
+ * (should change this to use GetAcceptExSockaddrs)
+ */
+ updateAcceptContext(handle, channel.handle());
+
+ InetSocketAddress local = Net.localAddress(channel.fd);
+ final InetSocketAddress remote = Net.remoteAddress(channel.fd);
+ channel.setConnected(local, remote);
+
+ // permission check (in context of initiating thread)
+ if (acc != null) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ public Void run() {
+ SecurityManager sm = System.getSecurityManager();
+ sm.checkAccept(remote.getAddress().getHostAddress(),
+ remote.getPort());
+ return null;
+ }
+ }, acc);
+ }
+ }
+
+ /**
+ * Initiates the accept operation.
+ */
+ @Override
+ public void run() {
+ long overlapped = 0L;
+
+ try {
+ // begin usage of listener socket
+ begin();
+ try {
+ // begin usage of child socket (as it is registered with
+ // completion port and so may be closed in the event that
+ // the group is forcefully closed).
+ channel.begin();
+
+ synchronized (result) {
+ overlapped = ioCache.add(result);
+
+ int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
+ if (n == IOStatus.UNAVAILABLE) {
+ return;
+ }
+
+ // connection accepted immediately
+ finishAccept();
+
+ // allow another accept before the result is set
+ enableAccept();
+ result.setResult(channel);
+ }
+ } finally {
+ // end usage on child socket
+ channel.end();
+ }
+ } catch (Throwable x) {
+ // failed to initiate accept so release resources
+ if (overlapped != 0L)
+ ioCache.remove(overlapped);
+ closeChildChannel();
+ if (x instanceof ClosedChannelException)
+ x = new AsynchronousCloseException();
+ if (!(x instanceof IOException) && !(x instanceof SecurityException))
+ x = new IOException(x);
+ enableAccept();
+ result.setFailure(x);
+ } finally {
+ // end of usage of listener socket
+ end();
+ }
+
+ // accept completed immediately but may not have executed on
+ // initiating thread in which case the operation may have been
+ // cancelled.
+ if (result.isCancelled()) {
+ closeChildChannel();
+ }
+
+ // invoke completion handler
+ Invoker.invokeIndirectly(result);
+ }
+
+ /**
+ * Executed when the I/O has completed
+ */
+ @Override
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
+ try {
+ // connection accept after group has shutdown
+ if (iocp.isShutdown()) {
+ throw new IOException(new ShutdownChannelGroupException());
+ }
+
+ // finish the accept
+ try {
+ begin();
+ try {
+ channel.begin();
+ finishAccept();
+ } finally {
+ channel.end();
+ }
+ } finally {
+ end();
+ }
+
+ // allow another accept before the result is set
+ enableAccept();
+ result.setResult(channel);
+ } catch (Throwable x) {
+ enableAccept();
+ closeChildChannel();
+ if (x instanceof ClosedChannelException)
+ x = new AsynchronousCloseException();
+ if (!(x instanceof IOException) && !(x instanceof SecurityException))
+ x = new IOException(x);
+ result.setFailure(x);
+ }
+
+ // if an async cancel has already cancelled the operation then
+ // close the new channel so as to free resources
+ if (result.isCancelled()) {
+ closeChildChannel();
+ }
+
+ // invoke handler (but not directly)
+ Invoker.invokeIndirectly(result);
+ }
+
+ @Override
+ public void failed(int error, IOException x) {
+ enableAccept();
+ closeChildChannel();
+
+ // release waiters
+ if (isOpen()) {
+ result.setFailure(x);
+ } else {
+ result.setFailure(new AsynchronousCloseException());
+ }
+ Invoker.invokeIndirectly(result);
+ }
+ }
+
+ @Override
+ Future<AsynchronousSocketChannel> implAccept(Object attachment,
+ final CompletionHandler<AsynchronousSocketChannel,Object> handler)
+ {
+ if (!isOpen()) {
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invokeIndirectly(this, handler, attachment, null, exc);
+ return null;
+ }
+ if (isAcceptKilled())
+ throw new RuntimeException("Accept not allowed due to cancellation");
+
+ // ensure channel is bound to local address
+ if (localAddress == null)
+ throw new NotYetBoundException();
+
+ // create the socket that will be accepted. The creation of the socket
+ // is enclosed by a begin/end for the listener socket to ensure that
+ // we check that the listener is open and also to prevent the I/O
+ // port from being closed as the new socket is registered.
+ WindowsAsynchronousSocketChannelImpl ch = null;
+ IOException ioe = null;
+ try {
+ begin();
+ ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
+ } catch (IOException x) {
+ ioe = x;
+ } finally {
+ end();
+ }
+ if (ioe != null) {
+ if (handler == null)
+ return CompletedFuture.withFailure(ioe);
+ Invoker.invokeIndirectly(this, handler, attachment, null, ioe);
+ return null;
+ }
+
+ // need calling context when there is security manager as
+ // permission check may be done in a different thread without
+ // any application call frames on the stack
+ AccessControlContext acc = (System.getSecurityManager() == null) ?
+ null : AccessController.getContext();
+
+ PendingFuture<AsynchronousSocketChannel,Object> result =
+ new PendingFuture<AsynchronousSocketChannel,Object>(this, handler, attachment);
+ AcceptTask task = new AcceptTask(ch, acc, result);
+ result.setContext(task);
+
+ // check and set flag to prevent concurrent accepting
+ if (!accepting.compareAndSet(false, true))
+ throw new AcceptPendingException();
+
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ task.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, task);
+ }
+ return result;
+ }
+
+ // -- Native methods --
+
+ private static native void initIDs();
+
+ private static native int accept0(long listenSocket, long acceptSocket,
+ long overlapped, long dataBuffer) throws IOException;
+
+ private static native void updateAcceptContext(long listenSocket,
+ long acceptSocket) throws IOException;
+
+ private static native void closesocket0(long socket) throws IOException;
+
+ static {
+ Util.load();
+ initIDs();
+ }
+}
diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
new file mode 100644
index 00000000..a55e7a01
--- /dev/null
+++ b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
@@ -0,0 +1,921 @@
+/*
+ * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.*;
+import java.nio.ByteBuffer;
+import java.nio.BufferOverflowException;
+import java.net.*;
+import java.util.concurrent.*;
+import java.io.IOException;
+import sun.misc.Unsafe;
+
+/**
+ * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
+ */
+
+class WindowsAsynchronousSocketChannelImpl
+ extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
+{
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static int addressSize = unsafe.addressSize();
+
+ private static int dependsArch(int value32, int value64) {
+ return (addressSize == 4) ? value32 : value64;
+ }
+
+ /*
+ * typedef struct _WSABUF {
+ * u_long len;
+ * char FAR * buf;
+ * } WSABUF;
+ */
+ private static final int SIZEOF_WSABUF = dependsArch(8, 16);
+ private static final int OFFSETOF_LEN = 0;
+ private static final int OFFSETOF_BUF = dependsArch(4, 8);
+
+ // maximum vector size for scatter/gather I/O
+ private static final int MAX_WSABUF = 16;
+
+ private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
+
+
+ // socket handle. Use begin()/end() around each usage of this handle.
+ final long handle;
+
+ // I/O completion port that the socket is associated with
+ private final Iocp iocp;
+
+ // completion key to identify channel when I/O completes
+ private final int completionKey;
+
+ // Pending I/O operations are tied to an OVERLAPPED structure that can only
+ // be released when the I/O completion event is posted to the completion
+ // port. Where I/O operations complete immediately then it is possible
+ // there may be more than two OVERLAPPED structures in use.
+ private final PendingIoCache ioCache;
+
+ // per-channel arrays of WSABUF structures
+ private final long readBufferArray;
+ private final long writeBufferArray;
+
+
+ WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
+ throws IOException
+ {
+ super(iocp);
+
+ // associate socket with default completion port
+ long h = IOUtil.fdVal(fd);
+ int key = 0;
+ try {
+ key = iocp.associate(this, h);
+ } catch (ShutdownChannelGroupException x) {
+ if (failIfGroupShutdown) {
+ closesocket0(h);
+ throw x;
+ }
+ } catch (IOException x) {
+ closesocket0(h);
+ throw x;
+ }
+
+ this.handle = h;
+ this.iocp = iocp;
+ this.completionKey = key;
+ this.ioCache = new PendingIoCache();
+
+ // allocate WSABUF arrays
+ this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
+ this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
+ }
+
+ WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
+ this(iocp, true);
+ }
+
+ @Override
+ public AsynchronousChannelGroupImpl group() {
+ return iocp;
+ }
+
+ /**
+ * Invoked by Iocp when an I/O operation competes.
+ */
+ @Override
+ public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
+ return ioCache.remove(overlapped);
+ }
+
+ // invoked by WindowsAsynchronousServerSocketChannelImpl
+ long handle() {
+ return handle;
+ }
+
+ // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
+ // accept
+ void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
+ synchronized (stateLock) {
+ state = ST_CONNECTED;
+ this.localAddress = localAddress;
+ this.remoteAddress = remoteAddress;
+ }
+ }
+
+ @Override
+ void implClose() throws IOException {
+ // close socket (may cause outstanding async I/O operations to fail).
+ closesocket0(handle);
+
+ // waits until all I/O operations have completed
+ ioCache.close();
+
+ // release arrays of WSABUF structures
+ unsafe.freeMemory(readBufferArray);
+ unsafe.freeMemory(writeBufferArray);
+
+ // finally disassociate from the completion port (key can be 0 if
+ // channel created when group is shutdown)
+ if (completionKey != 0)
+ iocp.disassociate(completionKey);
+ }
+
+ @Override
+ public void onCancel(PendingFuture<?,?> task) {
+ if (task.getContext() instanceof ConnectTask)
+ killConnect();
+ if (task.getContext() instanceof ReadTask)
+ killReading();
+ if (task.getContext() instanceof WriteTask)
+ killWriting();
+ }
+
+ /**
+ * Implements the task to initiate a connection and the handler to
+ * consume the result when the connection is established (or fails).
+ */
+ private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
+ private final InetSocketAddress remote;
+ private final PendingFuture<Void,A> result;
+
+ ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
+ this.remote = remote;
+ this.result = result;
+ }
+
+ private void closeChannel() {
+ try {
+ close();
+ } catch (IOException ignore) { }
+ }
+
+ private IOException toIOException(Throwable x) {
+ if (x instanceof IOException) {
+ if (x instanceof ClosedChannelException)
+ x = new AsynchronousCloseException();
+ return (IOException)x;
+ }
+ return new IOException(x);
+ }
+
+ /**
+ * Invoke after a connection is successfully established.
+ */
+ private void afterConnect() throws IOException {
+ updateConnectContext(handle);
+ synchronized (stateLock) {
+ state = ST_CONNECTED;
+ remoteAddress = remote;
+ }
+ }
+
+ /**
+ * Task to initiate a connection.
+ */
+ @Override
+ public void run() {
+ long overlapped = 0L;
+ Throwable exc = null;
+ try {
+ begin();
+
+ // synchronize on result to allow this thread handle the case
+ // where the connection is established immediately.
+ synchronized (result) {
+ overlapped = ioCache.add(result);
+ // initiate the connection
+ int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
+ remote.getPort(), overlapped);
+ if (n == IOStatus.UNAVAILABLE) {
+ // connection is pending
+ return;
+ }
+
+ // connection established immediately
+ afterConnect();
+ result.setResult(null);
+ }
+ } catch (Throwable x) {
+ if (overlapped != 0L)
+ ioCache.remove(overlapped);
+ exc = x;
+ } finally {
+ end();
+ }
+
+ if (exc != null) {
+ closeChannel();
+ result.setFailure(toIOException(exc));
+ }
+ Invoker.invoke(result);
+ }
+
+ /**
+ * Invoked by handler thread when connection established.
+ */
+ @Override
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
+ Throwable exc = null;
+ try {
+ begin();
+ afterConnect();
+ result.setResult(null);
+ } catch (Throwable x) {
+ // channel is closed or unable to finish connect
+ exc = x;
+ } finally {
+ end();
+ }
+
+ // can't close channel while in begin/end block
+ if (exc != null) {
+ closeChannel();
+ result.setFailure(toIOException(exc));
+ }
+
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
+ }
+
+ /**
+ * Invoked by handler thread when failed to establish connection.
+ */
+ @Override
+ public void failed(int error, IOException x) {
+ if (isOpen()) {
+ closeChannel();
+ result.setFailure(x);
+ } else {
+ result.setFailure(new AsynchronousCloseException());
+ }
+ Invoker.invoke(result);
+ }
+ }
+
+ @Override
+ <A> Future<Void> implConnect(SocketAddress remote,
+ A attachment,
+ CompletionHandler<Void,? super A> handler)
+ {
+ if (!isOpen()) {
+ Throwable exc = new ClosedChannelException();
+ if (handler == null)
+ return CompletedFuture.withFailure(exc);
+ Invoker.invoke(this, handler, attachment, null, exc);
+ return null;
+ }
+
+ InetSocketAddress isa = Net.checkAddress(remote);
+
+ // permission check
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
+
+ // check and update state
+ // ConnectEx requires the socket to be bound to a local address
+ IOException bindException = null;
+ synchronized (stateLock) {
+ if (state == ST_CONNECTED)
+ throw new AlreadyConnectedException();
+ if (state == ST_PENDING)
+ throw new ConnectionPendingException();
+ if (localAddress == null) {
+ try {
+ bind(new InetSocketAddress(0));
+ } catch (IOException x) {
+ bindException = x;
+ }
+ }
+ if (bindException == null)
+ state = ST_PENDING;
+ }
+
+ // handle bind failure
+ if (bindException != null) {
+ try {
+ close();
+ } catch (IOException ignore) { }
+ if (handler == null)
+ return CompletedFuture.withFailure(bindException);
+ Invoker.invoke(this, handler, attachment, null, bindException);
+ return null;
+ }
+
+ // setup task
+ PendingFuture<Void,A> result =
+ new PendingFuture<Void,A>(this, handler, attachment);
+ ConnectTask task = new ConnectTask<A>(isa, result);
+ result.setContext(task);
+
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ task.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, task);
+ }
+ return result;
+ }
+
+ /**
+ * Implements the task to initiate a read and the handler to consume the
+ * result when the read completes.
+ */
+ private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
+ private final ByteBuffer[] bufs;
+ private final int numBufs;
+ private final boolean scatteringRead;
+ private final PendingFuture<V,A> result;
+
+ // set by run method
+ private ByteBuffer[] shadow;
+
+ ReadTask(ByteBuffer[] bufs,
+ boolean scatteringRead,
+ PendingFuture<V,A> result)
+ {
+ this.bufs = bufs;
+ this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
+ this.scatteringRead = scatteringRead;
+ this.result = result;
+ }
+
+ /**
+ * Invoked prior to read to prepare the WSABUF array. Where necessary,
+ * it substitutes non-direct buffers with direct buffers.
+ */
+ void prepareBuffers() {
+ shadow = new ByteBuffer[numBufs];
+ long address = readBufferArray;
+ for (int i=0; i<numBufs; i++) {
+ ByteBuffer dst = bufs[i];
+ int pos = dst.position();
+ int lim = dst.limit();
+ assert (pos <= lim);
+ int rem = (pos <= lim ? lim - pos : 0);
+ long a;
+ if (!(dst instanceof DirectBuffer)) {
+ // substitute with direct buffer
+ ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ shadow[i] = bb;
+ a = ((DirectBuffer)bb).address();
+ } else {
+ shadow[i] = dst;
+ a = ((DirectBuffer)dst).address() + pos;
+ }
+ unsafe.putAddress(address + OFFSETOF_BUF, a);
+ unsafe.putInt(address + OFFSETOF_LEN, rem);
+ address += SIZEOF_WSABUF;
+ }
+ }
+
+ /**
+ * Invoked after a read has completed to update the buffer positions
+ * and release any substituted buffers.
+ */
+ void updateBuffers(int bytesRead) {
+ for (int i=0; i<numBufs; i++) {
+ ByteBuffer nextBuffer = shadow[i];
+ int pos = nextBuffer.position();
+ int len = nextBuffer.remaining();
+ if (bytesRead >= len) {
+ bytesRead -= len;
+ int newPosition = pos + len;
+ try {
+ nextBuffer.position(newPosition);
+ } catch (IllegalArgumentException x) {
+ // position changed by another
+ }
+ } else { // Buffers not completely filled
+ if (bytesRead > 0) {
+ assert(pos + bytesRead < (long)Integer.MAX_VALUE);
+ int newPosition = pos + bytesRead;
+ try {
+ nextBuffer.position(newPosition);
+ } catch (IllegalArgumentException x) {
+ // position changed by another
+ }
+ }
+ break;
+ }
+ }
+
+ // Put results from shadow into the slow buffers
+ for (int i=0; i<numBufs; i++) {
+ if (!(bufs[i] instanceof DirectBuffer)) {
+ shadow[i].flip();
+ try {
+ bufs[i].put(shadow[i]);
+ } catch (BufferOverflowException x) {
+ // position changed by another
+ }
+ }
+ }
+ }
+
+ void releaseBuffers() {
+ for (int i=0; i<numBufs; i++) {
+ if (!(bufs[i] instanceof DirectBuffer)) {
+ Util.releaseTemporaryDirectBuffer(shadow[i]);
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ long overlapped = 0L;
+ boolean prepared = false;
+ boolean pending = false;
+
+ try {
+ begin();
+
+ // substitute non-direct buffers
+ prepareBuffers();
+ prepared = true;
+
+ // get an OVERLAPPED structure (from the cache or allocate)
+ overlapped = ioCache.add(result);
+
+ // initiate read
+ int n = read0(handle, numBufs, readBufferArray, overlapped);
+ if (n == IOStatus.UNAVAILABLE) {
+ // I/O is pending
+ pending = true;
+ return;
+ }
+ if (n == IOStatus.EOF) {
+ // input shutdown
+ enableReading();
+ if (scatteringRead) {
+ result.setResult((V)Long.valueOf(-1L));
+ } else {
+ result.setResult((V)Integer.valueOf(-1));
+ }
+ } else {
+ throw new InternalError("Read completed immediately");
+ }
+ } catch (Throwable x) {
+ // failed to initiate read
+ // reset read flag before releasing waiters
+ enableReading();
+ if (x instanceof ClosedChannelException)
+ x = new AsynchronousCloseException();
+ if (!(x instanceof IOException))
+ x = new IOException(x);
+ result.setFailure(x);
+ } finally {
+ // release resources if I/O not pending
+ if (!pending) {
+ if (overlapped != 0L)
+ ioCache.remove(overlapped);
+ if (prepared)
+ releaseBuffers();
+ }
+ end();
+ }
+
+ // invoke completion handler
+ Invoker.invoke(result);
+ }
+
+ /**
+ * Executed when the I/O has completed
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
+ if (bytesTransferred == 0) {
+ bytesTransferred = -1; // EOF
+ } else {
+ updateBuffers(bytesTransferred);
+ }
+
+ // return direct buffer to cache if substituted
+ releaseBuffers();
+
+ // release waiters if not already released by timeout
+ synchronized (result) {
+ if (result.isDone())
+ return;
+ enableReading();
+ if (scatteringRead) {
+ result.setResult((V)Long.valueOf(bytesTransferred));
+ } else {
+ result.setResult((V)Integer.valueOf(bytesTransferred));
+ }
+ }
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
+ }
+
+ @Override
+ public void failed(int error, IOException x) {
+ // return direct buffer to cache if substituted
+ releaseBuffers();
+
+ // release waiters if not already released by timeout
+ if (!isOpen())
+ x = new AsynchronousCloseException();
+
+ synchronized (result) {
+ if (result.isDone())
+ return;
+ enableReading();
+ result.setFailure(x);
+ }
+ Invoker.invoke(result);
+ }
+
+ /**
+ * Invoked if timeout expires before it is cancelled
+ */
+ void timeout() {
+ // synchronize on result as the I/O could complete/fail
+ synchronized (result) {
+ if (result.isDone())
+ return;
+
+ // kill further reading before releasing waiters
+ enableReading(true);
+ result.setFailure(new InterruptedByTimeoutException());
+ }
+
+ // invoke handler without any locks
+ Invoker.invoke(result);
+ }
+ }
+
+ @Override
+ <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
+ ByteBuffer dst,
+ ByteBuffer[] dsts,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<V,? super A> handler)
+ {
+ // setup task
+ PendingFuture<V,A> result =
+ new PendingFuture<V,A>(this, handler, attachment);
+ ByteBuffer[] bufs;
+ if (isScatteringRead) {
+ bufs = dsts;
+ } else {
+ bufs = new ByteBuffer[1];
+ bufs[0] = dst;
+ }
+ final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
+ result.setContext(readTask);
+
+ // schedule timeout
+ if (timeout > 0L) {
+ Future<?> timeoutTask = iocp.schedule(new Runnable() {
+ public void run() {
+ readTask.timeout();
+ }
+ }, timeout, unit);
+ result.setTimeoutTask(timeoutTask);
+ }
+
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ readTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, readTask);
+ }
+ return result;
+ }
+
+ /**
+ * Implements the task to initiate a write and the handler to consume the
+ * result when the write completes.
+ */
+ private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
+ private final ByteBuffer[] bufs;
+ private final int numBufs;
+ private final boolean gatheringWrite;
+ private final PendingFuture<V,A> result;
+
+ // set by run method
+ private ByteBuffer[] shadow;
+
+ WriteTask(ByteBuffer[] bufs,
+ boolean gatheringWrite,
+ PendingFuture<V,A> result)
+ {
+ this.bufs = bufs;
+ this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
+ this.gatheringWrite = gatheringWrite;
+ this.result = result;
+ }
+
+ /**
+ * Invoked prior to write to prepare the WSABUF array. Where necessary,
+ * it substitutes non-direct buffers with direct buffers.
+ */
+ void prepareBuffers() {
+ shadow = new ByteBuffer[numBufs];
+ long address = writeBufferArray;
+ for (int i=0; i<numBufs; i++) {
+ ByteBuffer src = bufs[i];
+ int pos = src.position();
+ int lim = src.limit();
+ assert (pos <= lim);
+ int rem = (pos <= lim ? lim - pos : 0);
+ long a;
+ if (!(src instanceof DirectBuffer)) {
+ // substitute with direct buffer
+ ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ bb.put(src);
+ bb.flip();
+ src.position(pos); // leave heap buffer untouched for now
+ shadow[i] = bb;
+ a = ((DirectBuffer)bb).address();
+ } else {
+ shadow[i] = src;
+ a = ((DirectBuffer)src).address() + pos;
+ }
+ unsafe.putAddress(address + OFFSETOF_BUF, a);
+ unsafe.putInt(address + OFFSETOF_LEN, rem);
+ address += SIZEOF_WSABUF;
+ }
+ }
+
+ /**
+ * Invoked after a write has completed to update the buffer positions
+ * and release any substituted buffers.
+ */
+ void updateBuffers(int bytesWritten) {
+ // Notify the buffers how many bytes were taken
+ for (int i=0; i<numBufs; i++) {
+ ByteBuffer nextBuffer = bufs[i];
+ int pos = nextBuffer.position();
+ int lim = nextBuffer.limit();
+ int len = (pos <= lim ? lim - pos : lim);
+ if (bytesWritten >= len) {
+ bytesWritten -= len;
+ int newPosition = pos + len;
+ try {
+ nextBuffer.position(newPosition);
+ } catch (IllegalArgumentException x) {
+ // position changed by someone else
+ }
+ } else { // Buffers not completely filled
+ if (bytesWritten > 0) {
+ assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
+ int newPosition = pos + bytesWritten;
+ try {
+ nextBuffer.position(newPosition);
+ } catch (IllegalArgumentException x) {
+ // position changed by someone else
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ void releaseBuffers() {
+ for (int i=0; i<numBufs; i++) {
+ if (!(bufs[i] instanceof DirectBuffer)) {
+ Util.releaseTemporaryDirectBuffer(shadow[i]);
+ }
+ }
+ }
+
+ @Override
+ //@SuppressWarnings("unchecked")
+ public void run() {
+ long overlapped = 0L;
+ boolean prepared = false;
+ boolean pending = false;
+ boolean shutdown = false;
+
+ try {
+ begin();
+
+ // substitute non-direct buffers
+ prepareBuffers();
+ prepared = true;
+
+ // get an OVERLAPPED structure (from the cache or allocate)
+ overlapped = ioCache.add(result);
+ int n = write0(handle, numBufs, writeBufferArray, overlapped);
+ if (n == IOStatus.UNAVAILABLE) {
+ // I/O is pending
+ pending = true;
+ return;
+ }
+ if (n == IOStatus.EOF) {
+ // special case for shutdown output
+ shutdown = true;
+ throw new ClosedChannelException();
+ }
+ // write completed immediately
+ throw new InternalError("Write completed immediately");
+ } catch (Throwable x) {
+ // write failed. Enable writing before releasing waiters.
+ enableWriting();
+ if (!shutdown && (x instanceof ClosedChannelException))
+ x = new AsynchronousCloseException();
+ if (!(x instanceof IOException))
+ x = new IOException(x);
+ result.setFailure(x);
+ } finally {
+ // release resources if I/O not pending
+ if (!pending) {
+ if (overlapped != 0L)
+ ioCache.remove(overlapped);
+ if (prepared)
+ releaseBuffers();
+ }
+ end();
+ }
+
+ // invoke completion handler
+ Invoker.invoke(result);
+ }
+
+ /**
+ * Executed when the I/O has completed
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void completed(int bytesTransferred, boolean canInvokeDirect) {
+ updateBuffers(bytesTransferred);
+
+ // return direct buffer to cache if substituted
+ releaseBuffers();
+
+ // release waiters if not already released by timeout
+ synchronized (result) {
+ if (result.isDone())
+ return;
+ enableWriting();
+ if (gatheringWrite) {
+ result.setResult((V)Long.valueOf(bytesTransferred));
+ } else {
+ result.setResult((V)Integer.valueOf(bytesTransferred));
+ }
+ }
+ if (canInvokeDirect) {
+ Invoker.invokeUnchecked(result);
+ } else {
+ Invoker.invoke(result);
+ }
+ }
+
+ @Override
+ public void failed(int error, IOException x) {
+ // return direct buffer to cache if substituted
+ releaseBuffers();
+
+ // release waiters if not already released by timeout
+ if (!isOpen())
+ x = new AsynchronousCloseException();
+
+ synchronized (result) {
+ if (result.isDone())
+ return;
+ enableWriting();
+ result.setFailure(x);
+ }
+ Invoker.invoke(result);
+ }
+
+ /**
+ * Invoked if timeout expires before it is cancelled
+ */
+ void timeout() {
+ // synchronize on result as the I/O could complete/fail
+ synchronized (result) {
+ if (result.isDone())
+ return;
+
+ // kill further writing before releasing waiters
+ enableWriting(true);
+ result.setFailure(new InterruptedByTimeoutException());
+ }
+
+ // invoke handler without any locks
+ Invoker.invoke(result);
+ }
+ }
+
+ @Override
+ <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
+ ByteBuffer src,
+ ByteBuffer[] srcs,
+ long timeout,
+ TimeUnit unit,
+ A attachment,
+ CompletionHandler<V,? super A> handler)
+ {
+ // setup task
+ PendingFuture<V,A> result =
+ new PendingFuture<V,A>(this, handler, attachment);
+ ByteBuffer[] bufs;
+ if (gatheringWrite) {
+ bufs = srcs;
+ } else {
+ bufs = new ByteBuffer[1];
+ bufs[0] = src;
+ }
+ final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
+ result.setContext(writeTask);
+
+ // schedule timeout
+ if (timeout > 0L) {
+ Future<?> timeoutTask = iocp.schedule(new Runnable() {
+ public void run() {
+ writeTask.timeout();
+ }
+ }, timeout, unit);
+ result.setTimeoutTask(timeoutTask);
+ }
+
+ // initiate I/O (can only be done from thread in thread pool)
+ // initiate I/O
+ if (Iocp.supportsThreadAgnosticIo()) {
+ writeTask.run();
+ } else {
+ Invoker.invokeOnThreadInThreadPool(this, writeTask);
+ }
+ return result;
+ }
+
+ // -- Native methods --
+
+ private static native void initIDs();
+
+ private static native int connect0(long socket, boolean preferIPv6,
+ InetAddress remote, int remotePort, long overlapped) throws IOException;
+
+ private static native void updateConnectContext(long socket) throws IOException;
+
+ private static native int read0(long socket, int count, long addres, long overlapped)
+ throws IOException;
+
+ private static native int write0(long socket, int count, long address,
+ long overlapped) throws IOException;
+
+ private static native void shutdown0(long socket, int how) throws IOException;
+
+ private static native void closesocket0(long socket) throws IOException;
+
+ static {
+ Util.load();
+ initIDs();
+ }
+}