Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mono/ikvm-fork.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjfrijters <jfrijters>2011-08-24 13:07:19 +0400
committerjfrijters <jfrijters>2011-08-24 13:07:19 +0400
commiteaaccdd4e95adb03091adfb4e7fc91bd12d72e94 (patch)
treecdb224b7a4e958fd16a3084134b6ab24292215ce /openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
parent60adc319391d03eba308e29fb8d95ebe00a82afa (diff)
Implemented Asynchronous[Server]SocketChannel.
Diffstat (limited to 'openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java')
-rw-r--r--openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java190
1 files changed, 43 insertions, 147 deletions
diff --git a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
index a55e7a01..e82c1d9b 100644
--- a/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
+++ b/openjdk/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
@@ -30,87 +30,29 @@ import java.nio.ByteBuffer;
import java.nio.BufferOverflowException;
import java.net.*;
import java.util.concurrent.*;
+import java.io.FileDescriptor;
import java.io.IOException;
-import sun.misc.Unsafe;
/**
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
*/
class WindowsAsynchronousSocketChannelImpl
- extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
+ extends AsynchronousSocketChannelImpl
{
- private static final Unsafe unsafe = Unsafe.getUnsafe();
- private static int addressSize = unsafe.addressSize();
-
- private static int dependsArch(int value32, int value64) {
- return (addressSize == 4) ? value32 : value64;
- }
-
- /*
- * typedef struct _WSABUF {
- * u_long len;
- * char FAR * buf;
- * } WSABUF;
- */
- private static final int SIZEOF_WSABUF = dependsArch(8, 16);
- private static final int OFFSETOF_LEN = 0;
- private static final int OFFSETOF_BUF = dependsArch(4, 8);
-
// maximum vector size for scatter/gather I/O
private static final int MAX_WSABUF = 16;
- private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
-
-
- // socket handle. Use begin()/end() around each usage of this handle.
- final long handle;
-
// I/O completion port that the socket is associated with
private final Iocp iocp;
- // completion key to identify channel when I/O completes
- private final int completionKey;
-
- // Pending I/O operations are tied to an OVERLAPPED structure that can only
- // be released when the I/O completion event is posted to the completion
- // port. Where I/O operations complete immediately then it is possible
- // there may be more than two OVERLAPPED structures in use.
- private final PendingIoCache ioCache;
-
- // per-channel arrays of WSABUF structures
- private final long readBufferArray;
- private final long writeBufferArray;
-
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
throws IOException
{
super(iocp);
- // associate socket with default completion port
- long h = IOUtil.fdVal(fd);
- int key = 0;
- try {
- key = iocp.associate(this, h);
- } catch (ShutdownChannelGroupException x) {
- if (failIfGroupShutdown) {
- closesocket0(h);
- throw x;
- }
- } catch (IOException x) {
- closesocket0(h);
- throw x;
- }
-
- this.handle = h;
this.iocp = iocp;
- this.completionKey = key;
- this.ioCache = new PendingIoCache();
-
- // allocate WSABUF arrays
- this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
- this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
}
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
@@ -122,19 +64,6 @@ class WindowsAsynchronousSocketChannelImpl
return iocp;
}
- /**
- * Invoked by Iocp when an I/O operation competes.
- */
- @Override
- public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
- return ioCache.remove(overlapped);
- }
-
- // invoked by WindowsAsynchronousServerSocketChannelImpl
- long handle() {
- return handle;
- }
-
// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
// accept
void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
@@ -148,19 +77,7 @@ class WindowsAsynchronousSocketChannelImpl
@Override
void implClose() throws IOException {
// close socket (may cause outstanding async I/O operations to fail).
- closesocket0(handle);
-
- // waits until all I/O operations have completed
- ioCache.close();
-
- // release arrays of WSABUF structures
- unsafe.freeMemory(readBufferArray);
- unsafe.freeMemory(writeBufferArray);
-
- // finally disassociate from the completion port (key can be 0 if
- // channel created when group is shutdown)
- if (completionKey != 0)
- iocp.disassociate(completionKey);
+ SocketDispatcher.closeImpl(fd);
}
@Override
@@ -205,7 +122,7 @@ class WindowsAsynchronousSocketChannelImpl
* Invoke after a connection is successfully established.
*/
private void afterConnect() throws IOException {
- updateConnectContext(handle);
+ updateConnectContext(fd);
synchronized (stateLock) {
state = ST_CONNECTED;
remoteAddress = remote;
@@ -217,7 +134,6 @@ class WindowsAsynchronousSocketChannelImpl
*/
@Override
public void run() {
- long overlapped = 0L;
Throwable exc = null;
try {
begin();
@@ -225,10 +141,9 @@ class WindowsAsynchronousSocketChannelImpl
// synchronize on result to allow this thread handle the case
// where the connection is established immediately.
synchronized (result) {
- overlapped = ioCache.add(result);
// initiate the connection
- int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
- remote.getPort(), overlapped);
+ int n = connect0(fd, Net.isIPv6Available(), remote.getAddress(),
+ remote.getPort(), this);
if (n == IOStatus.UNAVAILABLE) {
// connection is pending
return;
@@ -239,8 +154,6 @@ class WindowsAsynchronousSocketChannelImpl
result.setResult(null);
}
} catch (Throwable x) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
exc = x;
} finally {
end();
@@ -388,30 +301,23 @@ class WindowsAsynchronousSocketChannelImpl
/**
* Invoked prior to read to prepare the WSABUF array. Where necessary,
- * it substitutes non-direct buffers with direct buffers.
+ * it substitutes direct buffers with managed buffers.
*/
void prepareBuffers() {
shadow = new ByteBuffer[numBufs];
- long address = readBufferArray;
for (int i=0; i<numBufs; i++) {
ByteBuffer dst = bufs[i];
int pos = dst.position();
int lim = dst.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
- long a;
- if (!(dst instanceof DirectBuffer)) {
+ if (!dst.hasArray()) {
// substitute with direct buffer
- ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ ByteBuffer bb = ByteBuffer.allocate(rem);
shadow[i] = bb;
- a = ((DirectBuffer)bb).address();
} else {
shadow[i] = dst;
- a = ((DirectBuffer)dst).address() + pos;
}
- unsafe.putAddress(address + OFFSETOF_BUF, a);
- unsafe.putInt(address + OFFSETOF_LEN, rem);
- address += SIZEOF_WSABUF;
}
}
@@ -448,7 +354,7 @@ class WindowsAsynchronousSocketChannelImpl
// Put results from shadow into the slow buffers
for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
+ if (!bufs[i].hasArray()) {
shadow[i].flip();
try {
bufs[i].put(shadow[i]);
@@ -460,32 +366,23 @@ class WindowsAsynchronousSocketChannelImpl
}
void releaseBuffers() {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- Util.releaseTemporaryDirectBuffer(shadow[i]);
- }
- }
}
@Override
@SuppressWarnings("unchecked")
public void run() {
- long overlapped = 0L;
boolean prepared = false;
boolean pending = false;
try {
begin();
- // substitute non-direct buffers
+ // substitute direct buffers
prepareBuffers();
prepared = true;
- // get an OVERLAPPED structure (from the cache or allocate)
- overlapped = ioCache.add(result);
-
// initiate read
- int n = read0(handle, numBufs, readBufferArray, overlapped);
+ int n = read0(fd, shadow, this);
if (n == IOStatus.UNAVAILABLE) {
// I/O is pending
pending = true;
@@ -499,8 +396,19 @@ class WindowsAsynchronousSocketChannelImpl
} else {
result.setResult((V)Integer.valueOf(-1));
}
+ }
+ // read completed immediately
+ if (n == 0) {
+ n = -1; // EOF
} else {
- throw new InternalError("Read completed immediately");
+ updateBuffers(n);
+ }
+ releaseBuffers();
+ enableReading();
+ if (scatteringRead) {
+ result.setResult((V)Long.valueOf(n));
+ } else {
+ result.setResult((V)Integer.valueOf(n));
}
} catch (Throwable x) {
// failed to initiate read
@@ -514,8 +422,6 @@ class WindowsAsynchronousSocketChannelImpl
} finally {
// release resources if I/O not pending
if (!pending) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
if (prepared)
releaseBuffers();
}
@@ -662,33 +568,26 @@ class WindowsAsynchronousSocketChannelImpl
/**
* Invoked prior to write to prepare the WSABUF array. Where necessary,
- * it substitutes non-direct buffers with direct buffers.
+ * it substitutes direct buffers with managed buffers.
*/
void prepareBuffers() {
shadow = new ByteBuffer[numBufs];
- long address = writeBufferArray;
for (int i=0; i<numBufs; i++) {
ByteBuffer src = bufs[i];
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
- long a;
- if (!(src instanceof DirectBuffer)) {
+ if (!src.hasArray()) {
// substitute with direct buffer
- ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
+ ByteBuffer bb = ByteBuffer.allocate(rem);
bb.put(src);
bb.flip();
src.position(pos); // leave heap buffer untouched for now
shadow[i] = bb;
- a = ((DirectBuffer)bb).address();
} else {
shadow[i] = src;
- a = ((DirectBuffer)src).address() + pos;
}
- unsafe.putAddress(address + OFFSETOF_BUF, a);
- unsafe.putInt(address + OFFSETOF_LEN, rem);
- address += SIZEOF_WSABUF;
}
}
@@ -727,17 +626,11 @@ class WindowsAsynchronousSocketChannelImpl
}
void releaseBuffers() {
- for (int i=0; i<numBufs; i++) {
- if (!(bufs[i] instanceof DirectBuffer)) {
- Util.releaseTemporaryDirectBuffer(shadow[i]);
- }
- }
}
@Override
//@SuppressWarnings("unchecked")
public void run() {
- long overlapped = 0L;
boolean prepared = false;
boolean pending = false;
boolean shutdown = false;
@@ -745,13 +638,11 @@ class WindowsAsynchronousSocketChannelImpl
try {
begin();
- // substitute non-direct buffers
+ // substitute direct buffers
prepareBuffers();
prepared = true;
- // get an OVERLAPPED structure (from the cache or allocate)
- overlapped = ioCache.add(result);
- int n = write0(handle, numBufs, writeBufferArray, overlapped);
+ int n = write0(fd, shadow, this);
if (n == IOStatus.UNAVAILABLE) {
// I/O is pending
pending = true;
@@ -763,7 +654,14 @@ class WindowsAsynchronousSocketChannelImpl
throw new ClosedChannelException();
}
// write completed immediately
- throw new InternalError("Write completed immediately");
+ updateBuffers(n);
+ releaseBuffers();
+ enableWriting();
+ if (gatheringWrite) {
+ result.setResult((V)Long.valueOf(n));
+ } else {
+ result.setResult((V)Integer.valueOf(n));
+ }
} catch (Throwable x) {
// write failed. Enable writing before releasing waiters.
enableWriting();
@@ -775,8 +673,6 @@ class WindowsAsynchronousSocketChannelImpl
} finally {
// release resources if I/O not pending
if (!pending) {
- if (overlapped != 0L)
- ioCache.remove(overlapped);
if (prepared)
releaseBuffers();
}
@@ -899,16 +795,16 @@ class WindowsAsynchronousSocketChannelImpl
private static native void initIDs();
- private static native int connect0(long socket, boolean preferIPv6,
- InetAddress remote, int remotePort, long overlapped) throws IOException;
+ private static native int connect0(FileDescriptor fd, boolean preferIPv6,
+ InetAddress remote, int remotePort, Iocp.ResultHandler handler) throws IOException;
- private static native void updateConnectContext(long socket) throws IOException;
+ private static native void updateConnectContext(FileDescriptor fd) throws IOException;
- private static native int read0(long socket, int count, long addres, long overlapped)
+ private static native int read0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
throws IOException;
- private static native int write0(long socket, int count, long address,
- long overlapped) throws IOException;
+ private static native int write0(FileDescriptor fd, ByteBuffer[] bufs, Iocp.ResultHandler handler)
+ throws IOException;
private static native void shutdown0(long socket, int how) throws IOException;