Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/ikvm-fork.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjfrijters <jfrijters>2011-08-24 13:07:19 +0400
committerjfrijters <jfrijters>2011-08-24 13:07:19 +0400
commiteaaccdd4e95adb03091adfb4e7fc91bd12d72e94 (patch)
treecdb224b7a4e958fd16a3084134b6ab24292215ce /openjdk/sun
parent60adc319391d03eba308e29fb8d95ebe00a82afa (diff)
Implemented Asynchronous[Server]SocketChannel.
Diffstat (limited to 'openjdk/sun')
-rw-r--r--openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java43
-rw-r--r--openjdk/sun/nio/ch/SocketDispatcher.java4
-rw-r--r--openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java67
-rw-r--r--openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java190
4 files changed, 56 insertions, 248 deletions
diff --git a/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java b/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java
deleted file mode 100644
index e2310f73..00000000
--- a/openjdk/sun/nio/ch/DefaultAsynchronousChannelProvider.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package sun.nio.ch;
-
-import java.nio.channels.spi.AsynchronousChannelProvider;
-
-/**
- * Creates this platform's default asynchronous channel provider
- */
-
-public class DefaultAsynchronousChannelProvider {
- private DefaultAsynchronousChannelProvider() { }
-
- /**
- * Returns the default AsynchronousChannelProvider.
- */
- public static AsynchronousChannelProvider create() {
- throw new ikvm.internal.NotYetImplementedError();
- }
-}
diff --git a/openjdk/sun/nio/ch/SocketDispatcher.java b/openjdk/sun/nio/ch/SocketDispatcher.java
index 62409143..7d408326 100644
--- a/openjdk/sun/nio/ch/SocketDispatcher.java
+++ b/openjdk/sun/nio/ch/SocketDispatcher.java
@@ -99,6 +99,10 @@ class SocketDispatcher extends NativeDispatcher
}
void preClose(FileDescriptor fd) throws IOException {
+ closeImpl(fd);
+ }
+
+ static void closeImpl(FileDescriptor fd) throws IOException {
try
{
if (false) throw new cli.System.Net.Sockets.SocketException();
diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
index 00d6c463..b95f4279 100644
--- a/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
+++ b/openjdk/sun/nio/ch/WindowsAsynchronousServerSocketChannelImpl.java
@@ -29,38 +29,21 @@ import java.nio.channels.*;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import sun.misc.Unsafe;
/**
* Windows implementation of AsynchronousServerSocketChannel using overlapped I/O.
*/
class WindowsAsynchronousServerSocketChannelImpl
- extends AsynchronousServerSocketChannelImpl implements Iocp.OverlappedChannel
+ extends AsynchronousServerSocketChannelImpl
{
- private static final Unsafe unsafe = Unsafe.getUnsafe();
-
- // 2 * (sizeof(SOCKET_ADDRESS) + 16)
- private static final int DATA_BUFFER_SIZE = 88;
-
- private final long handle;
- private final int completionKey;
private final Iocp iocp;
- // typically there will be zero, or one I/O operations pending. In rare
- // cases there may be more. These rare cases arise when a sequence of accept
- // operations complete immediately and handled by the initiating thread.
- // The corresponding OVERLAPPED cannot be reused/released until the completion
- // event has been posted.
- private final PendingIoCache ioCache;
-
- // the data buffer to receive the local/remote socket address
- private final long dataBuffer;
-
// flag to indicate that an accept operation is outstanding
private AtomicBoolean accepting = new AtomicBoolean();
@@ -68,41 +51,13 @@ class WindowsAsynchronousServerSocketChannelImpl
WindowsAsynchronousServerSocketChannelImpl(Iocp iocp) throws IOException {
super(iocp);
- // associate socket with given completion port
- long h = IOUtil.fdVal(fd);
- int key;
- try {
- key = iocp.associate(this, h);
- } catch (IOException x) {
- closesocket0(h); // prevent leak
- throw x;
- }
-
- this.handle = h;
- this.completionKey = key;
this.iocp = iocp;
- this.ioCache = new PendingIoCache();
- this.dataBuffer = unsafe.allocateMemory(DATA_BUFFER_SIZE);
- }
-
- @Override
- public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
- return ioCache.remove(overlapped);
}
@Override
void implClose() throws IOException {
// close socket (which may cause outstanding accept to be aborted).
- closesocket0(handle);
-
- // waits until the accept operations have completed
- ioCache.close();
-
- // finally disassociate from the completion port
- iocp.disassociate(completionKey);
-
- // release other resources
- unsafe.freeMemory(dataBuffer);
+ SocketDispatcher.closeImpl(fd);
}
@Override
@@ -144,7 +99,7 @@ class WindowsAsynchronousServerSocketChannelImpl
* in that it requires 2 calls to getsockname and 2 calls to getpeername.
* (should change this to use GetAcceptExSockaddrs)
*/
- updateAcceptContext(handle, channel.handle());
+ updateAcceptContext(fd, channel.fd);
InetSocketAddress local = Net.localAddress(channel.fd);
final InetSocketAddress remote = Net.remoteAddress(channel.fd);
@@ -168,7 +123,6 @@ class WindowsAsynchronousServerSocketChannelImpl
*/
@Override
public void run() {
- long overlapped = 0L;
try {
// begin usage of listener socket
@@ -180,9 +134,8 @@ class WindowsAsynchronousServerSocketChannelImpl
channel.begin();
synchronized (result) {
- overlapped = ioCache.add(result);
- int n = accept0(handle, channel.handle(), overlapped, dataBuffer);
+ int n = accept0(fd, channel.fd, this);
if (n == IOStatus.UNAVAILABLE) {
return;
}
@@ -200,8 +153,6 @@ class WindowsAsynchronousServerSocketChannelImpl
}
} catch (Throwable x) {
// failed to initiate accept so release resources
- if (overlapped != 0L)
- ioCache.remove(overlapped);
closeChildChannel();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
@@ -354,11 +305,11 @@ class WindowsAsynchronousServerSocketChannelImpl
private static native void initIDs();
- private static native int accept0(long listenSocket, long acceptSocket,
- long overlapped, long dataBuffer) throws IOException;
+ private static native int accept0(FileDescriptor listenSocket, FileDescriptor acceptSocket,
+ Iocp.ResultHandler handler) throws IOException;
- private static native void updateAcceptContext(long listenSocket,
- long acceptSocket) throws IOException;
+ private static native void updateAcceptContext(FileDescriptor listenSocket,
+ FileDescriptor acceptSocket) throws IOException;
private static native void closesocket0(long socket) throws IOException;
diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
index a55e7a01..e82c1d9b 100644
--- a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
+++ b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
@@ -30,87 +30,29 @@ import java.nio.ByteBuffer;
import java.nio.BufferOverflowException;
import java.net.*;
import java.util.concurrent.*;
+import java.io.FileDescriptor;
import java.io.IOException;
-import sun.misc.Unsafe;
/**
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
*/
class WindowsAsynchronousSocketChannelImpl
- extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
+ extends AsynchronousSocketChannelImpl
{
- private static final Unsafe unsafe = Unsafe.getUnsafe();
- private static int addressSize = unsafe.addressSize();
-
- private static int dependsArch(int value32, int value64) {
- return (addressSize == 4) ? value32 : value64;
- }
-
- /*
- * typedef struct _WSABUF {
- * u_long len;
- * char FAR * buf;
- * } WSABUF;
- */
- private static final int SIZEOF_WSABUF = dependsArch(8, 16);
- private static final int OFFSETOF_LEN = 0;
- private static final int OFFSETOF_BUF = dependsArch(4, 8);
-
// maximum vector size for scatter/gather I/O
private static final int MAX_WSABUF = 16;
- private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
-
-
- // socket handle. Use begin()/end() around each usage of this handle.
- final long handle;
-
// I/O completion port that the socket is associated with
private final Iocp iocp;
- // completion key to identify channel when I/O completes
- private final int completionKey;
-
- // Pending I/O operations are tied to an OVERLAPPED structure that can only
- // be released when the I/O completion event is posted to the completion
- // port. Where I/O operations complete immediately then it is possible
- // there may be more than two OVERLAPPED structures in use.
- private final PendingIoCache ioCache;
-
- // per-channel arrays of WSABUF structures
- private final long readBufferArray;
- private final long writeBufferArray;
-
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
throws IOException
{
super(iocp);
- // associate socket with default completion port
- long h = IOUtil.fdVal(fd);
- int key = 0;
- try {
- key = iocp.associate(this, h);
- } catch (ShutdownChannelGroupException x) {
- if (failIfGroupShutdown) {
- closesocket0(h);
- throw x;
- }
- } catch (IOException x) {
- closesocket0(h);
- throw x;
- }
-
- this.handle = h;
this.iocp = iocp;
- this.completionKey = key;
- this.ioCache = new PendingIoCache();
-
- // allocate WSABUF arrays
- this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
- this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
}
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
@@ -122,19 +64,6 @@ class WindowsAsynchronousSocketChannelImpl
return iocp;
}
- /**
- * Invoked by Iocp when an I/O operation competes.
- */
- @Override
- public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
- return ioCache.remove(overlapped);
- }
-
- // invoked by WindowsAsynchronousServerSocketChannelImpl
- long handle() {
- return handle;
- }
-
// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
// accept
void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
@@ -148,19 +77,7 @@ class WindowsAsynchronousSocketChannelImpl
@Override
void implClose() throws IOException {
// close socket (may cause outstanding async I/O operations to fail).
- closesocket0(handle);
-
- // waits until all I/O operations have completed
- ioCache.close();
-
- // release arrays of WSABUF structures
- unsafe.freeMemory(readBufferArray);
- unsafe.freeMemory(writeBufferArray);
-
- // finally disassociate from the completion port (key can be 0 if
- // channel created when group is shutdown)
- if (completionKey != 0)
- iocp.disassociate(completionKey);
+ SocketDispatcher.closeImpl(fd);
}
@Override
@@ -205,7 +122,7 @@ class WindowsAsynchronousSocketChannelImpl
* Invoke after a connection is successfully established.
*/
private void afterConnect() throws IOException {
- updateConnectContext(handle);
+ updateConnectContext(fd);
synchronized (stateLock) {
state = ST_CONNECTED;
remoteAddress = remote;
@@ -217,7 +134,6 @@ class WindowsAsynchronousSocketChannelImpl
*/
@Override
public void run() {
- long overlapped = 0L;
Throwable exc = null;
try {
begin();
@@ -225,10 +141,9 @@ class WindowsAsynchronousSocketChannelImpl
// synchronize on result to allow this thread handle the case
// where the connection is established immediately.
synchronized (result) {
- overlapped = ioCache.add(result);
// initiate the connection
- int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
- remote.getPort(), overlapped);
+ int n = connect0(fd, Net.isIPv6Available(), remote.getAddress(),
+ remote.getPort(), this);
if (n == IOStatus.UNAVAILABLE) {
// connection is pending
return;
@@ -239,8 +154,6 @@ class WindowsAsynchronousSocketChannelImpl
result.setResult(null);
}
} catch (Throwable x) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
exc = x;
} finally {
end();
@@ -388,30 +301,23 @@ class WindowsAsynchronousSocketChannelImpl
/**
* Invoked prior to read to prepare the WSABUF array. Where necessary,
- * it substitutes non-direct buffers with direct buffers.
+ * it substitutes direct buffers with managed buffers.
*/
void prepareBuffers() {
shadow = new ByteBuffer[numBufs];
- long address = readBufferArray;
for (int i=0; i<numBufs; i++) {
ByteBuffer dst = bufs[i];
int pos = dst.position();
int lim = dst.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
- long a;
- if (!(dst instanceof DirectBuffer)) {
+ if (!dst.hasArray()) {
// substitute with direct buffer
- ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ ByteBuffer bb = ByteBuffer.allocate(rem);
shadow[i] = bb;
- a = ((DirectBuffer)bb).address();
} else {
shadow[i] = dst;
- a = ((DirectBuffer)dst).address() + pos;
}
- unsafe.putAddress(address + OFFSETOF_BUF, a);
- unsafe.putInt(address + OFFSETOF_LEN, rem);
- address += SIZEOF_WSABUF;
}
}
@@ -448,7 +354,7 @@ class WindowsAsynchronousSocketChannelImpl
// Put results from shadow into the slow buffers
for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
+ if (!bufs[i].hasArray()) {
shadow[i].flip();
try {
bufs[i].put(shadow[i]);
@@ -460,32 +366,23 @@ class WindowsAsynchronousSocketChannelImpl
}
void releaseBuffers() {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- Util.releaseTemporaryDirectBuffer(shadow[i]);
- }
- }
}
@Override
@SuppressWarnings("unchecked")
public void run() {
- long overlapped = 0L;
boolean prepared = false;
boolean pending = false;
try {
begin();
- // substitute non-direct buffers
+ // substitute direct buffers
prepareBuffers();
prepared = true;
- // get an OVERLAPPED structure (from the cache or allocate)
- overlapped = ioCache.add(result);
-
// initiate read
- int n = read0(handle, numBufs, readBufferArray, overlapped);
+ int n = read0(fd, shadow, this);
if (n == IOStatus.UNAVAILABLE) {
// I/O is pending
pending = true;
@@ -499,8 +396,19 @@ class WindowsAsynchronousSocketChannelImpl
} else {
result.setResult((V)Integer.valueOf(-1));
}
+ }
+ // read completed immediately
+ if (n == 0) {
+ n = -1; // EOF
} else {
- throw new InternalError("Read completed immediately");
+ updateBuffers(n);
+ }
+ releaseBuffers();
+ enableReading();
+ if (scatteringRead) {
+ result.setResult((V)Long.valueOf(n));
+ } else {
+ result.setResult((V)Integer.valueOf(n));
}
} catch (Throwable x) {
// failed to initiate read
@@ -514,8 +422,6 @@ class WindowsAsynchronousSocketChannelImpl
} finally {
// release resources if I/O not pending
if (!pending) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
if (prepared)
releaseBuffers();
}
@@ -662,33 +568,26 @@ class WindowsAsynchronousSocketChannelImpl
/**
* Invoked prior to write to prepare the WSABUF array. Where necessary,
- * it substitutes non-direct buffers with direct buffers.
+ * it substitutes direct buffers with managed buffers.
*/
void prepareBuffers() {
shadow = new ByteBuffer[numBufs];
- long address = writeBufferArray;
for (int i=0; i<numBufs; i++) {
ByteBuffer src = bufs[i];
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
- long a;
- if (!(src instanceof DirectBuffer)) {
+ if (!src.hasArray()) {
// substitute with direct buffer
- ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ ByteBuffer bb = ByteBuffer.allocate(rem);
bb.put(src);
bb.flip();
src.position(pos); // leave heap buffer untouched for now
shadow[i] = bb;
- a = ((DirectBuffer)bb).address();
} else {
shadow[i] = src;
- a = ((DirectBuffer)src).address() + pos;
}
- unsafe.putAddress(address + OFFSETOF_BUF, a);
- unsafe.putInt(address + OFFSETOF_LEN, rem);
- address += SIZEOF_WSABUF;
}
}
@@ -727,17 +626,11 @@ class WindowsAsynchronousSocketChannelImpl
}
void releaseBuffers() {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- Util.releaseTemporaryDirectBuffer(shadow[i]);
- }
- }
}
@Override
//@SuppressWarnings("unchecked")
public void run() {
- long overlapped = 0L;
boolean prepared = false;
boolean pending = false;
boolean shutdown = false;
@@ -745,13 +638,11 @@ class WindowsAsynchronousSocketChannelImpl
try {
begin();
- // substitute non-direct buffers
+ // substitute direct buffers
prepareBuffers();
prepared = true;
- // get an OVERLAPPED structure (from the cache or allocate)
- overlapped = ioCache.add(result);
- int n = write0(handle, numBufs, writeBufferArray, overlapped);
+ int n = write0(fd, shadow, this);
if (n == IOStatus.UNAVAILABLE) {
// I/O is pending
pending = true;
@@ -763,7 +654,14 @@ class WindowsAsynchronousSocketChannelImpl
throw new ClosedChannelException();
}
// write completed immediately
- throw new InternalError("Write completed immediately");
+ updateBuffers(n);
+ releaseBuffers();
+ enableWriting();
+ if (gatheringWrite) {
+ result.setResult((V)Long.valueOf(n));
+ } else {
+ result.setResult((V)Integer.valueOf(n));
+ }
} catch (Throwable x) {
// write failed. Enable writing before releasing waiters.
enableWriting();
@@ -775,8 +673,6 @@ class WindowsAsynchronousSocketChannelImpl
} finally {
// release resources if I/O not pending
if (!pending) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
if (prepared)
releaseBuffers();
}
@@ -899,16 +795,16 @@ class WindowsAsynchronousSocketChannelImpl
private static native void initIDs();
- private static native int connect0(long socket, boolean preferIPv6,
- InetAddress remote, int remotePort, long overlapped) throws IOException;
+ private static native int connect0(FileDescriptor fd, boolean preferIPv6,
+ InetAddress remote, int remotePort, Iocp.ResultHandler handler) throws IOException;
- private static native void updateConnectContext(long socket) throws IOException;
+ private static native void updateConnectContext(FileDescriptor fd) throws IOException;
- private static native int read0(long socket, int count, long addres, long overlapped)
+ private static native int read0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
throws IOException;
- private static native int write0(long socket, int count, long address,
- long overlapped) throws IOException;
+ private static native int write0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
+ throws IOException;
private static native void shutdown0(long socket, int how) throws IOException;